""" 股票分析工具模块 提供各种辅助计算功能: - ATR计算 - 多周期共振分析 - 基本面评分 - 支撑阻力位识别 """ import pandas as pd import numpy as np from typing import Dict, Any, List, Tuple, Optional from app.utils.logger import logger class StockAnalysisTools: """股票分析工具类""" @staticmethod def calculate_atr(df: pd.DataFrame, period: int = 14) -> float: """ 计算ATR (真实波动幅度) Args: df: 包含 high, low, close 的数据 period: ATR周期 Returns: ATR值 """ if df is None or len(df) < period + 1: return 0.0 try: high = df['high'].values low = df['low'].values close = df['close'].values tr_list = [] for i in range(1, len(df)): tr1 = high[i] - low[i] tr2 = abs(high[i] - close[i-1]) tr3 = abs(low[i] - close[i-1]) tr = max(tr1, tr2, tr3) tr_list.append(tr) if len(tr_list) < period: return 0.0 atr = pd.Series(tr_list).rolling(window=period).mean().iloc[-1] return float(atr) if not np.isnan(atr) else 0.0 except Exception as e: logger.warning(f"ATR计算失败: {e}") return 0.0 @staticmethod def calculate_volume_ratio(df: pd.DataFrame, period: int = 20) -> float: """ 计算量比(当前成交量 / 过去N周期平均成交量) Args: df: 包含 volume 的数据 period: 均量周期 Returns: 量比值 """ if df is None or len(df) < period + 1: return 1.0 try: current_vol = df['volume'].iloc[-1] avg_vol = df['volume'].iloc[-period-1:-1].mean() if avg_vol > 0: return float(current_vol / avg_vol) return 1.0 except Exception as e: logger.warning(f"量比计算失败: {e}") return 1.0 @staticmethod def detect_trend(df: pd.DataFrame) -> Dict[str, Any]: """ 检测趋势方向和强度 使用EMA系统判断趋势 Returns: { 'direction': 'uptrend'/'downtrend'/'neutral', 'strength': 'strong'/'medium'/'weak', 'ema_alignment': 'bullish'/'bearish'/'mixed', 'price_vs_ema20': float, # 百分比 'signals': List[str] } """ if df is None or len(df) < 200: return { 'direction': 'neutral', 'strength': 'weak', 'ema_alignment': 'mixed', 'price_vs_ema20': 0.0, 'signals': [] } try: # 计算EMA close = df['close'] ema20 = close.ewm(span=20, adjust=False).mean().iloc[-1] ema50 = close.ewm(span=50, adjust=False).mean().iloc[-1] ema200 = close.ewm(span=200, adjust=False).mean().iloc[-1] current_price = close.iloc[-1] # EMA排列判断 if ema20 > ema50 > ema200: ema_alignment = 'bullish' direction = 'uptrend' elif ema20 < ema50 < ema200: ema_alignment = 'bearish' direction = 'downtrend' else: ema_alignment = 'mixed' direction = 'neutral' # 价格与EMA20的关系 price_vs_ema20 = ((current_price - ema20) / ema20 * 100) if ema20 > 0 else 0 # 趋势强度判断 strength = 'weak' signals = [] if ema_alignment == 'bullish': if price_vs_ema20 > 1: strength = 'strong' signals.append("强势上涨:价格站稳 EMA20 之上") elif price_vs_ema20 > 0: strength = 'medium' signals.append("上涨趋势:价格在 EMA20 附近") else: strength = 'weak' signals.append("上涨趋势减弱:价格跌破 EMA20") elif ema_alignment == 'bearish': if price_vs_ema20 < -1: strength = 'strong' signals.append("强势下跌:价格跌破 EMA20 之下") elif price_vs_ema20 < 0: strength = 'medium' signals.append("下跌趋势:价格在 EMA20 附近") else: strength = 'weak' signals.append("下跌趋势减弱:价格站上 EMA20") else: signals.append("震荡市:EMA 交织") return { 'direction': direction, 'strength': strength, 'ema_alignment': ema_alignment, 'price_vs_ema20': round(price_vs_ema20, 2), 'signals': signals } except Exception as e: logger.warning(f"趋势检测失败: {e}") return { 'direction': 'neutral', 'strength': 'weak', 'ema_alignment': 'mixed', 'price_vs_ema20': 0.0, 'signals': [] } @staticmethod def calculate_multi_timeframe_resonance(data: Dict[str, pd.DataFrame]) -> Dict[str, Any]: """ 计算多周期共振强度 Args: data: 包含多个周期的数据 {'1w': df, '1d': df, '1h': df} Returns: { 'score': 0-100, 'level': 'strong'/'medium'/'weak', 'weekly_trend': str, 'daily_trend': str, 'hourly_trend': str, 'resonance_type': str, 'analysis': str } """ trends = {} # 获取各周期趋势 for tf_name in ['1w', '1d', '1h']: df = data.get(tf_name) if df is not None and len(df) > 0: trend_info = StockAnalysisTools.detect_trend(df) trends[tf_name] = trend_info['direction'] else: trends[tf_name] = 'neutral' weekly_trend = trends.get('1w', 'neutral') daily_trend = trends.get('1d', 'neutral') hourly_trend = trends.get('1h', 'neutral') # 计算共振得分 score = 0 analysis_parts = [] # 大周期共振(周线+日线) if weekly_trend == daily_trend and weekly_trend != 'neutral': score += 40 analysis_parts.append(f"✅ 大周期共振({weekly_trend})") elif weekly_trend != 'neutral' and daily_trend != 'neutral': analysis_parts.append(f"⚠️ 大周期分歧(周线{weekly_trend} vs 日线{daily_trend})") else: analysis_parts.append(f"➖ 大周期不明确") # 主周期共振(日线+1h) if daily_trend == hourly_trend and daily_trend != 'neutral': score += 35 analysis_parts.append(f"✅ 主周期共振({daily_trend})") elif daily_trend != 'neutral' and hourly_trend != 'neutral': analysis_parts.append(f"⚠️ 主周期分歧(日线{daily_trend} vs 1h{hourly_trend})") else: analysis_parts.append(f"➖ 主周期不明确") # 全周期共振 if weekly_trend == daily_trend == hourly_trend and weekly_trend != 'neutral': score += 25 analysis_parts.append(f"🔥 全周期共振({weekly_trend})") # 确定共振等级 if score >= 70: level = 'strong' resonance_type = 'all_timeframe_aligned' if score >= 90 else 'strong' elif score >= 40: level = 'medium' resonance_type = 'large_timeframe_aligned' elif score >= 20: level = 'weak' resonance_type = 'partial_alignment' else: level = 'weak' resonance_type = 'no_resonance' return { 'score': score, 'level': level, 'weekly_trend': weekly_trend, 'daily_trend': daily_trend, 'hourly_trend': hourly_trend, 'resonance_type': resonance_type, 'analysis': ' | '.join(analysis_parts) } @staticmethod def calculate_fundamental_score(data: Dict[str, Any]) -> Dict[str, Any]: """ 计算基本面评分(0-100) Args: data: 基本面数据 Returns: { 'score': 0-100, 'grade': 'A'/'B'/'C'/'D', 'valuation': {'score': 0-25, 'grade': 'A'/'B'/'C'/'D'}, 'profitability': {'score': 0-35, 'grade': 'A'/'B'/'C'/'D'}, 'growth': {'score': 0-25, 'grade': 'A'/'B'/'C'/'D'}, 'financial_health': {'score': 0-15, 'grade': 'A'/'B'/'C'/'D'}, 'summary': str } """ if not data: return { 'score': 0, 'grade': 'D', 'summary': '无基本面数据' } score = 0 breakdown = {} # 估值评分 (25分) val_score = 0 pe = data.get('pe_ratio', 0) pb = data.get('pb_ratio', 0) peg = data.get('peg_ratio', 0) if 0 < pe < 15: val_score += 10 elif 15 <= pe <= 25: val_score += 7 elif pe > 40: val_score += 0 else: val_score += 5 if 0 < pb < 1: val_score += 10 elif 1 <= pb <= 3: val_score += 7 elif pb > 5: val_score += 0 else: val_score += 5 if peg and 0 < peg < 1: val_score += 5 elif peg and 1 <= peg <= 2: val_score += 3 elif peg and peg > 2: val_score += 0 else: val_score += 2 breakdown['valuation'] = { 'score': val_score, 'grade': 'A' if val_score >= 20 else 'B' if val_score >= 15 else 'C' if val_score >= 10 else 'D' } score += val_score # 盈利能力评分 (35分) prof_score = 0 roe = data.get('roe', 0) net_margin = data.get('profit_margin', 0) if roe > 20: prof_score += 20 elif roe > 15: prof_score += 15 elif roe > 10: prof_score += 10 elif roe > 0: prof_score += 5 if net_margin > 20: prof_score += 15 elif net_margin > 10: prof_score += 10 elif net_margin > 5: prof_score += 5 elif net_margin > 0: prof_score += 2 breakdown['profitability'] = { 'score': prof_score, 'grade': 'A' if prof_score >= 30 else 'B' if prof_score >= 20 else 'C' if prof_score >= 10 else 'D' } score += prof_score # 成长性评分 (25分) growth_score = 0 revenue_growth = data.get('revenue_growth', 0) earnings_growth = data.get('earnings_growth', 0) if revenue_growth > 30: growth_score += 13 elif revenue_growth > 20: growth_score += 10 elif revenue_growth > 10: growth_score += 5 elif revenue_growth > 0: growth_score += 2 if earnings_growth > 30: growth_score += 12 elif earnings_growth > 20: growth_score += 8 elif earnings_growth > 10: growth_score += 5 elif earnings_growth > 0: growth_score += 2 elif earnings_growth < 0: growth_score -= 5 # 负增长扣分 breakdown['growth'] = { 'score': max(0, growth_score), 'grade': 'A' if growth_score >= 20 else 'B' if growth_score >= 15 else 'C' if growth_score >= 10 else 'D' } score += max(0, growth_score) # 财务健康评分 (15分) health_score = 0 debt_ratio = data.get('debt_to_equity', 0) current_ratio = data.get('current_ratio', 0) if debt_ratio < 1: health_score += 8 elif debt_ratio < 2: health_score += 5 elif debt_ratio < 3: health_score += 2 else: health_score += 0 if current_ratio > 2: health_score += 7 elif current_ratio > 1.5: health_score += 5 elif current_ratio > 1: health_score += 2 else: health_score += 0 breakdown['financial_health'] = { 'score': health_score, 'grade': 'A' if health_score >= 12 else 'B' if health_score >= 8 else 'C' if health_score >= 5 else 'D' } score += health_score # 确定总等级 grade = 'A' if score >= 80 else 'B' if score >= 60 else 'C' if score >= 40 else 'D' # 生成摘要 summary_parts = [] if breakdown['valuation']['grade'] == 'A': summary_parts.append("估值低估") elif breakdown['valuation']['grade'] == 'D': summary_parts.append("估值高估") if breakdown['profitability']['grade'] == 'A': summary_parts.append("盈利优秀") elif breakdown['profitability']['grade'] == 'D': summary_parts.append("盈利较差") if breakdown['growth']['grade'] == 'A': summary_parts.append("高成长") elif breakdown['growth']['grade'] == 'D': summary_parts.append("低成长") if breakdown['financial_health']['grade'] == 'A': summary_parts.append("财务健康") elif breakdown['financial_health']['grade'] == 'D': summary_parts.append("财务风险") return { 'score': score, 'grade': grade, 'breakdown': breakdown, 'summary': ' | '.join(summary_parts) if summary_parts else '一般' } @staticmethod def identify_key_levels(df: pd.DataFrame, lookback: int = 60) -> Dict[str, List[float]]: """ 识别关键支撑位和阻力位 Args: df: K线数据 lookback: 回看周期 Returns: { 'support': [支撑位1, 支撑位2, ...], 'resistance': [阻力位1, 阻力位2, ...] } """ if df is None or len(df) < lookback: return {'support': [], 'resistance': []} try: recent = df.tail(lookback) highs = recent['high'].values lows = recent['low'].values # 找局部高点和低点 from scipy.signal import argrelextrema from numpy import array high_indices = argrelextrema(array(highs), np.greater, order=5) low_indices = argrelextrema(array(lows), np.less, order=5) resistance_levels = sorted([highs[i] for i in high_indices], reverse=True)[:3] support_levels = sorted([lows[i] for i in low_indices])[:3] return { 'resistance': resistance_levels, 'support': support_levels } except Exception as e: logger.warning(f"关键位识别失败: {e}") return {'support': [], 'resistance': []} @staticmethod def calculate_stop_loss_take_profit( entry_price: float, atr: float, direction: str, key_levels: Dict[str, List[float]] = None ) -> Dict[str, Any]: """ 计算止损止盈价格 Args: entry_price: 入场价格 atr: ATR值 direction: 'long'/'short' key_levels: 支撑阻力位 Returns: { 'stop_loss': float, 'take_profit': float, 'method': str, 'risk_reward_ratio': float } """ if direction == 'long': # 做多 # 止损:入场价 - 1.5×ATR,或设在前支撑位下方 sl_atr = entry_price - 1.5 * atr sl_support = None if key_levels and key_levels.get('support'): closest_support = [s for s in key_levels['support'] if s < entry_price] if closest_support: sl_support = min(closest_support) * 0.995 # 支撑位下方0.5% # 选择更保守的止损(更远的) if sl_support and sl_support < sl_atr: stop_loss = sl_atr else: stop_loss = sl_atr # 止盈:入场价 + 3×ATR(风险收益比1:2) take_profit = entry_price + 3 * atr else: # 做空 # 止损:入场价 + 1.5×ATR,或设在前阻力位上方 sl_atr = entry_price + 1.5 * atr sl_resistance = None if key_levels and key_levels.get('resistance'): closest_resistance = [r for r in key_levels['resistance'] if r > entry_price] if closest_resistance: sl_resistance = min(closest_resistance) * 1.005 # 阻力位上方0.5% # 选择更保守的止损 if sl_resistance and sl_resistance > sl_atr: stop_loss = sl_atr else: stop_loss = sl_atr # 止盈:入场价 - 3×ATR take_profit = entry_price - 3 * atr # 计算风险收益比 if direction == 'long': risk = entry_price - stop_loss reward = take_profit - entry_price else: risk = stop_loss - entry_price reward = entry_price - take_profit risk_reward_ratio = reward / risk if risk > 0 else 0 return { 'stop_loss': round(stop_loss, 2), 'take_profit': round(take_profit, 2), 'method': 'ATR-based (1:2 risk-reward)', 'risk_reward_ratio': round(risk_reward_ratio, 2) }