stock-ai-agent/backend/app/stock_agent/analysis_tools.py
2026-03-03 19:35:18 +08:00

570 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
股票分析工具模块
提供各种辅助计算功能:
- ATR计算
- 多周期共振分析
- 基本面评分
- 支撑阻力位识别
"""
import pandas as pd
import numpy as np
from typing import Dict, Any, List, Tuple, Optional
from app.utils.logger import logger
class StockAnalysisTools:
"""股票分析工具类"""
@staticmethod
def calculate_atr(df: pd.DataFrame, period: int = 14) -> float:
"""
计算ATR (真实波动幅度)
Args:
df: 包含 high, low, close 的数据
period: ATR周期
Returns:
ATR值
"""
if df is None or len(df) < period + 1:
return 0.0
try:
high = df['high'].values
low = df['low'].values
close = df['close'].values
tr_list = []
for i in range(1, len(df)):
tr1 = high[i] - low[i]
tr2 = abs(high[i] - close[i-1])
tr3 = abs(low[i] - close[i-1])
tr = max(tr1, tr2, tr3)
tr_list.append(tr)
if len(tr_list) < period:
return 0.0
atr = pd.Series(tr_list).rolling(window=period).mean().iloc[-1]
return float(atr) if not np.isnan(atr) else 0.0
except Exception as e:
logger.warning(f"ATR计算失败: {e}")
return 0.0
@staticmethod
def calculate_volume_ratio(df: pd.DataFrame, period: int = 20) -> float:
"""
计算量比(当前成交量 / 过去N周期平均成交量
Args:
df: 包含 volume 的数据
period: 均量周期
Returns:
量比值
"""
if df is None or len(df) < period + 1:
return 1.0
try:
current_vol = df['volume'].iloc[-1]
avg_vol = df['volume'].iloc[-period-1:-1].mean()
if avg_vol > 0:
return float(current_vol / avg_vol)
return 1.0
except Exception as e:
logger.warning(f"量比计算失败: {e}")
return 1.0
@staticmethod
def detect_trend(df: pd.DataFrame) -> Dict[str, Any]:
"""
检测趋势方向和强度
使用EMA系统判断趋势
Returns:
{
'direction': 'uptrend'/'downtrend'/'neutral',
'strength': 'strong'/'medium'/'weak',
'ema_alignment': 'bullish'/'bearish'/'mixed',
'price_vs_ema20': float, # 百分比
'signals': List[str]
}
"""
if df is None or len(df) < 200:
return {
'direction': 'neutral',
'strength': 'weak',
'ema_alignment': 'mixed',
'price_vs_ema20': 0.0,
'signals': []
}
try:
# 计算EMA
close = df['close']
ema20 = close.ewm(span=20, adjust=False).mean().iloc[-1]
ema50 = close.ewm(span=50, adjust=False).mean().iloc[-1]
ema200 = close.ewm(span=200, adjust=False).mean().iloc[-1]
current_price = close.iloc[-1]
# EMA排列判断
if ema20 > ema50 > ema200:
ema_alignment = 'bullish'
direction = 'uptrend'
elif ema20 < ema50 < ema200:
ema_alignment = 'bearish'
direction = 'downtrend'
else:
ema_alignment = 'mixed'
direction = 'neutral'
# 价格与EMA20的关系
price_vs_ema20 = ((current_price - ema20) / ema20 * 100) if ema20 > 0 else 0
# 趋势强度判断
strength = 'weak'
signals = []
if ema_alignment == 'bullish':
if price_vs_ema20 > 1:
strength = 'strong'
signals.append("强势上涨:价格站稳 EMA20 之上")
elif price_vs_ema20 > 0:
strength = 'medium'
signals.append("上涨趋势:价格在 EMA20 附近")
else:
strength = 'weak'
signals.append("上涨趋势减弱:价格跌破 EMA20")
elif ema_alignment == 'bearish':
if price_vs_ema20 < -1:
strength = 'strong'
signals.append("强势下跌:价格跌破 EMA20 之下")
elif price_vs_ema20 < 0:
strength = 'medium'
signals.append("下跌趋势:价格在 EMA20 附近")
else:
strength = 'weak'
signals.append("下跌趋势减弱:价格站上 EMA20")
else:
signals.append("震荡市EMA 交织")
return {
'direction': direction,
'strength': strength,
'ema_alignment': ema_alignment,
'price_vs_ema20': round(price_vs_ema20, 2),
'signals': signals
}
except Exception as e:
logger.warning(f"趋势检测失败: {e}")
return {
'direction': 'neutral',
'strength': 'weak',
'ema_alignment': 'mixed',
'price_vs_ema20': 0.0,
'signals': []
}
@staticmethod
def calculate_multi_timeframe_resonance(data: Dict[str, pd.DataFrame]) -> Dict[str, Any]:
"""
计算多周期共振强度
Args:
data: 包含多个周期的数据 {'1w': df, '1d': df, '1h': df}
Returns:
{
'score': 0-100,
'level': 'strong'/'medium'/'weak',
'weekly_trend': str,
'daily_trend': str,
'hourly_trend': str,
'resonance_type': str,
'analysis': str
}
"""
trends = {}
# 获取各周期趋势
for tf_name in ['1w', '1d', '1h']:
df = data.get(tf_name)
if df is not None and len(df) > 0:
trend_info = StockAnalysisTools.detect_trend(df)
trends[tf_name] = trend_info['direction']
else:
trends[tf_name] = 'neutral'
weekly_trend = trends.get('1w', 'neutral')
daily_trend = trends.get('1d', 'neutral')
hourly_trend = trends.get('1h', 'neutral')
# 计算共振得分
score = 0
analysis_parts = []
# 大周期共振(周线+日线)
if weekly_trend == daily_trend and weekly_trend != 'neutral':
score += 40
analysis_parts.append(f"✅ 大周期共振({weekly_trend}")
elif weekly_trend != 'neutral' and daily_trend != 'neutral':
analysis_parts.append(f"⚠️ 大周期分歧(周线{weekly_trend} vs 日线{daily_trend}")
else:
analysis_parts.append(f" 大周期不明确")
# 主周期共振(日线+1h
if daily_trend == hourly_trend and daily_trend != 'neutral':
score += 35
analysis_parts.append(f"✅ 主周期共振({daily_trend}")
elif daily_trend != 'neutral' and hourly_trend != 'neutral':
analysis_parts.append(f"⚠️ 主周期分歧(日线{daily_trend} vs 1h{hourly_trend}")
else:
analysis_parts.append(f" 主周期不明确")
# 全周期共振
if weekly_trend == daily_trend == hourly_trend and weekly_trend != 'neutral':
score += 25
analysis_parts.append(f"🔥 全周期共振({weekly_trend}")
# 确定共振等级
if score >= 70:
level = 'strong'
resonance_type = 'all_timeframe_aligned' if score >= 90 else 'strong'
elif score >= 40:
level = 'medium'
resonance_type = 'large_timeframe_aligned'
elif score >= 20:
level = 'weak'
resonance_type = 'partial_alignment'
else:
level = 'weak'
resonance_type = 'no_resonance'
return {
'score': score,
'level': level,
'weekly_trend': weekly_trend,
'daily_trend': daily_trend,
'hourly_trend': hourly_trend,
'resonance_type': resonance_type,
'analysis': ' | '.join(analysis_parts)
}
@staticmethod
def calculate_fundamental_score(data: Dict[str, Any]) -> Dict[str, Any]:
"""
计算基本面评分0-100
Args:
data: 基本面数据
Returns:
{
'score': 0-100,
'grade': 'A'/'B'/'C'/'D',
'valuation': {'score': 0-25, 'grade': 'A'/'B'/'C'/'D'},
'profitability': {'score': 0-35, 'grade': 'A'/'B'/'C'/'D'},
'growth': {'score': 0-25, 'grade': 'A'/'B'/'C'/'D'},
'financial_health': {'score': 0-15, 'grade': 'A'/'B'/'C'/'D'},
'summary': str
}
"""
if not data:
return {
'score': 0,
'grade': 'D',
'summary': '无基本面数据'
}
score = 0
breakdown = {}
# 估值评分 (25分)
val_score = 0
pe = data.get('pe_ratio', 0)
pb = data.get('pb_ratio', 0)
peg = data.get('peg_ratio', 0)
if 0 < pe < 15:
val_score += 10
elif 15 <= pe <= 25:
val_score += 7
elif pe > 40:
val_score += 0
else:
val_score += 5
if 0 < pb < 1:
val_score += 10
elif 1 <= pb <= 3:
val_score += 7
elif pb > 5:
val_score += 0
else:
val_score += 5
if peg and 0 < peg < 1:
val_score += 5
elif peg and 1 <= peg <= 2:
val_score += 3
elif peg and peg > 2:
val_score += 0
else:
val_score += 2
breakdown['valuation'] = {
'score': val_score,
'grade': 'A' if val_score >= 20 else 'B' if val_score >= 15 else 'C' if val_score >= 10 else 'D'
}
score += val_score
# 盈利能力评分 (35分)
prof_score = 0
roe = data.get('roe', 0)
net_margin = data.get('profit_margin', 0)
if roe > 20:
prof_score += 20
elif roe > 15:
prof_score += 15
elif roe > 10:
prof_score += 10
elif roe > 0:
prof_score += 5
if net_margin > 20:
prof_score += 15
elif net_margin > 10:
prof_score += 10
elif net_margin > 5:
prof_score += 5
elif net_margin > 0:
prof_score += 2
breakdown['profitability'] = {
'score': prof_score,
'grade': 'A' if prof_score >= 30 else 'B' if prof_score >= 20 else 'C' if prof_score >= 10 else 'D'
}
score += prof_score
# 成长性评分 (25分)
growth_score = 0
revenue_growth = data.get('revenue_growth', 0)
earnings_growth = data.get('earnings_growth', 0)
if revenue_growth > 30:
growth_score += 13
elif revenue_growth > 20:
growth_score += 10
elif revenue_growth > 10:
growth_score += 5
elif revenue_growth > 0:
growth_score += 2
if earnings_growth > 30:
growth_score += 12
elif earnings_growth > 20:
growth_score += 8
elif earnings_growth > 10:
growth_score += 5
elif earnings_growth > 0:
growth_score += 2
elif earnings_growth < 0:
growth_score -= 5 # 负增长扣分
breakdown['growth'] = {
'score': max(0, growth_score),
'grade': 'A' if growth_score >= 20 else 'B' if growth_score >= 15 else 'C' if growth_score >= 10 else 'D'
}
score += max(0, growth_score)
# 财务健康评分 (15分)
health_score = 0
debt_ratio = data.get('debt_to_equity', 0)
current_ratio = data.get('current_ratio', 0)
if debt_ratio < 1:
health_score += 8
elif debt_ratio < 2:
health_score += 5
elif debt_ratio < 3:
health_score += 2
else:
health_score += 0
if current_ratio > 2:
health_score += 7
elif current_ratio > 1.5:
health_score += 5
elif current_ratio > 1:
health_score += 2
else:
health_score += 0
breakdown['financial_health'] = {
'score': health_score,
'grade': 'A' if health_score >= 12 else 'B' if health_score >= 8 else 'C' if health_score >= 5 else 'D'
}
score += health_score
# 确定总等级
grade = 'A' if score >= 80 else 'B' if score >= 60 else 'C' if score >= 40 else 'D'
# 生成摘要
summary_parts = []
if breakdown['valuation']['grade'] == 'A':
summary_parts.append("估值低估")
elif breakdown['valuation']['grade'] == 'D':
summary_parts.append("估值高估")
if breakdown['profitability']['grade'] == 'A':
summary_parts.append("盈利优秀")
elif breakdown['profitability']['grade'] == 'D':
summary_parts.append("盈利较差")
if breakdown['growth']['grade'] == 'A':
summary_parts.append("高成长")
elif breakdown['growth']['grade'] == 'D':
summary_parts.append("低成长")
if breakdown['financial_health']['grade'] == 'A':
summary_parts.append("财务健康")
elif breakdown['financial_health']['grade'] == 'D':
summary_parts.append("财务风险")
return {
'score': score,
'grade': grade,
'breakdown': breakdown,
'summary': ' | '.join(summary_parts) if summary_parts else '一般'
}
@staticmethod
def identify_key_levels(df: pd.DataFrame, lookback: int = 60) -> Dict[str, List[float]]:
"""
识别关键支撑位和阻力位
Args:
df: K线数据
lookback: 回看周期
Returns:
{
'support': [支撑位1, 支撑位2, ...],
'resistance': [阻力位1, 阻力位2, ...]
}
"""
if df is None or len(df) < lookback:
return {'support': [], 'resistance': []}
try:
recent = df.tail(lookback)
highs = recent['high'].values
lows = recent['low'].values
# 找局部高点和低点
from scipy.signal import argrelextrema
from numpy import array
high_indices = argrelextrema(array(highs), np.greater, order=5)
low_indices = argrelextrema(array(lows), np.less, order=5)
resistance_levels = sorted([highs[i] for i in high_indices], reverse=True)[:3]
support_levels = sorted([lows[i] for i in low_indices])[:3]
return {
'resistance': resistance_levels,
'support': support_levels
}
except Exception as e:
logger.warning(f"关键位识别失败: {e}")
return {'support': [], 'resistance': []}
@staticmethod
def calculate_stop_loss_take_profit(
entry_price: float,
atr: float,
direction: str,
key_levels: Dict[str, List[float]] = None
) -> Dict[str, Any]:
"""
计算止损止盈价格
Args:
entry_price: 入场价格
atr: ATR值
direction: 'long'/'short'
key_levels: 支撑阻力位
Returns:
{
'stop_loss': float,
'take_profit': float,
'method': str,
'risk_reward_ratio': float
}
"""
if direction == 'long':
# 做多
# 止损:入场价 - 1.5×ATR或设在前支撑位下方
sl_atr = entry_price - 1.5 * atr
sl_support = None
if key_levels and key_levels.get('support'):
closest_support = [s for s in key_levels['support'] if s < entry_price]
if closest_support:
sl_support = min(closest_support) * 0.995 # 支撑位下方0.5%
# 选择更保守的止损(更远的)
if sl_support and sl_support < sl_atr:
stop_loss = sl_atr
else:
stop_loss = sl_atr
# 止盈:入场价 + 3×ATR风险收益比1:2
take_profit = entry_price + 3 * atr
else:
# 做空
# 止损:入场价 + 1.5×ATR或设在前阻力位上方
sl_atr = entry_price + 1.5 * atr
sl_resistance = None
if key_levels and key_levels.get('resistance'):
closest_resistance = [r for r in key_levels['resistance'] if r > entry_price]
if closest_resistance:
sl_resistance = min(closest_resistance) * 1.005 # 阻力位上方0.5%
# 选择更保守的止损
if sl_resistance and sl_resistance > sl_atr:
stop_loss = sl_atr
else:
stop_loss = sl_atr
# 止盈:入场价 - 3×ATR
take_profit = entry_price - 3 * atr
# 计算风险收益比
if direction == 'long':
risk = entry_price - stop_loss
reward = take_profit - entry_price
else:
risk = stop_loss - entry_price
reward = entry_price - take_profit
risk_reward_ratio = reward / risk if risk > 0 else 0
return {
'stop_loss': round(stop_loss, 2),
'take_profit': round(take_profit, 2),
'method': 'ATR-based (1:2 risk-reward)',
'risk_reward_ratio': round(risk_reward_ratio, 2)
}