This commit is contained in:
aaron 2026-03-03 19:35:18 +08:00
parent 75edbdca8d
commit 5cfe9eae83
2 changed files with 671 additions and 102 deletions

View File

@ -0,0 +1,569 @@
"""
股票分析工具模块
提供各种辅助计算功能
- ATR计算
- 多周期共振分析
- 基本面评分
- 支撑阻力位识别
"""
import pandas as pd
import numpy as np
from typing import Dict, Any, List, Tuple, Optional
from app.utils.logger import logger
class StockAnalysisTools:
"""股票分析工具类"""
@staticmethod
def calculate_atr(df: pd.DataFrame, period: int = 14) -> float:
"""
计算ATR (真实波动幅度)
Args:
df: 包含 high, low, close 的数据
period: ATR周期
Returns:
ATR值
"""
if df is None or len(df) < period + 1:
return 0.0
try:
high = df['high'].values
low = df['low'].values
close = df['close'].values
tr_list = []
for i in range(1, len(df)):
tr1 = high[i] - low[i]
tr2 = abs(high[i] - close[i-1])
tr3 = abs(low[i] - close[i-1])
tr = max(tr1, tr2, tr3)
tr_list.append(tr)
if len(tr_list) < period:
return 0.0
atr = pd.Series(tr_list).rolling(window=period).mean().iloc[-1]
return float(atr) if not np.isnan(atr) else 0.0
except Exception as e:
logger.warning(f"ATR计算失败: {e}")
return 0.0
@staticmethod
def calculate_volume_ratio(df: pd.DataFrame, period: int = 20) -> float:
"""
计算量比当前成交量 / 过去N周期平均成交量
Args:
df: 包含 volume 的数据
period: 均量周期
Returns:
量比值
"""
if df is None or len(df) < period + 1:
return 1.0
try:
current_vol = df['volume'].iloc[-1]
avg_vol = df['volume'].iloc[-period-1:-1].mean()
if avg_vol > 0:
return float(current_vol / avg_vol)
return 1.0
except Exception as e:
logger.warning(f"量比计算失败: {e}")
return 1.0
@staticmethod
def detect_trend(df: pd.DataFrame) -> Dict[str, Any]:
"""
检测趋势方向和强度
使用EMA系统判断趋势
Returns:
{
'direction': 'uptrend'/'downtrend'/'neutral',
'strength': 'strong'/'medium'/'weak',
'ema_alignment': 'bullish'/'bearish'/'mixed',
'price_vs_ema20': float, # 百分比
'signals': List[str]
}
"""
if df is None or len(df) < 200:
return {
'direction': 'neutral',
'strength': 'weak',
'ema_alignment': 'mixed',
'price_vs_ema20': 0.0,
'signals': []
}
try:
# 计算EMA
close = df['close']
ema20 = close.ewm(span=20, adjust=False).mean().iloc[-1]
ema50 = close.ewm(span=50, adjust=False).mean().iloc[-1]
ema200 = close.ewm(span=200, adjust=False).mean().iloc[-1]
current_price = close.iloc[-1]
# EMA排列判断
if ema20 > ema50 > ema200:
ema_alignment = 'bullish'
direction = 'uptrend'
elif ema20 < ema50 < ema200:
ema_alignment = 'bearish'
direction = 'downtrend'
else:
ema_alignment = 'mixed'
direction = 'neutral'
# 价格与EMA20的关系
price_vs_ema20 = ((current_price - ema20) / ema20 * 100) if ema20 > 0 else 0
# 趋势强度判断
strength = 'weak'
signals = []
if ema_alignment == 'bullish':
if price_vs_ema20 > 1:
strength = 'strong'
signals.append("强势上涨:价格站稳 EMA20 之上")
elif price_vs_ema20 > 0:
strength = 'medium'
signals.append("上涨趋势:价格在 EMA20 附近")
else:
strength = 'weak'
signals.append("上涨趋势减弱:价格跌破 EMA20")
elif ema_alignment == 'bearish':
if price_vs_ema20 < -1:
strength = 'strong'
signals.append("强势下跌:价格跌破 EMA20 之下")
elif price_vs_ema20 < 0:
strength = 'medium'
signals.append("下跌趋势:价格在 EMA20 附近")
else:
strength = 'weak'
signals.append("下跌趋势减弱:价格站上 EMA20")
else:
signals.append("震荡市EMA 交织")
return {
'direction': direction,
'strength': strength,
'ema_alignment': ema_alignment,
'price_vs_ema20': round(price_vs_ema20, 2),
'signals': signals
}
except Exception as e:
logger.warning(f"趋势检测失败: {e}")
return {
'direction': 'neutral',
'strength': 'weak',
'ema_alignment': 'mixed',
'price_vs_ema20': 0.0,
'signals': []
}
@staticmethod
def calculate_multi_timeframe_resonance(data: Dict[str, pd.DataFrame]) -> Dict[str, Any]:
"""
计算多周期共振强度
Args:
data: 包含多个周期的数据 {'1w': df, '1d': df, '1h': df}
Returns:
{
'score': 0-100,
'level': 'strong'/'medium'/'weak',
'weekly_trend': str,
'daily_trend': str,
'hourly_trend': str,
'resonance_type': str,
'analysis': str
}
"""
trends = {}
# 获取各周期趋势
for tf_name in ['1w', '1d', '1h']:
df = data.get(tf_name)
if df is not None and len(df) > 0:
trend_info = StockAnalysisTools.detect_trend(df)
trends[tf_name] = trend_info['direction']
else:
trends[tf_name] = 'neutral'
weekly_trend = trends.get('1w', 'neutral')
daily_trend = trends.get('1d', 'neutral')
hourly_trend = trends.get('1h', 'neutral')
# 计算共振得分
score = 0
analysis_parts = []
# 大周期共振(周线+日线)
if weekly_trend == daily_trend and weekly_trend != 'neutral':
score += 40
analysis_parts.append(f"✅ 大周期共振({weekly_trend}")
elif weekly_trend != 'neutral' and daily_trend != 'neutral':
analysis_parts.append(f"⚠️ 大周期分歧(周线{weekly_trend} vs 日线{daily_trend}")
else:
analysis_parts.append(f" 大周期不明确")
# 主周期共振(日线+1h
if daily_trend == hourly_trend and daily_trend != 'neutral':
score += 35
analysis_parts.append(f"✅ 主周期共振({daily_trend}")
elif daily_trend != 'neutral' and hourly_trend != 'neutral':
analysis_parts.append(f"⚠️ 主周期分歧(日线{daily_trend} vs 1h{hourly_trend}")
else:
analysis_parts.append(f" 主周期不明确")
# 全周期共振
if weekly_trend == daily_trend == hourly_trend and weekly_trend != 'neutral':
score += 25
analysis_parts.append(f"🔥 全周期共振({weekly_trend}")
# 确定共振等级
if score >= 70:
level = 'strong'
resonance_type = 'all_timeframe_aligned' if score >= 90 else 'strong'
elif score >= 40:
level = 'medium'
resonance_type = 'large_timeframe_aligned'
elif score >= 20:
level = 'weak'
resonance_type = 'partial_alignment'
else:
level = 'weak'
resonance_type = 'no_resonance'
return {
'score': score,
'level': level,
'weekly_trend': weekly_trend,
'daily_trend': daily_trend,
'hourly_trend': hourly_trend,
'resonance_type': resonance_type,
'analysis': ' | '.join(analysis_parts)
}
@staticmethod
def calculate_fundamental_score(data: Dict[str, Any]) -> Dict[str, Any]:
"""
计算基本面评分0-100
Args:
data: 基本面数据
Returns:
{
'score': 0-100,
'grade': 'A'/'B'/'C'/'D',
'valuation': {'score': 0-25, 'grade': 'A'/'B'/'C'/'D'},
'profitability': {'score': 0-35, 'grade': 'A'/'B'/'C'/'D'},
'growth': {'score': 0-25, 'grade': 'A'/'B'/'C'/'D'},
'financial_health': {'score': 0-15, 'grade': 'A'/'B'/'C'/'D'},
'summary': str
}
"""
if not data:
return {
'score': 0,
'grade': 'D',
'summary': '无基本面数据'
}
score = 0
breakdown = {}
# 估值评分 (25分)
val_score = 0
pe = data.get('pe_ratio', 0)
pb = data.get('pb_ratio', 0)
peg = data.get('peg_ratio', 0)
if 0 < pe < 15:
val_score += 10
elif 15 <= pe <= 25:
val_score += 7
elif pe > 40:
val_score += 0
else:
val_score += 5
if 0 < pb < 1:
val_score += 10
elif 1 <= pb <= 3:
val_score += 7
elif pb > 5:
val_score += 0
else:
val_score += 5
if peg and 0 < peg < 1:
val_score += 5
elif peg and 1 <= peg <= 2:
val_score += 3
elif peg and peg > 2:
val_score += 0
else:
val_score += 2
breakdown['valuation'] = {
'score': val_score,
'grade': 'A' if val_score >= 20 else 'B' if val_score >= 15 else 'C' if val_score >= 10 else 'D'
}
score += val_score
# 盈利能力评分 (35分)
prof_score = 0
roe = data.get('roe', 0)
net_margin = data.get('profit_margin', 0)
if roe > 20:
prof_score += 20
elif roe > 15:
prof_score += 15
elif roe > 10:
prof_score += 10
elif roe > 0:
prof_score += 5
if net_margin > 20:
prof_score += 15
elif net_margin > 10:
prof_score += 10
elif net_margin > 5:
prof_score += 5
elif net_margin > 0:
prof_score += 2
breakdown['profitability'] = {
'score': prof_score,
'grade': 'A' if prof_score >= 30 else 'B' if prof_score >= 20 else 'C' if prof_score >= 10 else 'D'
}
score += prof_score
# 成长性评分 (25分)
growth_score = 0
revenue_growth = data.get('revenue_growth', 0)
earnings_growth = data.get('earnings_growth', 0)
if revenue_growth > 30:
growth_score += 13
elif revenue_growth > 20:
growth_score += 10
elif revenue_growth > 10:
growth_score += 5
elif revenue_growth > 0:
growth_score += 2
if earnings_growth > 30:
growth_score += 12
elif earnings_growth > 20:
growth_score += 8
elif earnings_growth > 10:
growth_score += 5
elif earnings_growth > 0:
growth_score += 2
elif earnings_growth < 0:
growth_score -= 5 # 负增长扣分
breakdown['growth'] = {
'score': max(0, growth_score),
'grade': 'A' if growth_score >= 20 else 'B' if growth_score >= 15 else 'C' if growth_score >= 10 else 'D'
}
score += max(0, growth_score)
# 财务健康评分 (15分)
health_score = 0
debt_ratio = data.get('debt_to_equity', 0)
current_ratio = data.get('current_ratio', 0)
if debt_ratio < 1:
health_score += 8
elif debt_ratio < 2:
health_score += 5
elif debt_ratio < 3:
health_score += 2
else:
health_score += 0
if current_ratio > 2:
health_score += 7
elif current_ratio > 1.5:
health_score += 5
elif current_ratio > 1:
health_score += 2
else:
health_score += 0
breakdown['financial_health'] = {
'score': health_score,
'grade': 'A' if health_score >= 12 else 'B' if health_score >= 8 else 'C' if health_score >= 5 else 'D'
}
score += health_score
# 确定总等级
grade = 'A' if score >= 80 else 'B' if score >= 60 else 'C' if score >= 40 else 'D'
# 生成摘要
summary_parts = []
if breakdown['valuation']['grade'] == 'A':
summary_parts.append("估值低估")
elif breakdown['valuation']['grade'] == 'D':
summary_parts.append("估值高估")
if breakdown['profitability']['grade'] == 'A':
summary_parts.append("盈利优秀")
elif breakdown['profitability']['grade'] == 'D':
summary_parts.append("盈利较差")
if breakdown['growth']['grade'] == 'A':
summary_parts.append("高成长")
elif breakdown['growth']['grade'] == 'D':
summary_parts.append("低成长")
if breakdown['financial_health']['grade'] == 'A':
summary_parts.append("财务健康")
elif breakdown['financial_health']['grade'] == 'D':
summary_parts.append("财务风险")
return {
'score': score,
'grade': grade,
'breakdown': breakdown,
'summary': ' | '.join(summary_parts) if summary_parts else '一般'
}
@staticmethod
def identify_key_levels(df: pd.DataFrame, lookback: int = 60) -> Dict[str, List[float]]:
"""
识别关键支撑位和阻力位
Args:
df: K线数据
lookback: 回看周期
Returns:
{
'support': [支撑位1, 支撑位2, ...],
'resistance': [阻力位1, 阻力位2, ...]
}
"""
if df is None or len(df) < lookback:
return {'support': [], 'resistance': []}
try:
recent = df.tail(lookback)
highs = recent['high'].values
lows = recent['low'].values
# 找局部高点和低点
from scipy.signal import argrelextrema
from numpy import array
high_indices = argrelextrema(array(highs), np.greater, order=5)
low_indices = argrelextrema(array(lows), np.less, order=5)
resistance_levels = sorted([highs[i] for i in high_indices], reverse=True)[:3]
support_levels = sorted([lows[i] for i in low_indices])[:3]
return {
'resistance': resistance_levels,
'support': support_levels
}
except Exception as e:
logger.warning(f"关键位识别失败: {e}")
return {'support': [], 'resistance': []}
@staticmethod
def calculate_stop_loss_take_profit(
entry_price: float,
atr: float,
direction: str,
key_levels: Dict[str, List[float]] = None
) -> Dict[str, Any]:
"""
计算止损止盈价格
Args:
entry_price: 入场价格
atr: ATR值
direction: 'long'/'short'
key_levels: 支撑阻力位
Returns:
{
'stop_loss': float,
'take_profit': float,
'method': str,
'risk_reward_ratio': float
}
"""
if direction == 'long':
# 做多
# 止损:入场价 - 1.5×ATR或设在前支撑位下方
sl_atr = entry_price - 1.5 * atr
sl_support = None
if key_levels and key_levels.get('support'):
closest_support = [s for s in key_levels['support'] if s < entry_price]
if closest_support:
sl_support = min(closest_support) * 0.995 # 支撑位下方0.5%
# 选择更保守的止损(更远的)
if sl_support and sl_support < sl_atr:
stop_loss = sl_atr
else:
stop_loss = sl_atr
# 止盈:入场价 + 3×ATR风险收益比1:2
take_profit = entry_price + 3 * atr
else:
# 做空
# 止损:入场价 + 1.5×ATR或设在前阻力位上方
sl_atr = entry_price + 1.5 * atr
sl_resistance = None
if key_levels and key_levels.get('resistance'):
closest_resistance = [r for r in key_levels['resistance'] if r > entry_price]
if closest_resistance:
sl_resistance = min(closest_resistance) * 1.005 # 阻力位上方0.5%
# 选择更保守的止损
if sl_resistance and sl_resistance > sl_atr:
stop_loss = sl_atr
else:
stop_loss = sl_atr
# 止盈:入场价 - 3×ATR
take_profit = entry_price - 3 * atr
# 计算风险收益比
if direction == 'long':
risk = entry_price - stop_loss
reward = take_profit - entry_price
else:
risk = stop_loss - entry_price
reward = entry_price - take_profit
risk_reward_ratio = reward / risk if risk > 0 else 0
return {
'stop_loss': round(stop_loss, 2),
'take_profit': round(take_profit, 2),
'method': 'ATR-based (1:2 risk-reward)',
'risk_reward_ratio': round(risk_reward_ratio, 2)
}

View File

@ -308,6 +308,45 @@ class StockMarketSignalAnalyzer:
- 必须同时输出 `entry_price`建议入场价 `entry_type`入场方式
- 入场方式由你的市场分析判断不是简单的价格距离计算
## 止损止盈计算规则
使用ATR真实波动幅度动态计算止损止盈
**做多**
- 止损 = 入场价 - 1.5 × ATR(14)
- 止盈 = 入场价 + 3 × ATR(14) 风险收益比 1:2
- 如果附近有明显支撑位可将止损设在支撑位下方
**做空**
- 止损 = 入场价 + 1.5 × ATR(14)
- 止盈 = 入场价 - 3 × ATR(14) 风险收益比 1:2
- 如果附近有明显阻力位可将止损设在阻力位上方
## 信号输出条件(严格遵守)
**满足以下条件才输出信号否则 signals 返回空数组**
### 做多信号条件:
1. 趋势EMA20 > EMA50 > EMA200或处于回调中的上升趋势
2. 价格站稳 EMA20 之上或回调到 EMA20 附近
3. 量价放量上涨或缩量回调后重新放量
4. 共振多周期共振得分 >= 40至少大周期一致
5. 基本面评分 >= C级40分以上
6. 新闻无重大负面消息财报暴雷监管处罚等
### 做空信号条件:
1. 趋势EMA20 < EMA50 < EMA200或处于反弹中的下降趋势
2. 价格跌破 EMA20 或反弹到 EMA20 附近
3. 量价放量下跌或缩量反弹后继续下跌
4. 共振多周期共振得分 >= 40至少大周期一致
5. 基本面评分 >= C级40分以上
6. 新闻无重大正面消息
### 禁止输出信号的情况:
- 趋势不明确EMA 交织震荡市
- 多周期方向完全相反
- 基本面差D级<40
- 有重大负面新闻做多时或重大正面新闻做空时
- 技术指标矛盾如RSI超买但MACD死叉
## 输出格式
请严格按照以下 JSON 格式输出
@ -416,128 +455,89 @@ class StockMarketSignalAnalyzer:
symbols: List[str] = None,
fundamental_data: Dict[str, Any] = None,
news_data: List[Dict[str, Any]] = None) -> str:
"""准备市场上下文信息(技术面 + 基本面 + 新闻)"""
"""准备市场上下文信息结构化版本减少token消耗"""
from app.stock_agent.analysis_tools import StockAnalysisTools
context_parts = []
# 当前价格和24h变化(使用日线数据)
# 当前价格和24h变化
df_1d = data.get('1d')
if df_1d is None or len(df_1d) == 0:
df_1h = data.get('1h') # 备用使用1h数据
if df_1d is None or len(df_1d) == 0:
return "" # 没有数据就返回空
return ""
current_price = float(df_1d.iloc[-1]['close'])
price_change_24h = self._calculate_price_change_24h(df_1d)
context_parts.append(f"当前价格: ${current_price:,.2f} ({price_change_24h})")
context_parts.append(f"**当前价格**: ${current_price:,.2f} ({price_change_24h})")
# 多周期数据
for tf_name, df in data.items():
if df is None or len(df) == 0:
continue
# ===== 趋势分析(预计算) =====
context_parts.append(f"\n**## 趋势分析**")
trend_info = StockAnalysisTools.detect_trend(df_1d)
context_parts.append(f"- 方向: {trend_info['direction']} ({trend_info['strength']})")
context_parts.append(f"- EMA排列: {trend_info['ema_alignment']}")
context_parts.append(f"- 价格相对EMA20: {trend_info['price_vs_ema20']:+.2f}%")
latest = df.iloc[-1]
context_parts.append(f"\n## {tf_name} 数据")
context_parts.append(f"开: {latest['open']}, 高: {latest['high']}, 低: {latest['low']}, 收: {latest['close']}")
context_parts.append(f"成交量: {latest.get('volume', 'N/A')}")
# 多周期共振(预计算)
context_parts.append(f"\n**## 多周期共振**")
resonance = StockAnalysisTools.calculate_multi_timeframe_resonance(data)
context_parts.append(f"- 共振等级: {resonance['level'].upper()} (得分: {resonance['score']}/100)")
context_parts.append(f"- 共振分析: {resonance['analysis']}")
context_parts.append(f"- 周线: {resonance['weekly_trend']} | 日线: {resonance['daily_trend']} | 1h: {resonance['hourly_trend']}")
# 技术指标
if 'rsi' in df.columns:
rsi = df['rsi'].iloc[-1]
context_parts.append(f"RSI: {rsi:.2f}")
if 'macd' in df.columns:
macd = df['macd'].iloc[-1]
signal = df['macd_signal'].iloc[-1]
context_parts.append(f"MACD: {macd:.4f}, 信号线: {signal:.4f}")
if 'bb_upper' in df.columns:
bb_upper = df['bb_upper'].iloc[-1]
bb_lower = df['bb_lower'].iloc[-1]
context_parts.append(f"布林带: 上轨 {bb_upper:.2f}, 下轨 {bb_lower:.2f}")
# ===== 技术指标(只显示日线关键指标) =====
context_parts.append(f"\n**## 技术指标(日线)**")
latest = df_1d.iloc[-1]
# 均线系统(使用日线数据)
context_parts.append(f"\n## 均线系统")
df_1d = data.get('1d')
if df_1d is not None and len(df_1d) > 0:
latest = df_1d.iloc[-1]
context_parts.append(f"MA5: {latest.get('ma5', 'N/A')}")
context_parts.append(f"MA10: {latest.get('ma10', 'N/A')}")
context_parts.append(f"MA20: {latest.get('ma20', 'N/A')}")
context_parts.append(f"MA50: {latest.get('ma50', 'N/A')}")
# RSI
if 'rsi' in df_1d.columns:
rsi = df_1d['rsi'].iloc[-1]
context_parts.append(f"- RSI(14): {rsi:.1f} {'(超买⚠️)' if rsi > 70 else '(超卖💡)' if rsi < 30 else '(正常)'}")
# EMA 均线(用于趋势判断)
ema20 = latest.get('ema20', None)
ema50 = latest.get('ema50', None)
ema200 = latest.get('ema200', None)
if ema20 is not None:
context_parts.append(f"EMA20: {ema20:.2f}")
if ema50 is not None:
context_parts.append(f"EMA50: {ema50:.2f}")
if ema200 is not None:
context_parts.append(f"EMA200: {ema200:.2f}")
# MACD
if 'macd' in df_1d.columns:
macd = df_1d['macd'].iloc[-1]
signal = df_1d['macd_signal'].iloc[-1]
context_parts.append(f"- MACD: {macd:.4f} (信号: {signal:.4f})")
# 判断 MA 排列
ma5 = latest.get('ma5', 0)
ma10 = latest.get('ma10', 0)
ma20 = latest.get('ma20', 0)
ma50 = latest.get('ma50', 0)
# 布林带
if 'bb_upper' in df_1d.columns:
bb_upper = df_1d['bb_upper'].iloc[-1]
bb_lower = df_1d['bb_lower'].iloc[-1]
bb_position = (current_price - bb_lower) / (bb_upper - bb_lower) * 100 if bb_upper != bb_lower else 50
context_parts.append(f"- 布林带: [{bb_lower:.2f}, {bb_upper:.2f}] (位置: {bb_position:.0f}%)")
if all([ma5, ma10, ma20, ma50]):
if ma5 > ma10 > ma20 > ma50:
context_parts.append("MA排列: 多头排列 📈")
elif ma5 < ma10 < ma20 < ma50:
context_parts.append("MA排列: 空头排列 📉")
else:
context_parts.append("MA排列: 交织,方向不明")
# 量价分析
volume_ratio = StockAnalysisTools.calculate_volume_ratio(df_1d, 20)
context_parts.append(f"- 量比: {volume_ratio:.2f}x {'放量📊' if volume_ratio > 1.5 else '缩量📉' if volume_ratio < 0.7 else '平量'}")
# EMA 排列(更重要的趋势判断)
if all([ema20, ema50, ema200]):
if ema20 > ema50 > ema200:
context_parts.append("EMA排列: 多头排列 (长期趋势向上) 📈")
elif ema20 < ema50 < ema200:
context_parts.append("EMA排列: 空头排列 (长期趋势向下) 📉")
else:
context_parts.append("EMA排列: 交织 (震荡市) ➡️")
# ATR用于止损止盈
atr = StockAnalysisTools.calculate_atr(df_1d, 14)
atr_pct = (atr / current_price * 100) if current_price > 0 else 0
context_parts.append(f"- ATR(14): ${atr:.2f} ({atr_pct:.2f}%)")
# 价格与 EMA20 的关系(趋势判断关键)
if ema20 is not None:
if latest['close'] > ema20:
context_parts.append("价格位置: 站稳 EMA20 之上 (多头强势)")
elif latest['close'] < ema20:
context_parts.append("价格位置: 跌破 EMA20 (空头强势)")
else:
context_parts.append("价格位置: 接近 EMA20")
# ===== 关键支撑阻力位 =====
context_parts.append(f"\n**## 关键位**")
key_levels = StockAnalysisTools.identify_key_levels(df_1d, lookback=60)
if key_levels['resistance']:
context_parts.append(f"- 阻力位: ${', '.join(f'{r:.2f}' for r in key_levels['resistance'][:3])}")
else:
context_parts.append(f"- 阻力位: 未明确")
if key_levels['support']:
context_parts.append(f"- 支撑位: ${', '.join(f'{s:.2f}' for s in key_levels['support'][:3])}")
else:
context_parts.append(f"- 支撑位: 未明确")
# 量比分析(使用日线数据)
df_1d = data.get('1d')
if df_1d is not None and len(df_1d) >= 20:
vol_latest = df_1d['volume'].iloc[-1]
vol_ma20 = df_1d['volume'].iloc[-20:-1].mean()
volume_ratio = vol_latest / vol_ma20 if vol_ma20 > 0 else 1
context_parts.append(f"\n## 量价分析")
context_parts.append(f"最新成交量: {vol_latest:.0f}")
context_parts.append(f"20周期均量: {vol_ma20:.0f}")
context_parts.append(f"量比: {volume_ratio:.2f}")
if volume_ratio > 1.5:
context_parts.append("量价状态: 放量 📊")
elif volume_ratio < 0.7:
context_parts.append("量价状态: 缩量 📉")
else:
context_parts.append("量价状态: 平量 ")
# 波动率分析
volatility_analysis = self._analyze_volatility(data)
if volatility_analysis:
context_parts.append(f"\n## 波动率分析")
context_parts.append(volatility_analysis)
# 基本面分析
# ===== 基本面分析(评分) =====
if fundamental_data:
context_parts.append(f"\n## 基本面分析")
context_parts.append(self._format_fundamental_data(fundamental_data))
context_parts.append(f"\n**## 基本面分析**")
fund_score = StockAnalysisTools.calculate_fundamental_score(fundamental_data)
context_parts.append(f"- 综合评分: {fund_score['grade']}级 ({fund_score['score']}/100)")
context_parts.append(f"- 估值: {fund_score['breakdown']['valuation']['grade']}级 | 盈利: {fund_score['breakdown']['profitability']['grade']}")
context_parts.append(f"- 成长: {fund_score['breakdown']['growth']['grade']}级 | 财务: {fund_score['breakdown']['financial_health']['grade']}")
context_parts.append(f"- 摘要: {fund_score['summary']}")
# 新闻舆情
# ===== 新闻舆情 =====
if news_data:
context_parts.append(f"\n## 最新新闻")
context_parts.append(f"\n**## 最新新闻**")
context_parts.append(self._format_news_data(news_data))
return "\n".join(context_parts)