"""个股分析 API""" import json import logging import traceback from datetime import datetime, timedelta from fastapi import APIRouter, Query from starlette.responses import StreamingResponse from app.data.tushare_client import tushare_client from app.data import tencent_client from app.analysis.technical import add_all_indicators from app.analysis.signals import generate_signals from app.db.database import get_db from app.db import tables logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/stocks", tags=["stocks"]) @router.get("/search") async def search_stock(keyword: str): """搜索股票""" basic = tushare_client.get_stock_basic() if basic.empty: return [] matches = basic[ basic["name"].str.contains(keyword, na=False) | basic["ts_code"].str.contains(keyword, na=False) | basic["symbol"].str.contains(keyword, na=False) ].head(20) return matches[["ts_code", "name", "industry"]].to_dict(orient="records") @router.get("/{ts_code}/thesis") async def get_stock_thesis(ts_code: str): """获取个股推荐推演归档(只读缓存,不触发扫描或 LLM)。""" from sqlalchemy import text async with get_db() as db: rec_result = await db.execute( text( "SELECT * FROM recommendations " "WHERE ts_code = :code " "ORDER BY created_at DESC, id DESC LIMIT 1" ), {"code": ts_code}, ) rec_row = rec_result.fetchone() tracking_rows = [] diagnosis_rows = [] if rec_row: rec_id = rec_row._mapping["id"] tracking_result = await db.execute( text( "SELECT * FROM recommendation_tracking " "WHERE recommendation_id = :rid " "ORDER BY track_date DESC, id DESC LIMIT 10" ), {"rid": rec_id}, ) tracking_rows = tracking_result.fetchall() diagnosis_result = await db.execute( text( "SELECT id, diagnosis, created_at FROM stock_diagnoses " "WHERE ts_code = :code " "ORDER BY created_at DESC LIMIT 3" ), {"code": ts_code}, ) diagnosis_rows = diagnosis_result.fetchall() if not rec_row: return { "ts_code": ts_code, "name": ts_code, "has_recommendation": False, "recommendation": None, "latest_tracking": None, "tracking_history": [], "diagnoses": [ { "id": row._mapping["id"], "diagnosis": row._mapping["diagnosis"] or "", "created_at": str(row._mapping["created_at"] or ""), } for row in diagnosis_rows ], "decision_points": [], "data_freshness": { "recommendation_created_at": "", "tracking_date": "", "status": "no_recommendation", "message": "暂无推荐归档,可从 AI 诊断页生成个股诊断。", }, } r = rec_row._mapping def _safe_json_list(value: str | None) -> list: if not value: return [] try: parsed = json.loads(value) return parsed if isinstance(parsed, list) else [] except Exception: return [] tracking_history = [] for row in tracking_rows: t = row._mapping tracking_history.append({ "track_date": t["track_date"], "current_price": t["current_price"], "pct_from_entry": t["pct_from_entry"], "max_return_pct": t["max_return_pct"], "max_drawdown_pct": t["max_drawdown_pct"], "days_since_recommendation": t["days_since_recommendation"], "hit_target": bool(t["hit_target"]), "hit_stop_loss": bool(t["hit_stop_loss"]), "close_reason": t["close_reason"] or "", "review_note": t["review_note"] or "", "status": t["status"] or "", }) latest_tracking = tracking_history[0] if tracking_history else None decision_points = [ {"label": "操作计划", "value": r["action_plan"] or "观察"}, {"label": "召回来源", "value": " / ".join(_safe_json_list(r.get("recall_tags"))) or "未归档"}, {"label": "AI预筛", "value": r.get("prefilter_decision") or "未执行"}, {"label": "触发条件", "value": r["trigger_condition"] or "等待触发条件归档"}, {"label": "失效条件", "value": r["invalidation_condition"] or "等待失效条件归档"}, {"label": "建议仓位", "value": f"{r['suggested_position_pct']}%" if r["suggested_position_pct"] is not None else "未设置"}, {"label": "复盘周期", "value": f"{r['review_after_days'] or 3}个交易日"}, ] freshness_status = "fresh" freshness_message = "推荐归档可用" if not latest_tracking: freshness_status = "needs_tracking" freshness_message = "暂无跟踪记录,建议管理员执行跟踪更新。" elif latest_tracking.get("track_date"): freshness_message = f"最近跟踪日期 {latest_tracking['track_date']}" return { "ts_code": r["ts_code"], "name": r["name"], "has_recommendation": True, "recommendation": { "id": r["id"], "ts_code": r["ts_code"], "name": r["name"], "sector": r["sector"] or "", "score": r["score"] or 0, "market_temp_score": r["market_temp_score"] or 0, "sector_score": r["sector_score"] or 0, "capital_score": r["capital_score"] or 0, "technical_score": r["technical_score"] or 0, "supply_demand_score": r["supply_demand_score"] or 0, "price_action_score": r["price_action_score"] or 0, "position_score": r["position_score"] or 50, "valuation_score": r["valuation_score"] or 50, "entry_price": r["entry_price"], "target_price": r["target_price"], "stop_loss": r["stop_loss"], "reasons": _safe_json_list(r["reasons"]), "risk_note": r["risk_note"] or "", "action_plan": r["action_plan"] or "观察", "trigger_condition": r["trigger_condition"] or "", "invalidation_condition": r["invalidation_condition"] or "", "suggested_position_pct": r["suggested_position_pct"] or 0, "review_after_days": r["review_after_days"] or 3, "lifecycle_status": r["lifecycle_status"] or "candidate", "data_freshness": r["data_freshness"] or "", "llm_analysis": r["llm_analysis"] or "", "llm_score": r["llm_score"], "recall_tags": _safe_json_list(r.get("recall_tags")), "prefilter_decision": r.get("prefilter_decision") or "", "prefilter_reason": r.get("prefilter_reason") or "", "focus_points": _safe_json_list(r.get("focus_points")), "strategy": r["strategy"] or "trend_breakout", "entry_signal_type": r["entry_signal_type"] or "none", "entry_timing": r["entry_timing"] or "", "scan_session": r["scan_session"] or "", "created_at": str(r["created_at"] or ""), }, "latest_tracking": latest_tracking, "tracking_history": tracking_history, "diagnoses": [ { "id": row._mapping["id"], "diagnosis": row._mapping["diagnosis"] or "", "created_at": str(row._mapping["created_at"] or ""), } for row in diagnosis_rows ], "decision_points": decision_points, "data_freshness": { "recommendation_created_at": str(r["created_at"] or ""), "tracking_date": latest_tracking["track_date"] if latest_tracking else "", "status": freshness_status, "message": freshness_message, }, } @router.get("/{ts_code}/quote") async def get_quote(ts_code: str): """获取个股实时行情""" quote = await tencent_client.get_realtime_quote(ts_code) if not quote: return {"error": "获取行情失败"} return quote.model_dump() @router.get("/{ts_code}/kline") async def get_kline(ts_code: str, days: int = 120): """获取个股K线数据(含技术指标)""" df = tushare_client.get_stock_daily(ts_code, days=days) if df.empty: return [] df = df.sort_values("trade_date").reset_index(drop=True) df = add_all_indicators(df) # 替换 NaN 为 None(JSON 兼容) import math records = df.to_dict(orient="records") for rec in records: for k, v in rec.items(): if isinstance(v, float) and (math.isnan(v) or math.isinf(v)): rec[k] = None return records @router.get("/{ts_code}/signals") async def get_signals(ts_code: str): """获取个股技术面买卖信号""" signal = generate_signals(ts_code) return signal.model_dump() @router.get("/{ts_code}/capital_flow") async def get_capital_flow(ts_code: str, days: int = 10): """获取个股资金流向(含大/中/小单分拆)""" df = tushare_client.get_stock_moneyflow(ts_code, days=days) if df.empty: return [] df = df.sort_values("trade_date") records = [] for _, row in df.iterrows(): main_net = ( (row.get("buy_elg_amount", 0) or 0) - (row.get("sell_elg_amount", 0) or 0) + (row.get("buy_lg_amount", 0) or 0) - (row.get("sell_lg_amount", 0) or 0) ) records.append({ "trade_date": row["trade_date"], "main_net_inflow": round(main_net, 2), "net_mf_amount": round(float(row.get("net_mf_amount", 0) or 0), 2), "elg_net": round( (row.get("buy_elg_amount", 0) or 0) - (row.get("sell_elg_amount", 0) or 0), 2 ), "lg_net": round( (row.get("buy_lg_amount", 0) or 0) - (row.get("sell_lg_amount", 0) or 0), 2 ), "md_net": round( (row.get("buy_md_amount", 0) or 0) - (row.get("sell_md_amount", 0) or 0), 2 ), "sm_net": round( (row.get("buy_sm_amount", 0) or 0) - (row.get("sell_sm_amount", 0) or 0), 2 ), }) return records @router.get("/{ts_code}/diagnose/history") async def get_diagnose_history(ts_code: str): """获取个股最近5次诊断历史""" try: from sqlalchemy import text async with get_db() as db: result = await db.execute( text( "SELECT id, ts_code, name, diagnosis, created_at " "FROM stock_diagnoses " "WHERE ts_code = :code " "ORDER BY created_at DESC LIMIT 5" ), {"code": ts_code}, ) rows = result.fetchall() history = [] for row in rows: r = row._mapping history.append({ "id": r["id"], "ts_code": r["ts_code"], "name": r["name"], "diagnosis_mode": r.get("diagnosis_mode", "entry"), "diagnosis": r["diagnosis"], "created_at": str(r["created_at"]), }) return history except Exception as e: logger.error(f"获取诊断历史失败: {e}") await log_error("stocks", f"获取诊断历史失败: {e}", detail=traceback.format_exc()) return [] @router.post("/{ts_code}/diagnose") async def diagnose_stock(ts_code: str, mode: str = Query("entry")): """AI 诊断个股(SSE 流式返回)""" from app.config import settings if not settings.deepseek_api_key: return {"status": "error", "message": "未配置 LLM API Key"} from app.llm.client import get_client from sqlalchemy import text # ── 检查是否有最近30分钟内的诊断记录,若有则直接返回 ── try: async with get_db() as db: result = await db.execute( text( "SELECT id, ts_code, name, diagnosis, created_at " "FROM stock_diagnoses " "WHERE ts_code = :code " "AND diagnosis_mode = :mode " "AND created_at >= datetime('now', '-30 minutes', 'localtime') " "ORDER BY created_at DESC LIMIT 1" ), {"code": ts_code, "mode": mode}, ) recent_row = result.fetchone() if recent_row: r = recent_row._mapping # 直接返回缓存结果 async def _cached_stream(): yield f"data: {json.dumps({'cached': True, 'diagnosis': r['diagnosis']}, ensure_ascii=False)}\n\n" yield f"data: {json.dumps({'done': True, 'ts_code': ts_code})}\n\n" return StreamingResponse(_cached_stream(), media_type="text/event-stream") except Exception as e: logger.warning(f"检查诊断缓存失败: {e}") # ── 收集数据 ── quote = await tencent_client.get_realtime_quote(ts_code) signals = generate_signals(ts_code) df_daily = tushare_client.get_stock_daily(ts_code, days=120) df_flow = tushare_client.get_stock_moneyflow(ts_code, days=10) # ── 数据新鲜度检查 ── freshness_note = "" data_stale = False current_price_source = "" if not df_daily.empty: df_daily = df_daily.sort_values("trade_date") latest_kline_date = str(df_daily.iloc[-1]["trade_date"]) # 检查 K 线数据是否超过 10 天未更新 cutoff_date = (datetime.now() - timedelta(days=10)).strftime("%Y%m%d") if latest_kline_date < cutoff_date: logger.warning(f"K线数据过时 {ts_code}: 最新={latest_kline_date}, 10天前阈值={cutoff_date}") data_stale = True # 如果最新 K 线日期不是今天,添加新鲜度提示 today_str = datetime.now().strftime("%Y%m%d") if latest_kline_date != today_str: freshness_note = f"\n\n注意:K线数据最新日期为{latest_kline_date},非当日数据,部分分析可能滞后。" # 数据过时时,使用实时报价价格作为"当前价"替代 if data_stale and quote and quote.price > 0: current_price_source = f"(实时报价价 {quote.price},K线收盘价 {df_daily.iloc[-1]['close']} 可能滞后)" # 构建数据摘要 quote_str = "" if quote: quote_str = ( f"当前价: {quote.price}{current_price_source}, 涨跌幅: {quote.pct_chg}%, " f"换手率: {quote.turnover_rate}%, 量比: {quote.volume_ratio}, " f"PE: {quote.pe}, PB: {quote.pb}, " f"总市值: {quote.total_mv}亿, 流通市值: {quote.circ_mv}亿" ) signal_str = ( f"推荐体系评分: 趋势评分={signals.trend_score}/100(均线排列+高低点结构+MA20方向,主评分辅助项), " f"辅助信号计数={signals.signal_count}/7(触发计分,仅供节奏参考,不作为主评分裁判), " f"均线多头: {signals.ma_bullish}, " f"放量突破: {signals.volume_breakout}, " f"MACD金叉: {signals.macd_golden}, " f"RSI节奏区间: {signals.rsi_healthy}, " f"缩量回踩: {signals.pullback_support}, " f"放量长阳: {signals.big_yang}, " f"布林支撑: {signals.boll_support}, " f"支撑位: {signals.support_price}, " f"压力位: {signals.resist_price}, " f"止损位: {signals.stop_loss_price}" ) position_str = ( f"位置安全评分: {signals.position_score}/100(越高表示位置越低越安全,96分以上表示处于相对低位), " f"近5日涨幅: {signals.rally_pct_5d}%, " f"近10日涨幅: {signals.rally_pct_10d}%, " f"距60日高点: {signals.distance_from_high}%" ) trend_str = "" ma_info = "" if not df_daily.empty: latest = df_daily.iloc[-1] if len(df_daily) >= 5: pct_5d = (latest["close"] - df_daily.iloc[-5]["close"]) / df_daily.iloc[-5]["close"] * 100 trend_str += f"5日涨幅: {pct_5d:.2f}%, " if len(df_daily) >= 20: pct_20d = (latest["close"] - df_daily.iloc[-20]["close"]) / df_daily.iloc[-20]["close"] * 100 trend_str += f"20日涨幅: {pct_20d:.2f}%, " vol_avg_5 = df_daily.tail(5)["vol"].mean() vol_latest = latest["vol"] trend_str += f"量比(5日均): {vol_latest / vol_avg_5:.2f}" if vol_avg_5 > 0 else "" # MA 信息 if "ma5" in latest and "ma20" in latest: ma5 = latest.get("ma5", 0) ma10 = latest.get("ma10", 0) ma20 = latest.get("ma20", 0) ma60 = latest.get("ma60", 0) price = latest["close"] ma_info = ( f"价格与均线关系: 现价{price:.2f}, " f"MA5={ma5:.2f}, MA10={ma10:.2f}, MA20={ma20:.2f}, MA60={ma60:.2f}, " f"{'价格在MA5上方' if price > ma5 else '价格在MA5下方'}, " f"{'价格在MA20上方' if price > ma20 else '价格在MA20下方'}, " f"{'均线多头排列' if ma5 > ma10 > ma20 else '均线未多头排列'}" ) flow_str = "" if not df_flow.empty: df_flow = df_flow.sort_values("trade_date") latest_flow_date = str(df_flow.iloc[-1]["trade_date"]) recent_3 = df_flow.tail(3) total_main = 0 for _, r in recent_3.iterrows(): main_net = ( (r.get("buy_elg_amount", 0) or 0) - (r.get("sell_elg_amount", 0) or 0) + (r.get("buy_lg_amount", 0) or 0) - (r.get("sell_lg_amount", 0) or 0) ) total_main += main_net flow_str = f"近3日主力净流入: {total_main:.0f}万" # 资金流向数据新鲜度标注 today_str = datetime.now().strftime("%Y%m%d") if latest_flow_date != today_str: flow_str += f"(数据截至{latest_flow_date},盘中可能滞后一日)" # 基本信息 basic_info = "" stock_name = "" industry = "" basic_df = tushare_client.get_stock_basic() if not basic_df.empty: row = basic_df[basic_df["ts_code"] == ts_code] if not row.empty: r = row.iloc[0] stock_name = r["name"] industry = r.get("industry", "") or "" basic_info = f"名称: {r['name']}, 行业: {industry}" # 推荐体系评分(如果该股票在推荐列表中) rec_score_str = "" try: async with get_db() as db: rec_result = await db.execute( text( "SELECT score, supply_demand_score, price_action_score, " "capital_score, technical_score, position_score, sector, signal, entry_signal_type " "FROM recommendations " "WHERE ts_code = :code " "AND (action_plan IN ('可操作', '重点关注') OR COALESCE(llm_score, 0) >= 6 OR score >= 56) " "ORDER BY created_at DESC LIMIT 1" ), {"code": ts_code}, ) rec_row = rec_result.fetchone() if rec_row: rm = rec_row._mapping rec_score_str = ( f"\n推荐体系评分: 综合={rm['score']}, " f"资金={rm['capital_score']}(资金顺势核心), " f"供需={rm['supply_demand_score']}, " f"形态={rm['price_action_score']}, " f"趋势={rm['technical_score']}, " f"位置安全={rm['position_score']}, " f"板块={rm['sector']}, " f"信号={rm['signal']}, " f"入场类型={rm['entry_signal_type']}" ) except Exception: pass # 板块热度(如果有该行业板块数据) sector_str = "" if industry: try: async with get_db() as db: sector_result = await db.execute( text( "SELECT sector_name, pct_change, heat_score, stage, " "days_continuous, limit_up_count " "FROM sector_heat " "WHERE sector_name LIKE :industry " "ORDER BY created_at DESC LIMIT 1" ), {"industry": f"%{industry}%"}, ) s_row = sector_result.fetchone() if s_row: sm = s_row._mapping sector_str = ( f"板块热度: {sm['sector_name']} 涨幅={sm['pct_change']}%, " f"热度={sm['heat_score']}, 阶段={sm['stage']}, " f"连续{sm['days_continuous']}天, 涨停数={sm['limit_up_count']}" ) except Exception: pass mode_instruction_map = { "entry": "这是建仓前诊断。必须明确当前是可操作、重点关注、观察还是回避,并给出触发与失效边界。", "holding": "这是持仓复核。必须回答逻辑是否还成立,当前更适合持有、减仓、退出还是继续观察。", "review": "这是回撤复盘。重点拆清问题来自市场、板块还是个股执行,并提出下一轮修正动作。", "tracking": "这是继续跟踪。必须说明保留理由、升级条件和移除条件,避免空泛表述。", } mode_label_map = { "entry": "建仓前诊断", "holding": "持仓复核", "review": "回撤复盘", "tracking": "继续跟踪", } mode_instruction = mode_instruction_map.get(mode, mode_instruction_map["entry"]) mode_label = mode_label_map.get(mode, mode_label_map["entry"]) user_msg = f"""请基于当前 AI 推荐体系,对以下A股进行结构化个股会诊: 诊断模式: {mode_label} 模式要求: {mode_instruction} 股票: {ts_code} ({basic_info}) {quote_str} 资金面: {flow_str} {rec_score_str} {sector_str} 价格行为与趋势: {trend_str} {ma_info} 位置安全: {position_str} 技术指标备注: {signal_str} 重要提示: 1. 你不是在写传统研报,而是在给交易作战台输出结构化会诊意见。 2. 如果有推荐体系评分、操作计划、跟踪信息,请优先沿用当前推荐体系,而不是另起一套标准。 3. 分析优先级必须是:资金流向与主线板块 > 量价关系与价格行为 > 位置和交易边界 > 技术指标备注。 4. 技术指标只做节奏和风控确认,不能成为核心判断;不要把 MA/MACD/RSI/KDJ 放在主要依据的第一位。 5. RSI、MACD、KDJ 等滞后指标不能单独决定买卖;RSI 超买只提示追高风险,超卖只提示弱势或反弹弹性,不等于可买或不可买。 6. 位置安全评分高(>80)表示股价处于相对低位,低(<40)表示可能追高。 7. 板块信息、资金面、量价承接和推荐体系信息优先级高于单一技术指标。 8. 先给结论和动作,再解释原因;不要先铺陈背景再拖到最后才下结论。 9. 如果证据不足,也要明确给出“观察”或“回避”,不能写成模糊建议。 {freshness_note} 请严格按以下 Markdown 结构输出,不要写成泛泛长文: ## 当前结论 - 结论: 只能从「可操作 / 重点关注 / 观察 / 回避」中选一个 - 一句话判断: 用一句话解释为什么 - 当前动作: 只能从「执行 / 等确认 / 继续跟踪 / 暂不参与」中选一个 - 适配模式: 说明更适合启动试错、分歧回流、趋势跟随还是只观察 ## 资金与主线 - 市场环境: 当前大盘和风格是否支持这只票 - 板块位置: 所属板块是主线、次主线还是观察线 - 个股角色: 龙头 / 跟风 / 独立逻辑 / 非核心 - 资金状态: 主力资金是持续流入、分歧流入、脉冲流入还是流出 ## 量价与价格行为 - 价格行为: 启动、突破、回踩、分歧回流、冲高回落、放量滞涨或弱反弹 - 量价关系: 放量是否有承接,回调是否缩量,突破是否站稳 - 关键证据: 只提最重要的两到三条证据,不要抄原始数据 ## 位置与边界 - 位置阶段: 低位启动 / 中位加速 / 高位博弈 / 退潮反抽 / 弱势震荡 - 盈亏比: 当前是否还值得参与 - 主要边界: 最关键的一条支撑或压力 ## 执行动作 - 触发条件: 什么情况下才可以行动 - 失效条件: 什么情况下放弃 - 仓位建议: 用低 / 中 / 高 或百分比表达 - 适合谁: 适合激进试错、低吸等待、还是不适合参与 - 跟踪重点: 下一交易时段最该盯住什么 ## 风险清单 - 风险1: - 风险2: - 风险3: ## 技术指标备注 - 指标状态: 只说明 MA/MACD/RSI/BOLL 对节奏或风险的辅助含义 - 不能作为结论的原因: 如果指标和资金/量价冲突,以资金和价格行为为主 ## 复盘问题 - 如果后续走势不符合预期,优先检查哪两个问题 ## 会诊纪要 - 用两到三句话总结本次会诊,不要写成长文,不要复制前面的条目 要求: - 结论必须明确,不能模糊两可 - 不要把技术指标写成核心逻辑第一段 - 少写形容词,多写交易判断 - 不要重复原始数据 - 文字保持简洁,避免旧式研报语气 - 每个条目尽量一句话说清,不要堆砌长段落""" # ── SSE 流式返回 ── async def _stream_diagnosis(): full_content = "" try: client = get_client() stream = await client.chat.completions.create( model=settings.deepseek_model, messages=[ {"role": "system", "content": "你是A股AI投研作战台中的个股会诊模块。你输出的是交易会诊单,不是传统研报。必须先给明确结论,再给执行动作、风险边界和跟踪重点。回复必须使用Markdown,结构严格、结论清晰、语言简短,禁止空泛抒情。"}, {"role": "user", "content": user_msg}, ], max_tokens=1500, temperature=0.5, stream=True, ) async for chunk in stream: if chunk.choices and chunk.choices[0].delta: token = chunk.choices[0].delta.content or "" if token: full_content += token yield f"data: {json.dumps({'token': token}, ensure_ascii=False)}\n\n" # 流式完成后,保存到数据库 full_content = full_content.strip() if full_content: try: async with get_db() as db: await db.execute( tables.stock_diagnoses_table.insert().values( ts_code=ts_code, name=stock_name or ts_code, diagnosis_mode=mode, diagnosis=full_content, ) ) await db.commit() logger.info(f"已保存诊断结果到数据库: {ts_code}") except Exception as e: logger.error(f"保存诊断结果到数据库失败: {e}") from app.db.error_logger import log_error await log_error("stocks", f"保存诊断结果到数据库失败: {e}", detail=traceback.format_exc()) yield f"data: {json.dumps({'done': True, 'ts_code': ts_code}, ensure_ascii=False)}\n\n" except Exception as e: error_msg = str(e) logger.error(f"诊断流式调用失败: {error_msg}") await log_error( "stocks", f"诊断流式调用失败: {error_msg}", detail=traceback.format_exc(), context={"method": "POST", "path": f"/api/stocks/{ts_code}/diagnose"}, ) yield f"data: {json.dumps({'error': error_msg}, ensure_ascii=False)}\n\n" yield f"data: {json.dumps({'done': True, 'ts_code': ts_code}, ensure_ascii=False)}\n\n" return StreamingResponse(_stream_diagnosis(), media_type="text/event-stream")