财经热点话题分析与可视化系统是一个集数据采集、文本挖掘、主题分析和可视化展示于一体的综合应用。下面提供完整的系统实现代码,涵盖MySQL数据库设计、Python后端处理和前端可视化展示。
## 一、数据库设计与建表SQL
### 1.1 MySQL数据库表结构设计
```sql
-- 创建数据库
CREATE DATABASE IF NOT EXISTS finance_news DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
USE finance_news;
-- 1. 新闻数据表
CREATE TABLE IF NOT EXISTS news_articles (
id INT PRIMARY KEY AUTO_INCREMENT,
title VARCHAR(500) NOT NULL COMMENT '新闻标题',
content TEXT COMMENT '新闻内容',
source VARCHAR(100) COMMENT '来源网站',
publish_time DATETIME COMMENT '发布时间',
url VARCHAR(500) COMMENT '原文链接',
crawl_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '爬取时间',
category VARCHAR(50) COMMENT '新闻分类',
sentiment_score FLOAT COMMENT '情感分析得分',
keywords TEXT COMMENT '关键词JSON',
INDEX idx_publish_time (publish_time),
INDEX idx_category (category),
INDEX idx_sentiment (sentiment_score)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='新闻原始数据表' [ref_1];
-- 2. 热点话题表
CREATE TABLE IF NOT EXISTS hot_topics (
topic_id INT PRIMARY KEY AUTO_INCREMENT,
topic_name VARCHAR(200) NOT NULL COMMENT '话题名称',
keywords JSON COMMENT '话题关键词',
start_date DATE COMMENT '开始日期',
end_date DATE COMMENT '结束日期',
heat_score FLOAT COMMENT '热度分数',
related_news_count INT DEFAULT 0 COMMENT '关联新闻数量',
sentiment_trend JSON COMMENT '情感趋势数据',
created_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
INDEX idx_heat_score (heat_score),
INDEX idx_time_range (start_date, end_date)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='热点话题表' [ref_4];
-- 3. 新闻-话题关联表
CREATE TABLE IF NOT EXISTS news_topic_relation (
id INT PRIMARY KEY AUTO_INCREMENT,
news_id INT NOT NULL COMMENT '新闻ID',
topic_id INT NOT NULL COMMENT '话题ID',
relevance_score FLOAT COMMENT '关联度分数',
FOREIGN KEY (news_id) REFERENCES news_articles(id) ON DELETE CASCADE,
FOREIGN KEY (topic_id) REFERENCES hot_topics(topic_id) ON DELETE CASCADE,
INDEX idx_news_topic (news_id, topic_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='新闻话题关联表';
-- 4. 可视化配置表
CREATE TABLE IF NOT EXISTS visualization_config (
config_id INT PRIMARY KEY AUTO_INCREMENT,
chart_type VARCHAR(50) COMMENT '图表类型',
data_source VARCHAR(200) COMMENT '数据源',
update_frequency VARCHAR(20) COMMENT '更新频率',
parameters JSON COMMENT '配置参数',
last_update TIMESTAMP NULL COMMENT '最后更新时间'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='可视化配置表' [ref_3];
```
## 二、Python后端核心代码实现
### 2.1 数据库连接与操作模块
```python
# database.py
import pymysql
import json
from datetime import datetime
from typing import List, Dict, Any
import pandas as pd
from sqlalchemy import create_engine
from config import DB_CONFIG
class DatabaseManager:
"""数据库管理类"""
def __init__(self):
self.connection = pymysql.connect(
host=DB_CONFIG['host'],
user=DB_CONFIG['user'],
password=DB_CONFIG['password'],
database=DB_CONFIG['database'],
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor
)
self.engine = create_engine(
f"mysql+pymysql://{DB_CONFIG['user']}:{DB_CONFIG['password']}@"
f"{DB_CONFIG['host']}/{DB_CONFIG['database']}?charset=utf8mb4"
)
def insert_news(self, news_data: Dict) -> int:
"""插入新闻数据"""
with self.connection.cursor() as cursor:
sql = """
INSERT INTO news_articles
(title, content, source, publish_time, url, category, sentiment_score, keywords)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
"""
cursor.execute(sql, (
news_data['title'],
news_data['content'],
news_data.get('source', ''),
news_data.get('publish_time'),
news_data.get('url', ''),
news_data.get('category', '财经'),
news_data.get('sentiment_score', 0),
json.dumps(news_data.get('keywords', []), ensure_ascii=False)
))
self.connection.commit()
return cursor.lastrowid
def get_hot_topics(self, limit: int = 10, days: int = 7) -> List[Dict]:
"""获取热点话题"""
with self.connection.cursor() as cursor:
sql = """
SELECT ht.*, COUNT(ntr.news_id) as news_count
FROM hot_topics ht
LEFT JOIN news_topic_relation ntr ON ht.topic_id = ntr.topic_id
WHERE ht.start_date >= DATE_SUB(CURDATE(), INTERVAL %s DAY)
GROUP BY ht.topic_id
ORDER BY ht.heat_score DESC
LIMIT %s
"""
cursor.execute(sql, (days, limit))
return cursor.fetchall()
def get_news_by_topic(self, topic_id: int, limit: int = 20) -> List[Dict]:
"""获取话题相关新闻"""
with self.connection.cursor() as cursor:
sql = """
SELECT na.*, ntr.relevance_score
FROM news_articles na
JOIN news_topic_relation ntr ON na.id = ntr.news_id
WHERE ntr.topic_id = %s
ORDER BY na.publish_time DESC
LIMIT %s
"""
cursor.execute(sql, (topic_id, limit))
return cursor.fetchall()
def close(self):
"""关闭数据库连接"""
self.connection.close()
# config.py
DB_CONFIG = {
'host': 'localhost',
'user': 'root',
'password': 'your_password',
'database': 'finance_news',
'port': 3306
}
```
### 2.2 文本挖掘与主题分析模块
```python
# text_mining.py
import jieba
import jieba.analyse
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.decomposition import LatentDirichletAllocation
import numpy as np
import re
from collections import Counter
import json
class TextMiningProcessor:
"""文本挖掘处理器"""
def __init__(self):
# 加载自定义词典
jieba.load_userdict('data/finance_dict.txt')
# 停用词表
with open('data/stopwords.txt', 'r', encoding='utf-8') as f:
self.stopwords = set([line.strip() for line in f])
def preprocess_text(self, text: str) -> str:
"""文本预处理"""
# 去除HTML标签
text = re.sub(r'<[^>]+>', '', text)
# 去除特殊字符
text = re.sub(r'[^\w\u4e00-\u9fa5]', ' ', text)
# 去除多余空格
text = re.sub(r'\s+', ' ', text)
return text.strip()
def extract_keywords(self, text: str, top_k: int = 10) -> List[str]:
"""提取关键词"""
# 使用TF-IDF提取关键词
keywords = jieba.analyse.extract_tags(
text,
topK=top_k,
withWeight=False,
allowPOS=('n', 'nr', 'ns', 'nt', 'nz', 'vn', 'v')
)
return keywords
def analyze_sentiment(self, text: str) -> float:
"""情感分析(简化版)"""
# 正面词表
positive_words = ['上涨', '增长', '利好', '盈利', '突破', '创新高', '复苏']
# 负面词表
negative_words = ['下跌', '下滑', '利空', '亏损', '跌破', '危机', '衰退']
words = jieba.lcut(text)
pos_count = sum(1 for word in words if word in positive_words)
neg_count = sum(1 for word in words if word in negative_words)
if pos_count + neg_count == 0:
return 0
return (pos_count - neg_count) / (pos_count + neg_count)
def topic_modeling(self, texts: List[str], n_topics: int = 5) -> Dict:
"""LDA主题建模"""
# 文本向量化
vectorizer = TfidfVectorizer(
max_df=0.95,
min_df=2,
max_features=1000,
stop_words=list(self.stopwords)
)
tfidf_matrix = vectorizer.fit_transform(texts)
# LDA模型训练
lda = LatentDirichletAllocation(
n_components=n_topics,
max_iter=10,
learning_method='online',
random_state=42
)
lda.fit(tfidf_matrix)
# 提取主题关键词
feature_names = vectorizer.get_feature_names_out()
topics = []
for topic_idx, topic in enumerate(lda.components_):
top_words_idx = topic.argsort()[:-10:-1]
top_words = [feature_names[i] for i in top_words_idx]
topics.append({
'topic_id': topic_idx,
'keywords': top_words,
'weight': topic.sum()
})
return {
'topics': topics,
'document_topic_matrix': lda.transform(tfidf_matrix).tolist()
}
```
### 2.3 热点话题检测模块
```python
# topic_detection.py
from datetime import datetime, timedelta
import numpy as np
from sklearn.cluster import DBSCAN
from sklearn.feature_extraction.text import CountVectorizer
from collections import defaultdict
import json
class HotTopicDetector:
"""热点话题检测器"""
def __init__(self, db_manager):
self.db = db_manager
def detect_hot_topics(self, time_window: int = 7) -> List[Dict]:
"""检测热点话题"""
# 获取指定时间窗口内的新闻
end_date = datetime.now()
start_date = end_date - timedelta(days=time_window)
# 从数据库获取新闻数据
with self.db.connection.cursor() as cursor:
sql = """
SELECT id, title, content, publish_time, keywords
FROM news_articles
WHERE publish_time BETWEEN %s AND %s
AND LENGTH(content) > 100
"""
cursor.execute(sql, (start_date, end_date))
news_list = cursor.fetchall()
if len(news_list) < 10:
return []
# 提取文本特征
texts = [news['title'] + ' ' + news['content'][:500] for news in news_list]
# 文本向量化
vectorizer = CountVectorizer(
max_features=1000,
stop_words=list(self.stopwords)
)
X = vectorizer.fit_transform(texts)
# 聚类分析
dbscan = DBSCAN(eps=0.5, min_samples=3, metric='cosine')
clusters = dbscan.fit_predict(X.toarray())
# 统计聚类结果
cluster_news = defaultdict(list)
for idx, cluster_id in enumerate(clusters):
if cluster_id != -1: # 排除噪声点
cluster_news[cluster_id].append(news_list[idx])
# 生成热点话题
hot_topics = []
for cluster_id, news_items in cluster_news.items():
if len(news_items) >= 3: # 至少3篇相关新闻
topic = self._create_topic_from_cluster(cluster_id, news_items)
hot_topics.append(topic)
# 按热度排序
hot_topics.sort(key=lambda x: x['heat_score'], reverse=True)
return hot_topics[:10] # 返回前10个热点话题
def _create_topic_from_cluster(self, cluster_id: int, news_items: List) -> Dict:
"""从聚类创建话题"""
# 提取共同关键词
all_keywords = []
for news in news_items:
if news['keywords']:
keywords = json.loads(news['keywords'])
all_keywords.extend(keywords)
# 统计关键词频率
keyword_counter = Counter(all_keywords)
top_keywords = [kw for kw, _ in keyword_counter.most_common(5)]
# 计算热度分数
news_count = len(news_items)
time_recency = self._calculate_time_recency(news_items)
media_coverage = len(set([n['source'] for n in news_items]))
heat_score = (
news_count * 0.4 +
time_recency * 0.3 +
media_coverage * 0.3
)
# 分析情感趋势
sentiment_scores = [n.get('sentiment_score', 0) for n in news_items]
avg_sentiment = np.mean(sentiment_scores) if sentiment_scores else 0
return {
'topic_name': '、'.join(top_keywords[:3]),
'keywords': top_keywords,
'news_count': news_count,
'heat_score': round(heat_score, 2),
'avg_sentiment': round(avg_sentiment, 2),
'related_news_ids': [n['id'] for n in news_items],
'start_date': min([n['publish_time'] for n in news_items]),
'end_date': max([n['publish_time'] for n in news_items])
}
def _calculate_time_recency(self, news_items: List) -> float:
"""计算时间新鲜度"""
now = datetime.now()
time_diffs = [(now - n['publish_time']).total_seconds() / 86400
for n in news_items]
avg_diff = np.mean(time_diffs)
# 时间越近分数越高(7天内为满分)
return max(0, 1 - avg_diff / 7)
```
### 2.4 Flask Web应用后端
```python
# app.py
from flask import Flask, request, jsonify, render_template
from flask_cors import CORS
import json
from datetime import datetime
from database import DatabaseManager
from topic_detection import HotTopicDetector
from text_mining import TextMiningProcessor
app = Flask(__name__)
CORS(app)
# 初始化组件
db_manager = DatabaseManager()
topic_detector = HotTopicDetector(db_manager)
text_processor = TextMiningProcessor()
@app.route('/')
def index():
"""首页"""
return render_template('index.html')
@app.route('/api/hot_topics', methods=['GET'])
def get_hot_topics():
"""获取热点话题API"""
days = request.args.get('days', 7, type=int)
limit = request.args.get('limit', 10, type=int)
topics = db_manager.get_hot_topics(limit=limit, days=days)
return jsonify({
'code': 200,
'message': 'success',
'data': topics
})
@app.route('/api/topic_detail/<int:topic_id>', methods=['GET'])
def get_topic_detail(topic_id):
"""获取话题详情"""
# 获取话题基本信息
with db_manager.connection.cursor() as cursor:
cursor.execute("SELECT * FROM hot_topics WHERE topic_id = %s", (topic_id,))
topic = cursor.fetchone()
if not topic:
return jsonify({'code': 404, 'message': 'Topic not found'})
# 获取相关新闻
cursor.execute("""
SELECT na.*
FROM news_articles na
JOIN news_topic_relation ntr ON na.id = ntr.news_id
WHERE ntr.topic_id = %s
ORDER BY na.publish_time DESC
LIMIT 20
""", (topic_id,))
related_news = cursor.fetchall()
# 获取情感趋势数据
if topic['sentiment_trend']:
sentiment_trend = json.loads(topic['sentiment_trend'])
else:
sentiment_trend = self._calculate_sentiment_trend(topic_id)
return jsonify({
'code': 200,
'data': {
'topic': topic,
'related_news': related_news,
'sentiment_trend': sentiment_trend
}
})
@app.route('/api/news/search', methods=['GET'])
def search_news():
"""搜索新闻"""
keyword = request.args.get('keyword', '')
category = request.args.get('category', '')
start_date = request.args.get('start_date', '')
end_date = request.args.get('end_date', '')
page = request.args.get('page', 1, type=int)
page_size = request.args.get('page_size', 20, type=int)
# 构建查询条件
conditions = []
params = []
if keyword:
conditions.append("(title LIKE %s OR content LIKE %s)")
params.extend([f'%{keyword}%', f'%{keyword}%'])
if category:
conditions.append("category = %s")
params.append(category)
if start_date:
conditions.append("publish_time >= %s")
params.append(start_date)
if end_date:
conditions.append("publish_time <= %s")
params.append(end_date)
where_clause = " AND ".join(conditions) if conditions else "1=1"
offset = (page - 1) * page_size
with db_manager.connection.cursor() as