stock-ai-agent/backend/app/services/multi_llm_service.py
2026-03-04 12:14:07 +08:00

451 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
多模型LLM服务 - 支持智谱AI和DeepSeek
"""
from typing import Optional, List, Dict, Any
from datetime import datetime, timedelta
from app.config import get_settings
from app.utils.logger import logger
# 智谱AI
try:
from zhipuai import ZhipuAI
ZHIPUAI_AVAILABLE = True
except ImportError:
ZHIPUAI_AVAILABLE = False
logger.warning("zhipuai包未安装")
# DeepSeek (使用OpenAI兼容接口)
try:
from openai import OpenAI
from openai import APIStatusError, APIError
OPENAI_AVAILABLE = True
except ImportError:
OPENAI_AVAILABLE = False
logger.warning("openai包未安装")
class MultiLLMService:
"""多模型LLM服务类"""
# 余额错误通知冷却时间(秒)
BALANCE_ERROR_COOLDOWN = 3600 # 1小时内只通知一次
def __init__(self):
"""初始化多模型LLM服务"""
settings = get_settings()
self.clients = {}
self.current_model = None
self.model_info = {}
# 余额错误通知时间记录
self._balance_error_notified = {} # {provider: last_notified_time}
# 初始化智谱AI
if ZHIPUAI_AVAILABLE and settings.zhipuai_api_key:
try:
api_key = settings.zhipuai_api_key.strip()
if '.' in api_key and len(api_key) > 10:
self.clients['zhipu'] = ZhipuAI(api_key=api_key)
self.model_info['zhipu'] = {
'name': 'GLM-4-Flash',
'model_id': 'glm-4-flash',
'provider': 'zhipu',
'available': True
}
logger.info("智谱AI初始化成功 (使用模型: glm-4-flash)")
except Exception as e:
logger.error(f"智谱AI初始化失败: {e}")
# 初始化DeepSeek
if OPENAI_AVAILABLE and settings.deepseek_api_key:
try:
self.clients['deepseek'] = OpenAI(
api_key=settings.deepseek_api_key,
base_url="https://api.deepseek.com"
)
self.model_info['deepseek'] = {
'name': 'DeepSeek',
'model_id': 'deepseek-chat',
'provider': 'deepseek',
'available': True
}
logger.info("DeepSeek初始化成功")
except Exception as e:
logger.error(f"DeepSeek初始化失败: {e}")
# 设置默认模型(优先使用配置文件中的设置)
preferred_model = getattr(settings, 'smart_agent_model', None)
if preferred_model and preferred_model in self.clients:
self.current_model = preferred_model
logger.info(f"使用配置的模型: {preferred_model}")
elif 'deepseek' in self.clients:
self.current_model = 'deepseek'
elif 'zhipu' in self.clients:
self.current_model = 'zhipu'
if self.current_model:
logger.info(f"当前使用模型: {self.model_info[self.current_model]['name']}")
else:
logger.warning("没有可用的LLM模型")
def get_available_models(self) -> List[Dict[str, Any]]:
"""获取所有可用的模型列表"""
return [info for info in self.model_info.values() if info['available']]
def get_current_model_info(self) -> Optional[Dict[str, Any]]:
"""获取当前使用的模型信息"""
if self.current_model:
return self.model_info[self.current_model]
return None
def switch_model(self, provider: str) -> bool:
"""
切换模型
Args:
provider: 模型提供商 ('zhipu''deepseek')
Returns:
是否切换成功
"""
if provider in self.clients:
self.current_model = provider
logger.info(f"切换到模型: {self.model_info[provider]['name']}")
return True
else:
logger.error(f"模型不可用: {provider}")
return False
def _is_balance_error(self, error: Exception, provider: str) -> bool:
"""
检查错误是否是余额不足错误
Args:
error: 异常对象
provider: LLM提供商
Returns:
是否是余额不足错误
"""
error_str = str(error).lower()
error_type = type(error).__name__
# DeepSeek 余额错误
if provider == 'deepseek':
# APIStatusError: Error code: 402 - {'error': {'message': 'Insufficient Balance'
if '402' in error_str and 'insufficient balance' in error_str:
return True
if 'balance' in error_str and 'insufficient' in error_str:
return True
# 智谱AI 余额错误
elif provider == 'zhipu':
# 常见错误信息
if '余额' in error_str or 'balance' in error_str:
if 'insufficient' in error_str or '不足' in error_str:
return True
if error_type == 'APIError' and '130' in error_str: # 智谱错误码130表示余额不足
return True
return False
async def _notify_balance_error(self, provider: str, error: Exception):
"""
发送余额不足的Telegram通知
Args:
provider: LLM提供商
error: 异常对象
"""
# 检查冷却时间
now = datetime.now()
last_notified = self._balance_error_notified.get(provider)
if last_notified:
time_since_last = (now - last_notified).total_seconds()
if time_since_last < self.BALANCE_ERROR_COOLDOWN:
logger.info(f"{provider} 余额错误通知冷却中,剩余 {int(self.BALANCE_ERROR_COOLDOWN - time_since_last)}")
return
# 发送通知
try:
from app.services.telegram_service import get_telegram_service
telegram = get_telegram_service()
provider_name = {
'zhipu': '智谱AI (GLM-4)',
'deepseek': 'DeepSeek'
}.get(provider, provider)
message = f"""🚨 <b>LLM API 余额不足警告</b>
━━━━━━━━━━━━━━━━━━━━
📊 <b>服务商:</b> {provider_name}
⚠️ <b>错误类型:</b> 余额不足 (Insufficient Balance)
🔍 <b>错误信息:</b> {str(error)[:200]}
━━━━━━━━━━━━━━━━━━━━
<i>请及时充值,否则智能体将无法正常工作</i>"""
await telegram.send_message(message, parse_mode="HTML")
logger.warning(f"已发送 {provider} 余额不足Telegram通知")
# 记录通知时间
self._balance_error_notified[provider] = now
except Exception as e:
logger.error(f"发送余额不足通知失败: {e}")
def chat(
self,
messages: List[Dict[str, str]],
temperature: float = 0.7,
max_tokens: int = 2000,
model_override: Optional[str] = None
) -> Optional[str]:
"""
调用LLM进行对话
Args:
messages: 消息列表
temperature: 温度参数
max_tokens: 最大token数
model_override: 临时覆盖使用的模型
Returns:
LLM响应文本
"""
provider = model_override or self.current_model
if not provider or provider not in self.clients:
logger.error("没有可用的LLM客户端")
return None
try:
client = self.clients[provider]
model_id = self.model_info[provider]['model_id']
logger.info(f"调用LLM: provider={provider}, model={model_id}, messages={len(messages)}")
if provider == 'zhipu':
# 智谱AI调用
# Zhipu对参数更严格temperature范围是0.0-1.0
safe_temperature = max(0.0, min(1.0, temperature))
logger.debug(f"智谱AI请求参数: model={model_id}, temperature={safe_temperature}, max_tokens={max_tokens}")
logger.debug(f"消息内容: {messages[-1]['content'][:200] if messages else 'empty'}...")
response = client.chat.completions.create(
model=model_id,
messages=messages,
temperature=safe_temperature,
max_tokens=max_tokens
)
logger.debug(f"智谱AI原始响应: {response}")
elif provider == 'deepseek':
# DeepSeek调用OpenAI兼容
# DeepSeek对参数更严格确保temperature在有效范围内
safe_temperature = max(0.0, min(2.0, temperature))
response = client.chat.completions.create(
model=model_id,
messages=messages,
temperature=safe_temperature,
max_tokens=max_tokens
)
else:
logger.error(f"未知的模型提供商: {provider}")
return None
# 详细日志记录响应结构
logger.debug(f"响应对象类型: {type(response)}")
if hasattr(response, 'choices') and response.choices:
choice = response.choices[0]
logger.debug(f"Choice类型: {type(choice)}, 索引: {getattr(choice, 'index', 'N/A')}")
message = choice.message
logger.debug(f"Message类型: {type(message)}")
# 智谱AI可能使用不同的字段
content = getattr(message, 'content', None)
if content and content.strip():
logger.info(f"LLM响应成功长度: {len(content)}")
return content
else:
logger.warning(f"LLM响应content为空或空白: content={repr(content)}")
# 尝试从其他字段获取内容智谱AI可能使用不同字段
for attr in ['reasoning_content', 'text', 'result']:
if hasattr(message, attr):
alt_content = getattr(message, attr)
if alt_content and alt_content.strip():
logger.info(f"{attr} 获取内容,长度: {len(alt_content)}")
return alt_content
# 打印完整的message对象用于调试
logger.debug(f"完整Message对象: {message}")
if hasattr(message, '__dict__'):
logger.debug(f"Message属性: {message.__dict__}")
return None
else:
logger.warning(f"LLM响应中没有choices。响应: {response}")
# 检查是否有其他可能的响应格式
if hasattr(response, 'content'):
content = response.content
if content and content.strip():
logger.info(f"从response.content获取内容长度: {len(content)}")
return content
return None
except Exception as e:
logger.error(f"LLM调用失败: {type(e).__name__}: {e}")
import traceback
logger.error(f"详细错误: {traceback.format_exc()}")
# 检查是否是余额错误发送Telegram通知
if self._is_balance_error(e, provider):
import asyncio
try:
# 在新的事件循环中运行(避免嵌套事件循环问题)
loop = asyncio.get_event_loop()
if loop.is_running():
# 如果在异步上下文中,创建任务
asyncio.create_task(self._notify_balance_error(provider, e))
else:
# 如果没有运行的循环,直接运行
asyncio.run(self._notify_balance_error(provider, e))
except Exception as notify_error:
logger.error(f"发送余额通知异常: {notify_error}")
return None
def chat_stream(
self,
messages: List[Dict[str, str]],
temperature: float = 0.7,
max_tokens: int = 2000,
model_override: Optional[str] = None
):
"""
流式调用LLM进行对话
Args:
messages: 消息列表
temperature: 温度参数
max_tokens: 最大token数
model_override: 临时覆盖使用的模型
Yields:
LLM响应的文本片段
"""
provider = model_override or self.current_model
if not provider or provider not in self.clients:
logger.error("没有可用的LLM客户端")
return
try:
client = self.clients[provider]
model_id = self.model_info[provider]['model_id']
logger.info(f"流式调用LLM: provider={provider}, model={model_id}, messages={len(messages)}")
if provider == 'zhipu':
# 智谱AI流式调用
# Zhipu对参数更严格temperature范围是0.0-1.0
safe_temperature = max(0.0, min(1.0, temperature))
response = client.chat.completions.create(
model=model_id,
messages=messages,
temperature=safe_temperature,
max_tokens=max_tokens,
stream=True
)
for chunk in response:
if chunk.choices and chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
elif provider == 'deepseek':
# DeepSeek流式调用OpenAI兼容
# DeepSeek对参数更严格确保temperature在有效范围内
safe_temperature = max(0.0, min(2.0, temperature))
response = client.chat.completions.create(
model=model_id,
messages=messages,
temperature=safe_temperature,
max_tokens=max_tokens,
stream=True
)
for chunk in response:
if chunk.choices and chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
else:
logger.error(f"未知的模型提供商: {provider}")
return
logger.info("LLM流式响应完成")
except Exception as e:
logger.error(f"LLM流式调用失败: {type(e).__name__}: {e}")
import traceback
logger.error(f"详细错误: {traceback.format_exc()}")
# 检查是否是余额错误发送Telegram通知
if self._is_balance_error(e, provider):
import asyncio
try:
# 在新的事件循环中运行(避免嵌套事件循环问题)
loop = asyncio.get_event_loop()
if loop.is_running():
# 如果在异步上下文中,创建任务
asyncio.create_task(self._notify_balance_error(provider, e))
else:
# 如果没有运行的循环,直接运行
asyncio.run(self._notify_balance_error(provider, e))
except Exception as notify_error:
logger.error(f"发送余额通知异常: {notify_error}")
return
def analyze_intent(self, user_message: str) -> Dict[str, Any]:
"""使用LLM分析用户意图"""
if not self.current_model:
return {"type": "unknown", "confidence": 0}
prompt = f"""你是一个股票分析助手的意图识别模块。请分析用户的查询意图。
用户消息:{user_message}
请识别以下意图类型之一:
1. market_data - 查询实时行情、价格
2. technical_analysis - 技术分析、技术指标
3. fundamental - 基本面信息、公司信息
4. visualization - K线图、图表
5. unknown - 无法识别
请以JSON格式返回
{{
"type": "意图类型",
"confidence": 0.0-1.0,
"stock_name": "提取的股票名称(如果有)"
}}
"""
try:
response = self.chat([{"role": "user", "content": prompt}], temperature=0.3)
if response:
import json
result = json.loads(response)
return result
except Exception as e:
logger.error(f"意图分析失败: {e}")
return {"type": "unknown", "confidence": 0}
# 创建全局实例
multi_llm_service = MultiLLMService()