diff --git a/backend/app/__pycache__/config.cpython-313.pyc b/backend/app/__pycache__/config.cpython-313.pyc index 987cbf32..46d4202c 100644 Binary files a/backend/app/__pycache__/config.cpython-313.pyc and b/backend/app/__pycache__/config.cpython-313.pyc differ diff --git a/backend/app/config.py b/backend/app/config.py index 9a6985de..42e83988 100644 --- a/backend/app/config.py +++ b/backend/app/config.py @@ -53,9 +53,16 @@ class Settings(BaseSettings): # 新闻/政策催化采集 news_collection_enabled: bool = True - news_tushare_sources: str = "sina,eastmoney,10jqka,wallstreetcn" + news_tushare_enabled: bool = False + news_tushare_sources: str = "" news_tushare_sources_per_run: int = 1 + news_tushare_daily_quota: int = 0 + news_tushare_alert_errors: bool = False news_rss_sources: str = "" # name|url,name|url + news_web_sources: str = "eastmoney_roll|https://roll.eastmoney.com/" + news_akshare_enabled: bool = True + news_akshare_stock_limit: int = 20 + news_akshare_news_per_stock: int = 3 news_fetch_lookback_hours: int = 24 news_fetch_limit_per_source: int = 30 news_analyze_limit_per_run: int = 50 diff --git a/backend/app/data/__pycache__/tushare_client.cpython-313.pyc b/backend/app/data/__pycache__/tushare_client.cpython-313.pyc index d91ba0d6..04fa933d 100644 Binary files a/backend/app/data/__pycache__/tushare_client.cpython-313.pyc and b/backend/app/data/__pycache__/tushare_client.cpython-313.pyc differ diff --git a/backend/app/data/tushare_client.py b/backend/app/data/tushare_client.py index f2218e56..2f16a5c3 100644 --- a/backend/app/data/tushare_client.py +++ b/backend/app/data/tushare_client.py @@ -14,6 +14,7 @@ from app.data.cache import cache from app.db.error_logger import log_error_background logger = logging.getLogger(__name__) +_NEWS_QUOTA_ERROR_KEYWORDS = ("频率超限", "每分钟", "每小时", "每天", "权限") class TushareClient: @@ -300,8 +301,14 @@ class TushareClient: return result except Exception as e: logger.warning("Tushare 新闻请求失败 source=%s: %s", source, e) - log_error_background("tushare_news", f"Tushare 新闻请求失败 source={source}: {e}") + if settings.news_tushare_alert_errors and not _is_news_quota_error(e): + log_error_background("tushare_news", f"Tushare 新闻请求失败 source={source}: {e}") return pd.DataFrame() tushare_client = TushareClient() + + +def _is_news_quota_error(exc: Exception) -> bool: + message = str(exc) + return any(keyword in message for keyword in _NEWS_QUOTA_ERROR_KEYWORDS) diff --git a/backend/app/db/__pycache__/database.cpython-313.pyc b/backend/app/db/__pycache__/database.cpython-313.pyc index 0cd65a82..5a4f4d2a 100644 Binary files a/backend/app/db/__pycache__/database.cpython-313.pyc and b/backend/app/db/__pycache__/database.cpython-313.pyc differ diff --git a/backend/app/db/__pycache__/tables.cpython-313.pyc b/backend/app/db/__pycache__/tables.cpython-313.pyc index a7eda3d2..126a9b0f 100644 Binary files a/backend/app/db/__pycache__/tables.cpython-313.pyc and b/backend/app/db/__pycache__/tables.cpython-313.pyc differ diff --git a/backend/app/news/collector.py b/backend/app/news/collector.py index 1c2f80a9..611558a5 100644 --- a/backend/app/news/collector.py +++ b/backend/app/news/collector.py @@ -7,20 +7,29 @@ import html import logging import re import xml.etree.ElementTree as ET -from datetime import datetime, timedelta +from datetime import date, datetime, timedelta +from importlib import import_module import httpx +import pandas as pd +from sqlalchemy import text from app.config import settings from app.data.tushare_client import tushare_client +from app.db.database import get_db from app.news.models import NewsItem logger = logging.getLogger(__name__) _tushare_source_cursor = 0 +_tushare_calls_by_day: dict[str, int] = {} RSS_HEADERS = { "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36", } +WEB_HEADERS = { + "Referer": "https://www.eastmoney.com/", + "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36", +} async def collect_news_sources( @@ -38,6 +47,15 @@ async def collect_news_sources( except Exception as e: logger.warning("Tushare 新闻源采集失败 source=%s error=%s", source, e) + web_sources = _parse_named_url_sources(settings.news_web_sources) + if web_sources: + async with httpx.AsyncClient(headers=WEB_HEADERS, timeout=10.0, follow_redirects=True) as client: + for name, url in web_sources: + try: + items.extend(await _collect_web_page(client, name, url, lookback_hours, limit_per_source)) + except Exception as e: + logger.warning("网页新闻源采集失败 source=%s url=%s error=%s", name, url, e) + rss_sources = _parse_rss_sources(settings.news_rss_sources) if rss_sources: async with httpx.AsyncClient(headers=RSS_HEADERS, timeout=10.0, follow_redirects=True) as client: @@ -47,6 +65,12 @@ async def collect_news_sources( except Exception as e: logger.warning("RSS 新闻源采集失败 source=%s url=%s error=%s", name, url, e) + if settings.news_akshare_enabled: + try: + items.extend(await _collect_akshare_stock_news(lookback_hours, limit_per_source)) + except Exception as e: + logger.warning("AKShare 个股新闻采集失败: %s", e) + return _dedup_in_memory(items) @@ -77,6 +101,23 @@ async def _collect_tushare_news(source: str, lookback_hours: int, limit: int) -> return items[:limit] +async def _collect_web_page( + client: httpx.AsyncClient, + source: str, + url: str, + lookback_hours: int, + limit: int, +) -> list[NewsItem]: + resp = await client.get(url) + resp.raise_for_status() + text = resp.text + + if source == "eastmoney_roll": + return _parse_eastmoney_roll(text, limit) + + return _parse_generic_links(text, source=source, limit=limit) + + async def _collect_rss( client: httpx.AsyncClient, source: str, @@ -112,11 +153,124 @@ async def _collect_rss( return items +async def _collect_akshare_stock_news(lookback_hours: int, limit_per_source: int) -> list[NewsItem]: + """采集推荐池/关注池中标的的东方财富个股新闻。 + + AKShare 是可选增强依赖:安装时启用,未安装时安静跳过,避免影响主服务。 + """ + ak = _load_akshare() + if ak is None: + logger.info("未安装 AKShare,跳过个股新闻补充源") + return [] + + symbols = await _load_focus_stock_symbols(limit=settings.news_akshare_stock_limit) + if not symbols: + return [] + + items: list[NewsItem] = [] + per_stock = max(1, min(settings.news_akshare_news_per_stock, limit_per_source)) + cutoff = datetime.now() - timedelta(hours=lookback_hours) + for ts_code, name in symbols: + try: + df = await _call_akshare_stock_news(ak, _strip_market(ts_code)) + except Exception as e: + logger.debug("AKShare 个股新闻失败 ts_code=%s error=%s", ts_code, e) + continue + items.extend(_parse_akshare_stock_news(df, ts_code=ts_code, name=name, cutoff=cutoff, limit=per_stock)) + return items + + +def _load_akshare(): + try: + return import_module("akshare") + except Exception: + return None + + +async def _call_akshare_stock_news(ak, symbol: str) -> pd.DataFrame: + import asyncio + + return await asyncio.to_thread(ak.stock_news_em, symbol=symbol) + + +async def _load_focus_stock_symbols(limit: int) -> list[tuple[str, str]]: + async with get_db() as db: + result = await db.execute( + text( + "SELECT ts_code, name FROM recommendations " + "WHERE action_plan IN ('可操作', '重点关注', '观察') " + "ORDER BY created_at DESC, score DESC LIMIT :limit" + ), + {"limit": limit}, + ) + rows = result.fetchall() + + seen: set[str] = set() + symbols: list[tuple[str, str]] = [] + for row in rows: + ts_code = str(row._mapping["ts_code"] or "") + if not ts_code or ts_code in seen: + continue + seen.add(ts_code) + symbols.append((ts_code, str(row._mapping["name"] or ts_code))) + return symbols + + +def _parse_akshare_stock_news( + df: pd.DataFrame, + ts_code: str, + name: str, + cutoff: datetime, + limit: int, +) -> list[NewsItem]: + if df is None or df.empty: + return [] + + items: list[NewsItem] = [] + for _, row in df.head(max(limit * 3, limit)).iterrows(): + title = _clean_text(_pick_row_value(row, "新闻标题", "title", "标题")) + if not _is_useful_title(title): + continue + published_at = _parse_datetime(_pick_row_value(row, "发布时间", "datetime", "time", "日期")) + if published_at and published_at < cutoff: + continue + content = _clean_text(_pick_row_value(row, "新闻内容", "content", "内容", "摘要")) + source = _clean_text(_pick_row_value(row, "文章来源", "source", "来源")) or "东方财富" + url = str(_pick_row_value(row, "新闻链接", "url", "链接") or "") + summary = content[:240] if content else title + items.append(NewsItem( + title=f"{name}: {title}" if name and name not in title else title, + content=content or title, + summary=summary, + source=f"akshare:{source}", + url=url, + published_at=published_at, + )) + if len(items) >= limit: + break + return items + + +def _pick_row_value(row, *keys: str): + for key in keys: + try: + value = row.get(key) + except Exception: + value = None + if value is not None and str(value).strip() and str(value).lower() != "nan": + return value + return "" + + def _split_csv(value: str) -> list[str]: return [item.strip() for item in (value or "").split(",") if item.strip()] def _parse_rss_sources(value: str) -> list[tuple[str, str]]: + return _parse_named_url_sources(value) + + +def _parse_named_url_sources(value: str) -> list[tuple[str, str]]: result: list[tuple[str, str]] = [] for chunk in _split_csv(value): if "|" not in chunk: @@ -130,21 +284,100 @@ def _parse_rss_sources(value: str) -> list[tuple[str, str]]: def _select_tushare_sources_for_run() -> list[str]: - """Tushare news 免费/低权限账号通常限制 1 次/分钟,每轮只取少量源。""" + """Tushare news 默认关闭,仅在显式配置时少量使用。""" global _tushare_source_cursor + if not settings.news_tushare_enabled: + return [] + sources = _split_csv(settings.news_tushare_sources) if not sources: return [] - limit = max(1, min(int(settings.news_tushare_sources_per_run or 1), len(sources))) + remaining_quota = _remaining_tushare_quota() + if remaining_quota <= 0: + logger.info("Tushare 新闻日额度已用尽,跳过本轮采集") + return [] + + limit = max(1, min(int(settings.news_tushare_sources_per_run or 1), len(sources), remaining_quota)) selected: list[str] = [] for offset in range(limit): selected.append(sources[(_tushare_source_cursor + offset) % len(sources)]) _tushare_source_cursor = (_tushare_source_cursor + limit) % len(sources) + _consume_tushare_quota(len(selected)) return selected +def _remaining_tushare_quota() -> int: + quota = int(settings.news_tushare_daily_quota or 0) + if quota <= 0: + return 0 + today = date.today().isoformat() + return max(0, quota - int(_tushare_calls_by_day.get(today, 0))) + + +def _consume_tushare_quota(count: int) -> None: + if count <= 0: + return + today = date.today().isoformat() + _tushare_calls_by_day[today] = int(_tushare_calls_by_day.get(today, 0)) + count + + +def _parse_eastmoney_roll(text: str, limit: int) -> list[NewsItem]: + result: list[NewsItem] = [] + pattern = re.compile( + r']+href=["\'](?Phttps?://(?:finance|stock|kuaixun)\.eastmoney\.com/[^"\']+)["\'][^>]*>(?P.*?)</a>', + re.I | re.S, + ) + for match in pattern.finditer(text or ""): + title = _clean_text(match.group("title")) + url = _clean_text(match.group("url")) + if not _is_useful_title(title): + continue + result.append(NewsItem( + title=title, + content=title, + summary=title, + source="web:eastmoney_roll", + url=url, + published_at=None, + )) + if len(result) >= limit: + break + return result + + +def _parse_generic_links(text: str, source: str, limit: int) -> list[NewsItem]: + result: list[NewsItem] = [] + pattern = re.compile(r'<a[^>]+href=["\'](?P<url>https?://[^"\']+)["\'][^>]*>(?P<title>.*?)</a>', re.I | re.S) + for match in pattern.finditer(text or ""): + title = _clean_text(match.group("title")) + if not _is_useful_title(title): + continue + result.append(NewsItem( + title=title, + content=title, + summary=title, + source=f"web:{source}", + url=_clean_text(match.group("url")), + published_at=None, + )) + if len(result) >= limit: + break + return result + + +def _is_useful_title(title: str) -> bool: + if len(title) < settings.news_min_title_length: + return False + return not any(token in title for token in ("广告", "下载APP", "扫一扫", "关于我们", "联系我们")) + + +def _strip_market(ts_code: str) -> str: + text = str(ts_code or "").strip() + return text.split(".", 1)[0] if "." in text else text + + def _xml_text(item: ET.Element, tag: str) -> str: node = item.find(tag) return node.text if node is not None and node.text else "" diff --git a/backend/requirements.txt b/backend/requirements.txt index e9e658bd..2f533f3c 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -6,6 +6,7 @@ sqlalchemy==2.0.36 aiosqlite==0.20.0 greenlet==3.3.2 tushare==1.4.20 +akshare==1.18.40 pandas==2.2.3 numpy==2.2.1 apscheduler==3.10.4