astock-agent/backend/app/news/collector.py
2026-05-15 11:52:50 +08:00

423 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""多源新闻采集器。"""
from __future__ import annotations
import email.utils
import html
import logging
import re
import xml.etree.ElementTree as ET
from datetime import date, datetime, timedelta
from importlib import import_module
import httpx
import pandas as pd
from sqlalchemy import text
from app.config import settings
from app.data.tushare_client import tushare_client
from app.db.database import get_db
from app.news.models import NewsItem
logger = logging.getLogger(__name__)
_tushare_source_cursor = 0
_tushare_calls_by_day: dict[str, int] = {}
RSS_HEADERS = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36",
}
WEB_HEADERS = {
"Referer": "https://www.eastmoney.com/",
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36",
}
async def collect_news_sources(
lookback_hours: int | None = None,
limit_per_source: int | None = None,
) -> list[NewsItem]:
"""从配置的数据源采集新闻。失败源隔离,不影响其他源。"""
lookback_hours = lookback_hours or settings.news_fetch_lookback_hours
limit_per_source = limit_per_source or settings.news_fetch_limit_per_source
items: list[NewsItem] = []
for source in _select_tushare_sources_for_run():
try:
items.extend(await _collect_tushare_news(source, lookback_hours, limit_per_source))
except Exception as e:
logger.warning("Tushare 新闻源采集失败 source=%s error=%s", source, e)
web_sources = _parse_named_url_sources(settings.news_web_sources)
if web_sources:
async with httpx.AsyncClient(headers=WEB_HEADERS, timeout=10.0, follow_redirects=True) as client:
for name, url in web_sources:
try:
items.extend(await _collect_web_page(client, name, url, lookback_hours, limit_per_source))
except Exception as e:
logger.warning("网页新闻源采集失败 source=%s url=%s error=%s", name, url, e)
rss_sources = _parse_rss_sources(settings.news_rss_sources)
if rss_sources:
async with httpx.AsyncClient(headers=RSS_HEADERS, timeout=10.0, follow_redirects=True) as client:
for name, url in rss_sources:
try:
items.extend(await _collect_rss(client, name, url, lookback_hours, limit_per_source))
except Exception as e:
logger.warning("RSS 新闻源采集失败 source=%s url=%s error=%s", name, url, e)
if settings.news_akshare_enabled:
try:
items.extend(await _collect_akshare_stock_news(lookback_hours, limit_per_source))
except Exception as e:
logger.warning("AKShare 个股新闻采集失败: %s", e)
return _dedup_in_memory(items)
async def _collect_tushare_news(source: str, lookback_hours: int, limit: int) -> list[NewsItem]:
df = tushare_client.get_news(
source=source,
start_time=datetime.now() - timedelta(hours=lookback_hours),
end_time=datetime.now(),
limit=limit,
)
if df.empty:
return []
items: list[NewsItem] = []
for _, row in df.iterrows():
title = _clean_text(row.get("title", ""))
if len(title) < settings.news_min_title_length:
continue
content = _clean_text(row.get("content", ""))
items.append(NewsItem(
title=title,
content=content,
summary=_clean_text(row.get("summary", "")),
source=f"tushare:{source}",
url=str(row.get("url", "") or ""),
published_at=_parse_datetime(row.get("datetime") or row.get("time") or row.get("publish_time")),
))
return items[:limit]
async def _collect_web_page(
client: httpx.AsyncClient,
source: str,
url: str,
lookback_hours: int,
limit: int,
) -> list[NewsItem]:
resp = await client.get(url)
resp.raise_for_status()
text = resp.text
if source == "eastmoney_roll":
return _parse_eastmoney_roll(text, limit)
return _parse_generic_links(text, source=source, limit=limit)
async def _collect_rss(
client: httpx.AsyncClient,
source: str,
url: str,
lookback_hours: int,
limit: int,
) -> list[NewsItem]:
resp = await client.get(url)
resp.raise_for_status()
root = ET.fromstring(resp.content)
cutoff = datetime.now() - timedelta(hours=lookback_hours)
items: list[NewsItem] = []
for item in root.findall(".//item")[: limit * 2]:
title = _clean_text(_xml_text(item, "title"))
if len(title) < settings.news_min_title_length:
continue
published_at = _parse_datetime(_xml_text(item, "pubDate"))
if published_at and published_at < cutoff:
continue
summary = _clean_text(_xml_text(item, "description"))
items.append(NewsItem(
title=title,
content=summary,
summary=summary[:240],
source=f"rss:{source}",
url=_clean_text(_xml_text(item, "link")),
published_at=published_at,
))
if len(items) >= limit:
break
return items
async def _collect_akshare_stock_news(lookback_hours: int, limit_per_source: int) -> list[NewsItem]:
"""采集推荐池/关注池中标的的东方财富个股新闻。
AKShare 是可选增强依赖:安装时启用,未安装时安静跳过,避免影响主服务。
"""
ak = _load_akshare()
if ak is None:
logger.info("未安装 AKShare跳过个股新闻补充源")
return []
symbols = await _load_focus_stock_symbols(limit=settings.news_akshare_stock_limit)
if not symbols:
return []
items: list[NewsItem] = []
per_stock = max(1, min(settings.news_akshare_news_per_stock, limit_per_source))
cutoff = datetime.now() - timedelta(hours=lookback_hours)
for ts_code, name in symbols:
try:
df = await _call_akshare_stock_news(ak, _strip_market(ts_code))
except Exception as e:
logger.debug("AKShare 个股新闻失败 ts_code=%s error=%s", ts_code, e)
continue
items.extend(_parse_akshare_stock_news(df, ts_code=ts_code, name=name, cutoff=cutoff, limit=per_stock))
return items
def _load_akshare():
try:
return import_module("akshare")
except Exception:
return None
async def _call_akshare_stock_news(ak, symbol: str) -> pd.DataFrame:
import asyncio
return await asyncio.to_thread(ak.stock_news_em, symbol=symbol)
async def _load_focus_stock_symbols(limit: int) -> list[tuple[str, str]]:
async with get_db() as db:
result = await db.execute(
text(
"SELECT ts_code, name FROM recommendations "
"WHERE action_plan IN ('可操作', '重点关注', '观察') "
"ORDER BY created_at DESC, score DESC LIMIT :limit"
),
{"limit": limit},
)
rows = result.fetchall()
seen: set[str] = set()
symbols: list[tuple[str, str]] = []
for row in rows:
ts_code = str(row._mapping["ts_code"] or "")
if not ts_code or ts_code in seen:
continue
seen.add(ts_code)
symbols.append((ts_code, str(row._mapping["name"] or ts_code)))
return symbols
def _parse_akshare_stock_news(
df: pd.DataFrame,
ts_code: str,
name: str,
cutoff: datetime,
limit: int,
) -> list[NewsItem]:
if df is None or df.empty:
return []
items: list[NewsItem] = []
for _, row in df.head(max(limit * 3, limit)).iterrows():
title = _clean_text(_pick_row_value(row, "新闻标题", "title", "标题"))
if not _is_useful_title(title):
continue
published_at = _parse_datetime(_pick_row_value(row, "发布时间", "datetime", "time", "日期"))
if published_at and published_at < cutoff:
continue
content = _clean_text(_pick_row_value(row, "新闻内容", "content", "内容", "摘要"))
source = _clean_text(_pick_row_value(row, "文章来源", "source", "来源")) or "东方财富"
url = str(_pick_row_value(row, "新闻链接", "url", "链接") or "")
summary = content[:240] if content else title
items.append(NewsItem(
title=f"{name}: {title}" if name and name not in title else title,
content=content or title,
summary=summary,
source=f"akshare:{source}",
url=url,
published_at=published_at,
))
if len(items) >= limit:
break
return items
def _pick_row_value(row, *keys: str):
for key in keys:
try:
value = row.get(key)
except Exception:
value = None
if value is not None and str(value).strip() and str(value).lower() != "nan":
return value
return ""
def _split_csv(value: str) -> list[str]:
return [item.strip() for item in (value or "").split(",") if item.strip()]
def _parse_rss_sources(value: str) -> list[tuple[str, str]]:
return _parse_named_url_sources(value)
def _parse_named_url_sources(value: str) -> list[tuple[str, str]]:
result: list[tuple[str, str]] = []
for chunk in _split_csv(value):
if "|" not in chunk:
continue
name, url = chunk.split("|", 1)
name = name.strip()
url = url.strip()
if name and url:
result.append((name, url))
return result
def _select_tushare_sources_for_run() -> list[str]:
"""Tushare news 默认关闭,仅在显式配置时少量使用。"""
global _tushare_source_cursor
if not settings.news_tushare_enabled:
return []
sources = _split_csv(settings.news_tushare_sources)
if not sources:
return []
remaining_quota = _remaining_tushare_quota()
if remaining_quota <= 0:
logger.info("Tushare 新闻日额度已用尽,跳过本轮采集")
return []
limit = max(1, min(int(settings.news_tushare_sources_per_run or 1), len(sources), remaining_quota))
selected: list[str] = []
for offset in range(limit):
selected.append(sources[(_tushare_source_cursor + offset) % len(sources)])
_tushare_source_cursor = (_tushare_source_cursor + limit) % len(sources)
_consume_tushare_quota(len(selected))
return selected
def _remaining_tushare_quota() -> int:
quota = int(settings.news_tushare_daily_quota or 0)
if quota <= 0:
return 0
today = date.today().isoformat()
return max(0, quota - int(_tushare_calls_by_day.get(today, 0)))
def _consume_tushare_quota(count: int) -> None:
if count <= 0:
return
today = date.today().isoformat()
_tushare_calls_by_day[today] = int(_tushare_calls_by_day.get(today, 0)) + count
def _parse_eastmoney_roll(text: str, limit: int) -> list[NewsItem]:
result: list[NewsItem] = []
pattern = re.compile(
r'<a[^>]+href=["\'](?P<url>https?://(?:finance|stock|kuaixun)\.eastmoney\.com/[^"\']+)["\'][^>]*>(?P<title>.*?)</a>',
re.I | re.S,
)
for match in pattern.finditer(text or ""):
title = _clean_text(match.group("title"))
url = _clean_text(match.group("url"))
if not _is_useful_title(title):
continue
result.append(NewsItem(
title=title,
content=title,
summary=title,
source="web:eastmoney_roll",
url=url,
published_at=None,
))
if len(result) >= limit:
break
return result
def _parse_generic_links(text: str, source: str, limit: int) -> list[NewsItem]:
result: list[NewsItem] = []
pattern = re.compile(r'<a[^>]+href=["\'](?P<url>https?://[^"\']+)["\'][^>]*>(?P<title>.*?)</a>', re.I | re.S)
for match in pattern.finditer(text or ""):
title = _clean_text(match.group("title"))
if not _is_useful_title(title):
continue
result.append(NewsItem(
title=title,
content=title,
summary=title,
source=f"web:{source}",
url=_clean_text(match.group("url")),
published_at=None,
))
if len(result) >= limit:
break
return result
def _is_useful_title(title: str) -> bool:
if len(title) < settings.news_min_title_length:
return False
return not any(token in title for token in ("广告", "下载APP", "扫一扫", "关于我们", "联系我们"))
def _strip_market(ts_code: str) -> str:
text = str(ts_code or "").strip()
return text.split(".", 1)[0] if "." in text else text
def _xml_text(item: ET.Element, tag: str) -> str:
node = item.find(tag)
return node.text if node is not None and node.text else ""
def _clean_text(value) -> str:
text = html.unescape(str(value or ""))
text = re.sub(r"<[^>]+>", " ", text)
text = re.sub(r"\s+", " ", text)
return text.strip()
def _parse_datetime(value) -> datetime | None:
if value is None:
return None
if isinstance(value, datetime):
return value.replace(tzinfo=None)
text = str(value).strip()
if not text:
return None
for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d %H:%M", "%Y%m%d%H%M%S", "%Y%m%d"):
try:
return datetime.strptime(text[: len(fmt)], fmt)
except Exception:
pass
try:
parsed = email.utils.parsedate_to_datetime(text)
return parsed.replace(tzinfo=None)
except Exception:
return None
def _dedup_in_memory(items: list[NewsItem]) -> list[NewsItem]:
seen: set[str] = set()
result: list[NewsItem] = []
for item in items:
key = re.sub(r"\W+", "", item.title.lower())[:80]
if not key or key in seen:
continue
seen.add(key)
result.append(item)
return result