stock-ai-agent/backend/app/agent/smart_agent.py
2026-02-10 00:04:22 +08:00

3034 lines
110 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
智能Agent - 真正使用LLM进行全面分析
"""
import re
import json
import asyncio
from typing import Dict, Any, Optional, List
from app.config import get_settings
from app.agent.context import ContextManager
from app.agent.skill_manager import skill_manager
from app.agent.question_analyzer import QuestionAnalyzer
from app.agent.skill_planner import SkillPlanner
from app.skills.market_data import MarketDataSkill
from app.skills.technical_analysis import TechnicalAnalysisSkill
from app.skills.fundamental import FundamentalSkill
from app.skills.visualization import VisualizationSkill
from app.skills.advanced_data import AdvancedDataSkill
from app.skills.us_stock_skill import USStockSkill
from app.skills.brave_search import BraveSearchSkill
from app.services.llm_service import llm_service
from app.services.tushare_service import tushare_service
from app.utils.logger import logger
class SmartStockAgent:
"""智能股票分析Agent - 深度集成LLM"""
def __init__(self):
"""初始化Agent"""
self.context_manager = ContextManager()
self.settings = get_settings()
# 初始化智能组件
self.question_analyzer = QuestionAnalyzer()
self.skill_planner = SkillPlanner()
# 注册技能
self._register_skills()
# 获取配置的模型
self.model_override = getattr(self.settings, 'smart_agent_model', None)
# 检查LLM是否可用
self.use_llm = bool(self.settings.zhipuai_api_key) and llm_service.client is not None
if self.use_llm:
logger.info(f"Smart Agent初始化完成智能模式 + LLM深度集成 + Tushare Pro高级数据模型: {self.model_override or '默认'}")
else:
logger.warning("Smart Agent初始化完成规则模式建议配置LLM")
async def _call_llm_async(self, messages: List[Dict[str, str]], temperature: float = 0.7, max_tokens: int = 2000) -> Optional[str]:
"""异步调用LLM避免阻塞事件循环"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None,
lambda: llm_service.chat(messages, temperature, max_tokens, model_override=self.model_override)
)
def _register_skills(self):
"""注册所有技能"""
skill_manager.register(MarketDataSkill())
skill_manager.register(TechnicalAnalysisSkill())
skill_manager.register(FundamentalSkill())
skill_manager.register(VisualizationSkill())
skill_manager.register(AdvancedDataSkill())
skill_manager.register(USStockSkill())
skill_manager.register(BraveSearchSkill())
logger.info("技能注册完成Tushare Pro高级数据 + 美股支持 + Brave搜索")
async def process_message(
self,
message: str,
session_id: str,
user_id: Optional[str] = None
) -> Dict[str, Any]:
"""
处理用户消息(非流式,已废弃,保留用于兼容)
实际使用 process_message_stream 进行流式输出
"""
# 收集流式输出
full_response = ""
async for chunk in self.process_message_stream(message, session_id, user_id):
full_response += chunk
return {
"message": full_response,
"metadata": {"type": "text"}
}
def _is_comprehensive_analysis(self, message: str) -> bool:
"""
判断是否需要全面分析
默认情况下,如果用户只是简单提到股票名称或代码,就进行全面分析
只有明确要求特定信息时(如"技术指标""K线图"等),才做单一查询
"""
# 明确要求单一查询的关键词
single_query_keywords = [
"k线", "图表", "走势图", "kline",
"技术指标", "macd", "rsi", "均线", "kdj",
"基本面", "公司信息", "行业",
"实时行情", "价格", "涨跌"
]
# 如果明确要求单一查询返回False
if any(keyword in message.lower() for keyword in single_query_keywords):
return False
# 默认进行全面分析
return True
async def _comprehensive_analysis(
self,
stock_code: str,
stock_name: Optional[str],
message: str
) -> Dict[str, Any]:
"""
全面分析:使用 skill_planner 智能规划技能 + LLM深度分析
Args:
stock_code: 股票代码
stock_name: 股票名称
message: 用户消息
Returns:
综合分析结果
"""
logger.info(f"执行全面分析: {stock_code}")
display_name = stock_name or stock_code
try:
# 1. 使用 QuestionAnalyzer 分析问题意图
intent = await self.question_analyzer.analyze_question(
question=message,
context=[],
session_id=""
)
# 确保 intent 包含股票信息
if 'target' not in intent:
intent['target'] = {}
intent['target']['stock_code'] = stock_code
intent['target']['stock_name'] = stock_name
logger.info(f"问题意图分析: dimensions={intent.get('dimensions')}")
# 2. 使用 SkillPlanner 规划技能(包括 brave_search
plan = self.skill_planner.plan_skills(intent)
logger.info(f"技能规划完成: {[s['name'] for s in plan['skills']]}, 策略: {plan['execution_strategy']}")
# 3. 执行技能规划
execution_results = await skill_manager.execute_plan(
plan=plan,
stock_code=stock_code
)
if execution_results['errors']:
logger.warning(f"技能执行有错误: {execution_results['errors']}")
# 4. 整合数据(兼容旧格式)
results = execution_results['results']
all_data = {
"stock_code": stock_code,
"stock_name": display_name,
"quote": results.get("market_data"),
"technical": results.get("technical_analysis"),
"fundamental": results.get("fundamental"),
"advanced": results.get("advanced_data"),
"news": results.get("brave_search") # 新增:新闻数据
}
# 5. 使用LLM进行深度分析
if self.use_llm:
analysis = await self._llm_comprehensive_analysis(all_data, message)
else:
analysis = self._rule_based_analysis(all_data)
return {
"message": analysis,
"metadata": {
"type": "comprehensive",
"data": all_data,
"plan": plan
}
}
except Exception as e:
logger.error(f"全面分析失败: {e}")
import traceback
logger.error(traceback.format_exc())
return {
"message": f"分析{display_name}时出错:{str(e)}",
"metadata": {"type": "error"}
}
async def _llm_comprehensive_analysis(
self,
data: Dict[str, Any],
user_message: str
) -> str:
"""使用LLM进行深度综合分析"""
# 获取当前时间
from datetime import datetime
current_time = datetime.now().strftime("%Y-%m-%d %H:%M")
# 获取行情数据的交易日期
quote_date = "未知"
if data.get('quote') and data['quote'].get('trade_date'):
quote_date = data['quote']['trade_date']
# 构建高级数据摘要
advanced_summary = ""
if data.get('advanced'):
advanced_data = data['advanced']
advanced_summary = "\n【高级财务数据】Tushare Pro 5000+积分)\n"
# 财务指标
if advanced_data.get('financial'):
financial = advanced_data['financial']
if financial.get('indicators'):
indicators = financial['indicators'].get('indicators', {})
advanced_summary += f"财务指标(截止:{financial['indicators'].get('end_date', '未知')}\n"
advanced_summary += f" ROE: {indicators.get('roe', 'N/A')}%\n"
advanced_summary += f" ROA: {indicators.get('roa', 'N/A')}%\n"
advanced_summary += f" 毛利率: {indicators.get('gross_margin', 'N/A')}%\n"
advanced_summary += f" 资产负债率: {indicators.get('debt_to_assets', 'N/A')}%\n"
advanced_summary += f" 流动比率: {indicators.get('current_ratio', 'N/A')}\n\n"
# 估值数据
if advanced_data.get('valuation'):
valuation = advanced_data['valuation']
advanced_summary += f"估值指标:\n"
advanced_summary += f" PE(市盈率): {valuation.get('pe', 'N/A')}\n"
advanced_summary += f" PB(市净率): {valuation.get('pb', 'N/A')}\n"
advanced_summary += f" PS(市销率): {valuation.get('ps', 'N/A')}\n"
advanced_summary += f" 总市值: {valuation.get('total_mv', 'N/A')}万元\n"
advanced_summary += f" 流通市值: {valuation.get('circ_mv', 'N/A')}万元\n"
advanced_summary += f" 换手率: {valuation.get('turnover_rate', 'N/A')}%\n\n"
# 资金流向(最近一天)
if advanced_data.get('money_flow') and len(advanced_data['money_flow']) > 0:
latest_flow = advanced_data['money_flow'][0]
advanced_summary += f"资金流向({latest_flow.get('trade_date', '最近')}\n"
advanced_summary += f" 主力净流入: {latest_flow.get('net_mf_amount', 'N/A')}万元\n"
advanced_summary += f" 超大单净流入: {latest_flow.get('buy_elg_amount', 0) - latest_flow.get('sell_elg_amount', 0):.2f}万元\n"
advanced_summary += f" 大单净流入: {latest_flow.get('buy_lg_amount', 0) - latest_flow.get('sell_lg_amount', 0):.2f}万元\n\n"
# 融资融券
if advanced_data.get('margin') and len(advanced_data['margin']) > 0:
latest_margin = advanced_data['margin'][0]
advanced_summary += f"融资融券({latest_margin.get('trade_date', '最近')}\n"
advanced_summary += f" 融资余额: {latest_margin.get('rzye', 'N/A')}\n"
advanced_summary += f" 融券余额: {latest_margin.get('rqye', 'N/A')}\n\n"
# 重大公告
if advanced_data.get('announcements') and len(advanced_data['announcements']) > 0:
advanced_summary += "最近公告:\n"
for ann in advanced_data['announcements'][:3]:
advanced_summary += f" · {ann.get('title', '无标题')} ({ann.get('ann_date', '')})\n"
advanced_summary += "\n"
else:
advanced_summary = "\n【高级财务数据】\n暂无高级数据\n"
# 格式化新闻数据
news_section = ""
news_data = data.get('news', {})
if news_data and not news_data.get('error'):
results = news_data.get('results', [])
if results:
news_section = "\n【最新新闻和舆情】来源Brave Search\n"
for i, item in enumerate(results[:5], 1):
title = item.get('title', '无标题')
description = item.get('description', '')
source = item.get('source', '')
published = item.get('published', '')
news_section += f"{i}. {title}\n"
if description:
news_section += f" 摘要:{description[:100]}...\n"
if source:
news_section += f" 来源:{source}\n"
if published:
news_section += f" 发布时间:{published}\n"
news_section += "\n"
else:
news_section = "\n【最新新闻和舆情】\n暂无相关新闻\n"
else:
news_section = "\n【最新新闻和舆情】\n暂无相关新闻\n"
# 构建详细的分析提示
prompt = f"""你是一位专业的股票分析师。请对{data['stock_name']}({data['stock_code']})进行全面分析,用简洁专业但易懂的语言回答。
用户问题:{user_message}
【实时行情数据】
数据来源Tushare Pro API
交易日期:{quote_date}
{json.dumps(data.get('quote'), ensure_ascii=False, indent=2) if data.get('quote') else '数据获取失败'}
【技术指标数据】
数据来源Tushare Pro API基于历史K线数据计算
计算截止日期:{quote_date}
{json.dumps(data.get('technical'), ensure_ascii=False, indent=2) if data.get('technical') else '数据获取失败'}
【基本面数据】
数据来源Tushare Pro API
{json.dumps(data.get('fundamental'), ensure_ascii=False, indent=2) if data.get('fundamental') else '数据获取失败'}
{advanced_summary}
{news_section}
请按以下结构进行分析,并在每个部分明确标注数据来源和时效性:
## 一、基本面分析
分段说明公司情况,每个要点独立成段:
- 第一段:公司主营业务和行业地位
- 第二段财务健康度分析基于ROE、资产负债率、流动比率等财务指标
- 第三段估值水平分析PE、PB、PS是否合理
- 第四段:所属行业发展前景
- 第五段:如果有公告,简要分析对公司的影响
## 二、技术面分析(数据截止:{quote_date}
使用清晰的分段结构,每个技术指标独立成段:
**价格走势**
当前价格走势特征(上涨/下跌/震荡),结合成交量分析。
**均线系统**
短期均线MA5、MA10与长期均线MA20、MA60的位置关系判断当前趋势多头/空头/震荡)。
**MACD指标**
DIF和DEA的位置关系MACD柱状图变化判断动能强弱和买卖信号。
**RSI指标**
当前RSI值的位置是否超买>70或超卖<30短期走势预判。
**支撑与压力**
关键支撑位和压力位的具体价格区间。
## 三、市场情绪和新闻分析
分段分析市场情绪:
- 第一段:资金流向分析(主力资金、大单资金流入/流出情况)
- 第二段:融资融券情况(如有)
- 第三段:**基于最新新闻分析市场情绪和舆情,识别可能影响股价的重要事件**
- 第四段:当前市场情绪(乐观/谨慎/悲观)及原因
- 第五段:短期可能的催化因素
## 四、投资建议
基于技术面分析,给出具体的操作建议和点位:
**短期1-2周操作建议**
- 明确的操作建议:买入/持有/观望/减仓
- **具体点位建议**
- 如果建议买入:给出建议买入价格区间(基于支撑位)
- 如果建议卖出:给出建议卖出价格区间(基于压力位)
- 止损位:明确的止损价格点位
- 止盈位:明确的止盈价格点位
- 操作理由:基于技术指标的具体分析
**中期1-3个月策略**
- 趋势判断(上涨/下跌/震荡)
- 关键价格区间:
- 上方目标位:具体价格
- 下方支撑位:具体价格
- 策略建议
**长期(半年以上)**
投资价值评估和长期持有建议。
**风险提示**
- 主要风险点和注意事项
- 需要关注的关键价格位
## 五、总结
用一句话概括核心观点。
---
**数据说明**
- 行情数据来源Tushare Pro截止{quote_date}
- 技术指标基于历史K线数据计算截止{quote_date}
- 财务数据Tushare Pro 5000+积分接口(利润表、资产负债表、财务指标)
- 估值数据Tushare ProPE、PB、PS、市值等
- 资金流向Tushare Pro主力资金、大单资金
- 融资融券Tushare Pro如有
- 公告数据Tushare Pro重大公告
- 新闻舆情Brave Search最新市场资讯
写作要求:
1. 语言简洁专业,避免过度修饰和比喻
2. 专业术语后用括号简单解释,例如"RSI超买指标>70股价可能回调"
3. **重要:每个分析点必须独立成段,段落之间用空行分隔**
4. **技术面分析部分,每个指标必须使用加粗标题(**标题**)并独立成段**
5. 分析要客观理性,基于数据而非情绪
6. 充分利用提供的财务数据、估值数据、资金流向等高级数据进行分析
7. 结论要明确,不要模棱两可
8. 控制在800-1000字由于数据更丰富可以写得更详细
9. 最后必须声明:"以上分析仅供参考,不构成投资建议。股市有风险,投资需谨慎。"
"""
try:
analysis = await self._call_llm_async(
messages=[{"role": "user", "content": prompt}],
temperature=0.7,
max_tokens=3000 # 增加到3000因为分析更详细了
)
if analysis:
return f"{data['stock_name']}({data['stock_code']}) - AI深度分析】\n\n{analysis}"
else:
return self._rule_based_analysis(data)
except Exception as e:
logger.error(f"LLM分析失败: {e}")
return self._rule_based_analysis(data)
async def _llm_single_analysis(
self,
intent: Dict[str, Any],
result: Dict[str, Any],
stock_code: str,
stock_name: Optional[str],
user_message: str
) -> Dict[str, Any]:
"""使用LLM对单一查询进行分析"""
data = result.get("data", result)
display_name = stock_name or stock_code
# 根据查询类型构建不同的prompt
if intent["type"] == "technical":
prompt = f"""你是一位专业的股票分析师。用户询问了{display_name}({stock_code})的技术指标。
用户问题:{user_message}
【技术指标数据】
{json.dumps(data, ensure_ascii=False, indent=2)}
请进行专业的技术分析:
## 技术指标解读
1. 均线系统分析:
- 短期均线MA5、MA10与长期均线MA20、MA60的位置关系
- 判断当前趋势(多头/空头/震荡)
2. MACD指标分析
- DIF和DEA的位置关系
- MACD柱状图的变化趋势
- 判断动能强弱
3. RSI指标分析
- 当前RSI值的位置超买/超卖/中性)
- 短期可能的走势
4. KDJ指标分析如有
- K、D、J值的位置关系
- 金叉/死叉信号
## 综合判断
- 短期走势预判1-2周
- 关键支撑位和压力位
- 操作建议(买入/持有/观望/减仓)
## 风险提示
- 主要技术风险点
写作要求:
1. 语言简洁专业,直接给出分析结论
2. 基于数据进行分析,不要编造
3. 控制在300-400字
4. 最后声明:"以上分析仅供参考,不构成投资建议。股市有风险,投资需谨慎。"
"""
elif intent["type"] == "quote":
prompt = f"""你是一位专业的股票分析师。用户询问了{display_name}({stock_code})的实时行情。
用户问题:{user_message}
【实时行情数据】
{json.dumps(data, ensure_ascii=False, indent=2)}
请进行专业的行情分析:
## 行情解读
1. 当日表现:
- 涨跌幅分析
- 成交量分析
- 振幅分析
2. 价格位置:
- 当前价格相对开盘价、最高价、最低价的位置
- 判断多空力量对比
3. 短期判断:
- 当日走势特征
- 短期可能的走势
- 操作建议
写作要求:
1. 语言简洁专业,直接给出分析结论
2. 基于数据进行分析,不要编造
3. 控制在200-300字
4. 最后声明:"以上分析仅供参考,不构成投资建议。股市有风险,投资需谨慎。"
"""
elif intent["type"] == "fundamental":
prompt = f"""你是一位专业的股票分析师。用户询问了{display_name}({stock_code})的基本面信息。
用户问题:{user_message}
【基本面数据】
{json.dumps(data, ensure_ascii=False, indent=2)}
请进行专业的基本面分析:
## 公司概况
- 公司主营业务
- 所属行业和地域
- 上市时间和市场
## 行业分析
- 所属行业的发展前景
- 行业地位和竞争优势
## 投资价值
- 基本面评估
- 长期投资价值
- 关注要点
写作要求:
1. 语言简洁专业,直接给出分析结论
2. 基于数据进行分析,不要编造
3. 控制在200-300字
4. 最后声明:"以上分析仅供参考,不构成投资建议。股市有风险,投资需谨慎。"
"""
else:
# 其他类型,使用通用分析
prompt = f"""你是一位专业的股票分析师。用户询问了{display_name}({stock_code})的相关信息。
用户问题:{user_message}
【数据】
{json.dumps(data, ensure_ascii=False, indent=2)}
请基于提供的数据进行专业分析,给出有价值的见解和建议。
写作要求:
1. 语言简洁专业,直接给出分析结论
2. 基于数据进行分析,不要编造
3. 控制在200-300字
4. 最后声明:"以上分析仅供参考,不构成投资建议。股市有风险,投资需谨慎。"
"""
try:
analysis = await self._call_llm_async(
messages=[{"role": "user", "content": prompt}],
temperature=0.7,
max_tokens=1500
)
if analysis:
return {
"message": f"{display_name}({stock_code}) - AI分析】\n\n{analysis}",
"metadata": {"type": intent["type"], "data": data}
}
else:
# LLM失败使用原始格式化
return self._format_response(intent, result, stock_code, stock_name)
except Exception as e:
logger.error(f"LLM单一分析失败: {e}")
return self._format_response(intent, result, stock_code, stock_name)
async def _comprehensive_index_analysis(
self,
index_code: str,
index_name: str,
message: str
) -> Dict[str, Any]:
"""
指数全面分析使用Tushare获取指数数据 + LLM深度分析
Args:
index_code: 指数代码如000001.SH
index_name: 指数名称
message: 用户消息
Returns:
综合分析结果
"""
logger.info(f"执行指数全面分析: {index_code}")
try:
# 从tushare_advanced_service获取指数数据
from app.services.tushare_advanced_service import tushare_advanced_service
# 获取指数日线数据
index_data = tushare_advanced_service.get_index_daily(
ts_code=index_code,
start_date=None, # 默认180天
end_date=None
)
if not index_data or len(index_data) == 0:
return {
"message": f"抱歉,未能获取{index_name}的数据。",
"metadata": {"type": "error"}
}
# 获取最新数据
latest = index_data[-1]
# 计算技术指标(简单版)
closes = [d['close'] for d in index_data]
# 计算均线
ma5 = sum(closes[-5:]) / 5 if len(closes) >= 5 else None
ma10 = sum(closes[-10:]) / 10 if len(closes) >= 10 else None
ma20 = sum(closes[-20:]) / 20 if len(closes) >= 20 else None
ma60 = sum(closes[-60:]) / 60 if len(closes) >= 60 else None
# 整合数据
all_data = {
"index_code": index_code,
"index_name": index_name,
"latest": latest,
"ma": {
"ma5": ma5,
"ma10": ma10,
"ma20": ma20,
"ma60": ma60
},
"history_days": len(index_data)
}
# 使用LLM进行深度分析
if self.use_llm:
analysis = await self._llm_index_analysis(all_data, message)
else:
analysis = f"{index_name}】最新数据\n\n" \
f"日期:{latest['trade_date']}\n" \
f"收盘:{latest['close']:.2f}\n" \
f"涨跌幅:{latest['pct_chg']:+.2f}%\n" \
f"成交量:{latest['vol']:.0f}"
return {
"message": analysis,
"metadata": {
"type": "index_analysis",
"data": all_data
}
}
except Exception as e:
logger.error(f"指数分析失败: {e}")
import traceback
logger.error(traceback.format_exc())
return {
"message": f"分析{index_name}时出错:{str(e)}",
"metadata": {"type": "error"}
}
async def _llm_index_analysis(
self,
data: Dict[str, Any],
user_message: str
) -> str:
"""使用LLM进行指数深度分析"""
latest = data['latest']
ma = data['ma']
prompt = f"""你是一位专业的股票分析师。请对{data['index_name']}({data['index_code']})进行全面分析,用简洁专业但易懂的语言回答。
用户问题:{user_message}
【指数最新数据】
数据来源Tushare Pro API
交易日期:{latest['trade_date']}
收盘点位:{latest['close']:.2f}
开盘点位:{latest['open']:.2f}
最高点位:{latest['high']:.2f}
最低点位:{latest['low']:.2f}
涨跌幅:{latest['pct_chg']:+.2f}%
涨跌点数:{latest['change']:+.2f}
成交量:{latest['vol']:.0f}
成交额:{latest['amount']:.0f}千元
【技术指标】
MA5{f"{ma['ma5']:.2f}" if ma['ma5'] else '计算中'}
MA10{f"{ma['ma10']:.2f}" if ma['ma10'] else '计算中'}
MA20{f"{ma['ma20']:.2f}" if ma['ma20'] else '计算中'}
MA60{f"{ma['ma60']:.2f}" if ma['ma60'] else '计算中'}
当前点位:{latest['close']:.2f}
请按以下结构进行分析:
## 一、市场现状
- 当前点位分析(相对历史位置)
- 当日表现(涨跌幅、成交量)
## 二、技术面分析
**均线系统**
分析当前点位与各均线的关系,判断趋势(多头/空头/震荡)。
**支撑与压力**
基于近期走势,给出关键支撑位和压力位。
## 三、市场情绪
- 当前市场情绪(乐观/谨慎/悲观)
- 成交量分析
## 四、投资建议
**短期1-2周**
- 趋势判断
- 关键点位(支撑位、压力位)
- 操作建议
**中期1-3个月**
- 趋势展望
- 策略建议
**风险提示**
主要风险点和注意事项
## 五、总结
用一句话概括核心观点。
---
**数据说明**
- 数据来源Tushare Pro截止{latest['trade_date']}
- 分析周期:基于近{data['history_days']}个交易日数据
写作要求:
1. 语言简洁专业但易懂
2. 每个分析点独立成段
3. 控制在600-800字
4. 最后声明:"以上分析仅供参考,不构成投资建议。股市有风险,投资需谨慎。"
"""
try:
analysis = await self._call_llm_async(
messages=[{"role": "user", "content": prompt}],
temperature=0.7,
max_tokens=2500
)
if analysis:
return f"{data['index_name']}({data['index_code']}) - AI深度分析】\n\n{analysis}"
else:
return f"{data['index_name']}】分析生成失败"
except Exception as e:
logger.error(f"LLM指数分析失败: {e}")
return f"{data['index_name']}】分析生成失败"
def _rule_based_analysis(self, data: Dict[str, Any]) -> str:
"""基于规则的分析LLM不可用时的备选方案"""
parts = [f"{data['stock_name']}({data['stock_code']}) - 综合分析】\n"]
# 行情信息
if data.get('quote'):
quote = data['quote']
parts.append("## 一、实时行情")
parts.append(f"最新价:{quote.get('close', 0):.2f}")
parts.append(f"涨跌幅:{quote.get('pct_chg', 0):.2f}%")
parts.append(f"成交量:{quote.get('vol', 0):.0f}")
parts.append("")
# 技术分析
if data.get('technical'):
tech = data['technical'].get('indicators', {})
parts.append("## 二、技术指标")
if 'ma' in tech:
ma = tech['ma']
parts.append(f"均线系统MA5={ma.get('ma5')}, MA10={ma.get('ma10')}, MA20={ma.get('ma20')}")
if 'macd' in tech:
macd = tech['macd']
parts.append(f"MACDDIF={macd.get('dif')}, DEA={macd.get('dea')}")
if 'rsi' in tech:
rsi = tech['rsi']
rsi6 = rsi.get('rsi6', 50)
if rsi6 > 70:
parts.append(f"RSI{rsi6:.1f}(超买区域,注意回调风险)")
elif rsi6 < 30:
parts.append(f"RSI{rsi6:.1f}(超卖区域,可能存在反弹机会)")
else:
parts.append(f"RSI{rsi6:.1f}(中性区域)")
parts.append("")
# 基本面
if data.get('fundamental'):
fund = data['fundamental']
parts.append("## 三、基本信息")
parts.append(f"所属行业:{fund.get('industry', '未知')}")
parts.append(f"上市日期:{fund.get('list_date', '未知')}")
parts.append("")
# 简单建议
parts.append("## 四、参考建议")
parts.append("建议结合更多信息进行综合判断。")
parts.append("")
parts.append("⚠️ 以上分析仅供参考,不构成投资建议。股市有风险,投资需谨慎。")
return "\n".join(parts)
async def _single_query(
self,
stock_code: str,
stock_name: Optional[str],
message: str
) -> Dict[str, Any]:
"""单一查询处理 - 使用LLM进行分析"""
# 识别意图
intent = self._recognize_intent(message, stock_code)
# 执行技能
result = await skill_manager.execute_skill(
intent["skill"],
**intent["params"]
)
# 格式化响应
if not result.get("success", True):
return {
"message": f"查询失败:{result.get('error', '未知错误')}",
"metadata": {"type": "error"}
}
# 所有查询都使用LLM进行分析除了可视化
if intent["type"] != "visualization" and self.use_llm:
return await self._llm_single_analysis(intent, result, stock_code, stock_name, message)
else:
return self._format_response(intent, result, stock_code, stock_name)
def _recognize_intent(self, message: str, stock_code: str) -> Dict[str, Any]:
"""识别查询意图"""
message_lower = message.lower()
# K线图
if any(kw in message_lower for kw in ["k线", "图表", "走势图", "kline"]):
return {
"type": "visualization",
"skill": "visualization",
"params": {"stock_code": stock_code}
}
# 技术分析
if any(kw in message_lower for kw in ["技术", "指标", "macd", "rsi", "均线"]):
return {
"type": "technical",
"skill": "technical_analysis",
"params": {"stock_code": stock_code, "indicators": ["ma", "macd", "rsi"]}
}
# 基本面
if any(kw in message_lower for kw in ["基本面", "公司", "行业", "信息"]):
return {
"type": "fundamental",
"skill": "fundamental",
"params": {"stock_code": stock_code}
}
# 默认:实时行情
return {
"type": "quote",
"skill": "market_data",
"params": {"stock_code": stock_code, "data_type": "quote"}
}
def _format_response(
self,
intent: Dict[str, Any],
result: Dict[str, Any],
stock_code: str,
stock_name: Optional[str]
) -> Dict[str, Any]:
"""格式化响应"""
data = result.get("data", result)
display_name = stock_name or stock_code
if intent["type"] == "quote":
message = f"""{display_name}】实时行情
交易日期:{data.get('trade_date', '')}
最新价:{data.get('close', 0):.2f}
涨跌幅:{data.get('pct_chg', 0):+.2f}%
涨跌额:{data.get('change', 0):+.2f}
开盘价:{data.get('open', 0):.2f}
最高价:{data.get('high', 0):.2f}
最低价:{data.get('low', 0):.2f}
成交量:{data.get('vol', 0):.0f}
成交额:{data.get('amount', 0):.0f}千元"""
return {
"message": message,
"metadata": {"type": "quote", "data": data}
}
elif intent["type"] == "technical":
indicators = data.get("indicators", {})
parts = [f"{display_name}】技术指标\n"]
if "ma" in indicators:
ma = indicators["ma"]
parts.append(f"均线MA5={ma.get('ma5')}, MA10={ma.get('ma10')}, MA20={ma.get('ma20')}")
if "macd" in indicators:
macd = indicators["macd"]
parts.append(f"MACDDIF={macd.get('dif')}, DEA={macd.get('dea')}, MACD={macd.get('macd')}")
if "rsi" in indicators:
rsi = indicators["rsi"]
parts.append(f"RSIRSI6={rsi.get('rsi6')}, RSI12={rsi.get('rsi12')}")
return {
"message": "\n".join(parts),
"metadata": {"type": "technical", "data": data}
}
elif intent["type"] == "visualization":
return {
"message": f"已生成【{display_name}】的K线图",
"metadata": {"type": "chart", "data": data}
}
elif intent["type"] == "fundamental":
message = f"""{display_name}】基本信息
股票代码:{data.get('ts_code', '')}
所属地域:{data.get('area', '')}
所属行业:{data.get('industry', '')}
上市市场:{data.get('market', '')}
上市日期:{data.get('list_date', '')}"""
return {
"message": message,
"metadata": {"type": "fundamental", "data": data}
}
return {
"message": "查询完成",
"metadata": {"type": "data", "data": data}
}
async def _analyze_question_intent(self, message: str, session_id: str) -> Optional[Dict[str, Any]]:
"""
使用LLM分析问题意图支持上下文
Args:
message: 用户消息
session_id: 会话ID
Returns:
意图分析结果: {
'type': 'stock_specific' | 'macro_finance' | 'knowledge' | 'general_chat',
'description': '问题描述',
'keywords': ['关键词列表'],
'stock_names': ['股票名称'] (如果是stock_specific类型)
}
"""
if not self.use_llm:
logger.warning("LLM未配置无法分析意图")
return None
# 获取历史对话上下文
history = self.context_manager.get_context(session_id)
context_str = ""
if history:
context_str = "\n\n【对话历史】\n"
# 只取最近4条消息
for msg in history[-4:]:
role = "用户" if msg["role"] == "user" else "助手"
content = msg['content'][:100] # 限制长度
context_str += f"{role}: {content}\n"
prompt = f"""你是一个专业的金融智能助手。分析用户的问题,理解用户意图。
{context_str}
【当前问题】
用户: {message}
请分析这个问题属于以下哪一类:
1. **stock_specific** - 针对特定股票或指数的问题
例如:"贵州茅台怎么样""分析一下比亚迪""600519的技术指标""帮我看看这只股票"
**重要**:指数查询也属于此类,例如:"上证指数怎么样""分析大盘""A股指数走势""深证成指"
**美股支持**:美股查询也属于此类,例如:"苹果股票怎么样""AAPL分析""特斯拉走势""TSLA技术指标"
2. **macro_finance** - 宏观金融/市场问题(不针对特定股票或指数)
例如:"最近有什么投资机会""现在适合买股票吗""市场情绪如何"
3. **knowledge** - 金融知识问答
例如:"什么是MACD""如何看K线图""价值投资是什么""市盈率怎么算"
4. **general_chat** - 一般对话/问候/不明确的问题
例如:"你好""在吗""你能做什么""帮我"(没有具体说明)
**重要提示**
- 如果用户问题不明确,但可能与金融相关,优先归类为 general_chat以便引导用户
- 如果用户提到"这只股票"""等代词,查看对话历史判断是否指特定股票
- **如果用户提到"大盘""上证""深证""A股指数"等,归类为 stock_specific并在stock_names中填入对应的指数名称**
- **如果用户提到美股公司名称如苹果、特斯拉、微软或美股代码如AAPL、TSLA、MSFT归类为 stock_specific并在stock_names中填入对应的股票名称或代码**
- 对于模糊的问题,不要强行归类,使用 general_chat 类型
请以JSON格式返回分析结果
{{
"type": "问题类型",
"description": "问题的简要描述(用一句话概括用户想了解什么)",
"keywords": ["关键词1", "关键词2"],
"stock_names": ["股票名称或指数名称或美股代码"] (仅当type为stock_specific时如果有的话),
"market": "A股""美股" (仅当type为stock_specific时根据股票类型判断)
}}
只返回JSON不要有任何其他内容。"""
try:
result = await self._call_llm_async(
messages=[{"role": "user", "content": prompt}],
temperature=0.3,
max_tokens=300
)
if not result:
logger.warning("LLM返回空结果")
return None
# 清理结果移除可能的markdown代码块标记
result = result.strip()
if result.startswith("```json"):
result = result[7:]
if result.startswith("```"):
result = result[3:]
if result.endswith("```"):
result = result[:-3]
result = result.strip()
# 检查是否为空
if not result:
logger.warning("LLM返回内容为空")
return None
# 解析JSON
intent = json.loads(result)
logger.info(f"意图分析结果: {intent}")
return intent
except json.JSONDecodeError as e:
logger.error(f"意图分析JSON解析失败: {e}, 原始响应: {result[:200] if result else 'None'}")
return None
except Exception as e:
logger.error(f"意图分析失败: {e}")
return None
async def _handle_stock_question(
self,
intent_analysis: Dict[str, Any],
message: str
) -> Dict[str, Any]:
"""处理针对特定股票或指数的问题"""
stock_names = intent_analysis.get('stock_names', [])
market = intent_analysis.get('market', 'A股') # 默认A股
if not stock_names:
return {
"message": "抱歉,我没有识别到您提到的股票或指数。请提供更明确的股票代码、名称或指数名称。",
"metadata": {"type": "error"}
}
# 提取第一个股票或指数
stock_keyword = stock_names[0]
# 检测是否为美股
is_us_stock = self._is_us_stock(stock_keyword, market)
if is_us_stock:
# 处理美股
return await self._handle_us_stock(stock_keyword, message)
# 处理A股和指数 - 使用LLM进行智能匹配
stock_info = await self._match_stock_with_llm(stock_keyword)
if not stock_info:
return {
"message": f"抱歉,未找到股票或指数\"{stock_keyword}\"。请确认名称或代码是否正确。",
"metadata": {"type": "error"}
}
stock_code = stock_info['code']
stock_name = stock_info['name']
is_index = stock_info['is_index']
logger.info(f"处理{'指数' if is_index else '股票'}问题: {stock_name}({stock_code})")
# 判断是否需要全面分析
is_comprehensive = self._is_comprehensive_analysis(message)
if is_comprehensive:
if is_index:
return await self._comprehensive_index_analysis(stock_code, stock_name, message)
else:
return await self._comprehensive_analysis(stock_code, stock_name, message)
else:
return await self._single_query(stock_code, stock_name, message)
async def _match_stock_with_llm(self, keyword: str) -> Optional[Dict[str, Any]]:
"""
使用LLM智能匹配股票或指数
Args:
keyword: 用户输入的关键词
Returns:
匹配结果: {'code': '股票代码', 'name': '股票名称', 'is_index': bool}
"""
if not self.use_llm:
# 降级方案使用Tushare搜索
search_results = tushare_service.search_stock(keyword)
if search_results:
return {
'code': search_results[0]['symbol'],
'name': search_results[0]['name'],
'is_index': False
}
return None
prompt = f"""你是一个专业的A股市场专家。请根据用户输入的关键词识别对应的股票代码或指数代码。
用户输入:{keyword}
常见指数代码:
- 上证指数/大盘/沪指/A股 → 000001.SH
- 深证成指/深证/深指 → 399001.SZ
- 创业板指/创业板 → 399006.SZ
- 科创50 → 000688.SH
- 沪深300 → 000300.SH
- 中证500 → 000905.SH
如果是指数,请直接返回对应的指数代码。
如果是股票名称或代码请使用Tushare数据库进行搜索匹配。
请以JSON格式返回
{{
"is_index": true/false,
"code": "股票或指数代码如000001.SH",
"name": "股票或指数名称",
"confidence": 0.0-1.0
}}
如果无法匹配,返回:
{{
"is_index": false,
"code": null,
"name": null,
"confidence": 0.0
}}
只返回JSON不要有任何其他内容。"""
try:
result = await self._call_llm_async(
messages=[{"role": "user", "content": prompt}],
temperature=0.3,
max_tokens=200
)
if not result:
logger.warning("LLM匹配返回空结果")
return None
# 清理结果
result = result.strip()
if result.startswith("```json"):
result = result[7:]
if result.startswith("```"):
result = result[3:]
if result.endswith("```"):
result = result[:-3]
result = result.strip()
# 解析JSON
match_result = json.loads(result)
# 如果LLM无法匹配或置信度太低使用Tushare搜索
if not match_result.get('code') or match_result.get('confidence', 0) < 0.5:
logger.info(f"LLM匹配置信度低使用Tushare搜索: {keyword}")
search_results = tushare_service.search_stock(keyword)
if search_results:
return {
'code': search_results[0]['symbol'],
'name': search_results[0]['name'],
'is_index': False
}
return None
logger.info(f"LLM匹配成功: {keyword} -> {match_result['name']}({match_result['code']})")
return {
'code': match_result['code'],
'name': match_result['name'],
'is_index': match_result['is_index']
}
except json.JSONDecodeError as e:
logger.error(f"LLM匹配JSON解析失败: {e}, 原始响应: {result[:200] if result else 'None'}")
# 降级方案
search_results = tushare_service.search_stock(keyword)
if search_results:
return {
'code': search_results[0]['symbol'],
'name': search_results[0]['name'],
'is_index': False
}
return None
except Exception as e:
logger.error(f"LLM匹配失败: {e}")
# 降级方案
search_results = tushare_service.search_stock(keyword)
if search_results:
return {
'code': search_results[0]['symbol'],
'name': search_results[0]['name'],
'is_index': False
}
return None
async def _handle_macro_question(
self,
intent_analysis: Dict[str, Any],
message: str
) -> Dict[str, Any]:
"""处理宏观金融问题(智能模式)"""
description = intent_analysis.get('description', '')
keywords = intent_analysis.get('keywords', [])
logger.info(f"[智能模式] 处理宏观问题: {description}")
try:
# 使用智能、自然的prompt
prompt = f"""你是一位专业的金融分析师。用户询问了宏观金融问题。
用户问题:{message}
请用自然、专业的语言回答用户的问题。不要使用固定的格式或标题,而是像和朋友聊天一样,直接回答用户关心的内容。
要求:
- 直接回答用户的问题,不要添加"【宏观市场分析】"等标题
- 根据问题的具体内容调整回答的重点
- 语言自然、专业但易懂
- 如果用户问的是短期趋势,重点讲短期;如果问的是投资机会,重点讲机会
- 控制在300-500字
- 最后声明:"以上分析仅供参考,不构成投资建议。"
"""
analysis = await self._call_llm_async(
messages=[{"role": "user", "content": prompt}],
temperature=0.7,
max_tokens=1500
)
if analysis:
return {
"message": analysis,
"metadata": {"type": "macro_analysis"}
}
except Exception as e:
logger.error(f"宏观问题处理失败: {e}")
# 降级方案
return {
"message": f"抱歉,我暂时无法回答这个问题。您可以问我具体股票或指数的分析。",
"metadata": {"type": "error"}
}
async def _handle_knowledge_question(
self,
intent_analysis: Dict[str, Any],
message: str
) -> Dict[str, Any]:
"""处理金融知识问答(智能模式)"""
description = intent_analysis.get('description', '')
logger.info(f"[智能模式] 处理知识问答: {description}")
# 使用智能、自然的prompt
prompt = f"""你是一位专业的金融教育专家。用户询问了金融知识问题。
用户问题:{message}
请用自然、通俗的语言回答用户的问题。不要使用固定的格式(如"## 核心概念"),而是像老师给学生讲解一样,直接、清晰地解释。
要求:
- 直接回答问题,不要添加"【金融知识解答】"等标题
- 用通俗易懂的语言,避免过多专业术语
- 如果必须用专业术语,简单解释一下
- 可以举例子帮助理解
- 控制在200-400字
"""
try:
answer = await self._call_llm_async(
messages=[{"role": "user", "content": prompt}],
temperature=0.7,
max_tokens=1200
)
if answer:
return {
"message": answer,
"metadata": {"type": "knowledge"}
}
except Exception as e:
logger.error(f"知识问答处理失败: {e}")
return {
"message": "抱歉,暂时无法回答您的问题。请稍后再试。",
"metadata": {"type": "error"}
}
async def _handle_general_chat(
self,
intent_analysis: Dict[str, Any],
message: str
) -> Dict[str, Any]:
"""处理一般对话(智能模式)"""
description = intent_analysis.get('description', '')
logger.info(f"[智能模式] 处理一般对话: {description}")
# 使用智能、自然的prompt
prompt = f"""你是一位专业且友好的金融智能助手。用户发来了一条消息。
用户消息:{message}
请用自然、友好的语言回应用户。不要使用固定的格式,而是像真人对话一样。
要求:
- 如果是问候(如"你好"),友好回应并简单介绍你能做什么
- 如果问题不明确,礼貌地询问用户想了解什么
- 语言自然、友好,不要太正式
- 控制在100-200字
"""
try:
response = await self._call_llm_async(
messages=[{"role": "user", "content": prompt}],
temperature=0.8,
max_tokens=500
)
if response:
return {
"message": response,
"metadata": {"type": "chat"}
}
except Exception as e:
logger.error(f"一般对话处理失败: {e}")
# 降级方案
return {
"message": """您好!我是您的金融智能助手 👋
我可以帮您:
• 📊 分析具体股票(如"分析比亚迪"
• 📈 解读市场走势(如"现在大盘怎么样"
• 📚 解答金融知识(如"什么是市盈率"
请告诉我您想了解什么?""",
"metadata": {"type": "chat"}
}
def _is_us_stock(self, keyword: str, market: str) -> bool:
"""
判断是否为美股
Args:
keyword: 股票关键词
market: 市场类型从LLM意图分析中获取
Returns:
是否为美股
"""
# 如果LLM已经判断为美股
if market == "美股":
return True
# 检查是否为全大写字母(美股代码特征)
if keyword.isupper() and keyword.isalpha() and len(keyword) <= 5:
return True
return False
def _get_us_stock_symbol(self, keyword: str) -> str:
"""
获取美股代码(同步版本,用于兼容)
Args:
keyword: 股票关键词
Returns:
美股代码
"""
# 如果已经是代码格式,直接返回
if keyword.isupper() and keyword.isalpha():
return keyword
# 默认返回大写形式
return keyword.upper()
async def _handle_us_stock(self, keyword: str, message: str) -> Dict[str, Any]:
"""
处理美股查询(兼容旧接口,内部调用 _handle_us_stock_with_code
Args:
keyword: 股票关键词(可能是代码或名称)
message: 用户消息
Returns:
分析结果
"""
# 如果是大写字母,直接作为代码使用
if keyword.isupper() and keyword.isalpha() and len(keyword) <= 5:
return await self._handle_us_stock_with_code(keyword, keyword, message)
# 否则需要通过 QuestionAnalyzer 重新分析获取代码
# 这种情况理论上不应该发生,因为 QuestionAnalyzer 应该已经返回了代码
logger.warning(f"_handle_us_stock 收到非代码格式的关键词: {keyword}")
return {
"message": f"抱歉,无法识别美股 \"{keyword}\"。请直接输入美股代码(如 BABA、AAPL、TSLA进行查询。",
"metadata": {"type": "error"}
}
async def _handle_us_stock_with_code(self, symbol: str, stock_name: str, message: str) -> Dict[str, Any]:
"""
处理美股查询(使用已知的股票代码)
Args:
symbol: 美股代码(如 BABA、AAPL
stock_name: 股票名称
message: 用户消息
Returns:
分析结果
"""
logger.info(f"处理美股查询: {stock_name} -> {symbol}")
try:
# 1. 使用 QuestionAnalyzer 分析问题意图
intent = await self.question_analyzer.analyze_question(
question=message,
context=[],
session_id=""
)
# 确保 intent 包含股票信息
if 'target' not in intent:
intent['target'] = {}
intent['target']['stock_code'] = symbol
intent['target']['stock_name'] = stock_name
intent['target']['market'] = '美股'
logger.info(f"美股问题意图分析: dimensions={intent.get('dimensions')}")
# 2. 使用 SkillPlanner 规划技能(会自动识别美股并使用正确的技能)
plan = self.skill_planner.plan_skills(intent)
logger.info(f"美股技能规划完成: {[s['name'] for s in plan['skills']]}, 策略: {plan['execution_strategy']}")
# 3. 执行技能规划
execution_results = await skill_manager.execute_plan(
plan=plan,
stock_code=symbol
)
if execution_results['errors']:
logger.warning(f"美股技能执行有错误: {execution_results['errors']}")
# 5. 检查 us_stock_analysis 是否成功
us_stock_data = execution_results['results'].get('us_stock_analysis')
if not us_stock_data or 'error' in us_stock_data:
return {
"message": f"抱歉,未找到美股 {symbol}。请确认股票代码是否正确。\n\n提示:美股代码通常为大写字母,如 AAPL苹果、TSLA特斯拉、MSFT微软等。",
"metadata": {"type": "error"}
}
# 6. 整合数据
all_data = {
"symbol": symbol,
"name": stock_name,
**us_stock_data,
"news": execution_results['results'].get("brave_search") # 新增:新闻数据
}
# 7. 使用LLM分析美股数据
if self.use_llm:
analysis = await self._llm_us_stock_analysis(all_data, message)
else:
analysis = self._format_us_stock_data(all_data)
return {
"message": analysis,
"metadata": {
"type": "us_stock_analysis",
"data": all_data,
"plan": plan
}
}
except Exception as e:
logger.error(f"美股查询失败: {e}")
return {
"message": f"查询美股 {symbol} 时出错:{str(e)}",
"metadata": {"type": "error"}
}
async def _handle_hk_stock_with_code(self, symbol: str, stock_name: str, message: str) -> Dict[str, Any]:
"""
处理港股查询(使用已知的股票代码)
Args:
symbol: 港股代码(如 0700.HK、9988.HK
stock_name: 股票名称
message: 用户消息
Returns:
分析结果
"""
logger.info(f"处理港股查询: {stock_name} -> {symbol}")
try:
# 1. 使用 QuestionAnalyzer 分析问题意图
intent = await self.question_analyzer.analyze_question(
question=message,
context=[],
session_id=""
)
# 确保 intent 包含股票信息
if 'target' not in intent:
intent['target'] = {}
intent['target']['stock_code'] = symbol
intent['target']['stock_name'] = stock_name
intent['target']['market'] = '港股'
logger.info(f"港股问题意图分析: dimensions={intent.get('dimensions')}")
# 2. 使用 SkillPlanner 规划技能(会自动识别港股并使用正确的技能)
plan = self.skill_planner.plan_skills(intent)
logger.info(f"港股技能规划完成: {[s['name'] for s in plan['skills']]}, 策略: {plan['execution_strategy']}")
# 3. 执行技能规划
execution_results = await skill_manager.execute_plan(
plan=plan,
stock_code=symbol
)
if execution_results['errors']:
logger.warning(f"港股技能执行有错误: {execution_results['errors']}")
# 5. 检查 us_stock_analysis 是否成功
hk_stock_data = execution_results['results'].get('us_stock_analysis')
if not hk_stock_data or 'error' in hk_stock_data:
return {
"message": f"抱歉,未找到港股 {symbol}。请确认股票代码是否正确。\n\n提示:港股代码格式为数字加.HK后缀如 0700.HK腾讯、9988.HK阿里巴巴等。",
"metadata": {"type": "error"}
}
# 6. 整合数据
all_data = {
"symbol": symbol,
"name": stock_name,
"market": "港股",
**hk_stock_data,
"news": execution_results['results'].get("brave_search")
}
# 7. 使用LLM分析港股数据
if self.use_llm:
analysis = await self._llm_hk_stock_analysis(all_data, message)
else:
analysis = self._format_us_stock_data(all_data)
return {
"message": analysis,
"metadata": {
"type": "hk_stock_analysis",
"data": all_data,
"plan": plan
}
}
except Exception as e:
logger.error(f"港股查询失败: {e}")
return {
"message": f"查询港股 {symbol} 时出错:{str(e)}",
"metadata": {"type": "error"}
}
async def _llm_hk_stock_analysis(self, data: Dict[str, Any], user_message: str) -> str:
"""使用LLM分析港股数据"""
from datetime import datetime
current_time = datetime.now().strftime("%Y-%m-%d %H:%M")
# 提取关键数据(使用 us_stock_service 返回的扁平结构)
symbol = data.get('symbol', '')
name = data.get('name', symbol)
technical = data.get('technical_indicators', {})
news = data.get('news', [])
# 构建数据摘要
data_summary = f"""
【港股数据】{name}{symbol}
查询时间:{current_time}
【基本信息】
- 公司名称:{name}
- 行业:{data.get('industry', '未知')}
- 板块:{data.get('sector', '未知')}
- 市值:{data.get('market_cap', '未知')}
【最新行情】
- 当前价格:{data.get('current_price', '未知')}
- 今日涨跌:{data.get('change_percent', '未知')}%
- 52周最高{data.get('52_week_high', '未知')}
- 52周最低{data.get('52_week_low', '未知')}
【估值指标】
- 市盈率(PE){data.get('pe_ratio', '未知')}
- 市净率(PB){data.get('pb_ratio', '未知')}
- 股息率:{data.get('dividend_yield', '未知')}
【技术指标】
- MA5{technical.get('ma5', '未知')}
- MA20{technical.get('ma20', '未知')}
- RSI{technical.get('rsi', '未知')}
- MACD{technical.get('macd', '未知')}
"""
# 添加新闻摘要
if news:
data_summary += "\n【相关新闻】\n"
for i, item in enumerate(news[:3], 1):
if isinstance(item, dict):
title = item.get('title', '')
data_summary += f"{i}. {title}\n"
prompt = f"""你是一个专业的港股分析师。请根据以下数据,回答用户的问题。
{data_summary}
用户问题:{user_message}
请提供专业、客观的分析,包括:
1. 直接回答用户的问题
2. 基于数据的分析和判断
3. 潜在的风险提示
注意:港股以港币计价,交易时间为港交所交易时段。"""
try:
result = await self._call_llm_async(
messages=[{"role": "user", "content": prompt}],
temperature=0.7,
max_tokens=2000
)
return result or self._format_us_stock_data(data)
except Exception as e:
logger.error(f"LLM港股分析失败: {e}")
return self._format_us_stock_data(data)
async def _llm_us_stock_analysis(self, data: Dict[str, Any], user_message: str) -> str:
"""使用LLM分析美股数据"""
from datetime import datetime
current_time = datetime.now().strftime("%Y-%m-%d %H:%M")
# 提取关键数据
symbol = data.get("symbol", "")
name = data.get("name", "")
sector = data.get("sector", "")
industry = data.get("industry", "")
current_price = data.get("current_price", 0)
change = data.get("change", 0)
change_pct = data.get("change_percent", 0)
volume = data.get("volume", 0)
market_cap = data.get("market_cap", 0)
pe_ratio = data.get("pe_ratio", 0)
pb_ratio = data.get("pb_ratio", 0)
dividend_yield = data.get("dividend_yield", 0)
week_52_high = data.get("52_week_high", 0)
week_52_low = data.get("52_week_low", 0)
technical = data.get("technical_indicators", {})
description = data.get("description", "")
news_data = data.get("news", {}) # 新增:获取新闻数据
# 格式化市值
market_cap_str = f"${market_cap / 1e9:.2f}B" if market_cap > 1e9 else f"${market_cap / 1e6:.2f}M"
# 格式化新闻数据
news_section = ""
if news_data and not news_data.get('error'):
results = news_data.get('results', [])
if results:
news_section = "\n【最新新闻和舆情】来源Brave Search\n"
for i, item in enumerate(results[:5], 1):
title = item.get('title', '无标题')
description = item.get('description', '')
source = item.get('source', '')
published = item.get('published', '')
news_section += f"{i}. {title}\n"
if description:
news_section += f" 摘要:{description[:100]}...\n"
if source:
news_section += f" 来源:{source}\n"
if published:
news_section += f" 发布时间:{published}\n"
news_section += "\n"
else:
news_section = "\n【最新新闻和舆情】\n暂无相关新闻\n"
else:
news_section = "\n【最新新闻和舆情】\n暂无相关新闻\n"
# 构建分析提示
prompt = f"""你是一位专业的美股分析师。请基于以下数据对 {name} ({symbol}) 进行全面分析。
**重要提示:当前日期是 {current_time},请在分析中使用这个日期,不要使用其他日期。**
【基本信息】
股票代码:{symbol}
公司名称:{name}
所属行业:{sector} - {industry}
公司简介:{description[:300] if description else '暂无'}
【实时行情】(数据时间:{current_time}
当前价格:${current_price:.2f}
涨跌额:${change:.2f}
涨跌幅:{change_pct:.2f}%
成交量:{volume:,}
市值:{market_cap_str}
【估值指标】
市盈率(PE){f"{pe_ratio:.2f}" if pe_ratio else '暂无'}
市净率(PB){f"{pb_ratio:.2f}" if pb_ratio else '暂无'}
股息率:{f"{dividend_yield * 100:.2f}%" if dividend_yield else '暂无'}
52周最高${week_52_high:.2f}
52周最低${week_52_low:.2f}
【技术指标】
MA5{f"${technical.get('ma5'):.2f}" if technical.get('ma5') else '计算中'}
MA10{f"${technical.get('ma10'):.2f}" if technical.get('ma10') else '计算中'}
MA20{f"${technical.get('ma20'):.2f}" if technical.get('ma20') else '计算中'}
MA60{f"${technical.get('ma60'):.2f}" if technical.get('ma60') else '计算中'}
RSI{f"{technical.get('rsi'):.2f}" if technical.get('rsi') else '计算中'}
MACD{f"{technical.get('macd'):.4f}" if technical.get('macd') else '计算中'}
{news_section}
用户问题:{user_message}
请提供专业的分析报告,包括:
## 📊 行情概览
简要总结当前股价表现和市场表现2-3句话
## 💼 公司基本面
- 行业地位和竞争优势
- 估值水平分析PE、PB是否合理
- 盈利能力和成长性
## 📈 技术面分析
- 当前趋势判断(基于均线系统)
- 关键支撑位和压力位
- RSI和MACD信号解读
## 📰 市场情绪和新闻分析
- 基于最新新闻分析市场情绪和舆情
- 识别可能影响股价的重要事件或消息
- 评估新闻对短期和中期走势的影响
## 💡 投资建议
- 短期操作建议1-2周
- 中期投资价值1-3个月
- 风险提示
写作要求:
1. 语言专业但易懂,避免过度修饰
2. 分析客观理性,基于数据和事实
3. 每个部分独立成段,段落间用空行分隔
4. **充分利用新闻数据进行市场情绪分析**
5. 控制在600-800字
6. **不要在报告中添加日期标题,直接开始分析内容**
7. 最后声明:"以上分析仅供参考,不构成投资建议。美股投资有风险,请谨慎决策。"
"""
try:
analysis = await self._call_llm_async(
messages=[{"role": "user", "content": prompt}],
temperature=0.7,
max_tokens=2500 # 增加 token 数量以容纳新闻分析
)
if analysis:
return f"【美股分析】{name} ({symbol})\n\n{analysis}"
else:
return self._format_us_stock_data(data)
except Exception as e:
logger.error(f"LLM美股分析失败: {e}")
return self._format_us_stock_data(data)
def _format_us_stock_data(self, data: Dict[str, Any]) -> str:
"""格式化美股数据(降级方案)"""
symbol = data.get("symbol", "")
name = data.get("name", "")
current_price = data.get("current_price", 0)
change = data.get("change", 0)
change_pct = data.get("change_percent", 0)
market_cap = data.get("market_cap", 0)
pe_ratio = data.get("pe_ratio", 0)
technical = data.get("technical_indicators", {})
market_cap_str = f"${market_cap / 1e9:.2f}B" if market_cap > 1e9 else f"${market_cap / 1e6:.2f}M"
change_emoji = "📈" if change >= 0 else "📉"
return f"""【美股行情】{name} ({symbol})
{change_emoji} 当前价格:${current_price:.2f}
涨跌:${change:.2f} ({change_pct:+.2f}%)
市值:{market_cap_str}
市盈率:{pe_ratio:.2f if pe_ratio else '暂无'}
【技术指标】
MA5${technical.get('ma5', 0):.2f if technical.get('ma5') else '计算中'}
MA20${technical.get('ma20', 0):.2f if technical.get('ma20') else '计算中'}
RSI{technical.get('rsi', 0):.2f if technical.get('rsi') else '计算中'}
以上数据仅供参考,不构成投资建议。"""
async def process_message_stream(
self,
message: str,
session_id: str,
user_id: Optional[str] = None
):
"""
流式处理用户消息(智能模式)
Args:
message: 用户消息
session_id: 会话ID
user_id: 用户ID
Yields:
响应文本片段
"""
logger.info(f"[智能模式-流式] 处理消息: {message[:50]}...")
# 转换 user_id 为整数(如果是字符串)
user_id_int = int(user_id) if user_id else None
# 1. 保存用户消息
self.context_manager.add_message(session_id, "user", message, user_id=user_id_int)
# 2. 提取上下文信息
context_info = self.context_manager.extract_context_info(session_id)
logger.info(f"[智能模式-流式] 上下文信息: last_stock={context_info.get('last_stock')}")
# 3. 深度问题分析
intent = await self.question_analyzer.analyze_question(
question=message,
context=self.context_manager.get_context(session_id),
session_id=session_id
)
logger.info(f"[智能模式-流式] 问题分析: type={intent.get('type')}, dimensions={intent.get('dimensions')}")
# 4. 处理上下文引用(代词解析)
if intent.get('context_references', {}).get('refers_to_previous'):
intent = self._resolve_context_references(intent, context_info)
logger.info(f"[智能模式-流式] 上下文解析后: target={intent.get('target')}")
# 5. 根据问题类型分发(流式)
full_response = ""
if intent['type'] == 'stock_analysis':
async for chunk in self._handle_stock_analysis_stream(intent, message):
full_response += chunk
yield chunk
elif intent['type'] == 'market_overview':
response = await self._handle_macro_question(intent, message)
full_response = response["message"]
for char in full_response:
yield char
elif intent['type'] == 'knowledge':
response = await self._handle_knowledge_question(intent, message)
full_response = response["message"]
for char in full_response:
yield char
else:
response = await self._handle_general_chat(intent, message)
full_response = response["message"]
for char in full_response:
yield char
# 6. 保存助手响应
self.context_manager.add_message(session_id, "assistant", full_response, user_id=user_id_int)
async def _handle_other_question(
self,
question_type: str,
intent_analysis: Dict[str, Any],
message: str
) -> Dict[str, Any]:
"""处理非股票分析的其他问题"""
if question_type == 'macro_finance':
return await self._handle_macro_question(intent_analysis, message)
elif question_type == 'knowledge':
return await self._handle_knowledge_question(intent_analysis, message)
elif question_type == 'general_chat':
return await self._handle_general_chat(intent_analysis, message)
else:
return {"message": "抱歉,我无法理解您的问题。"}
async def _handle_stock_question_stream(
self,
intent_analysis: Dict[str, Any],
message: str
):
"""流式处理股票问题"""
stock_names = intent_analysis.get('stock_names', [])
market = intent_analysis.get('market', 'A股')
if not stock_names:
yield "抱歉,我没有识别到您提到的股票或指数。请提供更明确的股票代码、名称或指数名称。"
return
stock_keyword = stock_names[0]
is_us_stock = self._is_us_stock(stock_keyword, market)
if is_us_stock:
# 美股分析 - 流式
async for chunk in self._handle_us_stock_stream(stock_keyword, message):
yield chunk
else:
# A股分析 - 流式
async for chunk in self._handle_a_stock_stream(stock_keyword, message):
yield chunk
async def _handle_a_stock_stream(self, stock_keyword: str, message: str):
"""流式处理A股分析使用 skill_planner"""
# 使用LLM进行智能匹配
stock_info = await self._match_stock_with_llm(stock_keyword)
if not stock_info:
yield f"抱歉,未找到股票或指数\"{stock_keyword}\"。请确认名称或代码是否正确。"
return
stock_code = stock_info['code']
stock_name = stock_info['name']
is_index = stock_info['is_index']
logger.info(f"[流式] A股分析: {stock_name}({stock_code})")
try:
# 1. 使用 QuestionAnalyzer 分析问题意图
intent = await self.question_analyzer.analyze_question(
question=message,
context=[],
session_id=""
)
# 确保 intent 包含股票信息
if 'target' not in intent:
intent['target'] = {}
intent['target']['stock_code'] = stock_code
intent['target']['stock_name'] = stock_name
logger.info(f"[流式] A股问题意图分析: dimensions={intent.get('dimensions')}")
# 2. 使用 SkillPlanner 规划技能(包括 brave_search
plan = self.skill_planner.plan_skills(intent)
logger.info(f"[流式] A股技能规划完成: {[s['name'] for s in plan['skills']]}, 策略: {plan['execution_strategy']}")
# 3. 执行技能规划
execution_results = await skill_manager.execute_plan(
plan=plan,
stock_code=stock_code
)
if execution_results['errors']:
logger.warning(f"[流式] A股技能执行有错误: {execution_results['errors']}")
# 4. 整合数据(兼容旧格式)
results = execution_results['results']
all_data = {
"stock_code": stock_code,
"stock_name": stock_name,
"quote": results.get("market_data"),
"technical": results.get("technical_analysis"),
"fundamental": results.get("fundamental"),
"advanced": results.get("advanced_data"),
"news": results.get("brave_search") # 新增:新闻数据
}
# 5. 使用LLM流式分析
if self.use_llm:
async for chunk in self._llm_comprehensive_analysis_stream(all_data, message, is_index):
yield chunk
else:
yield self._rule_based_analysis(all_data)
except Exception as e:
logger.error(f"A股分析失败: {e}")
import traceback
logger.error(traceback.format_exc())
yield f"分析{stock_name}时出错:{str(e)}"
async def _handle_us_stock_stream(self, keyword: str, message: str):
"""流式处理美股分析(兼容旧接口)"""
# 如果是大写字母,直接作为代码使用
if keyword.isupper() and keyword.isalpha() and len(keyword) <= 5:
async for chunk in self._handle_us_stock_stream_with_code(keyword, keyword, message):
yield chunk
return
# 否则报错
logger.warning(f"_handle_us_stock_stream 收到非代码格式的关键词: {keyword}")
yield f"抱歉,无法识别美股 \"{keyword}\"。请直接输入美股代码(如 BABA、AAPL、TSLA进行查询。"
async def _handle_us_stock_stream_with_code(self, symbol: str, stock_name: str, message: str):
"""流式处理美股分析(使用已知的股票代码)"""
logger.info(f"[智能模式-流式] 美股查询: {stock_name} -> {symbol}")
try:
# 1. 使用 QuestionAnalyzer 分析问题意图
intent = await self.question_analyzer.analyze_question(
question=message,
context=[],
session_id=""
)
# 确保 intent 包含股票信息
if 'target' not in intent:
intent['target'] = {}
intent['target']['stock_code'] = symbol
intent['target']['stock_name'] = stock_name
intent['target']['market'] = '美股'
logger.info(f"[流式] 美股问题意图分析: dimensions={intent.get('dimensions')}")
# 2. 使用 SkillPlanner 规划技能(会自动识别美股并使用正确的技能)
plan = self.skill_planner.plan_skills(intent)
logger.info(f"[流式] 美股技能规划完成: {[s['name'] for s in plan['skills']]}, 策略: {plan['execution_strategy']}")
# 3. 执行技能规划
execution_results = await skill_manager.execute_plan(
plan=plan,
stock_code=symbol
)
if execution_results['errors']:
logger.warning(f"[流式] 美股技能执行有错误: {execution_results['errors']}")
# 5. 检查 us_stock_analysis 是否成功
us_stock_data = execution_results['results'].get('us_stock_analysis')
if not us_stock_data or 'error' in us_stock_data:
yield f"抱歉,未找到美股 {symbol}。请确认股票代码是否正确。"
return
# 6. 整合数据
all_data = {
"symbol": symbol,
"name": stock_name,
**us_stock_data,
"news": execution_results['results'].get("brave_search") # 新增:新闻数据
}
# 7. 使用智能模式的动态prompt生成
if self.use_llm:
# 构建美股数据的动态prompt
prompt = self._build_us_stock_dynamic_prompt(all_data, symbol, message)
# 流式生成
stream = llm_service.chat_stream(
messages=[{"role": "user", "content": prompt}],
temperature=0.7,
max_tokens=2500, # 增加 token 数量以容纳新闻分析
model_override=self.model_override
)
for chunk in stream:
yield chunk
else:
yield self._format_us_stock_data(all_data)
except Exception as e:
logger.error(f"美股查询失败: {e}")
import traceback
logger.error(traceback.format_exc())
yield f"查询美股 {symbol} 时出错:{str(e)}"
async def _handle_hk_stock_stream_with_code(self, symbol: str, stock_name: str, message: str):
"""流式处理港股分析(使用已知的股票代码)"""
logger.info(f"[智能模式-流式] 港股查询: {stock_name} -> {symbol}")
try:
# 1. 使用 QuestionAnalyzer 分析问题意图
intent = await self.question_analyzer.analyze_question(
question=message,
context=[],
session_id=""
)
# 确保 intent 包含股票信息
if 'target' not in intent:
intent['target'] = {}
intent['target']['stock_code'] = symbol
intent['target']['stock_name'] = stock_name
intent['target']['market'] = '港股'
logger.info(f"[流式] 港股问题意图分析: dimensions={intent.get('dimensions')}")
# 2. 使用 SkillPlanner 规划技能(会自动识别港股并使用正确的技能)
plan = self.skill_planner.plan_skills(intent)
logger.info(f"[流式] 港股技能规划完成: {[s['name'] for s in plan['skills']]}, 策略: {plan['execution_strategy']}")
logger.info(f"[流式] 港股技能规划完成: {[s['name'] for s in plan['skills']]}, 策略: {plan['execution_strategy']}")
# 4. 执行技能规划
execution_results = await skill_manager.execute_plan(
plan=plan,
stock_code=symbol
)
if execution_results['errors']:
logger.warning(f"[流式] 港股技能执行有错误: {execution_results['errors']}")
# 5. 检查 us_stock_analysis 是否成功
hk_stock_data = execution_results['results'].get('us_stock_analysis')
if not hk_stock_data or 'error' in hk_stock_data:
yield f"抱歉,未找到港股 {symbol}。请确认股票代码是否正确。\n\n提示:港股代码格式为数字加.HK后缀如 0700.HK腾讯、9988.HK阿里巴巴等。"
return
# 6. 整合数据
all_data = {
"symbol": symbol,
"name": stock_name,
"market": "港股",
**hk_stock_data,
"news": execution_results['results'].get("brave_search")
}
# 7. 使用智能模式的动态prompt生成
if self.use_llm:
# 构建港股数据的动态prompt
prompt = self._build_hk_stock_dynamic_prompt(all_data, symbol, message)
# 流式生成
stream = llm_service.chat_stream(
messages=[{"role": "user", "content": prompt}],
temperature=0.7,
max_tokens=2500,
model_override=self.model_override
)
for chunk in stream:
yield chunk
else:
yield self._format_us_stock_data(all_data)
except Exception as e:
logger.error(f"港股查询失败: {e}")
import traceback
logger.error(traceback.format_exc())
yield f"查询港股 {symbol} 时出错:{str(e)}"
def _build_hk_stock_dynamic_prompt(self, data: Dict[str, Any], symbol: str, user_message: str) -> str:
"""构建港股分析的动态prompt"""
from datetime import datetime
current_time = datetime.now().strftime("%Y-%m-%d %H:%M")
# 提取关键数据(使用 us_stock_service 返回的扁平结构)
name = data.get('name', symbol)
technical = data.get('technical_indicators', {})
news = data.get('news', [])
# 构建数据摘要
data_summary = f"""
【港股数据】{name}{symbol}
查询时间:{current_time}
【基本信息】
- 公司名称:{name}
- 行业:{data.get('industry', '未知')}
- 板块:{data.get('sector', '未知')}
- 市值:{data.get('market_cap', '未知')}
【最新行情】
- 当前价格:{data.get('current_price', '未知')}
- 今日涨跌:{data.get('change_percent', '未知')}%
- 52周最高{data.get('52_week_high', '未知')}
- 52周最低{data.get('52_week_low', '未知')}
【估值指标】
- 市盈率(PE){data.get('pe_ratio', '未知')}
- 市净率(PB){data.get('pb_ratio', '未知')}
- 股息率:{data.get('dividend_yield', '未知')}
【技术指标】
- MA5{technical.get('ma5', '未知')}
- MA20{technical.get('ma20', '未知')}
- RSI{technical.get('rsi', '未知')}
- MACD{technical.get('macd', '未知')}
"""
# 添加新闻摘要
if news:
data_summary += "\n【相关新闻】\n"
if isinstance(news, dict) and 'results' in news:
news_list = news.get('results', [])
else:
news_list = news if isinstance(news, list) else []
for i, item in enumerate(news_list[:3], 1):
if isinstance(item, dict):
title = item.get('title', '')
data_summary += f"{i}. {title}\n"
prompt = f"""你是一个专业的港股分析师。请根据以下数据,回答用户的问题。
{data_summary}
用户问题:{user_message}
请提供专业、客观的分析,包括:
1. 直接回答用户的问题
2. 基于数据的分析和判断
3. 潜在的风险提示
注意:港股以港币(HKD)计价交易时间为港交所交易时段北京时间9:30-12:00, 13:00-16:00"""
return prompt
def _format_news_section(self, news_data: Dict[str, Any]) -> str:
"""
格式化新闻数据为统一格式
Args:
news_data: 新闻数据字典
Returns:
格式化后的新闻文本
"""
if not news_data or news_data.get('error'):
return "\n**最新新闻和舆情**: 暂无相关新闻\n"
results = news_data.get('results', [])
if not results:
return "\n**最新新闻和舆情**: 暂无相关新闻\n"
news_section = "\n**最新新闻和舆情**来源Brave Search:\n"
for i, item in enumerate(results[:5], 1):
title = item.get('title', '无标题')
description = item.get('description', '')
source = item.get('source', '')
published = item.get('published', '')
news_section += f"{i}. {title}\n"
if description:
news_section += f" 摘要:{description[:100]}...\n"
if source:
news_section += f" 来源:{source}\n"
if published:
news_section += f" 发布时间:{published}\n"
news_section += "\n"
return news_section
def _build_unified_analysis_prompt(
self,
market: str,
stock_info: Dict[str, Any],
data: Dict[str, Any],
user_message: str
) -> str:
"""
统一的分析提示词构建函数(自然语言风格)
Args:
market: 市场类型('A股''美股'
stock_info: 股票基本信息 {'code': ..., 'name': ...}
data: 市场数据(行情、技术、基本面、新闻等)
user_message: 用户问题
Returns:
构建好的prompt字符串
"""
from datetime import datetime
current_time = datetime.now().strftime("%Y-%m-%d %H:%M")
stock_code = stock_info.get('code', '')
stock_name = stock_info.get('name', '')
# 格式化新闻数据(统一格式)
news_section = self._format_news_section(data.get('news', {}))
if market == '美股':
# 美股数据格式
current_price = data.get('current_price', 0)
change = data.get('change', 0)
change_percent = data.get('change_percent', 0)
volume = data.get('volume', 0)
market_cap = data.get('market_cap', 0)
pe_ratio = data.get('pe_ratio', 0)
pb_ratio = data.get('pb_ratio', 0)
technical = data.get('technical_indicators', {})
ma5 = technical.get('ma5', 0)
ma10 = technical.get('ma10', 0)
ma20 = technical.get('ma20', 0)
rsi = technical.get('rsi', 0)
macd = technical.get('macd', 0)
market_data_section = f"""
**行情数据**:
- 最新价: ${current_price:.2f}
- 涨跌: ${change:+.2f} ({change_percent:+.2f}%)
- 成交量: {volume:,.0f}
- 市值: ${market_cap:,.0f}
- 市盈率(PE): {pe_ratio:.2f}
- 市净率(PB): {pb_ratio:.2f}
**技术指标**:
- 均线: MA5=${ma5:.2f}, MA10=${ma10:.2f}, MA20=${ma20:.2f}
- RSI: {rsi:.2f}
- MACD: {macd:.4f}
"""
analyst_role = "美股分析师"
risk_disclaimer = "以上分析仅供参考,不构成投资建议。美股投资有风险,请谨慎决策。"
else: # A股
# A股数据格式
quote = data.get('quote', {})
technical = data.get('technical', {})
fundamental = data.get('fundamental', {})
advanced = data.get('advanced', {})
# 获取交易日期
quote_date = quote.get('trade_date', '未知') if quote else '未知'
# 构建行情数据部分
if quote:
market_data_section = f"""
**行情数据**(截止:{quote_date}:
- 最新价: ¥{quote.get('close', 0):.2f}
- 涨跌幅: {quote.get('pct_chg', 0):+.2f}%
- 成交量: {quote.get('vol', 0):,.0f}
- 成交额: {quote.get('amount', 0):,.0f}千元
- 换手率: {quote.get('turnover_rate', 0):.2f}%
"""
else:
market_data_section = "\n**行情数据**: 数据获取失败\n"
# 添加技术指标
if technical:
market_data_section += f"""
**技术指标**(截止:{quote_date}:
- 均线: MA5=¥{technical.get('ma5', 0):.2f}, MA10=¥{technical.get('ma10', 0):.2f}, MA20=¥{technical.get('ma20', 0):.2f}
- RSI: {technical.get('rsi', 0):.2f}
- MACD: {technical.get('macd', 0):.4f}
"""
# 添加基本面数据
if fundamental:
market_data_section += f"""
**基本面数据**:
- 市盈率(PE): {fundamental.get('pe', 0):.2f}
- 市净率(PB): {fundamental.get('pb', 0):.2f}
- 总市值: {fundamental.get('total_mv', 0):,.0f}亿元
- ROE: {fundamental.get('roe', 0):.2f}%
"""
# 添加高级数据(资金流向、融资融券等)
if advanced:
if advanced.get('money_flow'):
money_flow = advanced['money_flow'][0] if advanced['money_flow'] else {}
market_data_section += f"""
**资金流向**:
- 主力净流入: {money_flow.get('net_mf_amount', 0):,.0f}万元
- 大单净流入: {money_flow.get('buy_lg_amount', 0) - money_flow.get('sell_lg_amount', 0):,.0f}万元
"""
if advanced.get('margin'):
margin = advanced['margin'][0] if advanced['margin'] else {}
market_data_section += f"""
**融资融券**:
- 融资余额: {margin.get('rzye', 0):,.0f}
- 融券余额: {margin.get('rqye', 0):,.0f}
"""
analyst_role = "A股分析师"
risk_disclaimer = "以上分析仅供参考,不构成投资建议。股市有风险,投资需谨慎。"
# 构建统一的prompt
prompt = f"""你是一位专业的{analyst_role}。请根据以下数据分析【{stock_name}({stock_code})】。
**用户问题**: {user_message}
**【重要】最新新闻和舆情(必须分析)**
{news_section}
## 数据信息
{market_data_section}
## 分析要求
请根据用户的问题,提供自然、有针对性的分析。不要使用固定格式,而是像专业分析师一样,用自然的语言回答用户的问题。
**重要:必须在分析中包含对上面提供的最新新闻的分析,评估新闻对股价的影响。**
分析时请注意:
- 如果用户关注价格走势,重点分析价格和趋势,并结合新闻分析市场情绪
- 如果用户关注技术指标,重点分析技术面,并结合新闻判断短期走势
- 如果用户关注基本面,重点分析公司情况和估值,并结合新闻评估投资价值
- **无论用户问什么,都要在分析中提及最新新闻对股价的潜在影响**
请直接开始分析不要添加日期标题。控制在600-800字。最后声明"{risk_disclaimer}"
"""
return prompt
def _build_us_stock_dynamic_prompt(
self,
data: Dict[str, Any],
symbol: str,
user_message: str
) -> str:
"""
为美股构建动态prompt调用统一函数
Args:
data: 美股数据
symbol: 股票代码
user_message: 用户消息
Returns:
prompt字符串
"""
stock_info = {
'code': symbol,
'name': data.get('name', symbol)
}
return self._build_unified_analysis_prompt(
market='美股',
stock_info=stock_info,
data=data,
user_message=user_message
)
async def _llm_comprehensive_analysis_stream(self, data: Dict[str, Any], user_message: str, is_index: bool = False):
"""使用LLM流式进行综合分析调用统一函数"""
stock_info = {
'code': data.get('stock_code', ''),
'name': data.get('stock_name', '')
}
# 使用统一的prompt构建函数
prompt = self._build_unified_analysis_prompt(
market='A股',
stock_info=stock_info,
data=data,
user_message=user_message
)
# 流式调用LLM
import asyncio
stream = llm_service.chat_stream(
messages=[{"role": "user", "content": prompt}],
temperature=0.7,
max_tokens=2500,
model_override=self.model_override
)
# 在线程中迭代同步生成器,避免阻塞事件循环
for chunk in stream:
# 每次yield后让出控制权
await asyncio.sleep(0)
yield chunk
async def _llm_us_stock_analysis_stream(self, data: Dict[str, Any], user_message: str):
"""使用LLM流式分析美股"""
from datetime import datetime
current_time = datetime.now().strftime("%Y-%m-%d %H:%M")
symbol = data.get("symbol", "")
name = data.get("name", "")
sector = data.get("sector", "")
industry = data.get("industry", "")
current_price = data.get("current_price", 0)
change = data.get("change", 0)
change_pct = data.get("change_percent", 0)
volume = data.get("volume", 0)
market_cap = data.get("market_cap", 0)
pe_ratio = data.get("pe_ratio", 0)
pb_ratio = data.get("pb_ratio", 0)
dividend_yield = data.get("dividend_yield", 0)
week_52_high = data.get("52_week_high", 0)
week_52_low = data.get("52_week_low", 0)
technical = data.get("technical_indicators", {})
description = data.get("description", "")
market_cap_str = f"${market_cap / 1e9:.2f}B" if market_cap > 1e9 else f"${market_cap / 1e6:.2f}M"
prompt = f"""你是一位专业的美股分析师。请基于以下数据对 {name} ({symbol}) 进行全面分析。
**重要提示:当前日期是 {current_time},请在分析中使用这个日期,不要使用其他日期。**
【基本信息】
股票代码:{symbol}
公司名称:{name}
所属行业:{sector} - {industry}
公司简介:{description[:300] if description else '暂无'}
【实时行情】(数据时间:{current_time}
当前价格:${current_price:.2f}
涨跌额:${change:.2f}
涨跌幅:{change_pct:.2f}%
成交量:{volume:,}
市值:{market_cap_str}
【估值指标】
市盈率(PE){f"{pe_ratio:.2f}" if pe_ratio else '暂无'}
市净率(PB){f"{pb_ratio:.2f}" if pb_ratio else '暂无'}
股息率:{f"{dividend_yield * 100:.2f}%" if dividend_yield else '暂无'}
52周最高${week_52_high:.2f}
52周最低${week_52_low:.2f}
【技术指标】
MA5{f"${technical.get('ma5'):.2f}" if technical.get('ma5') else '计算中'}
MA10{f"${technical.get('ma10'):.2f}" if technical.get('ma10') else '计算中'}
MA20{f"${technical.get('ma20'):.2f}" if technical.get('ma20') else '计算中'}
MA60{f"${technical.get('ma60'):.2f}" if technical.get('ma60') else '计算中'}
RSI{f"{technical.get('rsi'):.2f}" if technical.get('rsi') else '计算中'}
MACD{f"{technical.get('macd'):.4f}" if technical.get('macd') else '计算中'}
用户问题:{user_message}
请提供专业的分析报告,包括:
## 📊 行情概览
简要总结当前股价表现和市场表现2-3句话
## 💼 公司基本面
- 行业地位和竞争优势
- 估值水平分析PE、PB是否合理
- 盈利能力和成长性
## 📈 技术面分析
- 当前趋势判断(基于均线系统)
- 关键支撑位和压力位
- RSI和MACD信号解读
## 💡 投资建议
- 短期操作建议1-2周
- 中期投资价值1-3个月
- 风险提示
写作要求:
1. 语言专业但易懂,避免过度修饰
2. 分析客观理性,基于数据和事实
3. 每个部分独立成段,段落间用空行分隔
4. 控制在500-600字
5. **不要在报告中添加日期标题,直接开始分析内容**
6. 最后声明:"以上分析仅供参考,不构成投资建议。美股投资有风险,请谨慎决策。"
"""
# 流式调用LLM同步生成器使用线程避免阻塞
import asyncio
stream = llm_service.chat_stream(
messages=[{"role": "user", "content": prompt}],
temperature=0.7,
max_tokens=2500, # 增加 token 数量以容纳新闻分析
model_override=self.model_override
)
# 在线程中迭代同步生成器,避免阻塞事件循环
for chunk in stream:
# 每次yield后让出控制权
await asyncio.sleep(0)
yield chunk
# ==================== 新增:智能模式方法 ====================
async def _handle_stock_analysis_v2(
self,
intent: Dict[str, Any],
message: str
) -> Dict[str, Any]:
"""
处理股票分析请求(智能模式 V2
Args:
intent: 问题意图
message: 用户消息
Returns:
响应结果
"""
target = intent.get('target', {})
stock_code = target.get('stock_code')
stock_name = target.get('stock_name')
market = target.get('market', 'A股')
# QuestionAnalyzer 应该已经返回了股票代码
if not stock_code:
return {
"message": f"抱歉,我没有识别到您提到的股票「{stock_name or ''}」。请提供更明确的股票代码或名称。",
"metadata": {"type": "error"}
}
# 根据市场类型处理
if market == '美股':
# 美股处理
return await self._handle_us_stock_with_code(stock_code, stock_name or stock_code, message)
if market == '港股':
# 港股处理(使用 yfinance与美股类似
return await self._handle_hk_stock_with_code(stock_code, stock_name or stock_code, message)
logger.info(f"[智能模式] 分析股票: {stock_name}({stock_code})")
# 1. 技能规划
plan = self.skill_planner.plan_skills(intent)
logger.info(f"[智能模式] 技能规划: {[s['name'] for s in plan['skills']]}")
# 2. 执行技能
execution_results = await skill_manager.execute_plan(
plan=plan,
stock_code=stock_code
)
if execution_results['errors']:
logger.warning(f"[智能模式] 技能执行有错误: {execution_results['errors']}")
# 3. 智能生成回答
analysis = await self._generate_intelligent_response(
intent=intent,
execution_results=execution_results['results'],
stock_code=stock_code,
stock_name=stock_name,
user_message=message
)
return {
"message": analysis,
"metadata": {
"type": "stock_analysis",
"intent": intent,
"plan": plan,
"data": {
"stock_code": stock_code,
"stock_name": stock_name,
**execution_results['results']
}
}
}
async def _generate_intelligent_response(
self,
intent: Dict[str, Any],
execution_results: Dict[str, Any],
stock_code: str,
stock_name: str,
user_message: str
) -> str:
"""
智能生成回答 - 根据用户意图定制
Args:
intent: 问题意图
execution_results: 技能执行结果
stock_code: 股票代码
stock_name: 股票名称
user_message: 用户消息
Returns:
分析报告
"""
# 1. 构建动态prompt
prompt = self._build_dynamic_prompt(
intent=intent,
data=execution_results,
stock_code=stock_code,
stock_name=stock_name,
user_message=user_message
)
# 2. 调用LLM生成
max_tokens = self._calculate_max_tokens(intent)
response = await self._call_llm_async(
messages=[{"role": "user", "content": prompt}],
temperature=0.7,
max_tokens=max_tokens
)
if not response:
# 降级到规则化格式
return self._format_fallback_response(execution_results, stock_name)
return response
def _build_dynamic_prompt(
self,
intent: Dict[str, Any],
data: Dict[str, Any],
stock_code: str,
stock_name: str,
user_message: str
) -> str:
"""
根据意图动态构建prompt
Args:
intent: 问题意图
data: 执行结果数据
stock_code: 股票代码
stock_name: 股票名称
user_message: 用户消息
Returns:
prompt字符串
"""
dimensions = intent.get('dimensions', {})
time_scope = intent.get('time_scope', {})
specific_concerns = intent.get('specific_concerns', [])
user_style = intent.get('user_style', {})
# 基础部分
prompt_parts = [
f"你是一个专业的股票分析师。请根据以下数据分析【{stock_name}({stock_code})】。",
"",
f"**用户问题**: {user_message}",
""
]
# 添加用户关注点
if specific_concerns:
prompt_parts.append(f"**用户特别关注**: {', '.join(specific_concerns)}")
prompt_parts.append("")
# 添加数据部分
prompt_parts.append("## 数据信息")
prompt_parts.append("")
# 根据维度添加相应数据
if dimensions.get('price_trend') and 'market_data' in data:
prompt_parts.append(self._format_market_data_section(data['market_data']))
if dimensions.get('technical') and 'technical_analysis' in data:
prompt_parts.append(self._format_technical_section(data['technical_analysis']))
if dimensions.get('fundamental') and 'fundamental' in data:
prompt_parts.append(self._format_fundamental_section(data['fundamental']))
if dimensions.get('valuation') or dimensions.get('money_flow'):
if 'advanced_data' in data:
prompt_parts.append(self._format_advanced_section(data['advanced_data']))
# 分析要求
prompt_parts.append("")
prompt_parts.append("## 分析要求")
prompt_parts.append("")
# 根据时间范围调整
if time_scope.get('short_term'):
prompt_parts.append("- 重点分析短期走势1-2周")
if time_scope.get('medium_term'):
prompt_parts.append("- 分析中期趋势1-3个月")
if time_scope.get('long_term'):
prompt_parts.append("- 评估长期投资价值(半年以上)")
# 根据用户风格调整
if user_style.get('tone') == 'casual':
prompt_parts.append("- 使用通俗易懂的语言,避免过多专业术语")
else:
prompt_parts.append("- 使用专业的金融术语和分析方法")
if user_style.get('detail_level') == 'brief':
prompt_parts.append("- 简洁回答控制在200-300字")
else:
prompt_parts.append("- 详细分析控制在500-600字")
# 输出格式
prompt_parts.append("")
prompt_parts.append("请直接开始分析,不要添加日期标题。最后声明:\"以上分析仅供参考,不构成投资建议。\"")
return "\n".join(prompt_parts)
def _format_market_data_section(self, data: Dict) -> str:
"""格式化行情数据部分"""
if 'error' in data:
return "**行情数据**: 暂时无法获取"
return f"""**行情数据**:
- 最新价: {data.get('close', 0):.2f}
- 涨跌幅: {data.get('pct_chg', 0):+.2f}%
- 成交量: {data.get('vol', 0):.0f}
- 成交额: {data.get('amount', 0):.0f}千元
"""
def _format_technical_section(self, data: Dict) -> str:
"""格式化技术指标部分"""
if 'error' in data:
return "**技术指标**: 暂时无法获取"
indicators = data.get('indicators', {})
parts = ["**技术指标**:"]
if 'ma' in indicators:
ma = indicators['ma']
parts.append(f"- 均线: MA5={ma.get('ma5', 0):.2f}, MA10={ma.get('ma10', 0):.2f}, MA20={ma.get('ma20', 0):.2f}")
if 'macd' in indicators:
macd = indicators['macd']
parts.append(f"- MACD: DIF={macd.get('dif', 0):.4f}, DEA={macd.get('dea', 0):.4f}, MACD={macd.get('macd', 0):.4f}")
if 'rsi' in indicators:
rsi = indicators['rsi']
parts.append(f"- RSI: RSI6={rsi.get('rsi6', 0):.2f}, RSI12={rsi.get('rsi12', 0):.2f}")
return "\n".join(parts)
def _format_fundamental_section(self, data: Dict) -> str:
"""格式化基本面部分"""
if 'error' in data:
return "**基本面**: 暂时无法获取"
return f"""**基本面**:
- 公司名称: {data.get('name', '')}
- 所属行业: {data.get('industry', '')}
- 所属地域: {data.get('area', '')}
- 上市市场: {data.get('market', '')}
"""
def _format_advanced_section(self, data: Dict) -> str:
"""格式化高级数据部分"""
if 'error' in data:
return "**高级数据**: 暂时无法获取"
parts = ["**高级数据**:"]
if 'valuation' in data:
val = data['valuation']
parts.append(f"- 估值: PE={val.get('pe', 0):.2f}, PB={val.get('pb', 0):.2f}")
if 'money_flow' in data and data['money_flow']:
mf = data['money_flow'][0] if isinstance(data['money_flow'], list) else data['money_flow']
parts.append(f"- 资金流向: 净流入={mf.get('net_mf_amount', 0):.2f}万元")
return "\n".join(parts)
def _calculate_max_tokens(self, intent: Dict[str, Any]) -> int:
"""根据意图计算max_tokens"""
depth = intent.get('analysis_depth', 'standard')
detail_level = intent.get('user_style', {}).get('detail_level', 'detailed')
if depth == 'quick' or detail_level == 'brief':
return 800
elif depth == 'deep' or detail_level == 'detailed':
return 2000
else:
return 1500
def _format_fallback_response(self, data: Dict, stock_name: str) -> str:
"""降级响应格式"""
parts = [f"{stock_name}】分析报告\n"]
if 'market_data' in data and 'error' not in data['market_data']:
md = data['market_data']
parts.append(f"最新价: {md.get('close', 0):.2f}")
parts.append(f"涨跌幅: {md.get('pct_chg', 0):+.2f}%\n")
parts.append("以上分析仅供参考,不构成投资建议。")
return "\n".join(parts)
async def _handle_stock_analysis_stream(
self,
intent: Dict[str, Any],
message: str
):
"""
流式处理股票分析请求(智能模式)
Args:
intent: 问题意图
message: 用户消息
Yields:
响应文本片段
"""
target = intent.get('target', {})
stock_code = target.get('stock_code')
stock_name = target.get('stock_name')
market = target.get('market', 'A股')
# QuestionAnalyzer 应该已经返回了股票代码
if not stock_code:
yield f"抱歉,我没有识别到您提到的股票「{stock_name or ''}」。请提供更明确的股票代码或名称。"
return
# 根据市场类型处理
if market == '美股':
# 美股处理流程
async for chunk in self._handle_us_stock_stream_with_code(stock_code, stock_name or stock_code, message):
yield chunk
return
if market == '港股':
# 港股处理流程(使用 yfinance与美股类似
async for chunk in self._handle_hk_stock_stream_with_code(stock_code, stock_name or stock_code, message):
yield chunk
return
# A股处理流程
logger.info(f"[智能模式-流式] 分析股票: {stock_name}({stock_code})")
# 1. 技能规划
plan = self.skill_planner.plan_skills(intent)
logger.info(f"[智能模式-流式] 技能规划: {[s['name'] for s in plan['skills']]}")
# 2. 执行技能
execution_results = await skill_manager.execute_plan(
plan=plan,
stock_code=stock_code
)
if execution_results['errors']:
logger.warning(f"[智能模式-流式] 技能执行有错误: {execution_results['errors']}")
# 3. 智能生成回答(流式)
async for chunk in self._generate_intelligent_response_stream(
intent=intent,
execution_results=execution_results['results'],
stock_code=stock_code,
stock_name=stock_name,
user_message=message
):
yield chunk
async def _generate_intelligent_response_stream(
self,
intent: Dict[str, Any],
execution_results: Dict[str, Any],
stock_code: str,
stock_name: str,
user_message: str
):
"""
智能生成回答(流式) - 根据用户意图定制
Args:
intent: 问题意图
execution_results: 技能执行结果
stock_code: 股票代码
stock_name: 股票名称
user_message: 用户消息
Yields:
响应文本片段
"""
# 1. 构建动态prompt
prompt = self._build_dynamic_prompt(
intent=intent,
data=execution_results,
stock_code=stock_code,
stock_name=stock_name,
user_message=user_message
)
# 2. 调用LLM流式生成
if self.use_llm:
stream = llm_service.chat_stream(
messages=[{"role": "user", "content": prompt}],
temperature=0.7,
max_tokens=self._calculate_max_tokens(intent),
model_override=self.model_override
)
for chunk in stream:
yield chunk
else:
# 降级到规则化格式
fallback = self._format_fallback_response(execution_results, stock_name)
for char in fallback:
yield char
def _resolve_context_references(
self,
intent: Dict[str, Any],
context_info: Dict
) -> Dict[str, Any]:
"""
解析上下文引用(代词解析)
Args:
intent: 问题意图
context_info: 上下文信息
Returns:
更新后的意图
"""
target = intent.get('target', {})
# 如果用户说"这只股票"、"它"等,从上下文中提取
if not target.get('stock_code') and context_info.get('last_stock'):
target['stock_code'] = context_info['last_stock']
intent['target'] = target
logger.info(f"[智能模式] 从上下文解析股票代码: {target['stock_code']}")
return intent
# 创建全局实例
smart_agent = SmartStockAgent()