""" 新闻 LLM 分析模块 使用 LLM 分析新闻内容并生成交易建议 """ import json from typing import Dict, Any, List, Optional from datetime import datetime from app.utils.logger import logger from app.news_agent.fetcher import NewsItem from app.config import get_settings from openai import OpenAI class NewsAnalyzer: """新闻 LLM 分析器 (DeepSeek)""" def __init__(self): self.settings = get_settings() self.client = None try: # 使用 DeepSeek API self.client = OpenAI( api_key=self.settings.deepseek_api_key, base_url="https://api.deepseek.com" ) except Exception as e: logger.error(f"LLM 客户端初始化失败: {e}") # 批量分析配置 self.batch_size = 10 # 每次最多分析 10 条新闻(只传标题,可以增加数量) self.max_retries = 2 def _build_analysis_prompt(self, news_item: NewsItem) -> str: """构建单条新闻的分析提示词""" prompt = f"""你是一名专业的金融新闻分析师。请分析以下新闻标题,并以 JSON 格式输出结果。 **新闻标题**: {news_item.title} **新闻来源**: {news_item.source} **新闻分类**: {news_item.category} 请按以下 JSON 格式输出(不要包含其他内容): ```json {{ "market_impact": "high/medium/low", "impact_type": "bullish/bearish/neutral", "sentiment": "positive/negative/neutral", "summary": "简洁的新闻摘要(1句话,不超过50字)", "key_points": ["关键点1", "关键点2", "关键点3"], "trading_advice": "简洁的交易建议(1句话,不超过30字)", "relevant_symbols": ["相关的币种或股票代码"], "confidence": 85 }} ``` **分析要求**: 1. market_impact: 对市场的潜在影响(high/medium/low) - **high**: 对市场或公司有**实质性、深远影响**的事件 * 改变行业格局或公司生存状态 * 监管政策重大变化(批准、禁止、调查) * 系统性风险事件(破产、退市、重大欺诈) - **medium**: 对价格有**短期影响**但不会改变长期趋势的事件 * 财报业绩、管理层变动、一般并购 * 评级调整、业务合作或重组 - **low**: 常规信息,影响有限 * 分析师观点、价格波动、一般评论 **判断原则**: 问自己"这条新闻会改变市场/公司的长期格局吗?" 如果会→high,如果只是短期波动→medium,如果无关紧要→low 2. impact_type: 对价格的影响方向(bullish=利好, bearish=利空, neutral=中性) 3. sentiment: 新闻情绪(positive=正面, negative=负面, neutral=中性) 4. summary: 根据标题推断并总结新闻核心内容 5. key_points: 基于标题推断3-5个关键信息点 6. trading_advice: 给出简明的交易建议 7. relevant_symbols: 根据标题列出相关的交易代码(如 BTC, ETH, NVDA, TSLA 等) 8. confidence: 分析置信度(0-100) 请只输出 JSON,不要包含其他解释。 """ return prompt def _build_batch_analysis_prompt(self, news_items: List[NewsItem]) -> str: """构建批量分析提示词""" news_text = "" for i, item in enumerate(news_items, 1): news_text += f""" --- 新闻 {i} --- 标题: {item.title} 来源: {item.source} 分类: {item.category} --- """ prompt = f"""你是一名专业的金融新闻分析师。请分析以下 {len(news_items)} 条新闻标题,并以 JSON 数组格式输出结果。 {news_text} 请按以下 JSON 格式输出(不要包含其他内容): ```json [ {{ "title": "新闻标题", "market_impact": "high/medium/low", "impact_type": "bullish/bearish/neutral", "sentiment": "positive/negative/neutral", "summary": "简洁的新闻摘要(1句话,不超过50字)", "key_points": ["关键点1", "关键点2"], "trading_advice": "简洁的交易建议(1句话,不超过30字)", "relevant_symbols": ["相关代码"], "confidence": 85 }} ] ``` **market_impact 判断标准**: - **high**: 对市场或公司有**实质性、深远影响**的事件(改变行业格局或公司生存状态) - **medium**: 对价格有**短期影响**但不会改变长期趋势的事件(财报、管理层变动、一般并购等) - **low**: 常规信息,影响有限 **判断原则**: 问自己"这条新闻会改变市场/公司的长期格局吗?" 如果会→high,如果只是短期波动→medium。 请只输出 JSON 数组,不要包含其他解释。 """ return prompt def _parse_llm_response(self, response: str) -> Optional[Dict[str, Any]]: """解析 LLM 响应""" try: # 尝试提取 JSON response = response.strip() # 移除可能的 markdown 代码块标记 if response.startswith("```json"): response = response[7:] if response.startswith("```"): response = response[3:] if response.endswith("```"): response = response[:-3] response = response.strip() # 解析 JSON return json.loads(response) except json.JSONDecodeError as e: # 尝试修复截断的 JSON logger.warning(f"JSON 解析失败,尝试修复: {e}") try: # 查找最后一个完整的对象 response = response.strip() # 如果是数组,找到最后一个完整的对象 if response.startswith('['): # 找到每个完整对象的结束位置 brace_count = 0 last_complete = 0 for i, char in enumerate(response): if char == '{': brace_count += 1 elif char == '}': brace_count -= 1 if brace_count == 0: last_complete = i + 1 break if last_complete > 0: # 提取完整的数组 fixed = response[:last_complete] if not fixed.endswith(']'): fixed += ']' if not fixed.endswith('}'): fixed += '}' return json.loads(fixed) except: pass logger.error(f"JSON 解析失败: {e}, 响应: {response[:500]}") return None def _parse_llm_array_response(self, response: str) -> Optional[List[Dict[str, Any]]]: """解析 LLM 数组响应""" try: # 尝试提取 JSON response = response.strip() # 移除可能的 markdown 代码块标记 if response.startswith("```json"): response = response[7:] if response.startswith("```"): response = response[3:] if response.endswith("```"): response = response[:-3] response = response.strip() # 解析 JSON 数组 result = json.loads(response) if isinstance(result, list): return result elif isinstance(result, dict) and 'title' in result: # 如果返回单个对象,包装成数组 return [result] return None except json.JSONDecodeError as e: # 尝试修复截断的 JSON 数组 logger.warning(f"JSON 数组解析失败,尝试修复: {e}") try: response = response.strip() if response.startswith('['): # 找到每个完整对象 objects = [] brace_count = 0 obj_start = -1 for i, char in enumerate(response): if char == '{': if obj_start == -1: obj_start = i brace_count += 1 elif char == '}': brace_count -= 1 if brace_count == 0 and obj_start >= 0: # 提取完整对象 obj_str = response[obj_start:i + 1] try: obj = json.loads(obj_str) if isinstance(obj, dict) and 'title' in obj: objects.append(obj) except: pass obj_start = -1 if objects: return objects except: pass logger.error(f"JSON 数组解析失败: {e}, 响应: {response[:500]}") return None def analyze_single(self, news_item: NewsItem) -> Optional[Dict[str, Any]]: """ 分析单条新闻 Args: news_item: 新闻项 Returns: 分析结果字典或 None """ if not self.client: logger.warning("LLM 客户端未初始化") return None try: prompt = self._build_analysis_prompt(news_item) for attempt in range(self.max_retries): try: response = self.client.chat.completions.create( model="deepseek-chat", messages=[ {"role": "system", "content": "你是一名专业的金融新闻分析师,擅长分析新闻标题对市场的影响。"}, {"role": "user", "content": prompt} ], temperature=0.3, max_tokens=1000 # 只传标题,减少输出token ) result = self._parse_llm_response(response.choices[0].message.content) if result: logger.info(f"新闻分析成功: {news_item.title[:50]}... -> {result.get('market_impact')}") return result except Exception as e: logger.warning(f"分析失败 (尝试 {attempt + 1}/{self.max_retries}): {e}") logger.error(f"新闻分析失败,已达最大重试次数: {news_item.title[:50]}") return None except Exception as e: logger.error(f"分析新闻时出错: {e}") return None def analyze_batch(self, news_items: List[NewsItem]) -> List[Optional[Dict[str, Any]]]: """ 批量分析新闻 Args: news_items: 新闻项列表 Returns: 分析结果列表(与输入顺序一致) """ if not self.client: logger.warning("LLM 客户端未初始化") return [None] * len(news_items) results = [] # 分批处理 for i in range(0, len(news_items), self.batch_size): batch = news_items[i:i + self.batch_size] try: prompt = self._build_batch_analysis_prompt(batch) response = self.client.chat.completions.create( model="deepseek-chat", messages=[ {"role": "system", "content": "你是一名专业的金融新闻分析师,擅长分析新闻标题对市场的影响。"}, {"role": "user", "content": prompt} ], temperature=0.3, max_tokens=2000 # 批量分析需要更多 token ) batch_results = self._parse_llm_array_response(response.choices[0].message.content) if batch_results: # 按标题匹配结果 title_to_result = {r.get('title'): r for r in batch_results if r and isinstance(r, dict)} for item in batch: result = title_to_result.get(item.title) results.append(result) if result: logger.info(f"新闻分析成功: {item.title[:50]}... -> {result.get('market_impact')}") else: results.extend([None] * len(batch)) except Exception as e: logger.error(f"批量分析失败: {e}") results.extend([None] * len(batch)) return results def calculate_priority(self, analysis: Dict[str, Any], quality_score: float = 0.5) -> float: """ 根据分析结果计算优先级 Args: analysis: LLM 分析结果 quality_score: 质量分数 Returns: 优先级分数 """ score = 0.0 # 市场影响 impact_weights = {'high': 50, 'medium': 30, 'low': 10} score += impact_weights.get(analysis.get('market_impact', 'low'), 10) # 方向性(利空利好比中性重要) if analysis.get('impact_type') in ['bullish', 'bearish']: score += 15 # 置信度 score += (analysis.get('confidence', 50) / 100) * 10 # 质量分数 score += quality_score * 20 # 是否有相关代码 if analysis.get('relevant_symbols'): score += 5 return score class NewsAnalyzerSimple: """简化版新闻分析器(仅关键词规则,不使用 LLM)""" def __init__(self): pass def analyze_single(self, news_item: NewsItem) -> Dict[str, Any]: """ 基于规则分析新闻 Args: news_item: 新闻项 Returns: 分析结果字典 """ # 使用已有的影响评分 impact_score = getattr(news_item, 'impact_score', 0.0) # 根据 impact_score 确定市场影响 if impact_score >= 1.0: market_impact = 'high' elif impact_score >= 0.7: market_impact = 'medium' else: market_impact = 'low' # 检查关键词确定方向 text = f"{news_item.title} {news_item.content}".lower() bullish_keywords = ['上涨', '增长', '突破', '新高', 'bullish', 'surge', 'rally', 'gain', '批准', '合作'] bearish_keywords = ['下跌', '暴跌', '崩盘', 'ban', 'bearish', 'crash', 'plunge', 'fall', '禁令', '风险'] bullish_count = sum(1 for k in bullish_keywords if k in text) bearish_count = sum(1 for k in bearish_keywords if k in text) if bullish_count > bearish_count: impact_type = 'bullish' sentiment = 'positive' elif bearish_count > bullish_count: impact_type = 'bearish' sentiment = 'negative' else: impact_type = 'neutral' sentiment = 'neutral' # 获取相关代码 relevant_symbols = list(set(getattr(news_item, 'relevant_symbols', []))) return { 'market_impact': market_impact, 'impact_type': impact_type, 'sentiment': sentiment, 'summary': news_item.title, 'key_points': [news_item.title[:100]], 'trading_advice': getattr(news_item, 'impact_reason', '关注市场动态'), 'relevant_symbols': relevant_symbols, 'confidence': 70, 'analyzed_by': 'rules' }