"""个股分析 API""" import json import logging from datetime import datetime, timedelta from fastapi import APIRouter from starlette.responses import StreamingResponse from app.data.tushare_client import tushare_client from app.data import tencent_client from app.analysis.technical import add_all_indicators from app.analysis.signals import generate_signals from app.db.database import get_db from app.db import tables logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/stocks", tags=["stocks"]) @router.get("/{ts_code}/quote") async def get_quote(ts_code: str): """获取个股实时行情""" quote = await tencent_client.get_realtime_quote(ts_code) if not quote: return {"error": "获取行情失败"} return quote.model_dump() @router.get("/{ts_code}/kline") async def get_kline(ts_code: str, days: int = 120): """获取个股K线数据(含技术指标)""" df = tushare_client.get_stock_daily(ts_code, days=days) if df.empty: return [] df = df.sort_values("trade_date").reset_index(drop=True) df = add_all_indicators(df) # 替换 NaN 为 None(JSON 兼容) import math records = df.to_dict(orient="records") for rec in records: for k, v in rec.items(): if isinstance(v, float) and (math.isnan(v) or math.isinf(v)): rec[k] = None return records @router.get("/{ts_code}/signals") async def get_signals(ts_code: str): """获取个股技术面买卖信号""" signal = generate_signals(ts_code) return signal.model_dump() @router.get("/{ts_code}/capital_flow") async def get_capital_flow(ts_code: str, days: int = 10): """获取个股资金流向(含大/中/小单分拆)""" df = tushare_client.get_stock_moneyflow(ts_code, days=days) if df.empty: return [] df = df.sort_values("trade_date") records = [] for _, row in df.iterrows(): main_net = ( (row.get("buy_elg_amount", 0) or 0) - (row.get("sell_elg_amount", 0) or 0) + (row.get("buy_lg_amount", 0) or 0) - (row.get("sell_lg_amount", 0) or 0) ) records.append({ "trade_date": row["trade_date"], "main_net_inflow": round(main_net, 2), "net_mf_amount": round(float(row.get("net_mf_amount", 0) or 0), 2), "elg_net": round( (row.get("buy_elg_amount", 0) or 0) - (row.get("sell_elg_amount", 0) or 0), 2 ), "lg_net": round( (row.get("buy_lg_amount", 0) or 0) - (row.get("sell_lg_amount", 0) or 0), 2 ), "md_net": round( (row.get("buy_md_amount", 0) or 0) - (row.get("sell_md_amount", 0) or 0), 2 ), "sm_net": round( (row.get("buy_sm_amount", 0) or 0) - (row.get("sell_sm_amount", 0) or 0), 2 ), }) return records @router.get("/search") async def search_stock(keyword: str): """搜索股票""" basic = tushare_client.get_stock_basic() if basic.empty: return [] matches = basic[ basic["name"].str.contains(keyword, na=False) | basic["ts_code"].str.contains(keyword, na=False) | basic["symbol"].str.contains(keyword, na=False) ].head(20) return matches[["ts_code", "name", "industry"]].to_dict(orient="records") @router.get("/{ts_code}/diagnose/history") async def get_diagnose_history(ts_code: str): """获取个股最近5次诊断历史""" try: from sqlalchemy import text async with get_db() as db: result = await db.execute( text( "SELECT id, ts_code, name, diagnosis, created_at " "FROM stock_diagnoses " "WHERE ts_code = :code " "ORDER BY created_at DESC LIMIT 5" ), {"code": ts_code}, ) rows = result.fetchall() history = [] for row in rows: r = row._mapping history.append({ "id": r["id"], "ts_code": r["ts_code"], "name": r["name"], "diagnosis": r["diagnosis"], "created_at": str(r["created_at"]), }) return history except Exception as e: logger.error(f"获取诊断历史失败: {e}") return [] @router.post("/{ts_code}/diagnose") async def diagnose_stock(ts_code: str): """AI 诊断个股(SSE 流式返回)""" from app.config import settings if not settings.deepseek_api_key: return {"status": "error", "message": "未配置 LLM API Key"} from app.llm.client import get_client from sqlalchemy import text # ── 检查是否有最近30分钟内的诊断记录,若有则直接返回 ── try: async with get_db() as db: result = await db.execute( text( "SELECT id, ts_code, name, diagnosis, created_at " "FROM stock_diagnoses " "WHERE ts_code = :code " "AND created_at >= datetime('now', '-30 minutes', 'localtime') " "ORDER BY created_at DESC LIMIT 1" ), {"code": ts_code}, ) recent_row = result.fetchone() if recent_row: r = recent_row._mapping # 直接返回缓存结果 async def _cached_stream(): yield f"data: {json.dumps({'cached': True, 'diagnosis': r['diagnosis']}, ensure_ascii=False)}\n\n" yield f"data: {json.dumps({'done': True, 'ts_code': ts_code})}\n\n" return StreamingResponse(_cached_stream(), media_type="text/event-stream") except Exception as e: logger.warning(f"检查诊断缓存失败: {e}") # ── 收集数据 ── quote = await tencent_client.get_realtime_quote(ts_code) signals = generate_signals(ts_code) df_daily = tushare_client.get_stock_daily(ts_code, days=120) df_flow = tushare_client.get_stock_moneyflow(ts_code, days=10) # ── 数据新鲜度检查 ── freshness_note = "" data_stale = False current_price_source = "" if not df_daily.empty: df_daily = df_daily.sort_values("trade_date") latest_kline_date = str(df_daily.iloc[-1]["trade_date"]) # 检查 K 线数据是否超过 10 天未更新 cutoff_date = (datetime.now() - timedelta(days=10)).strftime("%Y%m%d") if latest_kline_date < cutoff_date: logger.warning(f"K线数据过时 {ts_code}: 最新={latest_kline_date}, 10天前阈值={cutoff_date}") data_stale = True # 如果最新 K 线日期不是今天,添加新鲜度提示 today_str = datetime.now().strftime("%Y%m%d") if latest_kline_date != today_str: freshness_note = f"\n\n注意:K线数据最新日期为{latest_kline_date},非当日数据,部分分析可能滞后。" # 数据过时时,使用实时报价价格作为"当前价"替代 if data_stale and quote and quote.price > 0: current_price_source = f"(实时报价价 {quote.price},K线收盘价 {df_daily.iloc[-1]['close']} 可能滞后)" # 构建数据摘要 quote_str = "" if quote: quote_str = ( f"当前价: {quote.price}{current_price_source}, 涨跌幅: {quote.pct_chg}%, " f"换手率: {quote.turnover_rate}%, 量比: {quote.volume_ratio}, " f"PE: {quote.pe}, PB: {quote.pb}, " f"总市值: {quote.total_mv}亿, 流通市值: {quote.circ_mv}亿" ) signal_str = ( f"推荐体系评分: 趋势评分={signals.trend_score}/100(均线排列+高低点结构+MA20方向,主评分10%权重), " f"辅助信号计数={signals.signal_count}/7(触发计分,仅供参考不参与主评分), " f"均线多头: {signals.ma_bullish}, " f"放量突破: {signals.volume_breakout}, " f"MACD金叉: {signals.macd_golden}, " f"RSI健康: {signals.rsi_healthy}, " f"缩量回踩: {signals.pullback_support}, " f"放量长阳: {signals.big_yang}, " f"布林支撑: {signals.boll_support}, " f"支撑位: {signals.support_price}, " f"压力位: {signals.resist_price}, " f"止损位: {signals.stop_loss_price}" ) position_str = ( f"位置安全评分: {signals.position_score}/100(越高表示位置越低越安全,96分以上表示处于相对低位), " f"近5日涨幅: {signals.rally_pct_5d}%, " f"近10日涨幅: {signals.rally_pct_10d}%, " f"距60日高点: {signals.distance_from_high}%" ) trend_str = "" ma_info = "" if not df_daily.empty: latest = df_daily.iloc[-1] if len(df_daily) >= 5: pct_5d = (latest["close"] - df_daily.iloc[-5]["close"]) / df_daily.iloc[-5]["close"] * 100 trend_str += f"5日涨幅: {pct_5d:.2f}%, " if len(df_daily) >= 20: pct_20d = (latest["close"] - df_daily.iloc[-20]["close"]) / df_daily.iloc[-20]["close"] * 100 trend_str += f"20日涨幅: {pct_20d:.2f}%, " vol_avg_5 = df_daily.tail(5)["vol"].mean() vol_latest = latest["vol"] trend_str += f"量比(5日均): {vol_latest / vol_avg_5:.2f}" if vol_avg_5 > 0 else "" # MA 信息 if "ma5" in latest and "ma20" in latest: ma5 = latest.get("ma5", 0) ma10 = latest.get("ma10", 0) ma20 = latest.get("ma20", 0) ma60 = latest.get("ma60", 0) price = latest["close"] ma_info = ( f"价格与均线关系: 现价{price:.2f}, " f"MA5={ma5:.2f}, MA10={ma10:.2f}, MA20={ma20:.2f}, MA60={ma60:.2f}, " f"{'价格在MA5上方' if price > ma5 else '价格在MA5下方'}, " f"{'价格在MA20上方' if price > ma20 else '价格在MA20下方'}, " f"{'均线多头排列' if ma5 > ma10 > ma20 else '均线未多头排列'}" ) flow_str = "" if not df_flow.empty: df_flow = df_flow.sort_values("trade_date") latest_flow_date = str(df_flow.iloc[-1]["trade_date"]) recent_3 = df_flow.tail(3) total_main = 0 for _, r in recent_3.iterrows(): main_net = ( (r.get("buy_elg_amount", 0) or 0) - (r.get("sell_elg_amount", 0) or 0) + (r.get("buy_lg_amount", 0) or 0) - (r.get("sell_lg_amount", 0) or 0) ) total_main += main_net flow_str = f"近3日主力净流入: {total_main:.0f}万" # 资金流向数据新鲜度标注 today_str = datetime.now().strftime("%Y%m%d") if latest_flow_date != today_str: flow_str += f"(数据截至{latest_flow_date},盘中可能滞后一日)" # 基本信息 basic_info = "" stock_name = "" industry = "" basic_df = tushare_client.get_stock_basic() if not basic_df.empty: row = basic_df[basic_df["ts_code"] == ts_code] if not row.empty: r = row.iloc[0] stock_name = r["name"] industry = r.get("industry", "") or "" basic_info = f"名称: {r['name']}, 行业: {industry}" # 推荐体系评分(如果该股票在推荐列表中) rec_score_str = "" try: async with get_db() as db: rec_result = await db.execute( text( "SELECT score, supply_demand_score, price_action_score, " "technical_score, position_score, sector, signal " "FROM recommendations " "WHERE ts_code = :code AND score >= 60 " "ORDER BY created_at DESC LIMIT 1" ), {"code": ts_code}, ) rec_row = rec_result.fetchone() if rec_row: rm = rec_row._mapping rec_score_str = ( f"\n推荐体系评分: 综合={rm['score']}, " f"供需={rm['supply_demand_score']}(50%权重), " f"形态={rm['price_action_score']}(40%权重), " f"趋势={rm['technical_score']}(10%权重), " f"位置安全={rm['position_score']}, " f"板块={rm['sector']}, " f"信号={rm['signal']}" ) except Exception: pass # 板块热度(如果有该行业板块数据) sector_str = "" if industry: try: async with get_db() as db: sector_result = await db.execute( text( "SELECT sector_name, pct_change, heat_score, stage, " "days_continuous, limit_up_count " "FROM sector_heat " "WHERE sector_name LIKE :industry " "ORDER BY created_at DESC LIMIT 1" ), {"industry": f"%{industry}%"}, ) s_row = sector_result.fetchone() if s_row: sm = s_row._mapping sector_str = ( f"板块热度: {sm['sector_name']} 涨幅={sm['pct_change']}%, " f"热度={sm['heat_score']}, 阶段={sm['stage']}, " f"连续{sm['days_continuous']}天, 涨停数={sm['limit_up_count']}" ) except Exception: pass user_msg = f"""请对以下A股进行全面诊断分析: 股票: {ts_code} ({basic_info}) {quote_str} 技术面: {signal_str} 位置安全: {position_str} 趋势: {trend_str} {ma_info} 资金面: {flow_str} {rec_score_str} {sector_str} 重要提示: 1. 趋势评分是推荐体系的技术面核心分数(均线排列40+高低点结构35+MA20方向25=满分100),辅助信号计数仅供参考不参与主评分。 2. 位置安全评分高(>80)表示股价处于相对低位,低(<40)表示可能追高。 3. 如果有推荐体系评分,请作为主要分析依据;趋势评分和信号计数从不同维度描述技术面状态。 {freshness_note} 请从以下维度分析(Markdown格式,简洁专业): ## 综合评级 (给出1-5星评级和一句话总结,综合趋势评分、位置安全和供需形态) ## 技术面分析 (趋势方向、均线关系、支撑压力、量价配合,优先参考趋势评分而非信号计数) ## 资金面分析 (主力资金态度、板块联动效应) ## 操作建议 (适合什么类型的投资者、入场时机、风险提示)""" # ── SSE 流式返回 ── async def _stream_diagnosis(): full_content = "" try: client = get_client() stream = await client.chat.completions.create( model=settings.deepseek_model, messages=[ {"role": "system", "content": "你是一位专业的A股分析师,擅长技术面和资金面分析。回复使用Markdown格式,简洁专业,客观理性。"}, {"role": "user", "content": user_msg}, ], max_tokens=1500, temperature=0.5, stream=True, ) async for chunk in stream: if chunk.choices and chunk.choices[0].delta: token = chunk.choices[0].delta.content or "" if token: full_content += token yield f"data: {json.dumps({'token': token}, ensure_ascii=False)}\n\n" # 流式完成后,保存到数据库 full_content = full_content.strip() if full_content: try: async with get_db() as db: await db.execute( tables.stock_diagnoses_table.insert().values( ts_code=ts_code, name=stock_name or ts_code, diagnosis=full_content, ) ) await db.commit() logger.info(f"已保存诊断结果到数据库: {ts_code}") except Exception as e: logger.error(f"保存诊断结果到数据库失败: {e}") yield f"data: {json.dumps({'done': True, 'ts_code': ts_code}, ensure_ascii=False)}\n\n" except Exception as e: error_msg = str(e) logger.error(f"诊断流式调用失败: {error_msg}") yield f"data: {json.dumps({'error': error_msg}, ensure_ascii=False)}\n\n" yield f"data: {json.dumps({'done': True, 'ts_code': ts_code}, ensure_ascii=False)}\n\n" return StreamingResponse(_stream_diagnosis(), media_type="text/event-stream")