astock-agent/backend/app/research/report_agent.py
2026-06-10 08:36:25 +08:00

355 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Build and persist daily AI research reports."""
from __future__ import annotations
import json
from datetime import datetime
from typing import Any
from sqlalchemy import text
from app.db.database import get_db
from app.db import tables
from app.research.catalyst_agent import build_catalyst_summary
from app.research.feedback_agent import build_ranking_feedback
from app.research.market_agent import build_market_view
from app.research.ranking_agent import build_opportunity_cards
from app.research.risk_agent import build_risk_alerts
from app.research.sector_agent import build_theme_views, build_theme_views_async
from app.research.stock_research_agent import build_stock_research_notes, build_stock_research_notes_sync
def _latest_scan_from_result(result: dict, scan_session: str) -> dict:
for item in result.get("scan_logs", []) or []:
if item.get("stage") == "final_filter":
return item
return {
"scan_session": scan_session,
"scan_mode": result.get("scan_mode", ""),
"status": "empty" if not result.get("recommendations") else "ok",
"summary": "",
"elimination_reasons": {},
"created_at": datetime.now().isoformat(),
}
def build_research_report(result: dict, scan_session: str) -> dict:
"""Build a report with deterministic stock notes.
This remains available for fast API fallbacks and tests. Normal scans use
build_research_report_async so the stock research notes can call the LLM.
"""
return _assemble_research_report(
result,
scan_session,
build_stock_research_notes_sync,
)
async def build_research_report_async(result: dict, scan_session: str) -> dict:
return await _assemble_research_report_async(result, scan_session)
def _common_inputs(result: dict, scan_session: str) -> tuple[Any, dict, list, list, dict, dict, list, dict, list]:
market_temp = result.get("market_temp")
strategy_profile = result.get("strategy_profile") or {}
sectors = result.get("hot_sectors", []) or []
recommendations = result.get("recommendations", []) or []
latest_scan = result.get("latest_scan") or _latest_scan_from_result(result, scan_session)
theme_views = build_theme_views(sectors)
_calibrate_market_from_themes(market_temp, theme_views)
market_view = build_market_view(market_temp, strategy_profile)
catalyst = build_catalyst_summary(theme_views)
risks = build_risk_alerts(recommendations, market_view, latest_scan)
return market_temp, strategy_profile, sectors, recommendations, latest_scan, market_view, theme_views, catalyst, risks
def _assemble_research_report(result: dict, scan_session: str, notes_builder) -> dict:
market_temp, _, _, recommendations, latest_scan, market_view, theme_views, catalyst, risks = _common_inputs(result, scan_session)
stock_notes = notes_builder(recommendations, theme_views)
return _finalize_research_report(result, scan_session, market_temp, recommendations, latest_scan, market_view, theme_views, catalyst, risks, stock_notes, None, None)
async def _assemble_research_report_async(result: dict, scan_session: str) -> dict:
market_temp = result.get("market_temp")
strategy_profile = result.get("strategy_profile") or {}
sectors = result.get("hot_sectors", []) or []
recommendations = result.get("recommendations", []) or []
latest_scan = result.get("latest_scan") or _latest_scan_from_result(result, scan_session)
theme_views = await build_theme_views_async(sectors)
_calibrate_market_from_themes(market_temp, theme_views)
market_view = build_market_view(market_temp, strategy_profile)
catalyst = build_catalyst_summary(theme_views)
risks = build_risk_alerts(recommendations, market_view, latest_scan)
stock_notes = await build_stock_research_notes(recommendations, theme_views, risks)
feedback = await build_ranking_feedback(days=60)
data_quality = await _build_data_quality_report(market_view)
return _finalize_research_report(result, scan_session, market_temp, recommendations, latest_scan, market_view, theme_views, catalyst, risks, stock_notes, feedback, data_quality)
def _finalize_research_report(
result: dict,
scan_session: str,
market_temp: Any,
recommendations: list,
latest_scan: dict,
market_view: dict,
theme_views: list[dict],
catalyst: dict,
risks: list[dict],
stock_notes: list[dict],
feedback: dict | None,
data_quality: dict | None,
) -> dict:
opportunities = build_opportunity_cards(recommendations, stock_notes, risks, feedback)
trade_date = getattr(market_temp, "trade_date", "") or datetime.now().strftime("%Y%m%d")
if opportunities:
no_trade_reason = {"has_scan": True, "reason": "", "blocked_by": []}
else:
elimination = latest_scan.get("elimination_reasons") or {}
if elimination:
reason = "".join(f"{k} {v}" for k, v in list(elimination.items())[:3])
elif recommendations:
reason = "候选存在,但机会卡被风险或动作分层过滤。"
else:
reason = "本轮扫描没有形成满足条件的交易候选。"
no_trade_reason = {"has_scan": True, "reason": reason, "blocked_by": list(elimination.keys())[:5]}
top_theme_names = [item["theme"] for item in theme_views[:3]]
report = {
"trade_date": trade_date,
"scan_session": scan_session,
"scan_mode": result.get("scan_mode", ""),
"scanned_at": datetime.now().isoformat(),
"market_view": market_view,
"theme_views": theme_views,
"industry_chain_map": [
{
"theme": item["theme"],
"chain_nodes": item["chain_nodes"],
"chain_items": item.get("chain_items", []),
"leader_stocks": item.get("leader_stocks", []),
}
for item in theme_views
],
"catalyst": catalyst,
"stock_research_notes": stock_notes,
"opportunity_cards": opportunities,
"risk_alerts": risks,
"ranking_feedback": feedback or {},
"risk_summary": {
"reject_count": sum(1 for item in risks if item.get("reject")),
"warning_count": sum(1 for item in risks if not item.get("reject")),
"types": sorted({item.get("risk_type", "") for item in risks if item.get("risk_type")}),
},
"data_quality": data_quality or _fallback_data_quality(market_view),
"no_trade_reason": no_trade_reason,
"summary": {
"market": market_view["summary"],
"theme": f"当前主线关注 {''.join(top_theme_names)}" if top_theme_names else "暂未形成清晰主线。",
"opportunity_count": len(opportunities),
"risk_count": len(risks),
},
}
return report
async def _build_data_quality_report(market_view: dict) -> dict:
watched_sources = ("eastmoney", "tencent", "market_breadth")
since = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
issues = []
async with get_db() as db:
result = await db.execute(
text(
"SELECT source, level, message, created_at FROM error_logs "
"WHERE created_at >= :since AND ("
"source LIKE '%eastmoney%' OR source LIKE '%tencent%' OR source = 'market_breadth'"
") ORDER BY created_at DESC, id DESC LIMIT 20"
),
{"since": since.strftime("%Y-%m-%d %H:%M:%S")},
)
for row in result.fetchall():
item = dict(row._mapping)
issues.append({
"source": item.get("source", ""),
"level": item.get("level", ""),
"message": item.get("message", ""),
"created_at": str(item.get("created_at") or ""),
})
warnings = []
market_status = market_view.get("data_status") or "fresh"
if market_status == "estimated":
warnings.append("全市场涨跌停使用实时行情阈值估算,非涨跌停池精确口径。")
if issues:
warnings.append("今日存在行情源失败记录,盘中结论已降级处理。")
status = "degraded" if warnings or issues else "ok"
return {
"status": status,
"market_data_status": market_status,
"market_source": market_view.get("source", ""),
"limit_counts_reliable": bool(market_view.get("limit_counts_reliable", False)),
"warnings": warnings,
"issues": issues[:8],
}
def _fallback_data_quality(market_view: dict) -> dict:
market_status = market_view.get("data_status") or "fresh"
warnings = []
if market_status == "estimated":
warnings.append("全市场涨跌停使用实时行情阈值估算。")
return {
"status": "degraded" if warnings else "ok",
"market_data_status": market_status,
"market_source": market_view.get("source", ""),
"limit_counts_reliable": bool(market_view.get("limit_counts_reliable", False)),
"warnings": warnings,
"issues": [],
}
def _calibrate_market_from_themes(market_temp: Any, theme_views: list[dict]) -> None:
if not market_temp or not theme_views:
return
theme_limit_up = sum(max(int(item.get("limit_up_count") or 0), 0) for item in theme_views)
if theme_limit_up <= int(getattr(market_temp, "limit_up_count", 0) or 0):
return
original_limit = int(getattr(market_temp, "limit_up_count", 0) or 0)
original_temp = float(getattr(market_temp, "temperature", 0) or 0)
market_temp.limit_up_count = theme_limit_up
if original_limit == 0:
market_temp.temperature = round(min(original_temp + min(theme_limit_up / 5, 8), 100), 1)
if not getattr(market_temp, "limit_counts_reliable", False):
market_temp.data_status = "estimated"
detail = getattr(market_temp, "source_detail", "") or ""
market_temp.source_detail = f"{detail};theme_limit_lower_bound"
async def save_research_report(report: dict) -> None:
trade_date = str(report.get("trade_date") or "")
scan_session = str(report.get("scan_session") or "manual")
now = datetime.now()
async with get_db() as db:
await db.execute(text("DELETE FROM research_reports WHERE scan_session = :session"), {"session": scan_session})
await db.execute(text("DELETE FROM theme_maps WHERE scan_session = :session"), {"session": scan_session})
await db.execute(text("DELETE FROM theme_chain_nodes WHERE scan_session = :session"), {"session": scan_session})
await db.execute(text("DELETE FROM stock_research_notes WHERE scan_session = :session"), {"session": scan_session})
await db.execute(text("DELETE FROM risk_events WHERE scan_session = :session"), {"session": scan_session})
await db.execute(text("DELETE FROM opportunity_cards WHERE scan_session = :session"), {"session": scan_session})
await db.execute(tables.research_reports_table.insert().values(
scan_session=scan_session,
trade_date=trade_date,
market_summary=report.get("summary", {}).get("market", ""),
theme_summary=report.get("summary", {}).get("theme", ""),
no_trade_reason=json.dumps(report.get("no_trade_reason", {}), ensure_ascii=False),
report_json=json.dumps(report, ensure_ascii=False, default=str),
created_at=now,
))
for theme in report.get("theme_views", []):
await db.execute(tables.theme_maps_table.insert().values(
scan_session=scan_session,
trade_date=trade_date,
theme_name=theme.get("theme", ""),
stage=theme.get("stage", ""),
heat_score=theme.get("heat_score", 0),
logic_summary=theme.get("logic", ""),
lifecycle_status=theme.get("lifecycle_status", ""),
created_at=now,
))
for node in theme.get("chain_nodes", []) or ["未归类"]:
chain_item = _chain_item_for_node(theme, node)
await db.execute(tables.theme_chain_nodes_table.insert().values(
scan_session=scan_session,
trade_date=trade_date,
theme_name=theme.get("theme", ""),
chain_node=node,
related_stocks=json.dumps(chain_item.get("related_stocks", []), ensure_ascii=False, default=str),
leader_stocks=json.dumps(chain_item.get("leader_stocks", []) or theme.get("leader_stocks", []), ensure_ascii=False, default=str),
created_at=now,
))
for note in report.get("stock_research_notes", []):
await db.execute(tables.stock_research_notes_table.insert().values(
scan_session=scan_session,
trade_date=trade_date,
ts_code=note.get("ts_code", ""),
name=note.get("name", ""),
theme=note.get("theme", "未归类"),
chain_node=note.get("chain_node", "未归类"),
logic_score=note.get("logic_score", 0),
logic_summary=note.get("logic_summary", ""),
evidence_json=json.dumps(note.get("evidence", []), ensure_ascii=False, default=str),
uncertainty=note.get("uncertainty", ""),
stock_role=note.get("stock_role", "待归类"),
disagreement=note.get("disagreement", ""),
invalid_condition=note.get("invalid_condition", ""),
generated_by=note.get("generated_by", "rules"),
created_at=now,
))
for risk in report.get("risk_alerts", []):
await db.execute(tables.risk_events_table.insert().values(
scan_session=scan_session,
trade_date=trade_date,
ts_code=risk.get("ts_code", ""),
risk_type=risk.get("risk_type", ""),
severity=risk.get("severity", "warning"),
reject=bool(risk.get("reject")),
reason=risk.get("reason", ""),
source=risk.get("source", "research_agent"),
created_at=now,
))
for card in report.get("opportunity_cards", []):
await db.execute(tables.opportunity_cards_table.insert().values(
scan_session=scan_session,
trade_date=trade_date,
ts_code=card.get("ts_code", ""),
name=card.get("name", ""),
theme=card.get("theme", ""),
chain_node=card.get("chain_node", "未归类"),
stock_role=card.get("stock_role", "待归类"),
opportunity_type=card.get("opportunity_type", "观察"),
score=card.get("adjusted_score", card.get("score", 0)),
alpha_type=card.get("alpha_type", "观察线索"),
alpha_score=card.get("alpha_score", 0),
beta_dependency=card.get("beta_dependency", ""),
beta_dependency_score=card.get("beta_dependency_score", 0),
ambush_score=card.get("ambush_score", 0),
expectation_gap_score=card.get("expectation_gap_score", 0),
risk_gate=card.get("risk_gate", "通过"),
setup_quality=card.get("setup_quality", "仅观察"),
alpha_reason=card.get("alpha_reason", ""),
action_plan=card.get("action_plan", "观察"),
trigger=card.get("trigger", ""),
invalid_condition=card.get("invalid_condition", ""),
created_at=now,
))
await db.commit()
def _chain_item_for_node(theme: dict, node: str) -> dict:
for item in theme.get("chain_items", []) or []:
if item.get("chain_node") == node:
return item
return {}
async def load_latest_research_report() -> dict | None:
async with get_db() as db:
result = await db.execute(text("SELECT report_json FROM research_reports ORDER BY created_at DESC, id DESC LIMIT 1"))
row = result.fetchone()
if not row:
return None
try:
return json.loads(row._mapping["report_json"] or "{}")
except Exception:
return None