423 lines
14 KiB
Python
423 lines
14 KiB
Python
"""多源新闻采集器。"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import email.utils
|
||
import html
|
||
import logging
|
||
import re
|
||
import xml.etree.ElementTree as ET
|
||
from datetime import date, datetime, timedelta
|
||
from importlib import import_module
|
||
|
||
import httpx
|
||
import pandas as pd
|
||
from sqlalchemy import text
|
||
|
||
from app.config import settings
|
||
from app.data.tushare_client import tushare_client
|
||
from app.db.database import get_db
|
||
from app.news.models import NewsItem
|
||
|
||
logger = logging.getLogger(__name__)
|
||
_tushare_source_cursor = 0
|
||
_tushare_calls_by_day: dict[str, int] = {}
|
||
|
||
RSS_HEADERS = {
|
||
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36",
|
||
}
|
||
WEB_HEADERS = {
|
||
"Referer": "https://www.eastmoney.com/",
|
||
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36",
|
||
}
|
||
|
||
|
||
async def collect_news_sources(
|
||
lookback_hours: int | None = None,
|
||
limit_per_source: int | None = None,
|
||
) -> list[NewsItem]:
|
||
"""从配置的数据源采集新闻。失败源隔离,不影响其他源。"""
|
||
lookback_hours = lookback_hours or settings.news_fetch_lookback_hours
|
||
limit_per_source = limit_per_source or settings.news_fetch_limit_per_source
|
||
items: list[NewsItem] = []
|
||
|
||
for source in _select_tushare_sources_for_run():
|
||
try:
|
||
items.extend(await _collect_tushare_news(source, lookback_hours, limit_per_source))
|
||
except Exception as e:
|
||
logger.warning("Tushare 新闻源采集失败 source=%s error=%s", source, e)
|
||
|
||
web_sources = _parse_named_url_sources(settings.news_web_sources)
|
||
if web_sources:
|
||
async with httpx.AsyncClient(headers=WEB_HEADERS, timeout=10.0, follow_redirects=True) as client:
|
||
for name, url in web_sources:
|
||
try:
|
||
items.extend(await _collect_web_page(client, name, url, lookback_hours, limit_per_source))
|
||
except Exception as e:
|
||
logger.warning("网页新闻源采集失败 source=%s url=%s error=%s", name, url, e)
|
||
|
||
rss_sources = _parse_rss_sources(settings.news_rss_sources)
|
||
if rss_sources:
|
||
async with httpx.AsyncClient(headers=RSS_HEADERS, timeout=10.0, follow_redirects=True) as client:
|
||
for name, url in rss_sources:
|
||
try:
|
||
items.extend(await _collect_rss(client, name, url, lookback_hours, limit_per_source))
|
||
except Exception as e:
|
||
logger.warning("RSS 新闻源采集失败 source=%s url=%s error=%s", name, url, e)
|
||
|
||
if settings.news_akshare_enabled:
|
||
try:
|
||
items.extend(await _collect_akshare_stock_news(lookback_hours, limit_per_source))
|
||
except Exception as e:
|
||
logger.warning("AKShare 个股新闻采集失败: %s", e)
|
||
|
||
return _dedup_in_memory(items)
|
||
|
||
|
||
async def _collect_tushare_news(source: str, lookback_hours: int, limit: int) -> list[NewsItem]:
|
||
df = tushare_client.get_news(
|
||
source=source,
|
||
start_time=datetime.now() - timedelta(hours=lookback_hours),
|
||
end_time=datetime.now(),
|
||
limit=limit,
|
||
)
|
||
if df.empty:
|
||
return []
|
||
|
||
items: list[NewsItem] = []
|
||
for _, row in df.iterrows():
|
||
title = _clean_text(row.get("title", ""))
|
||
if len(title) < settings.news_min_title_length:
|
||
continue
|
||
content = _clean_text(row.get("content", ""))
|
||
items.append(NewsItem(
|
||
title=title,
|
||
content=content,
|
||
summary=_clean_text(row.get("summary", "")),
|
||
source=f"tushare:{source}",
|
||
url=str(row.get("url", "") or ""),
|
||
published_at=_parse_datetime(row.get("datetime") or row.get("time") or row.get("publish_time")),
|
||
))
|
||
return items[:limit]
|
||
|
||
|
||
async def _collect_web_page(
|
||
client: httpx.AsyncClient,
|
||
source: str,
|
||
url: str,
|
||
lookback_hours: int,
|
||
limit: int,
|
||
) -> list[NewsItem]:
|
||
resp = await client.get(url)
|
||
resp.raise_for_status()
|
||
text = resp.text
|
||
|
||
if source == "eastmoney_roll":
|
||
return _parse_eastmoney_roll(text, limit)
|
||
|
||
return _parse_generic_links(text, source=source, limit=limit)
|
||
|
||
|
||
async def _collect_rss(
|
||
client: httpx.AsyncClient,
|
||
source: str,
|
||
url: str,
|
||
lookback_hours: int,
|
||
limit: int,
|
||
) -> list[NewsItem]:
|
||
resp = await client.get(url)
|
||
resp.raise_for_status()
|
||
root = ET.fromstring(resp.content)
|
||
cutoff = datetime.now() - timedelta(hours=lookback_hours)
|
||
items: list[NewsItem] = []
|
||
|
||
for item in root.findall(".//item")[: limit * 2]:
|
||
title = _clean_text(_xml_text(item, "title"))
|
||
if len(title) < settings.news_min_title_length:
|
||
continue
|
||
published_at = _parse_datetime(_xml_text(item, "pubDate"))
|
||
if published_at and published_at < cutoff:
|
||
continue
|
||
summary = _clean_text(_xml_text(item, "description"))
|
||
items.append(NewsItem(
|
||
title=title,
|
||
content=summary,
|
||
summary=summary[:240],
|
||
source=f"rss:{source}",
|
||
url=_clean_text(_xml_text(item, "link")),
|
||
published_at=published_at,
|
||
))
|
||
if len(items) >= limit:
|
||
break
|
||
|
||
return items
|
||
|
||
|
||
async def _collect_akshare_stock_news(lookback_hours: int, limit_per_source: int) -> list[NewsItem]:
|
||
"""采集推荐池/关注池中标的的东方财富个股新闻。
|
||
|
||
AKShare 是可选增强依赖:安装时启用,未安装时安静跳过,避免影响主服务。
|
||
"""
|
||
ak = _load_akshare()
|
||
if ak is None:
|
||
logger.info("未安装 AKShare,跳过个股新闻补充源")
|
||
return []
|
||
|
||
symbols = await _load_focus_stock_symbols(limit=settings.news_akshare_stock_limit)
|
||
if not symbols:
|
||
return []
|
||
|
||
items: list[NewsItem] = []
|
||
per_stock = max(1, min(settings.news_akshare_news_per_stock, limit_per_source))
|
||
cutoff = datetime.now() - timedelta(hours=lookback_hours)
|
||
for ts_code, name in symbols:
|
||
try:
|
||
df = await _call_akshare_stock_news(ak, _strip_market(ts_code))
|
||
except Exception as e:
|
||
logger.debug("AKShare 个股新闻失败 ts_code=%s error=%s", ts_code, e)
|
||
continue
|
||
items.extend(_parse_akshare_stock_news(df, ts_code=ts_code, name=name, cutoff=cutoff, limit=per_stock))
|
||
return items
|
||
|
||
|
||
def _load_akshare():
|
||
try:
|
||
return import_module("akshare")
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
async def _call_akshare_stock_news(ak, symbol: str) -> pd.DataFrame:
|
||
import asyncio
|
||
|
||
return await asyncio.to_thread(ak.stock_news_em, symbol=symbol)
|
||
|
||
|
||
async def _load_focus_stock_symbols(limit: int) -> list[tuple[str, str]]:
|
||
async with get_db() as db:
|
||
result = await db.execute(
|
||
text(
|
||
"SELECT ts_code, name FROM recommendations "
|
||
"WHERE action_plan IN ('可操作', '重点关注', '观察') "
|
||
"ORDER BY created_at DESC, score DESC LIMIT :limit"
|
||
),
|
||
{"limit": limit},
|
||
)
|
||
rows = result.fetchall()
|
||
|
||
seen: set[str] = set()
|
||
symbols: list[tuple[str, str]] = []
|
||
for row in rows:
|
||
ts_code = str(row._mapping["ts_code"] or "")
|
||
if not ts_code or ts_code in seen:
|
||
continue
|
||
seen.add(ts_code)
|
||
symbols.append((ts_code, str(row._mapping["name"] or ts_code)))
|
||
return symbols
|
||
|
||
|
||
def _parse_akshare_stock_news(
|
||
df: pd.DataFrame,
|
||
ts_code: str,
|
||
name: str,
|
||
cutoff: datetime,
|
||
limit: int,
|
||
) -> list[NewsItem]:
|
||
if df is None or df.empty:
|
||
return []
|
||
|
||
items: list[NewsItem] = []
|
||
for _, row in df.head(max(limit * 3, limit)).iterrows():
|
||
title = _clean_text(_pick_row_value(row, "新闻标题", "title", "标题"))
|
||
if not _is_useful_title(title):
|
||
continue
|
||
published_at = _parse_datetime(_pick_row_value(row, "发布时间", "datetime", "time", "日期"))
|
||
if published_at and published_at < cutoff:
|
||
continue
|
||
content = _clean_text(_pick_row_value(row, "新闻内容", "content", "内容", "摘要"))
|
||
source = _clean_text(_pick_row_value(row, "文章来源", "source", "来源")) or "东方财富"
|
||
url = str(_pick_row_value(row, "新闻链接", "url", "链接") or "")
|
||
summary = content[:240] if content else title
|
||
items.append(NewsItem(
|
||
title=f"{name}: {title}" if name and name not in title else title,
|
||
content=content or title,
|
||
summary=summary,
|
||
source=f"akshare:{source}",
|
||
url=url,
|
||
published_at=published_at,
|
||
))
|
||
if len(items) >= limit:
|
||
break
|
||
return items
|
||
|
||
|
||
def _pick_row_value(row, *keys: str):
|
||
for key in keys:
|
||
try:
|
||
value = row.get(key)
|
||
except Exception:
|
||
value = None
|
||
if value is not None and str(value).strip() and str(value).lower() != "nan":
|
||
return value
|
||
return ""
|
||
|
||
|
||
def _split_csv(value: str) -> list[str]:
|
||
return [item.strip() for item in (value or "").split(",") if item.strip()]
|
||
|
||
|
||
def _parse_rss_sources(value: str) -> list[tuple[str, str]]:
|
||
return _parse_named_url_sources(value)
|
||
|
||
|
||
def _parse_named_url_sources(value: str) -> list[tuple[str, str]]:
|
||
result: list[tuple[str, str]] = []
|
||
for chunk in _split_csv(value):
|
||
if "|" not in chunk:
|
||
continue
|
||
name, url = chunk.split("|", 1)
|
||
name = name.strip()
|
||
url = url.strip()
|
||
if name and url:
|
||
result.append((name, url))
|
||
return result
|
||
|
||
|
||
def _select_tushare_sources_for_run() -> list[str]:
|
||
"""Tushare news 默认关闭,仅在显式配置时少量使用。"""
|
||
global _tushare_source_cursor
|
||
|
||
if not settings.news_tushare_enabled:
|
||
return []
|
||
|
||
sources = _split_csv(settings.news_tushare_sources)
|
||
if not sources:
|
||
return []
|
||
|
||
remaining_quota = _remaining_tushare_quota()
|
||
if remaining_quota <= 0:
|
||
logger.info("Tushare 新闻日额度已用尽,跳过本轮采集")
|
||
return []
|
||
|
||
limit = max(1, min(int(settings.news_tushare_sources_per_run or 1), len(sources), remaining_quota))
|
||
selected: list[str] = []
|
||
for offset in range(limit):
|
||
selected.append(sources[(_tushare_source_cursor + offset) % len(sources)])
|
||
_tushare_source_cursor = (_tushare_source_cursor + limit) % len(sources)
|
||
_consume_tushare_quota(len(selected))
|
||
return selected
|
||
|
||
|
||
def _remaining_tushare_quota() -> int:
|
||
quota = int(settings.news_tushare_daily_quota or 0)
|
||
if quota <= 0:
|
||
return 0
|
||
today = date.today().isoformat()
|
||
return max(0, quota - int(_tushare_calls_by_day.get(today, 0)))
|
||
|
||
|
||
def _consume_tushare_quota(count: int) -> None:
|
||
if count <= 0:
|
||
return
|
||
today = date.today().isoformat()
|
||
_tushare_calls_by_day[today] = int(_tushare_calls_by_day.get(today, 0)) + count
|
||
|
||
|
||
def _parse_eastmoney_roll(text: str, limit: int) -> list[NewsItem]:
|
||
result: list[NewsItem] = []
|
||
pattern = re.compile(
|
||
r'<a[^>]+href=["\'](?P<url>https?://(?:finance|stock|kuaixun)\.eastmoney\.com/[^"\']+)["\'][^>]*>(?P<title>.*?)</a>',
|
||
re.I | re.S,
|
||
)
|
||
for match in pattern.finditer(text or ""):
|
||
title = _clean_text(match.group("title"))
|
||
url = _clean_text(match.group("url"))
|
||
if not _is_useful_title(title):
|
||
continue
|
||
result.append(NewsItem(
|
||
title=title,
|
||
content=title,
|
||
summary=title,
|
||
source="web:eastmoney_roll",
|
||
url=url,
|
||
published_at=None,
|
||
))
|
||
if len(result) >= limit:
|
||
break
|
||
return result
|
||
|
||
|
||
def _parse_generic_links(text: str, source: str, limit: int) -> list[NewsItem]:
|
||
result: list[NewsItem] = []
|
||
pattern = re.compile(r'<a[^>]+href=["\'](?P<url>https?://[^"\']+)["\'][^>]*>(?P<title>.*?)</a>', re.I | re.S)
|
||
for match in pattern.finditer(text or ""):
|
||
title = _clean_text(match.group("title"))
|
||
if not _is_useful_title(title):
|
||
continue
|
||
result.append(NewsItem(
|
||
title=title,
|
||
content=title,
|
||
summary=title,
|
||
source=f"web:{source}",
|
||
url=_clean_text(match.group("url")),
|
||
published_at=None,
|
||
))
|
||
if len(result) >= limit:
|
||
break
|
||
return result
|
||
|
||
|
||
def _is_useful_title(title: str) -> bool:
|
||
if len(title) < settings.news_min_title_length:
|
||
return False
|
||
return not any(token in title for token in ("广告", "下载APP", "扫一扫", "关于我们", "联系我们"))
|
||
|
||
|
||
def _strip_market(ts_code: str) -> str:
|
||
text = str(ts_code or "").strip()
|
||
return text.split(".", 1)[0] if "." in text else text
|
||
|
||
|
||
def _xml_text(item: ET.Element, tag: str) -> str:
|
||
node = item.find(tag)
|
||
return node.text if node is not None and node.text else ""
|
||
|
||
|
||
def _clean_text(value) -> str:
|
||
text = html.unescape(str(value or ""))
|
||
text = re.sub(r"<[^>]+>", " ", text)
|
||
text = re.sub(r"\s+", " ", text)
|
||
return text.strip()
|
||
|
||
|
||
def _parse_datetime(value) -> datetime | None:
|
||
if value is None:
|
||
return None
|
||
if isinstance(value, datetime):
|
||
return value.replace(tzinfo=None)
|
||
text = str(value).strip()
|
||
if not text:
|
||
return None
|
||
for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d %H:%M", "%Y%m%d%H%M%S", "%Y%m%d"):
|
||
try:
|
||
return datetime.strptime(text[: len(fmt)], fmt)
|
||
except Exception:
|
||
pass
|
||
try:
|
||
parsed = email.utils.parsedate_to_datetime(text)
|
||
return parsed.replace(tzinfo=None)
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
def _dedup_in_memory(items: list[NewsItem]) -> list[NewsItem]:
|
||
seen: set[str] = set()
|
||
result: list[NewsItem] = []
|
||
for item in items:
|
||
key = re.sub(r"\W+", "", item.title.lower())[:80]
|
||
if not key or key in seen:
|
||
continue
|
||
seen.add(key)
|
||
result.append(item)
|
||
return result
|