"""用户自选股分析服务""" from __future__ import annotations import json import logging import re from sqlalchemy import text from app.analysis.signals import generate_signals from app.data import tencent_client from app.db.database import get_db from app.db import tables from app.llm.client import chat_completion logger = logging.getLogger(__name__) async def analyze_watchlist_for_all_users(mode: str = "scheduled") -> int: """批量分析所有启用中的用户自选股。""" async with get_db() as db: rows = (await db.execute( text( "SELECT w.id, w.user_id, w.ts_code, w.name, w.note, w.watch_group, w.cost_price " "FROM user_watchlists w " "WHERE COALESCE(w.is_active, 1) = 1 " "ORDER BY w.user_id, w.id" ) )).fetchall() count = 0 for row in rows: item = row._mapping await analyze_watchlist_item( watchlist_id=item["id"], user_id=item["user_id"], ts_code=item["ts_code"], name=item["name"], note=item.get("note") or "", watch_group=item.get("watch_group") or "observe", cost_price=item.get("cost_price"), mode=mode, ) count += 1 return count async def analyze_watchlist_item( watchlist_id: int, user_id: int, ts_code: str, name: str, note: str = "", watch_group: str = "observe", cost_price: float | None = None, mode: str = "manual", ) -> dict: """分析单只自选股并保存结果。""" recommendation = await _load_latest_recommendation(ts_code) latest_tracking = await _load_latest_tracking(recommendation["id"]) if recommendation else None try: quote = await tencent_client.get_realtime_quote(ts_code) except Exception: logger.exception("获取自选股实时行情失败: %s", ts_code) quote = None try: signals = generate_signals(ts_code) except Exception: logger.exception("生成自选股信号失败: %s", ts_code) signals = None summary = _build_summary(ts_code, name, recommendation, latest_tracking, quote, signals, note, watch_group, cost_price) llm_result = await _generate_watchlist_advice(summary) structured = _extract_structured_result(llm_result, recommendation, latest_tracking) async with get_db() as db: await db.execute( tables.watchlist_analyses_table.insert().values( user_id=user_id, watchlist_id=watchlist_id, ts_code=ts_code, name=name, conclusion=structured["conclusion"], advice=structured["advice"], trigger_condition=structured["trigger_condition"], risk_note=structured["risk_note"], summary=structured["summary"], full_analysis=structured["full_analysis"], score_reference=structured["score_reference"], analysis_mode=mode, ) ) await db.commit() return structured async def _load_latest_recommendation(ts_code: str) -> dict | None: async with get_db() as db: row = (await db.execute( text( "SELECT * FROM recommendations " "WHERE ts_code = :code " "ORDER BY created_at DESC, id DESC LIMIT 1" ), {"code": ts_code}, )).fetchone() return dict(row._mapping) if row else None async def _load_latest_tracking(recommendation_id: int) -> dict | None: async with get_db() as db: row = (await db.execute( text( "SELECT * FROM recommendation_tracking " "WHERE recommendation_id = :rid " "ORDER BY track_date DESC, id DESC LIMIT 1" ), {"rid": recommendation_id}, )).fetchone() return dict(row._mapping) if row else None def _build_summary( ts_code: str, name: str, recommendation: dict | None, latest_tracking: dict | None, quote, signals, note: str, watch_group: str, cost_price: float | None, ) -> str: quote_str = "" if quote: quote_str = f"当前价 {quote.price},涨跌幅 {quote.pct_chg}%,换手率 {quote.turnover_rate}%,量比 {quote.volume_ratio}。" recommendation_str = "暂无推荐归档。" if recommendation: recommendation_str = ( f"推荐归档:结论 {recommendation.get('action_plan') or '观察'}," f"触发条件 {recommendation.get('trigger_condition') or '暂无'}," f"失效条件 {recommendation.get('invalidation_condition') or '暂无'}," f"风险提示 {recommendation.get('risk_note') or '暂无'}。" ) tracking_str = "" if latest_tracking: tracking_str = ( f"最近跟踪:收益 {latest_tracking.get('pct_from_entry') or 0}%," f"最大浮盈 {latest_tracking.get('max_return_pct') or 0}%," f"最大回撤 {latest_tracking.get('max_drawdown_pct') or 0}%," f"备注 {latest_tracking.get('review_note') or '暂无'}。" ) signal_str = "技术快照暂无。" if signals: signal_str = ( f"技术快照:趋势强度 {signals.trend_score},辅助信号 {signals.signal_count}/7," f"位置安全 {signals.position_score},近5日涨幅 {signals.rally_pct_5d}% ,近10日涨幅 {signals.rally_pct_10d}%。" ) group_str = f"用户分组:{watch_group}。" cost_str = f"持仓成本 {cost_price}。" if cost_price and cost_price > 0 else "暂无持仓成本。" note_str = f"用户备注:{note}" if note else "用户未填写备注。" return f"{ts_code} {name}。{group_str} {cost_str} {quote_str} {recommendation_str} {tracking_str} {signal_str} {note_str}" async def _generate_watchlist_advice(summary: str) -> str: message = await chat_completion([ { "role": "system", "content": ( "你是A股投研作战台的用户自选股助手。" "你需要针对单只用户自选股给出简洁、可执行的建议。" "输出必须是 JSON 字符串,包含字段 conclusion、advice、trigger_condition、risk_note、summary。" "conclusion 只能是 可操作 / 重点关注 / 观察 / 回避。" "summary 必须是一句中文短句。advice 需要明确用户下一步该看什么、等什么或做什么。" ), }, { "role": "user", "content": f"请基于以下信息输出 JSON:{summary}", }, ]) if not message or not getattr(message, "content", None): return "" return message.content def _extract_structured_result(content: str, recommendation: dict | None, latest_tracking: dict | None) -> dict: default = { "conclusion": recommendation.get("action_plan") if recommendation else "观察", "advice": recommendation.get("trigger_condition") if recommendation else "继续观察量价配合、板块强弱和回踩承接。", "trigger_condition": recommendation.get("trigger_condition") if recommendation else "", "risk_note": recommendation.get("risk_note") if recommendation else (latest_tracking.get("review_note") if latest_tracking else ""), "summary": latest_tracking.get("review_note") if latest_tracking else "当前信息不足以升级为明确操作,先保留观察。", "full_analysis": content or "", "score_reference": float(recommendation.get("score") or 0) if recommendation else 0, } if not content: return default try: parsed = json.loads(_extract_json_string(content)) return { "conclusion": parsed.get("conclusion") or default["conclusion"], "advice": parsed.get("advice") or default["advice"], "trigger_condition": parsed.get("trigger_condition") or default["trigger_condition"], "risk_note": parsed.get("risk_note") or default["risk_note"], "summary": parsed.get("summary") or default["summary"], "full_analysis": content, "score_reference": default["score_reference"], } except Exception: logger.warning("自选股分析 JSON 解析失败,回退默认结构") default["full_analysis"] = content return default def _extract_json_string(content: str) -> str: cleaned = content.strip() if cleaned.startswith("```"): fenced = re.search(r"```(?:json)?\s*(\{.*\})\s*```", cleaned, re.DOTALL) if fenced: return fenced.group(1) start = cleaned.find("{") end = cleaned.rfind("}") if start != -1 and end != -1 and end > start: return cleaned[start : end + 1] return cleaned