diff --git a/backend/app/crypto_agent/llm_signal_analyzer.py b/backend/app/crypto_agent/llm_signal_analyzer.py index ee017b2..1e4e3be 100644 --- a/backend/app/crypto_agent/llm_signal_analyzer.py +++ b/backend/app/crypto_agent/llm_signal_analyzer.py @@ -480,15 +480,64 @@ class LLMSignalAnalyzer: - 理想的风险收益比应该在 1:3 以上 - 即:潜在风险 3%,潜在收益 9% 以上 +## 九、基本面分析(重要补充) +**对于股票交易,基本面分析是重要的参考维度:** + +### 估值指标分析 +- **PE(市盈率)**: + - PE < 15:估值偏低,价值投资机会 + - PE 15-25:估值合理 + - PE 25-40:估值偏高 + - PE > 40:估值过高,风险较大 +- **PB(市净率)**:PB < 1.5 通常表示被低估 +- **PEG(市盈率相对盈利增长比率)**:PEG < 1 表示被低估 + +### 盈利能力分析 +- **ROE(净资产收益率)**: + - ROE > 20%:优秀公司,盈利能力强 + - ROE 15-20%:良好 + - ROE 10-15%:一般 + - ROE < 10%:盈利能力较弱 +- **净利率**:净利率 > 20% 表示盈利质量高 +- **毛利率**:毛利率 > 40% 表示有竞争优势 + +### 成长性分析 +- **营收增长率**: + - > 30%:高成长 + - 20-30%:稳定成长 + - 10-20%:一般成长 + - < 10%:成长性不足 +- **盈利增长率**:与营收增长同步更健康 + +### 财务健康分析 +- **债务股本比**: + - < 1:财务健康 + - 1-2:可控范围 + - > 2:风险较高 +- **流动比率**:> 2 表示偿债能力强 + +### 基本面与技术面结合 +1. **基本面优秀 + 技术面突破** = 高质量做多机会(可提高置信度) +2. **基本面差 + 技术面破位** = 高质量做空机会(可提高置信度) +3. **基本面优秀 + 技术面回调** = 低吸机会(中线/长线) +4. **基本面差 + 技术面上涨** = 谨慎(可能是诱多) + +### 基本面评分参考 +- **80分以上(A级)**:基本面优秀,技术信号确认时可提高置信度 +- **60-80分(B级)**:基本面良好,可作为参考 +- **40-60分(C级)**:基本面一般,主要依赖技术分析 +- **40分以下(D级)**:基本面较差,降低信号置信度 + ## 重要原则 1. **量价优先** - 任何信号都必须有量能配合才可靠 2. **精选机会** - 股票不需要频繁交易,等待高质量信号 3. **多周期确认** - 日线决定方向,小周期决定入场 4. **结构止损** - 止损必须基于关键支撑/阻力位(前低前高、均线) 5. **合理止盈** - 根据交易周期设置合理的止盈目标 -6. **reason 字段必须包含量价分析**(如"放量突破+RSI=45,量比1.8确认有效") -7. **entry_type 必须明确**:信号已触发用 market,等待更好价位用 limit -8. **position_size 必须明确**:根据信号质量给出 heavy/medium/light""" +6. **基本面参考** - 结合基本面评分和技术面综合判断,提高信号质量 +7. **reason 字段必须包含量价分析**(如"放量突破+RSI=45,量比1.8确认有效") +8. **entry_type 必须明确**:信号已触发用 market,等待更好价位用 limit +9. **position_size 必须明确**:根据信号质量给出 heavy/medium/light""" # 兼容旧代码,使用加密货币提示词作为默认值 SYSTEM_PROMPT = CRYPTO_SYSTEM_PROMPT @@ -534,7 +583,9 @@ class LLMSignalAnalyzer: async def analyze(self, symbol: str, data: Dict[str, pd.DataFrame], symbols: List[str] = None, - position_info: Dict[str, Any] = None) -> Dict[str, Any]: + position_info: Dict[str, Any] = None, + fundamental_data: Dict[str, Any] = None, + fundamental_summary: str = "") -> Dict[str, Any]: """ 使用 LLM 分析市场数据 @@ -547,6 +598,8 @@ class LLMSignalAnalyzer: - total_position_value: 总持仓价值 - current_leverage: 当前杠杆倍数 - positions: 各交易对持仓列表 + fundamental_data: 基本面数据(仅股票) + fundamental_summary: 基本面摘要文本(仅股票) Returns: 分析结果 @@ -576,7 +629,8 @@ class LLMSignalAnalyzer: system_prompt = self.CRYPTO_SYSTEM_PROMPT # 构建数据提示 - data_prompt = self._build_data_prompt(symbol, data, news_text, position_info, futures_data) + data_prompt = self._build_data_prompt(symbol, data, news_text, position_info, futures_data, + fundamental_data, fundamental_summary) # 调用 LLM(使用异步方法避免阻塞事件循环) response = await llm_service.achat([ @@ -681,9 +735,77 @@ class LLMSignalAnalyzer: return "\n".join(lines) + def _format_fundamental_data(self, fundamental_data: Dict[str, Any]) -> str: + """格式化基本面数据供 LLM 参考""" + lines = [] + + # 基本信息 + company_name = fundamental_data.get('company_name', 'N/A') + sector = fundamental_data.get('sector', 'N/A') + industry = fundamental_data.get('industry', 'N/A') + market_cap = fundamental_data.get('market_cap', 0) + + lines.append(f"**公司**: {company_name}") + lines.append(f"**行业**: {sector} / {industry}") + if market_cap: + lines.append(f"**市值**: ${market_cap:,.0f}") + + # 基本面评分 + score_data = fundamental_data.get('score', {}) + total_score = score_data.get('total', 0) + rating = score_data.get('rating', 'N/A') + if total_score > 0: + lines.append(f"**基本面评分**: {total_score:.0f}/100 ({rating}级)") + + # 估值指标 + valuation = fundamental_data.get('valuation', {}) + if valuation.get('pe_ratio'): + pe = valuation['pe_ratio'] + pb = valuation.get('pb_ratio', 'N/A') + ps = valuation.get('ps_ratio', 'N/A') + lines.append(f"**估值**: PE={pe:.2f} | PB={pb} | PS={ps}") + + # 盈利能力 + profitability = fundamental_data.get('profitability', {}) + if profitability.get('return_on_equity'): + roe = profitability['return_on_equity'] + profit_margin = profitability.get('profit_margin') + gross_margin = profitability.get('gross_margin') + pm_str = f"{profit_margin:.1f}" if profit_margin is not None else "N/A" + gm_str = f"{gross_margin:.1f}" if gross_margin is not None else "N/A" + lines.append(f"**盈利**: ROE={roe:.2f}% | 净利率={pm_str}% | 毛利率={gm_str}%") + + # 成长性 + growth = fundamental_data.get('growth', {}) + revenue_growth = growth.get('revenue_growth') + earnings_growth = growth.get('earnings_growth') + if revenue_growth is not None or earnings_growth is not None: + rg_str = f"{revenue_growth:.1f}" if revenue_growth is not None else "N/A" + eg_str = f"{earnings_growth:.1f}" if earnings_growth is not None else "N/A" + lines.append(f"**成长**: 营收增长={rg_str}% | 盈利增长={eg_str}%") + + # 财务健康 + financial = fundamental_data.get('financial_health', {}) + if financial.get('debt_to_equity'): + debt_to_equity = financial['debt_to_equity'] + current_ratio = financial.get('current_ratio') + cr_str = f"{current_ratio:.2f}" if current_ratio is not None else "N/A" + lines.append(f"**财务**: 债务股本比={debt_to_equity:.2f} | 流动比率={cr_str}") + + # 分析师建议 + analyst = fundamental_data.get('analyst', {}) + if analyst.get('target_price'): + target_price = analyst['target_price'] + recommendation = analyst.get('recommendation', 'N/A') + lines.append(f"**分析师**: 目标价=${target_price:.2f} | 建议={recommendation}") + + return "\n".join(lines) + def _build_data_prompt(self, symbol: str, data: Dict[str, pd.DataFrame], news_text: str = "", position_info: Dict[str, Any] = None, - futures_data: Dict[str, Any] = None) -> str: + futures_data: Dict[str, Any] = None, + fundamental_data: Dict[str, Any] = None, + fundamental_summary: str = "") -> str: """构建数据提示词""" parts = [f"# {symbol} 市场数据分析\n"] parts.append(f"分析时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") @@ -694,6 +816,14 @@ class LLMSignalAnalyzer: current_price = float(data['5m'].iloc[-1]['close']) parts.append(f"**当前价格**: ${current_price:,.2f}\n") + # === 新增:基本面数据(仅股票) === + if fundamental_data and self.agent_type == 'stock': + parts.append("\n## 基本面分析") + if fundamental_summary: + parts.append(fundamental_summary) + else: + parts.append(self._format_fundamental_data(fundamental_data)) + # === 新增:合约市场数据 === if futures_data and self.agent_type == 'crypto': parts.append(self.binance_service.format_futures_data_for_llm(symbol, futures_data)) diff --git a/backend/app/services/fundamental_service.py b/backend/app/services/fundamental_service.py new file mode 100644 index 0000000..6d2b662 --- /dev/null +++ b/backend/app/services/fundamental_service.py @@ -0,0 +1,520 @@ +""" +基本面因子数据服务 +获取美股和港股的基本面数据,包括估值、盈利能力、成长性等指标 +""" +from typing import Dict, Any, Optional, List +from datetime import datetime +import pandas as pd + +try: + import yfinance as yf + YFINANCE_AVAILABLE = True +except ImportError: + YFINANCE_AVAILABLE = False + +from app.utils.logger import logger + + +class FundamentalService: + """基本面因子数据服务""" + + def __init__(self): + """初始化服务""" + if not YFINANCE_AVAILABLE: + logger.warning("yfinance 未安装,基本面数据功能将不可用") + return + + self._cache = {} # 数据缓存 + self._cache_time = {} # 缓存时间 + self._cache_ttl = 3600 # 缓存有效期1小时 + + logger.info("基本面数据服务初始化成功") + + def get_fundamental_data(self, symbol: str) -> Optional[Dict[str, Any]]: + """ + 获取股票的基本面数据 + + Args: + symbol: 股票代码,如 'AAPL', '0700.HK' + + Returns: + 基本面数据字典,包含估值、盈利、成长等指标 + """ + if not YFINANCE_AVAILABLE: + return None + + try: + ticker = yf.Ticker(symbol) + + # 获取股票信息 + info = ticker.info + + if not info: + logger.warning(f"无法获取 {symbol} 的基本面数据") + return None + + # 提取关键指标 + fundamental_data = { + 'symbol': symbol, + 'timestamp': datetime.now().isoformat(), + + # 基本信息 + 'company_name': info.get('longName', info.get('shortName', 'N/A')), + 'sector': info.get('sector', 'N/A'), + 'industry': info.get('industry', 'N/A'), + 'market_cap': info.get('marketCap'), + 'shares_outstanding': info.get('sharesOutstanding'), + + # 估值指标 + 'valuation': self._extract_valuation_metrics(info), + + # 盈利能力 + 'profitability': self._extract_profitability_metrics(info), + + # 成长性 + 'growth': self._extract_growth_metrics(info), + + # 财务健康 + 'financial_health': self._extract_financial_health_metrics(info), + + # 股票回报 + 'returns': self._extract_return_metrics(info), + + # 分析师建议 + 'analyst': self._extract_analyst_metrics(info), + } + + # 计算综合评分 + fundamental_data['score'] = self._calculate_fundamental_score(fundamental_data) + + # 输出基本面关键指标 + score = fundamental_data.get('score', {}) + logger.info(f"✓ {symbol} 基本面数据获取成功") + logger.info(f" 【公司】{fundamental_data.get('company_name', 'N/A')} | {fundamental_data.get('sector', 'N/A')}") + logger.info(f" 【评分】总分: {score.get('total', 0):.0f}/100 ({score.get('rating', 'N/A')}级) | " + f"估值:{score.get('valuation', 0)} 盈利:{score.get('profitability', 0)} " + f"成长:{score.get('growth', 0)} 财务:{score.get('financial_health', 0)}") + + # 估值指标 + val = fundamental_data.get('valuation', {}) + if val.get('pe_ratio'): + pe = val['pe_ratio'] + pb = val.get('pb_ratio') + ps = val.get('ps_ratio') + peg = val.get('peg_ratio') + pb_str = f"{pb:.2f}" if pb is not None else "N/A" + ps_str = f"{ps:.2f}" if ps is not None else "N/A" + peg_str = f"{peg:.2f}" if peg is not None else "N/A" + logger.info(f" 【估值】PE:{pe:.2f} | PB:{pb_str} | PS:{ps_str} | PEG:{peg_str}") + + # 盈利能力 + prof = fundamental_data.get('profitability', {}) + if prof.get('return_on_equity'): + roe = prof['return_on_equity'] + pm = prof.get('profit_margin') + gm = prof.get('gross_margin') + pm_str = f"{pm:.1f}" if pm is not None else "N/A" + gm_str = f"{gm:.1f}" if gm is not None else "N/A" + logger.info(f" 【盈利】ROE:{roe:.2f}% | 净利率:{pm_str}% | 毛利率:{gm_str}%") + + # 成长性 + growth = fundamental_data.get('growth', {}) + rg = growth.get('revenue_growth') + eg = growth.get('earnings_growth') + if rg is not None or eg is not None: + rg_str = f"{rg:.1f}" if rg is not None else "N/A" + eg_str = f"{eg:.1f}" if eg is not None else "N/A" + logger.info(f" 【成长】营收增长:{rg_str}% | 盈利增长:{eg_str}%") + + # 财务健康 + fin = fundamental_data.get('financial_health', {}) + if fin.get('debt_to_equity'): + de = fin['debt_to_equity'] + cr = fin.get('current_ratio') + cr_str = f"{cr:.2f}" if cr is not None else "N/A" + logger.info(f" 【财务】债务股本比:{de:.2f} | 流动比率:{cr_str}") + + # 分析师建议 + analyst = fundamental_data.get('analyst', {}) + tp = analyst.get('target_price') + if tp: + rec = analyst.get('recommendation', 'N/A') + logger.info(f" 【分析师】目标价:${tp:.2f} | 评级:{rec}") + + return fundamental_data + + except Exception as e: + logger.error(f"获取 {symbol} 基本面数据失败: {e}") + return None + + def _extract_valuation_metrics(self, info: Dict) -> Dict[str, Any]: + """提取估值指标""" + return { + 'pe_ratio': info.get('trailingPE'), # 市盈率 + 'forward_pe': info.get('forwardPE'), # 远期市盈率 + 'peg_ratio': info.get('pegRatio'), # PEG + 'pb_ratio': info.get('priceToBook'), # 市净率 + 'ps_ratio': info.get('priceToSalesTrailing12M'), # 市销率 + 'ev_to_ebitda': info.get('enterpriseToEbitda'), # EV/EBITDA + 'enterprise_value': info.get('enterpriseValue'), # 企业价值 + } + + def _extract_profitability_metrics(self, info: Dict) -> Dict[str, Any]: + """提取盈利能力指标""" + return { + 'eps': info.get('trailingEps'), # 每股收益 + 'forward_eps': info.get('forwardEps'), # 预期每股收益 + 'revenue': info.get('totalRevenue'), # 总收入 + 'net_income': info.get('netIncomeToCommon'), # 净收入 + 'profit_margin': info.get('profitMargins'), # 利润率 + 'operating_margin': info.get('operatingMargins'), # 营业利润率 + 'gross_margin': info.get('grossMargins'), # 毛利率 + 'ebitda': info.get('ebitda'), # EBITDA + 'ebitda_margins': info.get('ebitdaMargins'), # EBITDA利润率 + } + + def _extract_growth_metrics(self, info: Dict) -> Dict[str, Any]: + """提取成长性指标""" + return { + 'revenue_growth': info.get('revenueGrowth'), # 营收增长率 + 'earnings_growth': info.get('earningsGrowth'), # 盈利增长 + 'earnings_quarterly_growth': info.get('earningsQuarterlyGrowth'), # 季度盈利增长 + 'revenue_quarterly_growth': info.get('revenueQuarterlyGrowth'), # 季度营收增长 + } + + def _extract_financial_health_metrics(self, info: Dict) -> Dict[str, Any]: + """提取财务健康指标""" + return { + 'debt_to_equity': info.get('debtToEquity'), # 债务股本比 + 'current_ratio': info.get('currentRatio'), # 流动比率 + 'quick_ratio': info.get('quickRatio'), # 速动比率 + 'total_cash': info.get('totalCash'), # 总现金 + 'total_debt': info.get('totalDebt'), # 总债务 + 'operating_cashflow': info.get('operatingCashflow'), # 经营现金流 + 'free_cashflow': info.get('freeCashflow'), # 自由现金流 + } + + def _extract_return_metrics(self, info: Dict) -> Dict[str, Any]: + """提取股票回报指标""" + return { + 'dividend_rate': info.get('dividendRate'), # 股息率 + 'dividend_yield': info.get('dividendYield'), # 股息收益率 + 'payout_ratio': info.get('payoutRatio'), # 派息比率 + 'five_year_avg_dividend_yield': info.get('fiveYearAvgDividendYield'), # 5年平均股息率 + 'return_on_equity': info.get('returnOnEquity'), # ROE + 'return_on_assets': info.get('returnOnAssets'), # ROA + } + + def _extract_analyst_metrics(self, info: Dict) -> Dict[str, Any]: + """提取分析师建议""" + return { + 'target_price': info.get('targetMeanPrice'), # 目标价 + 'target_high': info.get('targetHighPrice'), # 目标价上限 + 'target_low': info.get('targetLowPrice'), # 目标价下限 + 'recommendation': info.get('recommendationKey'), # 分析师建议 + 'number_of_analysts': info.get('numberOfAnalystOpinions'), # 分析师数量 + } + + def _calculate_fundamental_score(self, data: Dict[str, Any]) -> Dict[str, Any]: + """ + 计算基本面综合评分(0-100分) + + 评分维度: + 1. 估值合理性 (0-25分) + 2. 盈利能力 (0-25分) + 3. 成长性 (0-25分) + 4. 财务健康 (0-25分) + """ + scores = { + 'valuation': 0, + 'profitability': 0, + 'growth': 0, + 'financial_health': 0, + 'total': 0 + } + + try: + # 1. 估值评分 (0-25分) + valuation = data.get('valuation', {}) + if valuation.get('pe_ratio'): + pe = valuation['pe_ratio'] + # PE < 15: 优秀,15-25: 良好,25-40: 一般,>40: 偏高 + if pe < 15: + scores['valuation'] = 25 + elif pe < 25: + scores['valuation'] = 20 + elif pe < 40: + scores['valuation'] = 10 + else: + scores['valuation'] = 5 + + # 2. 盈利能力评分 (0-25分) + profitability = data.get('profitability', {}) + roe = profitability.get('return_on_equity') + profit_margin = profitability.get('profit_margin') + + # 处理 None 值 + if roe is None: + roe = 0 + if profit_margin is None: + profit_margin = 0 + + if roe > 0: + # ROE > 20%: 优秀,15-20%: 良好,10-15%: 一般,< 10%: 较差 + if roe > 20: + scores['profitability'] += 15 + elif roe > 15: + scores['profitability'] += 12 + elif roe > 10: + scores['profitability'] += 8 + else: + scores['profitability'] += 4 + + if profit_margin > 0: + # 净利率 > 20%: 优秀,10-20%: 良好,5-10%: 一般 + if profit_margin > 20: + scores['profitability'] += 10 + elif profit_margin > 10: + scores['profitability'] += 7 + else: + scores['profitability'] += 4 + + # 3. 成长性评分 (0-25分) + growth = data.get('growth', {}) + revenue_growth = growth.get('revenue_growth') + earnings_growth = growth.get('earnings_growth') + + # 处理 None 值 + if revenue_growth is None: + revenue_growth = 0 + if earnings_growth is None: + earnings_growth = 0 + + if revenue_growth > 0: + # 营收增长 > 30%: 优秀,20-30%: 良好,10-20%: 一般,< 10%: 较差 + if revenue_growth > 30: + scores['growth'] += 12 + elif revenue_growth > 20: + scores['growth'] += 10 + elif revenue_growth > 10: + scores['growth'] += 6 + else: + scores['growth'] += 3 + + if earnings_growth > 0: + # 盈利增长 > 30%: 优秀,20-30%: 良好,10-20%: 一般 + if earnings_growth > 30: + scores['growth'] += 13 + elif earnings_growth > 20: + scores['growth'] += 10 + elif earnings_growth > 10: + scores['growth'] += 6 + else: + scores['growth'] += 3 + + # 4. 财务健康评分 (0-25分) + financial = data.get('financial_health', {}) + debt_to_equity = financial.get('debt_to_equity') + current_ratio = financial.get('current_ratio') + + # 处理 None 值 + if debt_to_equity is None: + debt_to_equity = 0 + if current_ratio is None: + current_ratio = 0 + + # 债务股本比 < 1: 优秀,1-2: 良好,2-3: 一般,> 3: 风险高 + if debt_to_equity < 1: + scores['financial_health'] += 12 + elif debt_to_equity < 2: + scores['financial_health'] += 10 + elif debt_to_equity < 3: + scores['financial_health'] += 5 + else: + scores['financial_health'] += 2 + + # 流动比率 > 2: 优秀,1.5-2: 良好,1-1.5: 一般,< 1: 风险 + if current_ratio > 2: + scores['financial_health'] += 13 + elif current_ratio > 1.5: + scores['financial_health'] += 10 + elif current_ratio > 1: + scores['financial_health'] += 5 + else: + scores['financial_health'] += 0 + + # 现金流评分 + fc = financial.get('free_cashflow') + if fc is not None and fc > 0: + scores['financial_health'] += 0 # 已在盈利能力中考虑 + + # 计算总分 + scores['total'] = sum([scores['valuation'], scores['profitability'], + scores['growth'], scores['financial_health']]) + + # 添加评级 + if scores['total'] >= 80: + scores['rating'] = 'A' + elif scores['total'] >= 60: + scores['rating'] = 'B' + elif scores['total'] >= 40: + scores['rating'] = 'C' + else: + scores['rating'] = 'D' + + except Exception as e: + logger.error(f"计算基本面评分失败: {e}") + + return scores + + def get_fundamental_summary(self, symbol: str, data: Dict[str, Any] = None) -> str: + """ + 生成基本面数据摘要文本,用于 LLM 分析 + + Args: + symbol: 股票代码 + data: 可选,已获取的基本面数据。如果为None,则自动获取 + + Returns: + 基本面摘要文本 + """ + if data is None: + data = self.get_fundamental_data(symbol) + if not data: + return f"{symbol}: 暂无基本面数据" + + summary_parts = [] + + # 基本信息 + summary_parts.append(f"【公司信息】{data.get('company_name', 'N/A')} | " + f"行业: {data.get('sector', 'N/A')}") + + # 估值情况 + val = data.get('valuation', {}) + if val.get('pe_ratio'): + summary_parts.append(f"【估值】PE: {val['pe_ratio']:.2f} | " + f"PB: {val.get('pb_ratio', 'N/A')} | " + f"PS: {val.get('ps_ratio', 'N/A')}") + + # 盈利能力 + prof = data.get('profitability', {}) + if prof.get('return_on_equity'): + pm = prof.get('profit_margin') + gm = prof.get('gross_margin') + pm_str = f"{pm:.1f}" if pm is not None else "N/A" + gm_str = f"{gm:.1f}" if gm is not None else "N/A" + summary_parts.append(f"【盈利】ROE: {prof['return_on_equity']:.2f}% | " + f"净利率: {pm_str}% | " + f"毛利率: {gm_str}%") + + # 成长性 + growth = data.get('growth', {}) + rg = growth.get('revenue_growth') + eg = growth.get('earnings_growth') + if rg is not None or eg is not None: + rg_str = f"{rg:.1f}" if rg is not None else "N/A" + eg_str = f"{eg:.1f}" if eg is not None else "N/A" + summary_parts.append(f"【成长】营收增长: {rg_str}% | " + f"盈利增长: {eg_str}%") + + # 财务健康 + fin = data.get('financial_health', {}) + if fin.get('debt_to_equity'): + cr = fin.get('current_ratio') + cr_str = f"{cr:.2f}" if cr is not None else "N/A" + summary_parts.append(f"【财务】债务股本比: {fin['debt_to_equity']:.2f} | " + f"流动比率: {cr_str}") + + # 分析师建议 + analyst = data.get('analyst', {}) + if analyst.get('target_price'): + summary_parts.append(f"【分析师建议】目标价: ${analyst['target_price']:.2f} | " + f"评级: {analyst.get('recommendation', 'N/A')}") + + # 基本面评分 + score = data.get('score', {}) + summary_parts.append(f"【基本面评分】{score.get('total', 0):.0f}/100 ({score.get('rating', 'N/A')}级)") + + return "\n".join(summary_parts) + + def batch_get_fundamentals(self, symbols: List[str]) -> Dict[str, Dict[str, Any]]: + """ + 批量获取多只股票的基本面数据 + + Args: + symbols: 股票代码列表 + + Returns: + 股票代码到基本面数据的映射 + """ + results = {} + for symbol in symbols: + data = self.get_fundamental_data(symbol) + if data: + results[symbol] = data + + logger.info(f"批量获取基本面数据完成: {len(results)}/{len(symbols)} 只股票") + return results + + def compare_stocks(self, symbols: List[str]) -> Dict[str, Any]: + """ + 比较多只股票的基本面指标 + + Args: + symbols: 股票代码列表 + + Returns: + 比较结果 + """ + fundamentals = self.batch_get_fundamentals(symbols) + + comparison = { + 'symbols': symbols, + 'metrics': {} + } + + # 提取可比较的指标 + metrics_to_compare = [ + ('valuation', ['pe_ratio', 'pb_ratio']), + ('profitability', ['return_on_equity', 'profit_margin']), + ('growth', ['revenue_growth', 'earnings_growth']), + ('financial_health', ['debt_to_equity', 'current_ratio']), + ] + + for category, metric_names in metrics_to_compare: + comparison['metrics'][category] = {} + for metric in metric_names: + values = {} + for symbol in symbols: + if symbol in fundamentals: + category_data = fundamentals[symbol].get(category, {}) + value = category_data.get(metric) + if value is not None: + values[symbol] = value + + if values: + comparison['metrics'][category][metric] = values + + # 计算排名 + comparison['rankings'] = {} + if 'valuation' in comparison['metrics']: + pe_ratios = {s: v.get('valuation', {}).get('pe_ratio') + for s, v in fundamentals.items() if v.get('valuation', {}).get('pe_ratio')} + if pe_ratios: + # PE 越低越好 + sorted_pe = sorted(pe_ratios.items(), key=lambda x: x[1]) + comparison['rankings']['pe_low_to_high'] = [s[0] for s in sorted_pe] + + return comparison + + +# 全局单例 +_fundamental_service: Optional[FundamentalService] = None + + +def get_fundamental_service() -> FundamentalService: + """获取基本面数据服务单例""" + global _fundamental_service + if _fundamental_service is None: + _fundamental_service = FundamentalService() + return _fundamental_service diff --git a/backend/app/stock_agent/stock_agent.py b/backend/app/stock_agent/stock_agent.py index 4a84fa2..e3ec701 100644 --- a/backend/app/stock_agent/stock_agent.py +++ b/backend/app/stock_agent/stock_agent.py @@ -13,6 +13,7 @@ from app.services.yfinance_service import get_yfinance_service from app.services.feishu_service import get_feishu_service from app.services.telegram_service import get_telegram_service from app.services.signal_database_service import get_signal_db_service +from app.services.fundamental_service import get_fundamental_service from app.crypto_agent.llm_signal_analyzer import LLMSignalAnalyzer @@ -87,6 +88,7 @@ class StockAgent: self.telegram = get_telegram_service() self.llm_analyzer = LLMSignalAnalyzer(agent_type="stock") # 指定使用 stock 模型配置 self.signal_db = get_signal_db_service() # 信号数据库服务 + self.fundamental = get_fundamental_service() # 基本面数据服务 # 状态管理 self.last_signals: Dict[str, Dict[str, Any]] = {} @@ -366,12 +368,29 @@ class StockAgent: logger.info(f"📊 分析 {symbol_display} @ ${current_price:,.2f}") logger.info(f"{'='*60}") - # 4. LLM 分析 + # 4. 获取基本面数据 + logger.info(f"\n📈 【基本面分析】") + fundamental_data = None + fundamental_summary = "" + try: + fundamental_data = self.fundamental.get_fundamental_data(symbol) + if fundamental_data: + # 传递已获取的数据,避免重复调用 + fundamental_summary = self.fundamental.get_fundamental_summary(symbol, fundamental_data) + # 基本面评分已经在 fundamental_service 中输出 + else: + logger.warning(f" ⚠️ 无法获取基本面数据") + except Exception as e: + logger.warning(f" ⚠️ 获取基本面数据失败: {e}") + + # 5. LLM 分析 logger.info(f"\n🤖 【LLM 分析中...】") analysis = await self.llm_analyzer.analyze( symbol, data, symbols=self.symbols, - position_info=None # 美股不跟踪持仓 + position_info=None, # 美股不跟踪持仓 + fundamental_data=fundamental_data, # 传递基本面数据 + fundamental_summary=fundamental_summary # 传递基本面摘要 ) # 输出分析摘要 @@ -542,10 +561,23 @@ class StockAgent: if not self._validate_data(data): return {'error': '数据不完整'} + # 获取基本面数据 + fundamental_data = None + fundamental_summary = "" + try: + fundamental_data = self.fundamental.get_fundamental_data(symbol) + if fundamental_data: + # 传递已获取的数据,避免重复调用 + fundamental_summary = self.fundamental.get_fundamental_summary(symbol, fundamental_data) + except Exception as e: + logger.warning(f"获取基本面数据失败: {e}") + result = await self.llm_analyzer.analyze( symbol, data, symbols=self.symbols, - position_info=None + position_info=None, + fundamental_data=fundamental_data, + fundamental_summary=fundamental_summary ) return result diff --git a/scripts/test_stock.py b/scripts/test_stock.py index b270e46..5677a46 100755 --- a/scripts/test_stock.py +++ b/scripts/test_stock.py @@ -19,6 +19,7 @@ import asyncio from app.services.yfinance_service import get_yfinance_service from app.services.feishu_service import get_feishu_service from app.services.telegram_service import get_telegram_service +from app.services.fundamental_service import get_fundamental_service from app.crypto_agent.llm_signal_analyzer import LLMSignalAnalyzer from app.config import get_settings from app.utils.logger import logger @@ -60,6 +61,7 @@ async def analyze(symbol: str, send_notification: bool = True): # 获取服务 yf_service = get_yfinance_service() llm = LLMSignalAnalyzer(agent_type="stock") # 指定使用 stock 模型配置 + fundamental = get_fundamental_service() # 基本面服务 feishu = get_feishu_service() telegram = get_telegram_service() @@ -86,9 +88,89 @@ async def analyze(symbol: str, send_notification: bool = True): print(f"时间周期: {', '.join(data.keys())}") + # 获取基本面数据 + print(f"\n📈 基本面分析中...") + fundamental_data = None + fundamental_summary = "" + try: + fundamental_data = fundamental.get_fundamental_data(symbol) + if fundamental_data: + # 传递已获取的数据,避免重复调用 + fundamental_summary = fundamental.get_fundamental_summary(symbol, fundamental_data) + # 输出基本面详细信息 + score = fundamental_data.get('score', {}) + print(f" ✓ 基本面数据获取成功") + + # 公司信息 + company = fundamental_data.get('company_name', 'N/A') + sector = fundamental_data.get('sector', 'N/A') + print(f" 【公司】{company} | {sector}") + + # 评分 + print(f" 【评分】总分: {score.get('total', 0):.0f}/100 ({score.get('rating', 'N/A')}级) | " + f"估值:{score.get('valuation', 0)} 盈利:{score.get('profitability', 0)} " + f"成长:{score.get('growth', 0)} 财务:{score.get('financial_health', 0)}") + + # 估值指标 + val = fundamental_data.get('valuation', {}) + if val.get('pe_ratio'): + pe = val['pe_ratio'] + pb = val.get('pb_ratio') + ps = val.get('ps_ratio') + peg = val.get('peg_ratio') + pb_str = f"{pb:.2f}" if pb is not None else "N/A" + ps_str = f"{ps:.2f}" if ps is not None else "N/A" + peg_str = f"{peg:.2f}" if peg is not None else "N/A" + print(f" 【估值】PE:{pe:.2f} | PB:{pb_str} | PS:{ps_str} | PEG:{peg_str}") + + # 盈利能力 + prof = fundamental_data.get('profitability', {}) + if prof.get('return_on_equity'): + roe = prof['return_on_equity'] + pm = prof.get('profit_margin') + gm = prof.get('gross_margin') + pm_str = f"{pm:.1f}" if pm is not None else "N/A" + gm_str = f"{gm:.1f}" if gm is not None else "N/A" + print(f" 【盈利】ROE:{roe:.2f}% | 净利率:{pm_str}% | 毛利率:{gm_str}%") + + # 成长性 + growth = fundamental_data.get('growth', {}) + rg = growth.get('revenue_growth') + eg = growth.get('earnings_growth') + if rg is not None or eg is not None: + rg_str = f"{rg:.1f}" if rg is not None else "N/A" + eg_str = f"{eg:.1f}" if eg is not None else "N/A" + print(f" 【成长】营收增长:{rg_str}% | 盈利增长:{eg_str}%") + + # 财务健康 + fin = fundamental_data.get('financial_health', {}) + if fin.get('debt_to_equity'): + de = fin['debt_to_equity'] + cr = fin.get('current_ratio') + cr_str = f"{cr:.2f}" if cr is not None else "N/A" + print(f" 【财务】债务股本比:{de:.2f} | 流动比率:{cr_str}") + + # 分析师建议 + analyst = fundamental_data.get('analyst', {}) + tp = analyst.get('target_price') + if tp: + rec = analyst.get('recommendation', 'N/A') + print(f" 【分析师】目标价:${tp:.2f} | 评级:{rec}") + + else: + print(f" ⚠️ 无法获取基本面数据") + except Exception as e: + print(f" ⚠️ 获取基本面数据失败: {e}") + # LLM分析 print(f"\n🤖 LLM分析中...\n") - analysis = await llm.analyze(symbol, data, symbols=[symbol], position_info=None) + analysis = await llm.analyze( + symbol, data, + symbols=[symbol], + position_info=None, + fundamental_data=fundamental_data, + fundamental_summary=fundamental_summary + ) # 输出结果 summary = analysis.get('analysis_summary', '')