"""LLM 逐股深度分析 量化筛选完成后,对每只候选股票单独调用 LLM 做深度分析, 让 AI 独立判断入场时机并给出具体买卖价格。 """ import asyncio import logging import re from app.config import settings logger = logging.getLogger(__name__) async def analyze_single_stock(candidate: dict, market_summary: str) -> dict: """对单只股票做 LLM 深度分析 candidate: 包含 ts_code, name, sector, quant_score, position_score, kline_summary, capital_flow_summary market_summary: 市场环境摘要 返回: { "signal": "BUY"/"HOLD"/"SKIP", "strength": "强"/"中"/"弱", "entry_price": float or None, "target_price": float or None, "stop_loss": float or None, "analysis": str, } """ from app.llm.prompts import SINGLE_STOCK_ANALYSIS_PROMPT from app.llm.client import get_client # 构建 prompt — 不传 signal_type,让 LLM 独立判断 stock_text = f"""\ 股票: {candidate['name']}({candidate['ts_code']}) 板块: {candidate.get('sector', '未知')} 量化评分: {candidate.get('quant_score', 0)}/100 位置安全: {candidate.get('position_score', 50)}/100 当前价: {candidate.get('current_price', '未知')}""" if candidate.get("kline_summary"): stock_text += f"\n\n## 技术分析结论\n{candidate['kline_summary']}" if candidate.get("capital_flow_summary"): stock_text += f"\n\n## 资金流向\n{candidate['capital_flow_summary']}" user_msg = f"{SINGLE_STOCK_ANALYSIS_PROMPT}\n\n## 市场环境\n{market_summary}\n\n{stock_text}\n\n请给出你的分析。" try: client = get_client() response = await client.chat.completions.create( model=settings.deepseek_model, messages=[ { "role": "system", "content": ( "你是一位专业的A股趋势交易分析师,专注于中短线(1-5日)交易。" "你根据技术分析结论独立判断入场时机,给出具体的买卖价格建议。" "不要被量化评分束缚,给出你真实的判断。" ), }, {"role": "user", "content": user_msg}, ], max_tokens=800, temperature=0.3, ) content = response.choices[0].message.content.strip() return _parse_single_response(content) except Exception as e: logger.error(f"LLM 分析 {candidate.get('ts_code')} 失败: {e}") return { "signal": "HOLD", "strength": "弱", "entry_price": None, "target_price": None, "stop_loss": None, "analysis": "AI分析暂不可用", } def _parse_single_response(text: str) -> dict: """解析单只股票的 LLM 返回""" # 提取信号 signal = "HOLD" signal_match = re.search(r"信号[:\s]*(BUY|HOLD|SKIP)", text) if signal_match: signal = signal_match.group(1) # 提取信号强度 strength = "中" strength_match = re.search(r"信号强度[:\s]*(强|中|弱)", text) if strength_match: strength = strength_match.group(1) # 提取买入价 entry_price = None entry_match = re.search(r"买入价[:\s]*(\d+(?:\.\d+)?)", text) if entry_match: entry_price = float(entry_match.group(1)) # 提取止盈价 target_price = None target_match = re.search(r"止盈价[:\s]*(\d+(?:\.\d+)?)", text) if target_match: target_price = float(target_match.group(1)) # 提取止损价 stop_loss = None stop_match = re.search(r"止损价[:\s]*(\d+(?:\.\d+)?)", text) if stop_match: stop_loss = float(stop_match.group(1)) # 提取分析 analysis = "" analysis_match = re.search(r"分析[:\s]*(.+)", text, re.DOTALL) if analysis_match: analysis = analysis_match.group(1).strip() return { "signal": signal, "strength": strength, "entry_price": entry_price, "target_price": target_price, "stop_loss": stop_loss, "analysis": analysis or "暂无分析", } async def analyze_candidates_individually( candidates: list[dict], market_summary: str, max_concurrent: int = 3 ) -> dict[str, dict]: """对候选股票逐个做 LLM 分析(控制并发数) 返回: {ts_code: {"signal", "strength", "entry_price", ...}} """ if not settings.deepseek_api_key or not candidates: return {} results = {} semaphore = asyncio.Semaphore(max_concurrent) async def _analyze_with_semaphore(c: dict): async with semaphore: ts_code = c["ts_code"] logger.info(f"LLM 分析: {c.get('name', ts_code)}") result = await analyze_single_stock(c, market_summary) logger.info( f"LLM 结果: {c.get('name', ts_code)} → " f"信号={result['signal']} 强度={result['strength']} " f"买入={result.get('entry_price')} 止盈={result.get('target_price')} " f"止损={result.get('stop_loss')}" ) return ts_code, result tasks = [_analyze_with_semaphore(c) for c in candidates] completed = await asyncio.gather(*tasks, return_exceptions=True) for item in completed: if isinstance(item, Exception): logger.error(f"LLM 分析任务异常: {item}") continue ts_code, result = item results[ts_code] = result logger.info(f"LLM 逐股分析完成: {len(results)}/{len(candidates)} 只") return results