astock-agent/backend/app/llm/analysis_agent.py
2026-04-17 01:03:27 +08:00

253 lines
9.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""AI 深度分析
预先获取 K 线、资金流、技术信号等数据,一次性传入 LLM 生成结构化分析报告。
不依赖 tool calling避免 DeepSeek DSML 标签问题。
"""
import asyncio
import json
import logging
import re
import traceback
from app.llm.client import chat_completion
from app.llm.prompts import TREND_BREAKOUT_ANALYSIS_PROMPT
from app.config import settings
logger = logging.getLogger(__name__)
async def analyze_recommendations(result: dict) -> None:
"""对所有推荐股票执行 AI 深度分析"""
recommendations = result.get("recommendations", [])
if not recommendations or not settings.deepseek_api_key:
return
try:
await _do_analyze(result, recommendations)
except Exception as e:
logger.error(f"AI 分析任务异常: {e}")
from app.db.error_logger import log_error
await log_error("analysis_agent", f"AI 分析任务异常: {e}", detail=traceback.format_exc())
for rec in recommendations:
if not rec.llm_analysis:
rec.llm_analysis = "AI 分析暂时不可用"
await _broadcast_llm_ready(recommendations)
async def _do_analyze(result: dict, recommendations: list) -> None:
"""分析核心逻辑"""
market_temp = result.get("market_temp")
hot_sectors = result.get("hot_sectors", [])
# 构建板块文本
sectors_text = "\n".join(
f"- {s.sector_name}: 涨幅{s.pct_change}%, 资金流入{s.capital_inflow}万, "
f"涨停{s.limit_up_count}家, 热度{s.heat_score}分, 阶段={s.stage}"
for s in hot_sectors[:5]
) if hot_sectors else "暂无板块数据"
# 温度等级
temp_val = market_temp.temperature if market_temp else 0
if temp_val >= 60:
temp_level = "积极"
elif temp_val >= 30:
temp_level = "谨慎"
else:
temp_level = "低迷"
enhanced_count = 0
for rec in recommendations:
try:
# 预先获取该股票的详细数据
stock_data = await _fetch_stock_data(rec.ts_code, rec.sector)
strategy_label = "趋势突破"
signal_map = {"breakout": "突破型", "pullback": "回踩型", "launch": "启动型", "none": "无信号"}
entry_label = signal_map.get(rec.entry_signal_type, "无信号")
system_prompt = TREND_BREAKOUT_ANALYSIS_PROMPT
user_msg = _build_user_message(
rec=rec,
strategy_label=strategy_label,
entry_label=entry_label,
market_temp=market_temp,
temp_level=temp_level,
sectors_text=sectors_text,
stock_data=stock_data,
)
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_msg},
]
resp = await chat_completion(messages)
if resp and resp.content:
analysis = resp.content.strip()
rec.llm_analysis = analysis
rec.llm_score = _extract_score(analysis)
enhanced_count += 1
else:
rec.llm_analysis = "AI 分析暂时不可用"
except asyncio.CancelledError:
logger.warning(f"AI 分析 {rec.ts_code} 被取消")
break
except Exception as e:
logger.error(f"AI 分析 {rec.ts_code} 失败: {e}")
rec.llm_analysis = "AI 分析暂时不可用"
# 无论成功失败都保存并广播
await _save_llm_analysis_to_db(recommendations)
await _broadcast_llm_ready(recommendations)
logger.info(f"AI 深度分析完成: {enhanced_count}/{len(recommendations)}")
async def _fetch_stock_data(ts_code: str, sector: str) -> str:
"""预先获取个股详细数据,拼接为文本供 LLM 分析"""
from app.llm.tool_executor import (
_get_stock_kline,
_get_stock_capital_flow,
_get_stock_technical_signal,
_get_sector_performance,
)
parts = []
# K 线(最近 30 天摘要)
try:
kline_text = await _get_stock_kline(ts_code, 60)
kline_data = json.loads(kline_text)
if isinstance(kline_data, list) and kline_data:
# 只取最近 10 条以控制 token
recent = kline_data[-10:]
kline_summary = "\n".join(
f" {d.get('trade_date', '')}: 收{d.get('close', '')} "
f"涨跌{d.get('pct_chg', '')}% 量{d.get('vol', '')} "
f"MA5={d.get('ma5', '')} MA10={d.get('ma10', '')} MA20={d.get('ma20', '')} "
f"DIF={d.get('dif', '')} DEA={d.get('dea', '')} RSI={d.get('rsi14', '')}"
for d in recent
)
parts.append(f"## K线数据近10日\n{kline_summary}")
except Exception as e:
logger.debug(f"获取K线数据失败 {ts_code}: {e}")
# 资金流向
try:
flow_text = await _get_stock_capital_flow(ts_code, 5)
flow_data = json.loads(flow_text)
if isinstance(flow_data, list) and flow_data:
flow_summary = "\n".join(
f" {d.get('trade_date', '')}: 主力净流入{d.get('main_net_inflow', 0)}"
for d in flow_data[-5:]
)
parts.append(f"## 资金流向近5日\n{flow_summary}")
except Exception as e:
logger.debug(f"获取资金流向失败 {ts_code}: {e}")
# 技术信号
try:
signal_text = await _get_stock_technical_signal(ts_code)
parts.append(f"## 技术信号\n{signal_text}")
except Exception as e:
logger.debug(f"获取技术信号失败 {ts_code}: {e}")
# 板块表现
if sector:
try:
sector_text = await _get_sector_performance(sector)
parts.append(f"## 板块数据\n{sector_text}")
except Exception as e:
logger.debug(f"获取板块数据失败 {sector}: {e}")
return "\n\n".join(parts) if parts else "暂无额外数据"
def _build_user_message(
rec,
strategy_label: str,
entry_label: str,
market_temp,
temp_level: str,
sectors_text: str,
stock_data: str,
) -> str:
"""构建完整的用户消息(含预获取的数据)"""
return f"""## 量化系统数据
- 股票: {rec.name}({rec.ts_code})
- 所属板块: {rec.sector}
- 策略类型: {strategy_label}
- 入场信号: {entry_label}
- 综合评分: {rec.score}分({rec.level}
- 各维度: 市场{rec.market_temp_score} | 板块{rec.sector_score} | 资金{rec.capital_score} | 技术{rec.technical_score} | 位置{rec.position_score} | 估值{rec.valuation_score}
- 信号: {rec.signal}
- 参考价: 入场{rec.entry_price or 'N/A'} / 目标{rec.target_price or 'N/A'} / 止损{rec.stop_loss or 'N/A'}
- 量化理由: {"".join(rec.reasons) if rec.reasons else ""}
## 市场环境
- 市场温度: {market_temp.temperature if market_temp else 'N/A'}/100{temp_level}
- 涨跌比: {market_temp.up_count if market_temp else 0}涨 / {market_temp.down_count if market_temp else 0}
- 涨停: {market_temp.limit_up_count if market_temp else 0}
## 热门板块
{sectors_text}
## 个股详细数据
{stock_data}
请根据以上所有数据,按照指定格式输出深度分析报告。"""
def _extract_score(text: str) -> float | None:
"""从 AI 分析报告中提取评分1-10"""
match = re.search(r"###\s*AI\s*评分[^\d]*(\d+(?:\.\d+)?)", text)
if match:
score = float(match.group(1))
return min(max(score, 1), 10)
return None
async def _save_llm_analysis_to_db(recommendations: list) -> None:
"""将 AI 分析结果更新到数据库"""
try:
from app.db.database import get_db
from sqlalchemy import text
async with get_db() as db:
for rec in recommendations:
if not rec.llm_analysis:
continue
await db.execute(
text(
"UPDATE recommendations SET llm_analysis = :analysis, "
"llm_score = :score "
"WHERE ts_code = :code AND date(created_at) = date('now', 'localtime') "
"AND scan_session = :session"
),
{
"analysis": rec.llm_analysis,
"score": rec.llm_score,
"code": rec.ts_code,
"session": rec.scan_session,
},
)
await db.commit()
except Exception as e:
logger.error(f"保存 AI 分析到数据库失败: {e}")
from app.db.error_logger import log_error
await log_error("analysis_agent", f"保存 AI 分析到数据库失败: {e}", detail=traceback.format_exc())
async def _broadcast_llm_ready(recommendations: list) -> None:
"""通过 WebSocket 广播 AI 分析完成事件"""
try:
from app.api.websocket import broadcast_update
await broadcast_update({
"type": "llm_analysis_ready",
"count": len([r for r in recommendations if r.llm_analysis]),
})
except Exception as e:
logger.error(f"广播 AI 分析完成失败: {e}")