stock-ai-agent/backend/app/agent/smart_agent.py
2026-02-03 22:48:45 +08:00

2037 lines
73 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
智能Agent - 真正使用LLM进行全面分析
"""
import re
import json
from typing import Dict, Any, Optional, List
from app.config import get_settings
from app.agent.context import ContextManager
from app.agent.skill_manager import skill_manager
from app.skills.market_data import MarketDataSkill
from app.skills.technical_analysis import TechnicalAnalysisSkill
from app.skills.fundamental import FundamentalSkill
from app.skills.visualization import VisualizationSkill
from app.skills.advanced_data import AdvancedDataSkill
from app.skills.us_stock_skill import USStockSkill
from app.services.llm_service import llm_service
from app.services.tushare_service import tushare_service
from app.utils.logger import logger
class SmartStockAgent:
"""智能股票分析Agent - 深度集成LLM"""
def __init__(self):
"""初始化Agent"""
self.context_manager = ContextManager()
self.settings = get_settings()
# 注册技能
self._register_skills()
# 检查LLM是否可用
self.use_llm = bool(self.settings.zhipuai_api_key) and llm_service.client is not None
if self.use_llm:
logger.info("Smart Agent初始化完成LLM深度集成模式 + Tushare Pro高级数据")
else:
logger.warning("Smart Agent初始化完成规则模式建议配置LLM")
def _register_skills(self):
"""注册所有技能"""
skill_manager.register(MarketDataSkill())
skill_manager.register(TechnicalAnalysisSkill())
skill_manager.register(FundamentalSkill())
skill_manager.register(VisualizationSkill())
skill_manager.register(AdvancedDataSkill())
skill_manager.register(USStockSkill())
logger.info("技能注册完成Tushare Pro高级数据 + 美股支持)")
async def process_message(
self,
message: str,
session_id: str,
user_id: Optional[str] = None
) -> Dict[str, Any]:
"""
处理用户消息(智能版)
Args:
message: 用户消息
session_id: 会话ID
user_id: 用户ID
Returns:
响应结果
"""
logger.info(f"处理消息: {message[:50]}...")
# 保存用户消息
self.context_manager.add_message(session_id, "user", message)
# 第一步使用LLM理解问题意图
intent_analysis = await self._analyze_question_intent(message, session_id)
if not intent_analysis:
# 不直接说"无法理解",而是引导用户
response = {
"message": """我是您的金融智能助手,可以帮您:
📊 **股票分析** - 分析个股走势、技术指标、基本面
📈 **市场观察** - 解读大盘走势、行业热点
📚 **知识问答** - 解答金融投资相关问题
您可以这样问我:
"分析一下贵州茅台"
"现在A股市场怎么样"
"什么是MACD指标"
请告诉我您想了解什么?""",
"metadata": {"type": "guide"}
}
self.context_manager.add_message(session_id, "assistant", response["message"])
return response
# 第二步:根据意图类型处理
question_type = intent_analysis['type']
if question_type == 'stock_specific':
# 针对特定股票的问题
response = await self._handle_stock_question(intent_analysis, message)
elif question_type == 'macro_finance':
# 宏观金融问题
response = await self._handle_macro_question(intent_analysis, message)
elif question_type == 'knowledge':
# 金融知识问答
response = await self._handle_knowledge_question(intent_analysis, message)
elif question_type == 'general_chat':
# 一般对话,引导用户
response = await self._handle_general_chat(intent_analysis, message)
else:
# 未知类型,智能引导
response = {
"message": f"""我理解您想了解:{intent_analysis.get('description', '相关信息')}
作为金融智能助手,我擅长:
• 📊 分析具体股票(如"分析比亚迪"
• 📈 解读市场走势(如"现在大盘怎么样"
• 📚 解答金融知识(如"什么是市盈率"
能否更具体地告诉我您想了解什么?""",
"metadata": {"type": "guide"}
}
# 保存助手响应
self.context_manager.add_message(
session_id,
"assistant",
response["message"],
metadata=response.get("metadata")
)
return response
def _is_comprehensive_analysis(self, message: str) -> bool:
"""
判断是否需要全面分析
默认情况下,如果用户只是简单提到股票名称或代码,就进行全面分析
只有明确要求特定信息时(如"技术指标""K线图"等),才做单一查询
"""
# 明确要求单一查询的关键词
single_query_keywords = [
"k线", "图表", "走势图", "kline",
"技术指标", "macd", "rsi", "均线", "kdj",
"基本面", "公司信息", "行业",
"实时行情", "价格", "涨跌"
]
# 如果明确要求单一查询返回False
if any(keyword in message.lower() for keyword in single_query_keywords):
return False
# 默认进行全面分析
return True
async def _comprehensive_analysis(
self,
stock_code: str,
stock_name: Optional[str],
message: str
) -> Dict[str, Any]:
"""
全面分析:整合多个数据源 + LLM深度分析
Args:
stock_code: 股票代码
stock_name: 股票名称
message: 用户消息
Returns:
综合分析结果
"""
logger.info(f"执行全面分析: {stock_code}")
display_name = stock_name or stock_code
# 1. 并行获取所有数据
try:
# 获取实时行情
quote_result = await skill_manager.execute_skill(
"market_data",
stock_code=stock_code,
data_type="quote"
)
# 获取技术指标
technical_result = await skill_manager.execute_skill(
"technical_analysis",
stock_code=stock_code,
indicators=["ma", "macd", "rsi", "kdj"]
)
# 获取基本面
fundamental_result = await skill_manager.execute_skill(
"fundamental",
stock_code=stock_code
)
# 获取高级数据Tushare Pro 5000+积分)
advanced_result = await skill_manager.execute_skill(
"advanced_data",
stock_code=stock_code,
data_type="all"
)
# 整合数据
all_data = {
"stock_code": stock_code,
"stock_name": display_name,
"quote": quote_result.get("data") if quote_result.get("success") else None,
"technical": technical_result.get("data") if technical_result.get("success") else None,
"fundamental": fundamental_result.get("data") if fundamental_result.get("success") else None,
"advanced": advanced_result.get("data") if advanced_result.get("success") else None
}
# 2. 使用LLM进行深度分析
if self.use_llm:
analysis = await self._llm_comprehensive_analysis(all_data, message)
else:
analysis = self._rule_based_analysis(all_data)
return {
"message": analysis,
"metadata": {
"type": "comprehensive",
"data": all_data
}
}
except Exception as e:
logger.error(f"全面分析失败: {e}")
return {
"message": f"分析{display_name}时出错:{str(e)}",
"metadata": {"type": "error"}
}
async def _llm_comprehensive_analysis(
self,
data: Dict[str, Any],
user_message: str
) -> str:
"""使用LLM进行深度综合分析"""
# 获取当前时间
from datetime import datetime
current_time = datetime.now().strftime("%Y-%m-%d %H:%M")
# 获取行情数据的交易日期
quote_date = "未知"
if data.get('quote') and data['quote'].get('trade_date'):
quote_date = data['quote']['trade_date']
# 构建高级数据摘要
advanced_summary = ""
if data.get('advanced'):
advanced_data = data['advanced']
advanced_summary = "\n【高级财务数据】Tushare Pro 5000+积分)\n"
# 财务指标
if advanced_data.get('financial'):
financial = advanced_data['financial']
if financial.get('indicators'):
indicators = financial['indicators'].get('indicators', {})
advanced_summary += f"财务指标(截止:{financial['indicators'].get('end_date', '未知')}\n"
advanced_summary += f" ROE: {indicators.get('roe', 'N/A')}%\n"
advanced_summary += f" ROA: {indicators.get('roa', 'N/A')}%\n"
advanced_summary += f" 毛利率: {indicators.get('gross_margin', 'N/A')}%\n"
advanced_summary += f" 资产负债率: {indicators.get('debt_to_assets', 'N/A')}%\n"
advanced_summary += f" 流动比率: {indicators.get('current_ratio', 'N/A')}\n\n"
# 估值数据
if advanced_data.get('valuation'):
valuation = advanced_data['valuation']
advanced_summary += f"估值指标:\n"
advanced_summary += f" PE(市盈率): {valuation.get('pe', 'N/A')}\n"
advanced_summary += f" PB(市净率): {valuation.get('pb', 'N/A')}\n"
advanced_summary += f" PS(市销率): {valuation.get('ps', 'N/A')}\n"
advanced_summary += f" 总市值: {valuation.get('total_mv', 'N/A')}万元\n"
advanced_summary += f" 流通市值: {valuation.get('circ_mv', 'N/A')}万元\n"
advanced_summary += f" 换手率: {valuation.get('turnover_rate', 'N/A')}%\n\n"
# 资金流向(最近一天)
if advanced_data.get('money_flow') and len(advanced_data['money_flow']) > 0:
latest_flow = advanced_data['money_flow'][0]
advanced_summary += f"资金流向({latest_flow.get('trade_date', '最近')}\n"
advanced_summary += f" 主力净流入: {latest_flow.get('net_mf_amount', 'N/A')}万元\n"
advanced_summary += f" 超大单净流入: {latest_flow.get('buy_elg_amount', 0) - latest_flow.get('sell_elg_amount', 0):.2f}万元\n"
advanced_summary += f" 大单净流入: {latest_flow.get('buy_lg_amount', 0) - latest_flow.get('sell_lg_amount', 0):.2f}万元\n\n"
# 融资融券
if advanced_data.get('margin') and len(advanced_data['margin']) > 0:
latest_margin = advanced_data['margin'][0]
advanced_summary += f"融资融券({latest_margin.get('trade_date', '最近')}\n"
advanced_summary += f" 融资余额: {latest_margin.get('rzye', 'N/A')}\n"
advanced_summary += f" 融券余额: {latest_margin.get('rqye', 'N/A')}\n\n"
# 重大公告
if advanced_data.get('announcements') and len(advanced_data['announcements']) > 0:
advanced_summary += "最近公告:\n"
for ann in advanced_data['announcements'][:3]:
advanced_summary += f" · {ann.get('title', '无标题')} ({ann.get('ann_date', '')})\n"
advanced_summary += "\n"
else:
advanced_summary = "\n【高级财务数据】\n暂无高级数据\n"
# 构建详细的分析提示
prompt = f"""你是一位专业的股票分析师。请对{data['stock_name']}({data['stock_code']})进行全面分析,用简洁专业但易懂的语言回答。
用户问题:{user_message}
【实时行情数据】
数据来源Tushare Pro API
交易日期:{quote_date}
{json.dumps(data.get('quote'), ensure_ascii=False, indent=2) if data.get('quote') else '数据获取失败'}
【技术指标数据】
数据来源Tushare Pro API基于历史K线数据计算
计算截止日期:{quote_date}
{json.dumps(data.get('technical'), ensure_ascii=False, indent=2) if data.get('technical') else '数据获取失败'}
【基本面数据】
数据来源Tushare Pro API
{json.dumps(data.get('fundamental'), ensure_ascii=False, indent=2) if data.get('fundamental') else '数据获取失败'}
{advanced_summary}
请按以下结构进行分析,并在每个部分明确标注数据来源和时效性:
## 一、基本面分析
分段说明公司情况,每个要点独立成段:
- 第一段:公司主营业务和行业地位
- 第二段财务健康度分析基于ROE、资产负债率、流动比率等财务指标
- 第三段估值水平分析PE、PB、PS是否合理
- 第四段:所属行业发展前景
- 第五段:如果有公告,简要分析对公司的影响
## 二、技术面分析(数据截止:{quote_date}
使用清晰的分段结构,每个技术指标独立成段:
**价格走势**
当前价格走势特征(上涨/下跌/震荡),结合成交量分析。
**均线系统**
短期均线MA5、MA10与长期均线MA20、MA60的位置关系判断当前趋势多头/空头/震荡)。
**MACD指标**
DIF和DEA的位置关系MACD柱状图变化判断动能强弱和买卖信号。
**RSI指标**
当前RSI值的位置是否超买>70或超卖<30短期走势预判。
**支撑与压力**
关键支撑位和压力位的具体价格区间。
## 三、市场情绪分析
分段分析市场情绪:
- 第一段:资金流向分析(主力资金、大单资金流入/流出情况)
- 第二段:融资融券情况(如有)
- 第三段:当前市场情绪(乐观/谨慎/悲观)及原因
- 第四段:短期可能的催化因素
## 四、投资建议
基于技术面分析,给出具体的操作建议和点位:
**短期1-2周操作建议**
- 明确的操作建议:买入/持有/观望/减仓
- **具体点位建议**
- 如果建议买入:给出建议买入价格区间(基于支撑位)
- 如果建议卖出:给出建议卖出价格区间(基于压力位)
- 止损位:明确的止损价格点位
- 止盈位:明确的止盈价格点位
- 操作理由:基于技术指标的具体分析
**中期1-3个月策略**
- 趋势判断(上涨/下跌/震荡)
- 关键价格区间:
- 上方目标位:具体价格
- 下方支撑位:具体价格
- 策略建议
**长期(半年以上)**
投资价值评估和长期持有建议。
**风险提示**
- 主要风险点和注意事项
- 需要关注的关键价格位
## 五、总结
用一句话概括核心观点。
---
**数据说明**
- 行情数据来源Tushare Pro截止{quote_date}
- 技术指标基于历史K线数据计算截止{quote_date}
- 财务数据Tushare Pro 5000+积分接口(利润表、资产负债表、财务指标)
- 估值数据Tushare ProPE、PB、PS、市值等
- 资金流向Tushare Pro主力资金、大单资金
- 融资融券Tushare Pro如有
- 公告数据Tushare Pro重大公告
写作要求:
1. 语言简洁专业,避免过度修饰和比喻
2. 专业术语后用括号简单解释,例如"RSI超买指标>70股价可能回调"
3. **重要:每个分析点必须独立成段,段落之间用空行分隔**
4. **技术面分析部分,每个指标必须使用加粗标题(**标题**)并独立成段**
5. 分析要客观理性,基于数据而非情绪
6. 充分利用提供的财务数据、估值数据、资金流向等高级数据进行分析
7. 结论要明确,不要模棱两可
8. 控制在800-1000字由于数据更丰富可以写得更详细
9. 最后必须声明:"以上分析仅供参考,不构成投资建议。股市有风险,投资需谨慎。"
"""
try:
analysis = llm_service.chat(
messages=[{"role": "user", "content": prompt}],
temperature=0.7,
max_tokens=3000 # 增加到3000因为分析更详细了
)
if analysis:
return f"{data['stock_name']}({data['stock_code']}) - AI深度分析】\n\n{analysis}"
else:
return self._rule_based_analysis(data)
except Exception as e:
logger.error(f"LLM分析失败: {e}")
return self._rule_based_analysis(data)
async def _llm_single_analysis(
self,
intent: Dict[str, Any],
result: Dict[str, Any],
stock_code: str,
stock_name: Optional[str],
user_message: str
) -> Dict[str, Any]:
"""使用LLM对单一查询进行分析"""
data = result.get("data", result)
display_name = stock_name or stock_code
# 根据查询类型构建不同的prompt
if intent["type"] == "technical":
prompt = f"""你是一位专业的股票分析师。用户询问了{display_name}({stock_code})的技术指标。
用户问题:{user_message}
【技术指标数据】
{json.dumps(data, ensure_ascii=False, indent=2)}
请进行专业的技术分析:
## 技术指标解读
1. 均线系统分析:
- 短期均线MA5、MA10与长期均线MA20、MA60的位置关系
- 判断当前趋势(多头/空头/震荡)
2. MACD指标分析
- DIF和DEA的位置关系
- MACD柱状图的变化趋势
- 判断动能强弱
3. RSI指标分析
- 当前RSI值的位置超买/超卖/中性)
- 短期可能的走势
4. KDJ指标分析如有
- K、D、J值的位置关系
- 金叉/死叉信号
## 综合判断
- 短期走势预判1-2周
- 关键支撑位和压力位
- 操作建议(买入/持有/观望/减仓)
## 风险提示
- 主要技术风险点
写作要求:
1. 语言简洁专业,直接给出分析结论
2. 基于数据进行分析,不要编造
3. 控制在300-400字
4. 最后声明:"以上分析仅供参考,不构成投资建议。股市有风险,投资需谨慎。"
"""
elif intent["type"] == "quote":
prompt = f"""你是一位专业的股票分析师。用户询问了{display_name}({stock_code})的实时行情。
用户问题:{user_message}
【实时行情数据】
{json.dumps(data, ensure_ascii=False, indent=2)}
请进行专业的行情分析:
## 行情解读
1. 当日表现:
- 涨跌幅分析
- 成交量分析
- 振幅分析
2. 价格位置:
- 当前价格相对开盘价、最高价、最低价的位置
- 判断多空力量对比
3. 短期判断:
- 当日走势特征
- 短期可能的走势
- 操作建议
写作要求:
1. 语言简洁专业,直接给出分析结论
2. 基于数据进行分析,不要编造
3. 控制在200-300字
4. 最后声明:"以上分析仅供参考,不构成投资建议。股市有风险,投资需谨慎。"
"""
elif intent["type"] == "fundamental":
prompt = f"""你是一位专业的股票分析师。用户询问了{display_name}({stock_code})的基本面信息。
用户问题:{user_message}
【基本面数据】
{json.dumps(data, ensure_ascii=False, indent=2)}
请进行专业的基本面分析:
## 公司概况
- 公司主营业务
- 所属行业和地域
- 上市时间和市场
## 行业分析
- 所属行业的发展前景
- 行业地位和竞争优势
## 投资价值
- 基本面评估
- 长期投资价值
- 关注要点
写作要求:
1. 语言简洁专业,直接给出分析结论
2. 基于数据进行分析,不要编造
3. 控制在200-300字
4. 最后声明:"以上分析仅供参考,不构成投资建议。股市有风险,投资需谨慎。"
"""
else:
# 其他类型,使用通用分析
prompt = f"""你是一位专业的股票分析师。用户询问了{display_name}({stock_code})的相关信息。
用户问题:{user_message}
【数据】
{json.dumps(data, ensure_ascii=False, indent=2)}
请基于提供的数据进行专业分析,给出有价值的见解和建议。
写作要求:
1. 语言简洁专业,直接给出分析结论
2. 基于数据进行分析,不要编造
3. 控制在200-300字
4. 最后声明:"以上分析仅供参考,不构成投资建议。股市有风险,投资需谨慎。"
"""
try:
analysis = llm_service.chat(
messages=[{"role": "user", "content": prompt}],
temperature=0.7,
max_tokens=1500
)
if analysis:
return {
"message": f"{display_name}({stock_code}) - AI分析】\n\n{analysis}",
"metadata": {"type": intent["type"], "data": data}
}
else:
# LLM失败使用原始格式化
return self._format_response(intent, result, stock_code, stock_name)
except Exception as e:
logger.error(f"LLM单一分析失败: {e}")
return self._format_response(intent, result, stock_code, stock_name)
async def _comprehensive_index_analysis(
self,
index_code: str,
index_name: str,
message: str
) -> Dict[str, Any]:
"""
指数全面分析使用Tushare获取指数数据 + LLM深度分析
Args:
index_code: 指数代码如000001.SH
index_name: 指数名称
message: 用户消息
Returns:
综合分析结果
"""
logger.info(f"执行指数全面分析: {index_code}")
try:
# 从tushare_advanced_service获取指数数据
from app.services.tushare_advanced_service import tushare_advanced_service
# 获取指数日线数据
index_data = tushare_advanced_service.get_index_daily(
ts_code=index_code,
start_date=None, # 默认180天
end_date=None
)
if not index_data or len(index_data) == 0:
return {
"message": f"抱歉,未能获取{index_name}的数据。",
"metadata": {"type": "error"}
}
# 获取最新数据
latest = index_data[-1]
# 计算技术指标(简单版)
closes = [d['close'] for d in index_data]
# 计算均线
ma5 = sum(closes[-5:]) / 5 if len(closes) >= 5 else None
ma10 = sum(closes[-10:]) / 10 if len(closes) >= 10 else None
ma20 = sum(closes[-20:]) / 20 if len(closes) >= 20 else None
ma60 = sum(closes[-60:]) / 60 if len(closes) >= 60 else None
# 整合数据
all_data = {
"index_code": index_code,
"index_name": index_name,
"latest": latest,
"ma": {
"ma5": ma5,
"ma10": ma10,
"ma20": ma20,
"ma60": ma60
},
"history_days": len(index_data)
}
# 使用LLM进行深度分析
if self.use_llm:
analysis = await self._llm_index_analysis(all_data, message)
else:
analysis = f"{index_name}】最新数据\n\n" \
f"日期:{latest['trade_date']}\n" \
f"收盘:{latest['close']:.2f}\n" \
f"涨跌幅:{latest['pct_chg']:+.2f}%\n" \
f"成交量:{latest['vol']:.0f}"
return {
"message": analysis,
"metadata": {
"type": "index_analysis",
"data": all_data
}
}
except Exception as e:
logger.error(f"指数分析失败: {e}")
import traceback
logger.error(traceback.format_exc())
return {
"message": f"分析{index_name}时出错:{str(e)}",
"metadata": {"type": "error"}
}
async def _llm_index_analysis(
self,
data: Dict[str, Any],
user_message: str
) -> str:
"""使用LLM进行指数深度分析"""
latest = data['latest']
ma = data['ma']
prompt = f"""你是一位专业的股票分析师。请对{data['index_name']}({data['index_code']})进行全面分析,用简洁专业但易懂的语言回答。
用户问题:{user_message}
【指数最新数据】
数据来源Tushare Pro API
交易日期:{latest['trade_date']}
收盘点位:{latest['close']:.2f}
开盘点位:{latest['open']:.2f}
最高点位:{latest['high']:.2f}
最低点位:{latest['low']:.2f}
涨跌幅:{latest['pct_chg']:+.2f}%
涨跌点数:{latest['change']:+.2f}
成交量:{latest['vol']:.0f}
成交额:{latest['amount']:.0f}千元
【技术指标】
MA5{f"{ma['ma5']:.2f}" if ma['ma5'] else '计算中'}
MA10{f"{ma['ma10']:.2f}" if ma['ma10'] else '计算中'}
MA20{f"{ma['ma20']:.2f}" if ma['ma20'] else '计算中'}
MA60{f"{ma['ma60']:.2f}" if ma['ma60'] else '计算中'}
当前点位:{latest['close']:.2f}
请按以下结构进行分析:
## 一、市场现状
- 当前点位分析(相对历史位置)
- 当日表现(涨跌幅、成交量)
## 二、技术面分析
**均线系统**
分析当前点位与各均线的关系,判断趋势(多头/空头/震荡)。
**支撑与压力**
基于近期走势,给出关键支撑位和压力位。
## 三、市场情绪
- 当前市场情绪(乐观/谨慎/悲观)
- 成交量分析
## 四、投资建议
**短期1-2周**
- 趋势判断
- 关键点位(支撑位、压力位)
- 操作建议
**中期1-3个月**
- 趋势展望
- 策略建议
**风险提示**
主要风险点和注意事项
## 五、总结
用一句话概括核心观点。
---
**数据说明**
- 数据来源Tushare Pro截止{latest['trade_date']}
- 分析周期:基于近{data['history_days']}个交易日数据
写作要求:
1. 语言简洁专业但易懂
2. 每个分析点独立成段
3. 控制在600-800字
4. 最后声明:"以上分析仅供参考,不构成投资建议。股市有风险,投资需谨慎。"
"""
try:
analysis = llm_service.chat(
messages=[{"role": "user", "content": prompt}],
temperature=0.7,
max_tokens=2500
)
if analysis:
return f"{data['index_name']}({data['index_code']}) - AI深度分析】\n\n{analysis}"
else:
return f"{data['index_name']}】分析生成失败"
except Exception as e:
logger.error(f"LLM指数分析失败: {e}")
return f"{data['index_name']}】分析生成失败"
def _rule_based_analysis(self, data: Dict[str, Any]) -> str:
"""基于规则的分析LLM不可用时的备选方案"""
parts = [f"{data['stock_name']}({data['stock_code']}) - 综合分析】\n"]
# 行情信息
if data.get('quote'):
quote = data['quote']
parts.append("## 一、实时行情")
parts.append(f"最新价:{quote.get('close', 0):.2f}")
parts.append(f"涨跌幅:{quote.get('pct_chg', 0):.2f}%")
parts.append(f"成交量:{quote.get('vol', 0):.0f}")
parts.append("")
# 技术分析
if data.get('technical'):
tech = data['technical'].get('indicators', {})
parts.append("## 二、技术指标")
if 'ma' in tech:
ma = tech['ma']
parts.append(f"均线系统MA5={ma.get('ma5')}, MA10={ma.get('ma10')}, MA20={ma.get('ma20')}")
if 'macd' in tech:
macd = tech['macd']
parts.append(f"MACDDIF={macd.get('dif')}, DEA={macd.get('dea')}")
if 'rsi' in tech:
rsi = tech['rsi']
rsi6 = rsi.get('rsi6', 50)
if rsi6 > 70:
parts.append(f"RSI{rsi6:.1f}(超买区域,注意回调风险)")
elif rsi6 < 30:
parts.append(f"RSI{rsi6:.1f}(超卖区域,可能存在反弹机会)")
else:
parts.append(f"RSI{rsi6:.1f}(中性区域)")
parts.append("")
# 基本面
if data.get('fundamental'):
fund = data['fundamental']
parts.append("## 三、基本信息")
parts.append(f"所属行业:{fund.get('industry', '未知')}")
parts.append(f"上市日期:{fund.get('list_date', '未知')}")
parts.append("")
# 简单建议
parts.append("## 四、参考建议")
parts.append("建议结合更多信息进行综合判断。")
parts.append("")
parts.append("⚠️ 以上分析仅供参考,不构成投资建议。股市有风险,投资需谨慎。")
return "\n".join(parts)
async def _single_query(
self,
stock_code: str,
stock_name: Optional[str],
message: str
) -> Dict[str, Any]:
"""单一查询处理 - 使用LLM进行分析"""
# 识别意图
intent = self._recognize_intent(message, stock_code)
# 执行技能
result = await skill_manager.execute_skill(
intent["skill"],
**intent["params"]
)
# 格式化响应
if not result.get("success", True):
return {
"message": f"查询失败:{result.get('error', '未知错误')}",
"metadata": {"type": "error"}
}
# 所有查询都使用LLM进行分析除了可视化
if intent["type"] != "visualization" and self.use_llm:
return await self._llm_single_analysis(intent, result, stock_code, stock_name, message)
else:
return self._format_response(intent, result, stock_code, stock_name)
def _recognize_intent(self, message: str, stock_code: str) -> Dict[str, Any]:
"""识别查询意图"""
message_lower = message.lower()
# K线图
if any(kw in message_lower for kw in ["k线", "图表", "走势图", "kline"]):
return {
"type": "visualization",
"skill": "visualization",
"params": {"stock_code": stock_code}
}
# 技术分析
if any(kw in message_lower for kw in ["技术", "指标", "macd", "rsi", "均线"]):
return {
"type": "technical",
"skill": "technical_analysis",
"params": {"stock_code": stock_code, "indicators": ["ma", "macd", "rsi"]}
}
# 基本面
if any(kw in message_lower for kw in ["基本面", "公司", "行业", "信息"]):
return {
"type": "fundamental",
"skill": "fundamental",
"params": {"stock_code": stock_code}
}
# 默认:实时行情
return {
"type": "quote",
"skill": "market_data",
"params": {"stock_code": stock_code, "data_type": "quote"}
}
def _format_response(
self,
intent: Dict[str, Any],
result: Dict[str, Any],
stock_code: str,
stock_name: Optional[str]
) -> Dict[str, Any]:
"""格式化响应"""
data = result.get("data", result)
display_name = stock_name or stock_code
if intent["type"] == "quote":
message = f"""{display_name}】实时行情
交易日期:{data.get('trade_date', '')}
最新价:{data.get('close', 0):.2f}
涨跌幅:{data.get('pct_chg', 0):+.2f}%
涨跌额:{data.get('change', 0):+.2f}
开盘价:{data.get('open', 0):.2f}
最高价:{data.get('high', 0):.2f}
最低价:{data.get('low', 0):.2f}
成交量:{data.get('vol', 0):.0f}
成交额:{data.get('amount', 0):.0f}千元"""
return {
"message": message,
"metadata": {"type": "quote", "data": data}
}
elif intent["type"] == "technical":
indicators = data.get("indicators", {})
parts = [f"{display_name}】技术指标\n"]
if "ma" in indicators:
ma = indicators["ma"]
parts.append(f"均线MA5={ma.get('ma5')}, MA10={ma.get('ma10')}, MA20={ma.get('ma20')}")
if "macd" in indicators:
macd = indicators["macd"]
parts.append(f"MACDDIF={macd.get('dif')}, DEA={macd.get('dea')}, MACD={macd.get('macd')}")
if "rsi" in indicators:
rsi = indicators["rsi"]
parts.append(f"RSIRSI6={rsi.get('rsi6')}, RSI12={rsi.get('rsi12')}")
return {
"message": "\n".join(parts),
"metadata": {"type": "technical", "data": data}
}
elif intent["type"] == "visualization":
return {
"message": f"已生成【{display_name}】的K线图",
"metadata": {"type": "chart", "data": data}
}
elif intent["type"] == "fundamental":
message = f"""{display_name}】基本信息
股票代码:{data.get('ts_code', '')}
所属地域:{data.get('area', '')}
所属行业:{data.get('industry', '')}
上市市场:{data.get('market', '')}
上市日期:{data.get('list_date', '')}"""
return {
"message": message,
"metadata": {"type": "fundamental", "data": data}
}
return {
"message": "查询完成",
"metadata": {"type": "data", "data": data}
}
async def _analyze_question_intent(self, message: str, session_id: str) -> Optional[Dict[str, Any]]:
"""
使用LLM分析问题意图支持上下文
Args:
message: 用户消息
session_id: 会话ID
Returns:
意图分析结果: {
'type': 'stock_specific' | 'macro_finance' | 'knowledge' | 'general_chat',
'description': '问题描述',
'keywords': ['关键词列表'],
'stock_names': ['股票名称'] (如果是stock_specific类型)
}
"""
if not self.use_llm:
logger.warning("LLM未配置无法分析意图")
return None
# 获取历史对话上下文
history = self.context_manager.get_context(session_id)
context_str = ""
if history:
context_str = "\n\n【对话历史】\n"
# 只取最近4条消息
for msg in history[-4:]:
role = "用户" if msg["role"] == "user" else "助手"
content = msg['content'][:100] # 限制长度
context_str += f"{role}: {content}\n"
prompt = f"""你是一个专业的金融智能助手。分析用户的问题,理解用户意图。
{context_str}
【当前问题】
用户: {message}
请分析这个问题属于以下哪一类:
1. **stock_specific** - 针对特定股票或指数的问题
例如:"贵州茅台怎么样""分析一下比亚迪""600519的技术指标""帮我看看这只股票"
**重要**:指数查询也属于此类,例如:"上证指数怎么样""分析大盘""A股指数走势""深证成指"
**美股支持**:美股查询也属于此类,例如:"苹果股票怎么样""AAPL分析""特斯拉走势""TSLA技术指标"
2. **macro_finance** - 宏观金融/市场问题(不针对特定股票或指数)
例如:"最近有什么投资机会""现在适合买股票吗""市场情绪如何"
3. **knowledge** - 金融知识问答
例如:"什么是MACD""如何看K线图""价值投资是什么""市盈率怎么算"
4. **general_chat** - 一般对话/问候/不明确的问题
例如:"你好""在吗""你能做什么""帮我"(没有具体说明)
**重要提示**
- 如果用户问题不明确,但可能与金融相关,优先归类为 general_chat以便引导用户
- 如果用户提到"这只股票"""等代词,查看对话历史判断是否指特定股票
- **如果用户提到"大盘""上证""深证""A股指数"等,归类为 stock_specific并在stock_names中填入对应的指数名称**
- **如果用户提到美股公司名称如苹果、特斯拉、微软或美股代码如AAPL、TSLA、MSFT归类为 stock_specific并在stock_names中填入对应的股票名称或代码**
- 对于模糊的问题,不要强行归类,使用 general_chat 类型
请以JSON格式返回分析结果
{{
"type": "问题类型",
"description": "问题的简要描述(用一句话概括用户想了解什么)",
"keywords": ["关键词1", "关键词2"],
"stock_names": ["股票名称或指数名称或美股代码"] (仅当type为stock_specific时如果有的话),
"market": "A股""美股" (仅当type为stock_specific时根据股票类型判断)
}}
只返回JSON不要有任何其他内容。"""
try:
result = llm_service.chat(
messages=[{"role": "user", "content": prompt}],
temperature=0.3,
max_tokens=300
)
if not result:
logger.warning("LLM返回空结果")
return None
# 清理结果移除可能的markdown代码块标记
result = result.strip()
if result.startswith("```json"):
result = result[7:]
if result.startswith("```"):
result = result[3:]
if result.endswith("```"):
result = result[:-3]
result = result.strip()
# 检查是否为空
if not result:
logger.warning("LLM返回内容为空")
return None
# 解析JSON
intent = json.loads(result)
logger.info(f"意图分析结果: {intent}")
return intent
except json.JSONDecodeError as e:
logger.error(f"意图分析JSON解析失败: {e}, 原始响应: {result[:200] if result else 'None'}")
return None
except Exception as e:
logger.error(f"意图分析失败: {e}")
return None
async def _handle_stock_question(
self,
intent_analysis: Dict[str, Any],
message: str
) -> Dict[str, Any]:
"""处理针对特定股票或指数的问题"""
stock_names = intent_analysis.get('stock_names', [])
market = intent_analysis.get('market', 'A股') # 默认A股
if not stock_names:
return {
"message": "抱歉,我没有识别到您提到的股票或指数。请提供更明确的股票代码、名称或指数名称。",
"metadata": {"type": "error"}
}
# 提取第一个股票或指数
stock_keyword = stock_names[0]
# 检测是否为美股
is_us_stock = self._is_us_stock(stock_keyword, market)
if is_us_stock:
# 处理美股
return await self._handle_us_stock(stock_keyword, message)
# 处理A股和指数
# 指数映射表
index_mapping = {
"上证指数": "000001.SH",
"上证": "000001.SH",
"大盘": "000001.SH",
"沪指": "000001.SH",
"深证成指": "399001.SZ",
"深证": "399001.SZ",
"深指": "399001.SZ",
"创业板指": "399006.SZ",
"创业板": "399006.SZ",
"科创50": "000688.SH",
"沪深300": "000300.SH",
"中证500": "000905.SH",
"A股": "000001.SH" # 默认用上证指数代表A股
}
# 检查是否是指数查询
stock_code = None
stock_name = None
is_index = False
for key, code in index_mapping.items():
if key in stock_keyword or stock_keyword in key:
stock_code = code
stock_name = key if key in stock_keyword else stock_keyword
is_index = True
logger.info(f"识别为指数查询: {stock_name} -> {stock_code}")
break
# 如果不是指数使用Tushare搜索股票
if not is_index:
search_results = tushare_service.search_stock(stock_keyword)
if not search_results:
return {
"message": f"抱歉,未找到股票\"{stock_keyword}\"。请确认股票名称或代码是否正确。",
"metadata": {"type": "error"}
}
stock = search_results[0]
stock_code = stock['symbol']
stock_name = stock['name']
logger.info(f"处理{'指数' if is_index else '股票'}问题: {stock_name}({stock_code})")
# 判断是否需要全面分析
is_comprehensive = self._is_comprehensive_analysis(message)
if is_comprehensive:
if is_index:
return await self._comprehensive_index_analysis(stock_code, stock_name, message)
else:
return await self._comprehensive_analysis(stock_code, stock_name, message)
else:
return await self._single_query(stock_code, stock_name, message)
async def _handle_macro_question(
self,
intent_analysis: Dict[str, Any],
message: str
) -> Dict[str, Any]:
"""处理宏观金融问题"""
keywords = intent_analysis.get('keywords', [])
description = intent_analysis.get('description', '')
logger.info(f"处理宏观问题: {description}")
try:
# 使用LLM进行分析基于Tushare数据和公开信息
prompt = f"""你是一位专业的金融分析师。用户询问了宏观金融问题。
用户问题:{message}
问题分析:{description}
关键词:{', '.join(keywords)}
请基于你的金融知识和市场经验,给出专业且易懂的分析:
## 市场现状分析
简要说明当前市场整体情况和主要影响因素(分段说明,每个要点独立成段)
## 趋势判断
- 短期趋势1-2周
- 中期展望1-3个月
## 投资建议
- 具体的投资策略建议
- 需要关注的风险点
写作要求:
1. 语言简洁专业但易懂,避免过度修饰
2. 分析要客观理性,基于事实和数据
3. 每个分析点独立成段,段落之间用空行分隔
4. 控制在400-500字
5. 最后声明:"以上分析仅供参考,不构成投资建议。股市有风险,投资需谨慎。"
"""
analysis = llm_service.chat(
messages=[{"role": "user", "content": prompt}],
temperature=0.7,
max_tokens=1500
)
if analysis:
return {
"message": f"【宏观市场分析】\n\n{analysis}",
"metadata": {"type": "macro_analysis"}
}
except Exception as e:
logger.error(f"宏观问题处理失败: {e}")
# 降级方案:提供友好的引导
return {
"message": f"""我理解您想了解:{description}
目前我可以帮您:
• 📊 分析具体股票的走势和投资价值
• 📈 查看大盘指数(如上证指数、深证成指)
• 📚 解答金融投资相关问题
您可以更具体地问我,比如:
"现在上证指数怎么样"
"分析一下创业板的走势"
"最近哪些行业比较热门"
请告诉我您想了解什么?""",
"metadata": {"type": "guide"}
}
async def _handle_knowledge_question(
self,
intent_analysis: Dict[str, Any],
message: str
) -> Dict[str, Any]:
"""处理金融知识问答"""
description = intent_analysis.get('description', '')
keywords = intent_analysis.get('keywords', [])
logger.info(f"处理知识问答: {description}")
# 直接使用LLM回答
prompt = f"""你是一位专业的金融教育专家。用户询问了金融知识问题。
用户问题:{message}
请用通俗易懂的语言解释这个概念或回答这个问题:
## 核心概念
- 清晰定义和解释
## 实际应用
- 如何在投资中应用
- 注意事项
## 举例说明
- 用简单的例子帮助理解
写作要求:
1. 语言通俗易懂,避免过多专业术语
2. 如果使用专业术语,要简单解释
3. 控制在300-400字
4. 重点是帮助用户理解,而不是炫耀知识
"""
try:
answer = llm_service.chat(
messages=[{"role": "user", "content": prompt}],
temperature=0.7,
max_tokens=1200
)
if answer:
return {
"message": f"【金融知识解答】\n\n{answer}",
"metadata": {"type": "knowledge"}
}
except Exception as e:
logger.error(f"知识问答处理失败: {e}")
return {
"message": "抱歉,暂时无法回答您的问题。请稍后再试。",
"metadata": {"type": "error"}
}
async def _handle_general_chat(
self,
intent_analysis: Dict[str, Any],
message: str
) -> Dict[str, Any]:
"""处理一般对话,智能引导用户"""
description = intent_analysis.get('description', '')
logger.info(f"处理一般对话: {description}")
# 使用LLM生成友好的引导回复
prompt = f"""你是一位专业且友好的金融智能助手。用户发来了一条消息,但不够具体。
用户消息:{message}
问题分析:{description}
你的任务是:
1. 如果是问候(如"你好""在吗"),友好回应并介绍你的能力
2. 如果问题不明确(如"帮我""看看"),礼貌地询问用户想了解什么
3. 如果可能与金融相关但不具体,给出具体的提问示例引导用户
你可以提供的服务:
• 📊 股票分析 - 分析个股走势、技术指标、基本面
• 📈 市场观察 - 解读大盘走势、行业热点、投资机会
• 📚 知识问答 - 解答金融投资相关问题
回复要求:
1. 语气友好、专业但不生硬
2. 简洁明了不要太长150字以内
3. 给出2-3个具体的提问示例
4. 不要使用"抱歉""无法理解"等负面词汇
5. 用emoji让回复更友好但不要过度使用
直接返回回复内容,不要有其他格式。"""
try:
reply = llm_service.chat(
messages=[{"role": "user", "content": prompt}],
temperature=0.8,
max_tokens=400
)
if reply:
return {
"message": reply,
"metadata": {"type": "chat"}
}
except Exception as e:
logger.error(f"一般对话处理失败: {e}")
# 降级方案
return {
"message": """您好!我是您的金融智能助手 👋
我可以帮您:
• 📊 分析具体股票(如"分析比亚迪"
• 📈 解读市场走势(如"现在大盘怎么样"
• 📚 解答金融知识(如"什么是市盈率"
请告诉我您想了解什么?""",
"metadata": {"type": "chat"}
}
def _is_us_stock(self, keyword: str, market: str) -> bool:
"""
判断是否为美股
Args:
keyword: 股票关键词
market: 市场类型从LLM意图分析中获取
Returns:
是否为美股
"""
# 如果LLM已经判断为美股
if market == "美股":
return True
# 检查是否为全大写字母(美股代码特征)
if keyword.isupper() and keyword.isalpha() and len(keyword) <= 5:
return True
# 常见美股公司名称映射
us_stock_names = {
"苹果": "AAPL",
"特斯拉": "TSLA",
"微软": "MSFT",
"谷歌": "GOOGL",
"亚马逊": "AMZN",
"Meta": "META",
"脸书": "META",
"英伟达": "NVDA",
"奈飞": "NFLX",
"网飞": "NFLX",
"迪士尼": "DIS",
"可口可乐": "KO",
"麦当劳": "MCD",
"星巴克": "SBUX",
"耐克": "NKE",
"波音": "BA",
"英特尔": "INTC",
"AMD": "AMD",
"高通": "QCOM",
"推特": "TWTR",
"优步": "UBER",
"Uber": "UBER",
"Airbnb": "ABNB",
"爱彼迎": "ABNB",
}
if keyword in us_stock_names:
return True
return False
def _get_us_stock_symbol(self, keyword: str) -> str:
"""
获取美股代码
Args:
keyword: 股票关键词
Returns:
美股代码
"""
# 如果已经是代码格式,直接返回
if keyword.isupper() and keyword.isalpha():
return keyword
# 中文名称映射
us_stock_names = {
"苹果": "AAPL",
"特斯拉": "TSLA",
"微软": "MSFT",
"谷歌": "GOOGL",
"亚马逊": "AMZN",
"Meta": "META",
"脸书": "META",
"英伟达": "NVDA",
"奈飞": "NFLX",
"网飞": "NFLX",
"迪士尼": "DIS",
"可口可乐": "KO",
"麦当劳": "MCD",
"星巴克": "SBUX",
"耐克": "NKE",
"波音": "BA",
"英特尔": "INTC",
"AMD": "AMD",
"高通": "QCOM",
"推特": "TWTR",
"优步": "UBER",
"Uber": "UBER",
"Airbnb": "ABNB",
"爱彼迎": "ABNB",
}
return us_stock_names.get(keyword, keyword.upper())
async def _handle_us_stock(self, keyword: str, message: str) -> Dict[str, Any]:
"""
处理美股查询
Args:
keyword: 股票关键词
message: 用户消息
Returns:
分析结果
"""
# 获取美股代码
symbol = self._get_us_stock_symbol(keyword)
logger.info(f"处理美股查询: {keyword} -> {symbol}")
try:
# 调用美股分析技能
result = await skill_manager.execute_skill(
"us_stock_analysis",
symbol=symbol,
analysis_type="comprehensive"
)
if not result.get("success"):
return {
"message": f"抱歉,未找到美股 {symbol}。请确认股票代码是否正确。\n\n提示:美股代码通常为大写字母,如 AAPL苹果、TSLA特斯拉、MSFT微软等。",
"metadata": {"type": "error"}
}
# 使用LLM分析美股数据
if self.use_llm:
analysis = await self._llm_us_stock_analysis(result["data"], message)
else:
analysis = self._format_us_stock_data(result["data"])
return {
"message": analysis,
"metadata": {
"type": "us_stock_analysis",
"data": result["data"]
}
}
except Exception as e:
logger.error(f"美股查询失败: {e}")
return {
"message": f"查询美股 {symbol} 时出错:{str(e)}",
"metadata": {"type": "error"}
}
async def _llm_us_stock_analysis(self, data: Dict[str, Any], user_message: str) -> str:
"""使用LLM分析美股数据"""
from datetime import datetime
current_time = datetime.now().strftime("%Y-%m-%d %H:%M")
# 提取关键数据
symbol = data.get("symbol", "")
name = data.get("name", "")
sector = data.get("sector", "")
industry = data.get("industry", "")
current_price = data.get("current_price", 0)
change = data.get("change", 0)
change_pct = data.get("change_percent", 0)
volume = data.get("volume", 0)
market_cap = data.get("market_cap", 0)
pe_ratio = data.get("pe_ratio", 0)
pb_ratio = data.get("pb_ratio", 0)
dividend_yield = data.get("dividend_yield", 0)
week_52_high = data.get("52_week_high", 0)
week_52_low = data.get("52_week_low", 0)
technical = data.get("technical_indicators", {})
description = data.get("description", "")
# 格式化市值
market_cap_str = f"${market_cap / 1e9:.2f}B" if market_cap > 1e9 else f"${market_cap / 1e6:.2f}M"
# 构建分析提示
prompt = f"""你是一位专业的美股分析师。请基于以下数据对 {name} ({symbol}) 进行全面分析。
**重要提示:当前日期是 {current_time},请在分析中使用这个日期,不要使用其他日期。**
【基本信息】
股票代码:{symbol}
公司名称:{name}
所属行业:{sector} - {industry}
公司简介:{description[:300] if description else '暂无'}
【实时行情】(数据时间:{current_time}
当前价格:${current_price:.2f}
涨跌额:${change:.2f}
涨跌幅:{change_pct:.2f}%
成交量:{volume:,}
市值:{market_cap_str}
【估值指标】
市盈率(PE){f"{pe_ratio:.2f}" if pe_ratio else '暂无'}
市净率(PB){f"{pb_ratio:.2f}" if pb_ratio else '暂无'}
股息率:{f"{dividend_yield * 100:.2f}%" if dividend_yield else '暂无'}
52周最高${week_52_high:.2f}
52周最低${week_52_low:.2f}
【技术指标】
MA5{f"${technical.get('ma5'):.2f}" if technical.get('ma5') else '计算中'}
MA10{f"${technical.get('ma10'):.2f}" if technical.get('ma10') else '计算中'}
MA20{f"${technical.get('ma20'):.2f}" if technical.get('ma20') else '计算中'}
MA60{f"${technical.get('ma60'):.2f}" if technical.get('ma60') else '计算中'}
RSI{f"{technical.get('rsi'):.2f}" if technical.get('rsi') else '计算中'}
MACD{f"{technical.get('macd'):.4f}" if technical.get('macd') else '计算中'}
用户问题:{user_message}
请提供专业的分析报告,包括:
## 📊 行情概览
简要总结当前股价表现和市场表现2-3句话
## 💼 公司基本面
- 行业地位和竞争优势
- 估值水平分析PE、PB是否合理
- 盈利能力和成长性
## 📈 技术面分析
- 当前趋势判断(基于均线系统)
- 关键支撑位和压力位
- RSI和MACD信号解读
## 💡 投资建议
- 短期操作建议1-2周
- 中期投资价值1-3个月
- 风险提示
写作要求:
1. 语言专业但易懂,避免过度修饰
2. 分析客观理性,基于数据和事实
3. 每个部分独立成段,段落间用空行分隔
4. 控制在500-600字
5. **不要在报告中添加日期标题,直接开始分析内容**
6. 最后声明:"以上分析仅供参考,不构成投资建议。美股投资有风险,请谨慎决策。"
"""
try:
analysis = llm_service.chat(
messages=[{"role": "user", "content": prompt}],
temperature=0.7,
max_tokens=2000
)
if analysis:
return f"【美股分析】{name} ({symbol})\n\n{analysis}"
else:
return self._format_us_stock_data(data)
except Exception as e:
logger.error(f"LLM美股分析失败: {e}")
return self._format_us_stock_data(data)
def _format_us_stock_data(self, data: Dict[str, Any]) -> str:
"""格式化美股数据(降级方案)"""
symbol = data.get("symbol", "")
name = data.get("name", "")
current_price = data.get("current_price", 0)
change = data.get("change", 0)
change_pct = data.get("change_percent", 0)
market_cap = data.get("market_cap", 0)
pe_ratio = data.get("pe_ratio", 0)
technical = data.get("technical_indicators", {})
market_cap_str = f"${market_cap / 1e9:.2f}B" if market_cap > 1e9 else f"${market_cap / 1e6:.2f}M"
change_emoji = "📈" if change >= 0 else "📉"
return f"""【美股行情】{name} ({symbol})
{change_emoji} 当前价格:${current_price:.2f}
涨跌:${change:.2f} ({change_pct:+.2f}%)
市值:{market_cap_str}
市盈率:{pe_ratio:.2f if pe_ratio else '暂无'}
【技术指标】
MA5${technical.get('ma5', 0):.2f if technical.get('ma5') else '计算中'}
MA20${technical.get('ma20', 0):.2f if technical.get('ma20') else '计算中'}
RSI{technical.get('rsi', 0):.2f if technical.get('rsi') else '计算中'}
以上数据仅供参考,不构成投资建议。"""
async def process_message_stream(
self,
message: str,
session_id: str,
user_id: Optional[str] = None
):
"""
流式处理用户消息
Args:
message: 用户消息
session_id: 会话ID
user_id: 用户ID
Yields:
响应文本片段
"""
logger.info(f"流式处理消息: {message[:50]}...")
# 保存用户消息
self.context_manager.add_message(session_id, "user", message)
# 第一步使用LLM理解问题意图
intent_analysis = await self._analyze_question_intent(message, session_id)
if not intent_analysis:
# 引导用户
guide_message = """我是您的金融智能助手,可以帮您:
📊 **股票分析** - 分析个股走势、技术指标、基本面
📈 **市场观察** - 解读大盘走势、行业热点
📚 **知识问答** - 解答金融投资相关问题
您可以这样问我:
"分析一下贵州茅台"
"现在A股市场怎么样"
"什么是MACD指标"
请告诉我您想了解什么?"""
self.context_manager.add_message(session_id, "assistant", guide_message)
yield guide_message
return
# 第二步:根据意图类型处理(流式)
question_type = intent_analysis['type']
full_response = ""
if question_type == 'stock_specific':
# 针对特定股票的问题 - 流式输出
async for chunk in self._handle_stock_question_stream(intent_analysis, message):
full_response += chunk
yield chunk
elif question_type in ['macro_finance', 'knowledge', 'general_chat']:
# 其他类型 - 直接流式输出(不需要特殊处理)
response = await self._handle_other_question(question_type, intent_analysis, message)
full_response = response["message"]
# 逐字yield
for char in full_response:
yield char
else:
# 未知类型
guide_message = f"""我理解您想了解:{intent_analysis.get('description', '相关信息')}
作为金融智能助手,我擅长:
• 📊 分析具体股票(如"分析比亚迪"
• 📈 解读市场走势(如"现在大盘怎么样"
• 📚 解答金融知识(如"什么是市盈率"
能否更具体地告诉我您想了解什么?"""
full_response = guide_message
yield guide_message
# 保存助手响应
self.context_manager.add_message(session_id, "assistant", full_response)
async def _handle_other_question(
self,
question_type: str,
intent_analysis: Dict[str, Any],
message: str
) -> Dict[str, Any]:
"""处理非股票分析的其他问题"""
if question_type == 'macro_finance':
return await self._handle_macro_question(intent_analysis, message)
elif question_type == 'knowledge':
return await self._handle_knowledge_question(intent_analysis, message)
elif question_type == 'general_chat':
return await self._handle_general_chat(intent_analysis, message)
else:
return {"message": "抱歉,我无法理解您的问题。"}
async def _handle_stock_question_stream(
self,
intent_analysis: Dict[str, Any],
message: str
):
"""流式处理股票问题"""
stock_names = intent_analysis.get('stock_names', [])
market = intent_analysis.get('market', 'A股')
if not stock_names:
yield "抱歉,我没有识别到您提到的股票或指数。请提供更明确的股票代码、名称或指数名称。"
return
stock_keyword = stock_names[0]
is_us_stock = self._is_us_stock(stock_keyword, market)
if is_us_stock:
# 美股分析 - 流式
async for chunk in self._handle_us_stock_stream(stock_keyword, message):
yield chunk
else:
# A股分析 - 流式
async for chunk in self._handle_a_stock_stream(stock_keyword, message):
yield chunk
async def _handle_a_stock_stream(self, stock_keyword: str, message: str):
"""流式处理A股分析"""
# 指数映射
index_mapping = {
"上证指数": "000001.SH", "上证": "000001.SH", "大盘": "000001.SH", "沪指": "000001.SH",
"深证成指": "399001.SZ", "深证": "399001.SZ", "深指": "399001.SZ",
"创业板指": "399006.SZ", "创业板": "399006.SZ",
"科创50": "000688.SH", "沪深300": "000300.SH", "中证500": "000905.SH",
"A股": "000001.SH"
}
stock_code = None
stock_name = None
is_index = False
for key, code in index_mapping.items():
if key in stock_keyword or stock_keyword in key:
stock_code = code
stock_name = key if key in stock_keyword else stock_keyword
is_index = True
break
if not is_index:
search_results = tushare_service.search_stock(stock_keyword)
if not search_results:
yield f"抱歉,未找到股票\"{stock_keyword}\"。请确认股票名称或代码是否正确。"
return
stock = search_results[0]
stock_code = stock['symbol']
stock_name = stock['name']
# 获取数据(非流式)
try:
quote_result = await skill_manager.execute_skill("market_data", stock_code=stock_code, data_type="quote")
technical_result = await skill_manager.execute_skill("technical_analysis", stock_code=stock_code, indicators=["ma", "macd", "rsi", "kdj"])
fundamental_result = await skill_manager.execute_skill("fundamental", stock_code=stock_code)
advanced_result = await skill_manager.execute_skill("advanced_data", stock_code=stock_code, data_type="all")
all_data = {
"stock_code": stock_code,
"stock_name": stock_name,
"quote": quote_result.get("data") if quote_result.get("success") else None,
"technical": technical_result.get("data") if technical_result.get("success") else None,
"fundamental": fundamental_result.get("data") if fundamental_result.get("success") else None,
"advanced": advanced_result.get("data") if advanced_result.get("success") else None
}
# 使用LLM流式分析
if self.use_llm:
async for chunk in self._llm_comprehensive_analysis_stream(all_data, message, is_index):
yield chunk
else:
yield self._rule_based_analysis(all_data)
except Exception as e:
logger.error(f"A股分析失败: {e}")
yield f"分析{stock_name}时出错:{str(e)}"
async def _handle_us_stock_stream(self, keyword: str, message: str):
"""流式处理美股分析"""
symbol = self._get_us_stock_symbol(keyword)
logger.info(f"流式处理美股查询: {keyword} -> {symbol}")
try:
result = await skill_manager.execute_skill("us_stock_analysis", symbol=symbol, analysis_type="comprehensive")
if not result.get("success"):
yield f"抱歉,未找到美股 {symbol}。请确认股票代码是否正确。"
return
# 使用LLM流式分析
if self.use_llm:
async for chunk in self._llm_us_stock_analysis_stream(result["data"], message):
yield chunk
else:
yield self._format_us_stock_data(result["data"])
except Exception as e:
logger.error(f"美股查询失败: {e}")
yield f"查询美股 {symbol} 时出错:{str(e)}"
async def _llm_comprehensive_analysis_stream(self, data: Dict[str, Any], user_message: str, is_index: bool = False):
"""使用LLM流式进行综合分析"""
from datetime import datetime
import json
current_time = datetime.now().strftime("%Y-%m-%d %H:%M")
# 获取行情数据的交易日期
quote_date = "未知"
if data.get('quote') and data['quote'].get('trade_date'):
quote_date = data['quote']['trade_date']
# 构建高级数据摘要
advanced_summary = ""
if data.get('advanced'):
advanced_data = data['advanced']
advanced_summary = "\n【高级财务数据】Tushare Pro 5000+积分)\n"
# 财务指标
if advanced_data.get('financial'):
financial = advanced_data['financial']
if financial.get('indicators'):
indicators = financial['indicators'].get('indicators', {})
advanced_summary += f"财务指标(截止:{financial['indicators'].get('end_date', '未知')}\n"
advanced_summary += f" ROE: {indicators.get('roe', 'N/A')}%\n"
advanced_summary += f" ROA: {indicators.get('roa', 'N/A')}%\n"
advanced_summary += f" 毛利率: {indicators.get('gross_margin', 'N/A')}%\n"
advanced_summary += f" 资产负债率: {indicators.get('debt_to_assets', 'N/A')}%\n"
advanced_summary += f" 流动比率: {indicators.get('current_ratio', 'N/A')}\n\n"
# 估值数据
if advanced_data.get('valuation'):
valuation = advanced_data['valuation']
advanced_summary += f"估值指标:\n"
advanced_summary += f" PE(市盈率): {valuation.get('pe', 'N/A')}\n"
advanced_summary += f" PB(市净率): {valuation.get('pb', 'N/A')}\n"
advanced_summary += f" PS(市销率): {valuation.get('ps', 'N/A')}\n"
advanced_summary += f" 总市值: {valuation.get('total_mv', 'N/A')}万元\n"
advanced_summary += f" 流通市值: {valuation.get('circ_mv', 'N/A')}万元\n"
advanced_summary += f" 换手率: {valuation.get('turnover_rate', 'N/A')}%\n\n"
# 资金流向(最近一天)
if advanced_data.get('money_flow') and len(advanced_data['money_flow']) > 0:
latest_flow = advanced_data['money_flow'][0]
advanced_summary += f"资金流向({latest_flow.get('trade_date', '最近')}\n"
advanced_summary += f" 主力净流入: {latest_flow.get('net_mf_amount', 'N/A')}万元\n"
advanced_summary += f" 超大单净流入: {latest_flow.get('buy_elg_amount', 0) - latest_flow.get('sell_elg_amount', 0):.2f}万元\n"
advanced_summary += f" 大单净流入: {latest_flow.get('buy_lg_amount', 0) - latest_flow.get('sell_lg_amount', 0):.2f}万元\n\n"
# 融资融券
if advanced_data.get('margin') and len(advanced_data['margin']) > 0:
latest_margin = advanced_data['margin'][0]
advanced_summary += f"融资融券({latest_margin.get('trade_date', '最近')}\n"
advanced_summary += f" 融资余额: {latest_margin.get('rzye', 'N/A')}\n"
advanced_summary += f" 融券余额: {latest_margin.get('rqye', 'N/A')}\n\n"
else:
advanced_summary = "\n【高级财务数据】\n暂无高级数据\n"
# 构建详细的分析提示
prompt = f"""你是一位专业的股票分析师。请对{data['stock_name']}({data['stock_code']})进行全面分析,用简洁专业但易懂的语言回答。
用户问题:{user_message}
【实时行情数据】
数据来源Tushare Pro API
交易日期:{quote_date}
{json.dumps(data.get('quote'), ensure_ascii=False, indent=2) if data.get('quote') else '数据获取失败'}
【技术指标数据】
数据来源Tushare Pro API基于历史K线数据计算
计算截止日期:{quote_date}
{json.dumps(data.get('technical'), ensure_ascii=False, indent=2) if data.get('technical') else '数据获取失败'}
【基本面数据】
数据来源Tushare Pro API
{json.dumps(data.get('fundamental'), ensure_ascii=False, indent=2) if data.get('fundamental') else '数据获取失败'}
{advanced_summary}
请按以下结构进行分析:
## 一、基本面分析
分段说明公司情况,每个要点独立成段。
## 二、技术面分析(数据截止:{quote_date}
使用清晰的分段结构,每个技术指标独立成段。
## 三、市场情绪分析
分段分析市场情绪和资金流向。
## 四、投资建议
基于分析给出具体的操作建议和点位。
写作要求:
1. 语言简洁专业但易懂
2. 分析客观理性,基于数据
3. 每个分析点独立成段
4. 控制在600-800字
5. 最后声明:"以上分析仅供参考,不构成投资建议。股市有风险,投资需谨慎。"
"""
# 流式调用LLM同步生成器
stream = llm_service.chat_stream(
messages=[{"role": "user", "content": prompt}],
temperature=0.7,
max_tokens=2000
)
for chunk in stream:
yield chunk
async def _llm_us_stock_analysis_stream(self, data: Dict[str, Any], user_message: str):
"""使用LLM流式分析美股"""
from datetime import datetime
current_time = datetime.now().strftime("%Y-%m-%d %H:%M")
symbol = data.get("symbol", "")
name = data.get("name", "")
sector = data.get("sector", "")
industry = data.get("industry", "")
current_price = data.get("current_price", 0)
change = data.get("change", 0)
change_pct = data.get("change_percent", 0)
volume = data.get("volume", 0)
market_cap = data.get("market_cap", 0)
pe_ratio = data.get("pe_ratio", 0)
pb_ratio = data.get("pb_ratio", 0)
dividend_yield = data.get("dividend_yield", 0)
week_52_high = data.get("52_week_high", 0)
week_52_low = data.get("52_week_low", 0)
technical = data.get("technical_indicators", {})
description = data.get("description", "")
market_cap_str = f"${market_cap / 1e9:.2f}B" if market_cap > 1e9 else f"${market_cap / 1e6:.2f}M"
prompt = f"""你是一位专业的美股分析师。请基于以下数据对 {name} ({symbol}) 进行全面分析。
**重要提示:当前日期是 {current_time},请在分析中使用这个日期,不要使用其他日期。**
【基本信息】
股票代码:{symbol}
公司名称:{name}
所属行业:{sector} - {industry}
公司简介:{description[:300] if description else '暂无'}
【实时行情】(数据时间:{current_time}
当前价格:${current_price:.2f}
涨跌额:${change:.2f}
涨跌幅:{change_pct:.2f}%
成交量:{volume:,}
市值:{market_cap_str}
【估值指标】
市盈率(PE){f"{pe_ratio:.2f}" if pe_ratio else '暂无'}
市净率(PB){f"{pb_ratio:.2f}" if pb_ratio else '暂无'}
股息率:{f"{dividend_yield * 100:.2f}%" if dividend_yield else '暂无'}
52周最高${week_52_high:.2f}
52周最低${week_52_low:.2f}
【技术指标】
MA5{f"${technical.get('ma5'):.2f}" if technical.get('ma5') else '计算中'}
MA10{f"${technical.get('ma10'):.2f}" if technical.get('ma10') else '计算中'}
MA20{f"${technical.get('ma20'):.2f}" if technical.get('ma20') else '计算中'}
MA60{f"${technical.get('ma60'):.2f}" if technical.get('ma60') else '计算中'}
RSI{f"{technical.get('rsi'):.2f}" if technical.get('rsi') else '计算中'}
MACD{f"{technical.get('macd'):.4f}" if technical.get('macd') else '计算中'}
用户问题:{user_message}
请提供专业的分析报告,包括:
## 📊 行情概览
简要总结当前股价表现和市场表现2-3句话
## 💼 公司基本面
- 行业地位和竞争优势
- 估值水平分析PE、PB是否合理
- 盈利能力和成长性
## 📈 技术面分析
- 当前趋势判断(基于均线系统)
- 关键支撑位和压力位
- RSI和MACD信号解读
## 💡 投资建议
- 短期操作建议1-2周
- 中期投资价值1-3个月
- 风险提示
写作要求:
1. 语言专业但易懂,避免过度修饰
2. 分析客观理性,基于数据和事实
3. 每个部分独立成段,段落间用空行分隔
4. 控制在500-600字
5. **不要在报告中添加日期标题,直接开始分析内容**
6. 最后声明:"以上分析仅供参考,不构成投资建议。美股投资有风险,请谨慎决策。"
"""
# 流式调用LLM同步生成器
stream = llm_service.chat_stream(
messages=[{"role": "user", "content": prompt}],
temperature=0.7,
max_tokens=2000
)
for chunk in stream:
yield chunk
# 创建全局实例
smart_agent = SmartStockAgent()