stock-ai-agent/backend/app/crypto_agent/market_signal_analyzer.py
2026-04-27 11:47:27 +08:00

2864 lines
128 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
市场信号分析器 - 纯市场分析,不包含任何仓位信息
职责:
1. 分析K线、量价、技术指标
2. 分析新闻舆情
3. 输出纯市场信号buy/sell/hold + confidence + reasoning
不负责:
- 仓位管理
- 风险控制
- 具体下单决策
"""
import json
import re
import asyncio
import numpy as np
import pandas as pd
from typing import Dict, Any, Optional, List
from datetime import datetime
from app.utils.logger import logger
from app.services.llm_service import llm_service
from app.services.news_service import get_news_service
from app.services.bitget_service import bitget_service
from app.crypto_agent.feature_engine import FeatureEngine
from app.crypto_agent.regime_engine import RegimeEngine
from app.crypto_agent.setup_policy import SetupPolicy
class MarketSignalAnalyzer:
"""市场信号分析器 - 只关注市场,输出客观信号"""
INTRADAY_ANALYSIS_TEMPERATURE = 0.12
TREND_ANALYSIS_TEMPERATURE = 0.08
ANALYSIS_MAX_TOKENS = 1200
LANE_MIN_CONFIDENCE = {
'short_term': 70,
'medium_term': 70,
}
LANE_MIN_RISK_REWARD = {
'short_term': 1.6,
'medium_term': 2.0,
}
LANE_MIN_STOP_LOSS_PCT = {
'short_term': 0.7,
'medium_term': 1.5,
}
LANE_MIN_TAKE_PROFIT_PCT = {
'short_term': 1.2,
'medium_term': 3.0,
}
FIB_MIN_PIVOT_SEPARATION_BARS = 4
FIB_PIVOT_VOLUME_LOOKBACK = 20
INTRADAY_ANALYSIS_PROMPT = """你是一位专业的加密货币日内交易员,只负责生成 short_term 信号。
你的任务是基于 5m / 15m、当日开盘、VWAP、开盘区间、关键位、Fib 回撤位和衍生品拥挤度,判断未来 30 分钟到 4 小时内是否存在可执行的合约日内 setup。
执行原则:
1. 先判断日内 regimetrending / ranging / neutral。
2. 趋势日内只做顺势回调或突破后的回踩确认,不追涨杀跌。
3. 震荡日内只做区间边界附近的反转,不在区间中部开仓。
4. 技术指标只做辅助优先看价格结构、供需区、量价是否匹配、VWAP 偏离和位置优势。
5. 优先使用“优先支撑 / 优先阻力”和“可交易多头区 / 可交易空头区”,普通支撑阻力只作补充。
6. 没有清晰止损、止盈和盈亏比就不交易。
7. 本次分析独立进行,不参考任何上一轮信号。
8. 硬性禁止:
- 如果多周期特征已确认上升趋势HH+HL 结构,或突破震荡区间向上),禁止输出 sell 信号。
- 如果多周期特征已确认下降趋势LL+LH 结构,或跌破震荡区间向下),禁止输出 buy 信号。
- 逆势信号只允许在 trend_direction=neutral 且有明确区间边界反转结构时输出。
信号要求:
1. 只允许输出 0 或 1 个 short_term 信号。
2. 盈亏比至少 1:1.6。
3. 如果价格处于加速延伸、远离优先交易区、或衍生品同向拥挤,优先返回空信号。
4. 如果价格位于区间中部、离关键位太远、止损过宽或方向证据冲突,必须返回空信号。
5. 做多时entry 应尽量靠近优先支撑或多头共振区做空时entry 应尽量靠近优先阻力或空头共振区。
6. 只有在 setup 足够清晰时才允许输出信号;宁可空仓,不要勉强给单。
7. entry_type:
- 价格已回到关键位并出现确认,可用 market
- 仍需等待回踩/反抽,使用 limit
8. grade / confidence 约束:
- A: 80-100结构、位置、量价、时机都对齐
- B: 70-79条件较完整但仍有一项次优
- C: 70-71只有轻仓试错级别
- 70 以下不要输出交易信号
9. 止损止盈距离下限:
- short_term 止损距离至少 0.7%
- short_term 止盈距离至少 1.2%
10. reasoning 必须覆盖四点中的至少三点:结构、位置、量价关系、衍生品拥挤度。
11. 如果数据明确显示 `market_location=middle_of_range` 或 `far_from_trade_zone`,必须返回空信号。
12. 如果突破没有得到量价确认,或回调不是缩量回调,必须显著降低做单积极性,必要时直接返回空信号。
输出 JSON禁止输出解释性正文
```json
{
"market_state": "ranging/trending/neutral",
"trend_direction": "uptrend/downtrend/neutral",
"trend_strength": "strong/medium/weak",
"analysis_summary": "20字以内总结日内状态",
"key_levels": {
"support": [数字, 数字],
"resistance": [数字, 数字]
},
"signals": [
{
"type": "short_term",
"action": "buy/sell",
"entry_type": "market/limit",
"confidence": 0,
"grade": "A/B/C",
"entry_price": 0,
"stop_loss": 0,
"take_profit": 0,
"reasoning": "结构+位置+量价/波动+拥挤度"
}
]
}
```
额外约束:
1. `analysis_summary` 控制在 20 个中文字符以内。
2. `reasoning` 只写一条简洁证据链,不要写仓位建议。
3. `entry_price` / `stop_loss` / `take_profit` 必须是纯数字。
4. 做多必须满足 `stop_loss < entry_price < take_profit`;做空必须满足 `take_profit < entry_price < stop_loss`。
5. 没有 setup 时必须返回 `signals: []`。
"""
TREND_ANALYSIS_PROMPT = """你是一位专业的加密货币趋势交易员,只负责生成 medium_term 信号。
你的任务是基于 1h / 4h / 1d、关键位、Fib 回撤/扩展位、趋势阶段、反转检测、衍生品拥挤度和新闻催化,判断未来 4 小时到 1 周内是否存在可执行的合约趋势 setup。
执行原则:
1. 4h/1d 决定大方向1h 决定节奏与入场位置。
2. 只做两类交易:
- 趋势延续4h/1d 趋势明确1h 回踩关键位后确认继续
- 趋势反转4h/1d 结构和 1h 动能同时改善,且反转证据充分
3. 禁止仅凭 15m 噪音逆 4h 开仓。
4. 趋势晚期、资金费率过热、价格过度偏离供需区、量价失配、或衍生品顺向拥挤时,要显著降低开仓积极性。
5. 没有清晰位置优势就不交易。
6. 本次分析独立进行,不参考任何上一轮信号。
7. 优先使用“优先支撑 / 优先阻力”和“可交易多头区 / 可交易空头区”,普通关键位只作补充。
8. 趋势单的核心不是猜方向,而是等待大级别方向明确后,在有位置优势的回踩/反抽处开仓。
信号要求:
1. 只允许输出 0 或 1 个 medium_term 信号。
2. 盈亏比至少 1:2.0。
3. 如果 4h/1d 与 1h 明显冲突,优先返回空信号。
4. 反转信号必须比延续信号更严格。
5. 如果趋势处于晚期且没有回踩确认,或反转证据不足,必须返回空信号。
6. 只有在位置优势和方向一致性都充分时才允许开仓。
7. 趋势延续单的 entry 应优先靠近优先支撑/阻力或对应共振区,不在远离关键位的位置追价。
8. grade / confidence 约束:
- A: 82-1004h/1d/1h 同向且位置优
- B: 72-81趋势或反转证据较完整
- C: 70-71仅限早期确认不足的轻仓趋势尝试
- 70 以下不要输出交易信号
9. 止损止盈距离下限:
- medium_term 止损距离至少 1.5%
- medium_term 止盈距离至少 3.0%
10. reasoning 必须明确大级别方向、1h 入场节奏、位置优势、量价关系/拥挤度风险。
11. 如果价格已经远离优先交易区,或趋势方向虽对但没有回踩/反抽确认,必须返回空信号。
12. 趋势延续单必须尽量体现“推进放量、回调缩量、关键位再接受”中的至少两项;否则优先空仓。
输出 JSON禁止输出解释性正文
```json
{
"market_state": "ranging/trending/neutral",
"trend_direction": "uptrend/downtrend/neutral",
"trend_strength": "strong/medium/weak",
"analysis_summary": "20字以内总结趋势状态",
"key_levels": {
"support": [数字, 数字],
"resistance": [数字, 数字]
},
"signals": [
{
"type": "medium_term",
"action": "buy/sell",
"entry_type": "market/limit",
"confidence": 0,
"grade": "A/B/C",
"entry_price": 0,
"stop_loss": 0,
"take_profit": 0,
"reasoning": "4h/1d方向+1h节奏+位置优势+拥挤度"
}
]
}
```
额外约束:
1. `analysis_summary` 控制在 20 个中文字符以内。
2. `reasoning` 只写一条简洁证据链,不要写仓位建议。
3. `entry_price` / `stop_loss` / `take_profit` 必须是纯数字。
4. 做多必须满足 `stop_loss < entry_price < take_profit`;做空必须满足 `take_profit < entry_price < stop_loss`。
5. 没有 setup 时必须返回 `signals: []`。
"""
def __init__(self):
self.news_service = get_news_service()
self.exchange = bitget_service
self.feature_engine = FeatureEngine()
self.regime_engine = RegimeEngine()
self.setup_policy = SetupPolicy()
async def analyze(self, symbol: str, data: Dict[str, Any],
symbols: List[str] = None,
lanes: Optional[List[str]] = None,
cached_lane_results: Optional[Dict[str, Dict[str, Any]]] = None) -> Dict[str, Any]:
"""
分析市场并生成信号
Args:
symbol: 交易对
data: 多周期K线数据
symbols: 所有监控的交易对(用于市场对比)
Returns:
市场信号字典
"""
try:
# 1. 准备市场数据
market_context = self._prepare_market_context(symbol, data, symbols)
# 2. 获取新闻舆情
news_context = await self._get_news_context(symbol)
# 3. 获取合约市场数据(资金费率、持仓量等)
futures_context, futures_market_data = await self._get_futures_context(symbol)
lanes_to_run = set(lanes or ["intraday", "trend"])
cached_lane_results = cached_lane_results or {}
lane_tasks = {}
if "intraday" in lanes_to_run:
intraday_prompt = self._build_analysis_prompt(
symbol=symbol,
lane="intraday",
market_context=market_context,
news_context=news_context,
futures_context=futures_context,
futures_market_data=futures_market_data,
)
lane_tasks["intraday"] = llm_service.achat(
[
{"role": "system", "content": self.INTRADAY_ANALYSIS_PROMPT},
{"role": "user", "content": intraday_prompt}
],
temperature=self.INTRADAY_ANALYSIS_TEMPERATURE,
max_tokens=self.ANALYSIS_MAX_TOKENS
)
if "trend" in lanes_to_run:
trend_prompt = self._build_analysis_prompt(
symbol=symbol,
lane="trend",
market_context=market_context,
news_context=news_context,
futures_context=futures_context,
futures_market_data=futures_market_data,
)
lane_tasks["trend"] = llm_service.achat(
[
{"role": "system", "content": self.TREND_ANALYSIS_PROMPT},
{"role": "user", "content": trend_prompt}
],
temperature=self.TREND_ANALYSIS_TEMPERATURE,
max_tokens=self.ANALYSIS_MAX_TOKENS
)
lane_responses = {}
if lane_tasks:
responses = await asyncio.gather(*lane_tasks.values())
lane_responses = dict(zip(lane_tasks.keys(), responses))
intraday_result = (
self._parse_llm_response(lane_responses.get("intraday") or "", symbol)
if "intraday" in lane_responses
else dict(cached_lane_results.get("intraday") or self._get_empty_signal(symbol))
)
trend_result = (
self._parse_llm_response(lane_responses.get("trend") or "", symbol)
if "trend" in lane_responses
else dict(cached_lane_results.get("trend") or self._get_empty_signal(symbol))
)
intraday_result['_lane_source'] = 'fresh' if "intraday" in lane_responses else 'cache'
trend_result['_lane_source'] = 'fresh' if "trend" in lane_responses else 'cache'
result = self._merge_lane_results(symbol, intraday_result, trend_result)
result['llm_lanes'] = {
'requested': sorted(lanes_to_run),
'fresh': sorted(lane_responses.keys()),
'cached': sorted(set(["intraday", "trend"]) - set(lane_responses.keys())),
}
result['lane_results'] = {
'intraday': intraday_result,
'trend': trend_result,
}
# 携带量化 regime 数据到最终结果,供执行层使用
if market_context.get('range_metrics'):
result['range_metrics'] = market_context['range_metrics']
# 携带资金费率数据到最终结果,供执行层风控使用
if futures_market_data:
funding = futures_market_data.get('funding_rate') or {}
result['funding_rate_data'] = {
'funding_rate': funding.get('funding_rate'),
'funding_rate_percent': funding.get('funding_rate_percent', 0),
'sentiment': funding.get('sentiment'),
'sentiment_level': funding.get('sentiment_level'),
'open_interest': futures_market_data.get('open_interest'),
}
result = self.apply_regime_policy(
symbol=symbol,
market_signal=result,
market_context=market_context,
futures_market_data=futures_market_data,
)
return result
except Exception as e:
logger.error(f"市场信号分析失败: {e}")
import traceback
logger.debug(traceback.format_exc())
return self._get_empty_signal(symbol)
def _prepare_market_context(self, symbol: str, data: Dict,
symbols: List[str] = None) -> Dict[str, str]:
"""准备市场上下文信息"""
current_price = float(data['5m'].iloc[-1]['close'])
price_change_24h = self._calculate_price_change_24h(data['1h'])
day_open = self.feature_engine.get_session_open(data.get('1h'))
session_vwap = self.feature_engine.calculate_session_vwap(data.get('5m'))
opening_range = self.feature_engine.calculate_opening_range(data.get('5m'))
feature_5m = self.feature_engine.summarize_timeframe_features(data.get('5m'), '5m')
feature_15m = self.feature_engine.summarize_timeframe_features(data.get('15m'), '15m')
feature_1h = self.feature_engine.summarize_timeframe_features(data.get('1h'), '1h')
feature_4h = self.feature_engine.summarize_timeframe_features(data.get('4h'), '4h')
feature_1d = self.feature_engine.summarize_timeframe_features(data.get('1d'), '1d')
intraday_alignment = self._describe_alignment([feature_5m, feature_15m])
trend_alignment = self._describe_alignment([feature_1h, feature_4h, feature_1d])
range_zone = self._detect_range_zone(data)
reversal_detection = self._detect_trend_reversal(data)
trend_stage = self._detect_trend_stage(data)
fib_context = self._build_fibonacci_context(data, current_price)
key_levels = self._derive_key_levels(data, range_zone, fib_context, current_price)
market_location = self._build_market_location_summary(current_price, range_zone, key_levels)
snapshot_parts = [
f"## 市场快照",
f"- 交易对: {symbol}",
f"- 当前价格: {current_price:.2f}",
f"- 24h涨跌: {price_change_24h}",
f"- 当日开盘: {day_open:.2f}" if day_open is not None else "- 当日开盘: N/A",
f"- 会话VWAP: {session_vwap:.2f}" if session_vwap is not None else "- 会话VWAP: N/A",
]
if day_open:
snapshot_parts.append(f"- 相对日开盘偏离: {((current_price - day_open) / day_open) * 100:+.2f}%")
if session_vwap:
snapshot_parts.append(f"- 相对VWAP偏离: {((current_price - session_vwap) / session_vwap) * 100:+.2f}%")
if opening_range:
snapshot_parts.append(
f"- 开盘区间(前30分钟): 高 {opening_range['high']:.2f} / 低 {opening_range['low']:.2f}"
)
snapshot_parts.append(f"- 市场位置: {market_location['summary']}")
intraday_parts = [
"## 日内特征",
self._format_feature_line(feature_5m),
self._format_feature_line(feature_15m),
f"- 日内级别一致性: {intraday_alignment}",
]
trend_parts = [
"## 趋势特征",
self._format_feature_line(feature_1h),
self._format_feature_line(feature_4h),
self._format_feature_line(feature_1d),
f"- 趋势级别一致性: {trend_alignment}",
]
if trend_stage.get('stage') != 'unknown':
stage_map = {'early': '早期', 'middle': '中期', 'late': '晚期'}
trend_parts.append(
f"- 趋势阶段: {stage_map.get(trend_stage['stage'], trend_stage['stage'])} ({trend_stage['confidence']}%) | {trend_stage['analysis']}"
)
levels_parts = [
"## 关键位",
f"- 支撑位: {self._format_levels(key_levels.get('support'))}",
f"- 阻力位: {self._format_levels(key_levels.get('resistance'))}",
]
if key_levels.get('support_priority'):
levels_parts.append(f"- 优先支撑: {self._format_level_priority(key_levels['support_priority'])}")
if key_levels.get('resistance_priority'):
levels_parts.append(f"- 优先阻力: {self._format_level_priority(key_levels['resistance_priority'])}")
if key_levels.get('best_long_zone'):
levels_parts.append(f"- 可交易多头区: {self._format_trade_zone(key_levels['best_long_zone'])}")
if key_levels.get('best_short_zone'):
levels_parts.append(f"- 可交易空头区: {self._format_trade_zone(key_levels['best_short_zone'])}")
if fib_context.get('intraday'):
intraday_parts.append(f"- 日内Fib: {fib_context['intraday']}")
if fib_context.get('trend'):
trend_parts.append(f"- 趋势Fib: {fib_context['trend']}")
if range_zone.get('is_ranging'):
intraday_parts.append(
f"- 区间判断: 是 ({range_zone['confidence']}%) | 宽度 {range_zone.get('range_width_pct', 0):.2f}% | {range_zone.get('analysis', '')}"
)
else:
intraday_parts.append("- 区间判断: 否")
if reversal_detection.get('is_reversing'):
reversal_type = "bullish" if reversal_detection.get('reversal_type') == 'bullish_reversal' else "bearish"
signal_desc = ", ".join(sig['desc'] for sig in reversal_detection.get('signals', [])[:3])
trend_parts.append(
f"- 反转检测: {reversal_type} ({reversal_detection['confidence']}%) | {signal_desc}"
)
else:
trend_parts.append("- 反转检测: 无显著反转信号")
# 震荡市场量化
range_metrics = self._quantify_ranging_state(data)
range_warning = ""
if range_metrics['regime'] in ('ranging', 'transitional'):
range_warning = (
f"\n## 震荡市场警告\n"
f"- 当前市场状态: {range_metrics['regime']} (score {range_metrics['regime_score']}/100)\n"
f"- ATR(1h): {range_metrics['atr_pct']:.3f}%\n"
f"- EMA收敛度: {range_metrics['ema_convergence_pct']:.3f}%\n"
f"- 方向效率: {range_metrics['range_efficiency']:.2f} (0=纯震荡, 1=纯趋势)\n"
f"- ADX: {range_metrics['adx']:.1f}\n"
f"- 建议: {'避免趋势策略,只做区间边界反转' if range_metrics['regime'] == 'ranging' else '谨慎开仓,降低仓位'}\n"
)
intraday_parts.append(
f"- ⚠️ 震荡市场: regime={range_metrics['regime']} (score {range_metrics['regime_score']}) | "
f"ATR={range_metrics['atr_pct']:.2f}% | EMA收敛={range_metrics['ema_convergence_pct']:.2f}% | "
f"效率={range_metrics['range_efficiency']:.2f} | ADX={range_metrics['adx']:.1f}"
)
trend_parts.append(
f"- 市场结构: {range_metrics['regime']} | ADX={range_metrics['adx']:.1f} | "
f"{'BB收口' if range_metrics['bb_squeeze'] else 'BB正常'}"
)
intraday_structured = self._build_market_context_block(
lane='intraday',
symbol=symbol,
current_price=current_price,
day_open=day_open,
session_vwap=session_vwap,
opening_range=opening_range,
intraday_alignment=intraday_alignment,
trend_alignment=trend_alignment,
feature_map={
'5m': feature_5m,
'15m': feature_15m,
'1h': feature_1h,
'4h': feature_4h,
},
range_zone=range_zone,
range_metrics=range_metrics,
reversal_detection=reversal_detection,
trend_stage=trend_stage,
fib_context=fib_context,
key_levels=key_levels,
market_location=market_location,
)
trend_structured = self._build_market_context_block(
lane='trend',
symbol=symbol,
current_price=current_price,
day_open=day_open,
session_vwap=session_vwap,
opening_range=opening_range,
intraday_alignment=intraday_alignment,
trend_alignment=trend_alignment,
feature_map={
'15m': feature_15m,
'1h': feature_1h,
'4h': feature_4h,
'1d': feature_1d,
},
range_zone=range_zone,
range_metrics=range_metrics,
reversal_detection=reversal_detection,
trend_stage=trend_stage,
fib_context=fib_context,
key_levels=key_levels,
market_location=market_location,
)
return {
'snapshot': "\n".join(snapshot_parts),
'intraday': "\n".join(intraday_parts),
'trend': "\n".join(trend_parts),
'levels': "\n".join(levels_parts),
'range_warning': range_warning,
'range_metrics': range_metrics,
'market_location': market_location,
'intraday_structured': intraday_structured,
'trend_structured': trend_structured,
'reversal_detection': reversal_detection,
'trend_stage': trend_stage,
}
def _format_feature_line(self, feature: Dict[str, Any]) -> str:
"""格式化单周期特征摘要"""
if not feature.get('available'):
return f"- {feature.get('timeframe')}: 数据不足"
def fmt(value: Optional[float], digits: int = 2) -> str:
return "N/A" if value is None else f"{value:+.{digits}f}%"
def safe(value: Optional[float], digits: int = 1) -> str:
return "N/A" if value is None else f"{value:.{digits}f}"
adx_part = f" | ADX={safe(feature.get('adx'), 1)}({feature['trend_strength_adx']})" if feature.get('adx') else ""
return (
f"- {feature['timeframe']}: 结构={feature['structure']} | EMA={feature['ema_alignment']} | "
f"3bar={fmt(feature['momentum_3'])} | 12bar={fmt(feature['momentum_12'])} | "
f"RSI={safe(feature['rsi'], 1)} | ATR={safe(feature['atr_pct'], 2)}% | "
f"量比={safe(feature['volume_ratio'], 2)} | "
f"量价={feature.get('volume_price_state', 'neutral')} | "
f"突破={feature.get('breakout_quality', 'none')} | "
f"回调={feature.get('pullback_quality', 'neutral')} | "
f"拒绝={feature.get('rejection_signal', 'none')} | "
f"高潮={feature.get('exhaustion_risk', 'low')} | "
f"距EMA20={fmt(feature['distance_to_ema20'])} | "
f"距20bar高点={fmt(feature['distance_to_recent_high'])} | "
f"距20bar低点={fmt(feature['distance_to_recent_low'])} | "
f"加速={'' if feature['is_accelerating'] else ''}"
f"{adx_part}"
)
def _describe_alignment(self, features: List[Dict[str, Any]]) -> str:
"""描述多周期方向一致性"""
directions = []
for feature in features:
if not feature.get('available'):
continue
direction = feature.get('ema_alignment')
if direction == 'mixed':
direction = 'neutral'
directions.append(direction)
if not directions:
return "数据不足"
if all(direction == 'bull' for direction in directions):
return "多头一致"
if all(direction == 'bear' for direction in directions):
return "空头一致"
if all(direction == 'neutral' for direction in directions):
return "全部中性"
return "存在分歧"
def _derive_key_levels(self, data: Dict[str, pd.DataFrame], range_zone: Dict[str, Any],
fib_context: Optional[Dict[str, Any]] = None,
current_price: Optional[float] = None) -> Dict[str, Any]:
"""提炼高价值支撑阻力位"""
support_candidates: List[Dict[str, Any]] = []
resistance_candidates: List[Dict[str, Any]] = []
if range_zone.get('support_level'):
support_candidates.append(self._make_level_candidate(float(range_zone['support_level']), 1.2, "区间下沿"))
if range_zone.get('resistance_level'):
resistance_candidates.append(self._make_level_candidate(float(range_zone['resistance_level']), 1.2, "区间上沿"))
for timeframe, count, tf_weight in [('15m', 20, 0.95), ('1h', 20, 1.15), ('4h', 12, 1.3), ('1d', 10, 1.5)]:
df = data.get(timeframe)
if df is None or len(df) < count:
continue
window = df.iloc[-count:]
support_candidates.append(self._make_level_candidate(float(window['low'].min()), tf_weight, f"{timeframe}低点"))
resistance_candidates.append(self._make_level_candidate(float(window['high'].max()), tf_weight, f"{timeframe}高点"))
ema20 = df['ema20'].iloc[-1] if 'ema20' in df.columns else None
if pd.notna(ema20):
if float(ema20) < float(df['close'].iloc[-1]):
support_candidates.append(self._make_level_candidate(float(ema20), tf_weight * 0.9, f"{timeframe}EMA20"))
else:
resistance_candidates.append(self._make_level_candidate(float(ema20), tf_weight * 0.9, f"{timeframe}EMA20"))
fib_levels = fib_context or {}
for detail in fib_levels.get('support_details', []):
support_candidates.append(
self._make_level_candidate(
float(detail['price']),
self._score_fib_level(detail),
f"Fib{detail['ratio']:.3f}"
)
)
for detail in fib_levels.get('resistance_details', []):
resistance_candidates.append(
self._make_level_candidate(
float(detail['price']),
self._score_fib_level(detail),
f"Fib{detail['ratio']:.3f}"
)
)
ranked_supports = self._rank_level_candidates(support_candidates, current_price, reverse=True)
ranked_resistances = self._rank_level_candidates(resistance_candidates, current_price, reverse=False)
best_long_zone = self._build_trade_zone(ranked_supports, current_price, "buy")
best_short_zone = self._build_trade_zone(ranked_resistances, current_price, "sell")
return {
'support': self._sort_level_prices([item['price'] for item in ranked_supports[:3]], reverse=True),
'resistance': self._sort_level_prices([item['price'] for item in ranked_resistances[:3]], reverse=False),
'support_priority': ranked_supports[:3],
'resistance_priority': ranked_resistances[:3],
'best_long_zone': best_long_zone,
'best_short_zone': best_short_zone,
}
def _build_fibonacci_context(self, data: Dict[str, pd.DataFrame], current_price: float) -> Dict[str, Any]:
"""提炼日内与趋势的 Fib 关键位,只保留对当前价格最有意义的层级"""
contexts = {
'intraday': '',
'trend': '',
'support_levels': [],
'resistance_levels': [],
'support_details': [],
'resistance_details': [],
}
fib_specs = [
('15m', 48, 'intraday', '日内'),
('4h', 60, 'trend', '趋势'),
('1d', 30, 'trend', '日线'),
]
for timeframe, lookback, key, label in fib_specs:
fib_result = self._calculate_fibonacci_levels(data.get(timeframe), current_price, lookback)
if not fib_result:
continue
summary = (
f"{label}波段 {fib_result['swing_low']:.2f}->{fib_result['swing_high']:.2f} | "
f"方向={fib_result['direction']} | "
f"支撑={self._format_fib_levels(fib_result.get('support_details'))} | "
f"阻力={self._format_fib_levels(fib_result.get('resistance_details'))}"
)
if fib_result.get('confluence'):
summary += f" | 共振={fib_result['confluence']}"
if fib_result.get('trade_zone'):
summary += f" | 可交易区={fib_result['trade_zone']}"
if key in contexts and contexts[key]:
contexts[key] += f"\n{summary}"
else:
contexts[key] = summary
contexts['support_levels'].extend(fib_result.get('support_levels', []))
contexts['resistance_levels'].extend(fib_result.get('resistance_levels', []))
contexts['support_details'].extend(fib_result.get('support_details', []))
contexts['resistance_details'].extend(fib_result.get('resistance_details', []))
contexts['support_levels'] = self._dedupe_levels(contexts['support_levels'], reverse=True)
contexts['resistance_levels'] = self._dedupe_levels(contexts['resistance_levels'], reverse=False)
return contexts
def _calculate_fibonacci_levels(self, df: Optional[pd.DataFrame], current_price: float,
lookback: int = 60) -> Optional[Dict[str, Any]]:
"""基于最近确认波段计算 Fib 回撤和扩展位"""
if df is None or df.empty or len(df) < max(lookback // 2, 20):
return None
window = df.iloc[-lookback:].copy()
swing = self._select_recent_fib_swing(window, current_price)
if swing:
pivot_high = float(swing['high'])
pivot_low = float(swing['low'])
is_upswing = swing['direction'] == 'up'
else:
pivot_high = float(window['high'].max())
pivot_low = float(window['low'].min())
high_idx = window['high'].idxmax()
low_idx = window['low'].idxmin()
is_upswing = low_idx < high_idx
span = pivot_high - pivot_low
if span <= 0 or pivot_low <= 0:
return None
direction = 'up' if is_upswing else 'down'
retracement_ratios = [0.382, 0.5, 0.618, 0.786]
extension_ratios = [1.272, 1.618]
levels: List[Dict[str, Any]] = []
if is_upswing:
for ratio in retracement_ratios:
price = pivot_high - span * ratio
levels.append({'kind': 'retracement', 'ratio': ratio, 'price': price})
for ratio in extension_ratios:
price = pivot_low + span * ratio
levels.append({'kind': 'extension', 'ratio': ratio, 'price': price})
else:
for ratio in retracement_ratios:
price = pivot_low + span * ratio
levels.append({'kind': 'retracement', 'ratio': ratio, 'price': price})
for ratio in extension_ratios:
price = pivot_high - span * ratio
levels.append({'kind': 'extension', 'ratio': ratio, 'price': price})
support_candidates = sorted(
[level for level in levels if level['price'] < current_price],
key=lambda item: item['price'],
reverse=True
)
resistance_candidates = sorted(
[level for level in levels if level['price'] > current_price],
key=lambda item: item['price']
)
if not support_candidates and not resistance_candidates:
nearest = sorted(levels, key=lambda item: abs(item['price'] - current_price))[:2]
support_candidates = sorted(
[level for level in nearest if level['price'] <= current_price],
key=lambda item: item['price'],
reverse=True
)
resistance_candidates = sorted(
[level for level in nearest if level['price'] > current_price],
key=lambda item: item['price']
)
support_details = self._serialize_fib_levels(support_candidates[:2], current_price)
resistance_details = self._serialize_fib_levels(resistance_candidates[:2], current_price)
support_levels = [level['price'] for level in support_details]
resistance_levels = [level['price'] for level in resistance_details]
confluence_parts = []
ema20 = window['ema20'].iloc[-1] if 'ema20' in window.columns else None
if pd.notna(ema20):
nearest_fib = min(levels, key=lambda item: abs(item['price'] - float(ema20)))
distance = abs(nearest_fib['price'] - float(ema20)) / float(ema20)
if distance <= 0.004:
confluence_parts.append(f"Fib{nearest_fib['ratio']:.3f}+EMA20")
recent_high = float(window['high'].iloc[-12:].max())
recent_low = float(window['low'].iloc[-12:].min())
for ref_price, ref_name in [(recent_high, '前高'), (recent_low, '前低')]:
nearest_fib = min(levels, key=lambda item: abs(item['price'] - ref_price))
distance = abs(nearest_fib['price'] - ref_price) / ref_price if ref_price > 0 else 1
if distance <= 0.004:
confluence_parts.append(f"Fib{nearest_fib['ratio']:.3f}+{ref_name}")
return {
'direction': direction,
'swing_high': pivot_high,
'swing_low': pivot_low,
'support_levels': support_levels,
'resistance_levels': resistance_levels,
'support_details': support_details,
'resistance_details': resistance_details,
'confluence': " / ".join(dict.fromkeys(confluence_parts)),
'trade_zone': self._describe_fib_trade_zone(direction, support_details, resistance_details),
}
def _select_recent_fib_swing(self, window: pd.DataFrame, current_price: float) -> Optional[Dict[str, Any]]:
"""从最近确认 pivot 中选择一个有效波段用于 Fib"""
pivots = self._find_confirmed_pivots(window)
if len(pivots) < 2:
return None
atr_pct = self._estimate_window_atr_pct(window, current_price)
min_span_pct = max(0.01 if current_price >= 100 else 0.015, atr_pct * 1.8)
ema20 = window['ema20'].iloc[-1] if 'ema20' in window.columns else None
current_close = float(window['close'].iloc[-1])
best_candidate = None
best_score = float('-inf')
for end_idx in range(len(pivots) - 1, 0, -1):
end_pivot = pivots[end_idx]
for start_idx in range(end_idx - 1, -1, -1):
start_pivot = pivots[start_idx]
if start_pivot['type'] == end_pivot['type']:
continue
low_price = min(start_pivot['price'], end_pivot['price'])
high_price = max(start_pivot['price'], end_pivot['price'])
if low_price <= 0:
continue
separation_bars = end_pivot['pos'] - start_pivot['pos']
if separation_bars < self.FIB_MIN_PIVOT_SEPARATION_BARS:
continue
span_pct = (high_price - low_price) / low_price
if span_pct < min_span_pct:
continue
direction = 'up' if start_pivot['type'] == 'low' else 'down'
volume_score = self._pivot_volume_score(window, start_pivot['pos']) + self._pivot_volume_score(window, end_pivot['pos'])
direction_score = 0.0
if pd.notna(ema20):
if direction == 'up' and current_close >= float(ema20):
direction_score = 0.4
elif direction == 'down' and current_close <= float(ema20):
direction_score = 0.4
score = (
(end_pivot['pos'] / max(len(window), 1)) * 1.8 +
min(span_pct / max(min_span_pct, 1e-6), 3.0) +
min(separation_bars / 12, 1.2) +
volume_score +
direction_score
)
candidate = {
'low': low_price,
'high': high_price,
'direction': direction,
'start_pos': start_pivot['pos'],
'end_pos': end_pivot['pos'],
'score': score,
}
if score > best_score:
best_candidate = candidate
best_score = score
return best_candidate
def _find_confirmed_pivots(self, window: pd.DataFrame, left_bars: int = 2, right_bars: int = 2) -> List[Dict[str, Any]]:
"""寻找确认过的 pivot high / low"""
pivots: List[Dict[str, Any]] = []
if window is None or len(window) < left_bars + right_bars + 1:
return pivots
highs = window['high'].tolist()
lows = window['low'].tolist()
for pos in range(left_bars, len(window) - right_bars):
high = float(highs[pos])
low = float(lows[pos])
left_highs = highs[pos - left_bars:pos]
right_highs = highs[pos + 1:pos + 1 + right_bars]
left_lows = lows[pos - left_bars:pos]
right_lows = lows[pos + 1:pos + 1 + right_bars]
if all(high > value for value in left_highs) and all(high >= value for value in right_highs):
pivots.append({'pos': pos, 'type': 'high', 'price': high})
if all(low < value for value in left_lows) and all(low <= value for value in right_lows):
pivots.append({'pos': pos, 'type': 'low', 'price': low})
pivots.sort(key=lambda item: item['pos'])
return self._compress_adjacent_pivots(pivots)
def _compress_adjacent_pivots(self, pivots: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""压缩相邻同类型 pivot保留更极端的那个"""
compressed: List[Dict[str, Any]] = []
for pivot in pivots:
if not compressed or compressed[-1]['type'] != pivot['type']:
compressed.append(pivot)
continue
previous = compressed[-1]
if pivot['type'] == 'high' and pivot['price'] >= previous['price']:
compressed[-1] = pivot
elif pivot['type'] == 'low' and pivot['price'] <= previous['price']:
compressed[-1] = pivot
return compressed
def _estimate_window_atr_pct(self, window: pd.DataFrame, current_price: float) -> float:
"""估算当前窗口 ATR 百分比,用于过滤过小波段"""
if 'atr' in window.columns:
atr_series = window['atr'].dropna()
if not atr_series.empty and current_price > 0:
return float(atr_series.iloc[-1]) / current_price
return 0.006
def _pivot_volume_score(self, window: pd.DataFrame, pos: int) -> float:
"""估算 pivot 附近的成交量强度"""
if 'volume' not in window.columns or pos >= len(window):
return 0.0
baseline_start = max(0, pos - self.FIB_PIVOT_VOLUME_LOOKBACK)
baseline = float(window['volume'].iloc[baseline_start:pos + 1].mean()) if pos > baseline_start else 0.0
if baseline <= 0:
return 0.0
pivot_volume = float(window['volume'].iloc[pos])
ratio = pivot_volume / baseline
if ratio >= 1.8:
return 0.5
if ratio >= 1.3:
return 0.25
return 0.0
def _serialize_fib_levels(self, levels: List[Dict[str, Any]], current_price: float) -> List[Dict[str, Any]]:
"""把 Fib 层级转成带距离说明的结构"""
serialized: List[Dict[str, Any]] = []
for level in levels:
price = float(level['price'])
distance_pct = abs(price - current_price) / current_price * 100 if current_price > 0 else 0
serialized.append({
'ratio': float(level['ratio']),
'price': price,
'distance_pct': distance_pct,
'kind': level.get('kind', 'retracement'),
})
return serialized
def _format_fib_levels(self, levels: Optional[List[Dict[str, Any]]]) -> str:
if not levels:
return "N/A"
return ", ".join(
f"{level['price']:.2f}({self._fib_kind_label(level.get('kind'))}Fib{level['ratio']:.3f}, {level['distance_pct']:.1f}%)"
for level in levels[:2]
)
def _fib_kind_label(self, kind: Optional[str]) -> str:
if kind == 'extension':
return "扩展"
return "回撤"
def _describe_fib_trade_zone(self, direction: str,
support_details: List[Dict[str, Any]],
resistance_details: List[Dict[str, Any]]) -> str:
if direction == 'up' and support_details:
return f"顺势回踩 {support_details[0]['price']:.2f} 附近"
if direction == 'down' and resistance_details:
return f"顺势反抽 {resistance_details[0]['price']:.2f} 附近"
nearest = support_details[:1] + resistance_details[:1]
if nearest:
return f"{nearest[0]['price']:.2f} 附近观察反应"
return ""
def _make_level_candidate(self, price: float, score: float, source: str) -> Dict[str, Any]:
return {
'price': float(price),
'score': float(score),
'sources': [source],
}
def _score_fib_level(self, detail: Dict[str, Any]) -> float:
ratio = round(float(detail.get('ratio', 0)), 3)
kind = detail.get('kind', 'retracement')
ratio_score = {
0.382: 0.95,
0.5: 1.05,
0.618: 1.25,
0.786: 1.1,
1.272: 0.9,
1.618: 0.95,
}.get(ratio, 0.85)
if kind == 'extension':
ratio_score *= 0.95
distance_pct = float(detail.get('distance_pct', 0))
if distance_pct <= 0.5:
ratio_score += 0.25
elif distance_pct <= 1.0:
ratio_score += 0.15
return ratio_score
def _rank_level_candidates(self, candidates: List[Dict[str, Any]],
current_price: Optional[float],
reverse: bool) -> List[Dict[str, Any]]:
"""把多来源关键位聚合成带优先级的层级"""
if not candidates:
return []
sorted_candidates = sorted(candidates, key=lambda item: item['price'], reverse=reverse)
clusters: List[Dict[str, Any]] = []
tolerance = 0.0035
for candidate in sorted_candidates:
price = candidate['price']
matched_cluster = None
for cluster in clusters:
base_price = cluster['price']
if base_price <= 0:
continue
if abs(price - base_price) / base_price <= tolerance:
matched_cluster = cluster
break
if matched_cluster is None:
clusters.append({
'price': price,
'score': candidate['score'],
'sources': list(candidate['sources']),
'count': 1,
})
continue
total_score = matched_cluster['score'] + candidate['score']
matched_cluster['price'] = (
matched_cluster['price'] * matched_cluster['score'] + price * candidate['score']
) / total_score
matched_cluster['score'] = total_score
matched_cluster['count'] += 1
matched_cluster['sources'].extend(candidate['sources'])
ranked = []
for cluster in clusters:
unique_sources = list(dict.fromkeys(cluster['sources']))
confluence_bonus = 0.35 * max(0, len(unique_sources) - 1)
distance_pct = abs(cluster['price'] - current_price) / current_price * 100 if current_price and current_price > 0 else 0
ranked.append({
'price': float(cluster['price']),
'score': round(cluster['score'] + confluence_bonus, 2),
'sources': unique_sources[:3],
'distance_pct': distance_pct,
})
ranked.sort(key=lambda item: (-item['score'], item['distance_pct']))
return ranked
def _format_level_priority(self, levels: List[Dict[str, Any]]) -> str:
if not levels:
return "N/A"
return " | ".join(
f"{level['price']:.2f}(强度{level['score']:.2f}, {'+'.join(level['sources'])})"
for level in levels[:3]
)
def _build_trade_zone(self, levels: List[Dict[str, Any]], current_price: Optional[float],
action: str) -> Optional[Dict[str, Any]]:
if not levels:
return None
level = levels[0]
price = float(level['price'])
reference = current_price if current_price and current_price > 0 else price
raw_distance = float(level.get('distance_pct', 0) or 0)
band_pct = max(0.25, min(0.8, raw_distance * 0.35 if raw_distance > 0 else 0.4))
half_band = price * band_pct / 100
return {
'action': action,
'center': price,
'low': price - half_band,
'high': price + half_band,
'distance_pct': abs(price - reference) / reference * 100 if reference > 0 else 0,
'score': level.get('score', 0),
'sources': level.get('sources', []),
}
def _format_trade_zone(self, zone: Optional[Dict[str, Any]]) -> str:
if not zone:
return "N/A"
return (
f"{zone['low']:.2f}-{zone['high']:.2f} "
f"(中心 {zone['center']:.2f}, 强度{zone['score']:.2f}, {'+'.join(zone.get('sources', []))})"
)
def _sort_level_prices(self, prices: List[float], reverse: bool) -> List[float]:
return sorted([float(price) for price in prices if isinstance(price, (int, float))], reverse=reverse)
def _dedupe_levels(self, levels: List[float], reverse: bool) -> List[float]:
"""对价位去重,避免同类水平位过密"""
cleaned = sorted([float(level) for level in levels if level], reverse=reverse)
deduped: List[float] = []
for level in cleaned:
if not deduped:
deduped.append(level)
continue
if abs(level - deduped[-1]) / deduped[-1] > 0.003:
deduped.append(level)
if len(deduped) >= 3:
break
return deduped
def _format_levels(self, levels: Optional[List[float]]) -> str:
if not levels:
return "N/A"
return ", ".join(f"{level:.2f}" for level in levels[:3])
def _build_market_location_summary(self,
current_price: float,
range_zone: Dict[str, Any],
key_levels: Dict[str, Any]) -> Dict[str, Any]:
"""量化当前价格相对区间和优先交易区的位置"""
summary = {
'location_tag': 'unknown',
'relative_to_range': 'unknown',
'distance_to_best_long_zone_pct': None,
'distance_to_best_short_zone_pct': None,
'summary': '未知',
}
best_long_zone = key_levels.get('best_long_zone')
best_short_zone = key_levels.get('best_short_zone')
if best_long_zone and current_price > 0:
summary['distance_to_best_long_zone_pct'] = round(
abs(current_price - float(best_long_zone['center'])) / current_price * 100, 2
)
if best_short_zone and current_price > 0:
summary['distance_to_best_short_zone_pct'] = round(
abs(current_price - float(best_short_zone['center'])) / current_price * 100, 2
)
if range_zone.get('is_ranging') and range_zone.get('support_level') and range_zone.get('resistance_level'):
low = float(range_zone['support_level'])
high = float(range_zone['resistance_level'])
width = high - low
if width > 0:
position = (current_price - low) / width
if position <= 0.25:
summary['relative_to_range'] = 'near_range_support'
elif position >= 0.75:
summary['relative_to_range'] = 'near_range_resistance'
else:
summary['relative_to_range'] = 'middle_of_range'
long_dist = summary['distance_to_best_long_zone_pct']
short_dist = summary['distance_to_best_short_zone_pct']
candidates = [(long_dist, 'near_long_zone'), (short_dist, 'near_short_zone')]
valid_candidates = [(dist, tag) for dist, tag in candidates if dist is not None]
if valid_candidates:
nearest_dist, nearest_tag = min(valid_candidates, key=lambda item: item[0])
if nearest_dist <= 0.6:
summary['location_tag'] = nearest_tag
elif nearest_dist >= 2.0:
summary['location_tag'] = 'far_from_trade_zone'
else:
summary['location_tag'] = 'between_trade_zones'
if summary['relative_to_range'] == 'middle_of_range':
summary['location_tag'] = 'middle_of_range'
summary['summary'] = (
f"location={summary['location_tag']} | range={summary['relative_to_range']} | "
f"dist_long={summary['distance_to_best_long_zone_pct']}% | "
f"dist_short={summary['distance_to_best_short_zone_pct']}%"
)
return summary
def _serialize_feature_block(self, feature: Dict[str, Any]) -> Dict[str, Any]:
"""把单周期特征压成稳定字段,供 prompt 直接消费"""
if not feature.get('available'):
return {'available': False}
def rounded(value: Optional[float], digits: int = 2) -> Optional[float]:
if value is None:
return None
return round(float(value), digits)
return {
'available': True,
'structure': feature.get('structure'),
'ema_alignment': feature.get('ema_alignment'),
'momentum_3_pct': rounded(feature.get('momentum_3')),
'momentum_12_pct': rounded(feature.get('momentum_12')),
'rsi': rounded(feature.get('rsi'), 1),
'atr_pct': rounded(feature.get('atr_pct')),
'volume_ratio': rounded(feature.get('volume_ratio')),
'body_ratio': rounded(feature.get('body_ratio')),
'close_position_in_bar': rounded(feature.get('close_position_in_bar')),
'upper_wick_ratio': rounded(feature.get('upper_wick_ratio')),
'lower_wick_ratio': rounded(feature.get('lower_wick_ratio')),
'range_expansion_ratio': rounded(feature.get('range_expansion_ratio')),
'pressure_bias': feature.get('pressure_bias'),
'volume_price_state': feature.get('volume_price_state'),
'breakout_quality': feature.get('breakout_quality'),
'pullback_quality': feature.get('pullback_quality'),
'rejection_signal': feature.get('rejection_signal'),
'exhaustion_risk': feature.get('exhaustion_risk'),
'distance_to_ema20_pct': rounded(feature.get('distance_to_ema20')),
'distance_to_recent_high_pct': rounded(feature.get('distance_to_recent_high')),
'distance_to_recent_low_pct': rounded(feature.get('distance_to_recent_low')),
'is_accelerating': bool(feature.get('is_accelerating')),
'adx': rounded(feature.get('adx'), 1),
'trend_strength_adx': feature.get('trend_strength_adx'),
}
def _build_market_context_block(self,
lane: str,
symbol: str,
current_price: float,
day_open: Optional[float],
session_vwap: Optional[float],
opening_range: Optional[Dict[str, float]],
intraday_alignment: str,
trend_alignment: str,
feature_map: Dict[str, Dict[str, Any]],
range_zone: Dict[str, Any],
range_metrics: Dict[str, Any],
reversal_detection: Dict[str, Any],
trend_stage: Dict[str, Any],
fib_context: Dict[str, Any],
key_levels: Dict[str, Any],
market_location: Dict[str, Any]) -> str:
"""构建给 LLM 的结构化行情上下文"""
block = {
'symbol': symbol,
'lane': lane,
'current_price': round(current_price, 4),
'day_open': round(float(day_open), 4) if day_open else None,
'session_vwap': round(float(session_vwap), 4) if session_vwap else None,
'opening_range': (
{
'high': round(float(opening_range['high']), 4),
'low': round(float(opening_range['low']), 4),
} if opening_range else None
),
'alignment': {
'intraday': intraday_alignment,
'trend': trend_alignment,
},
'market_location': market_location,
'range_state': {
'is_ranging': bool(range_zone.get('is_ranging')),
'support_level': round(float(range_zone.get('support_level')), 4) if range_zone.get('support_level') else None,
'resistance_level': round(float(range_zone.get('resistance_level')), 4) if range_zone.get('resistance_level') else None,
'range_width_pct': round(float(range_zone.get('range_width_pct', 0) or 0), 2),
'confidence': int(range_zone.get('confidence', 0) or 0),
'regime': range_metrics.get('regime'),
'regime_score': int(range_metrics.get('regime_score', 0) or 0),
'efficiency': round(float(range_metrics.get('range_efficiency', 0) or 0), 2),
'adx': round(float(range_metrics.get('adx', 0) or 0), 1),
},
'trend_stage': {
'stage': trend_stage.get('stage', 'unknown'),
'confidence': int(trend_stage.get('confidence', 0) or 0),
},
'reversal_detection': {
'is_reversing': bool(reversal_detection.get('is_reversing')),
'type': reversal_detection.get('reversal_type'),
'confidence': int(reversal_detection.get('confidence', 0) or 0),
},
'timeframes': {
timeframe: self._serialize_feature_block(feature)
for timeframe, feature in feature_map.items()
},
'levels': {
'support': [round(float(level), 4) for level in key_levels.get('support', [])[:3]],
'resistance': [round(float(level), 4) for level in key_levels.get('resistance', [])[:3]],
'priority_support': [
{
'price': round(float(level['price']), 4),
'score': round(float(level['score']), 2),
'distance_pct': round(float(level.get('distance_pct', 0) or 0), 2),
'sources': level.get('sources', [])[:3],
}
for level in key_levels.get('support_priority', [])[:2]
],
'priority_resistance': [
{
'price': round(float(level['price']), 4),
'score': round(float(level['score']), 2),
'distance_pct': round(float(level.get('distance_pct', 0) or 0), 2),
'sources': level.get('sources', [])[:3],
}
for level in key_levels.get('resistance_priority', [])[:2]
],
'best_long_zone': self._serialize_trade_zone(key_levels.get('best_long_zone')),
'best_short_zone': self._serialize_trade_zone(key_levels.get('best_short_zone')),
},
'fib_context': {
'intraday': fib_context.get('intraday') if lane == 'intraday' else None,
'trend': fib_context.get('trend') if lane == 'trend' else None,
},
}
return "```json\n" + json.dumps(block, ensure_ascii=False, indent=2) + "\n```"
def _serialize_trade_zone(self, zone: Optional[Dict[str, Any]]) -> Optional[Dict[str, Any]]:
if not zone:
return None
return {
'action': zone.get('action'),
'center': round(float(zone['center']), 4),
'low': round(float(zone['low']), 4),
'high': round(float(zone['high']), 4),
'distance_pct': round(float(zone.get('distance_pct', 0) or 0), 2),
'score': round(float(zone.get('score', 0) or 0), 2),
'sources': zone.get('sources', [])[:3],
}
async def _get_news_context(self, symbol: str) -> str:
"""获取新闻舆情上下文"""
try:
news_result = await self.news_service.get_crypto_news(symbol)
if not news_result or not news_result.get('articles'):
return "无最新新闻"
articles = news_result['articles'][:5] # 只取前5条
context_parts = ["\n## 最新新闻"]
for article in articles:
title = article.get('title', '')
source = article.get('source', '')
published_at = article.get('publishedAt', '')
time_str = published_at.split('T')[1][:5] if 'T' in published_at else ''
context_parts.append(f"- [{time_str}] {title} ({source})")
return "\n".join(context_parts)
except Exception as e:
logger.warning(f"获取新闻失败: {e}")
return "新闻获取失败"
async def _get_futures_context(self, symbol: str) -> tuple:
"""获取合约市场数据(资金费率、持仓量、溢价率)
Returns:
(formatted_str, raw_market_data) - 格式化文本和原始数据
"""
try:
loop = asyncio.get_event_loop()
market_data = await loop.run_in_executor(
None,
self.exchange.get_futures_market_data,
symbol
)
if not market_data:
return "", None
return self._format_futures_context(symbol, market_data), market_data
except Exception as e:
logger.warning(f"获取 {symbol} 合约数据失败: {e}")
return "", None
def _format_futures_context(self, symbol: str, market_data: Dict[str, Any]) -> str:
"""格式化高价值合约特征,避免大段说明性文本"""
funding = market_data.get('funding_rate', {})
oi = market_data.get('open_interest', {})
premium = market_data.get('premium_rate')
derivatives_state = self._summarize_derivatives_state(market_data)
lines = [
f"## 衍生品特征",
f"- 交易对: {symbol}",
]
if funding:
lines.append(
f"- 资金费率: {funding.get('funding_rate_percent', 0):+.4f}% | 情绪: {funding.get('sentiment', 'unknown')}"
)
lines.append(
f"- 标记价 vs 指数价: {market_data.get('mark_price', 0):.2f} vs {market_data.get('index_price', 0):.2f}"
)
if oi:
lines.append(f"- 持仓量: {oi.get('open_interest', 0):,.0f}")
if premium is not None:
lines.append(f"- 溢价率: {premium:+.2f}%")
if derivatives_state.get('summary'):
lines.append(f"- 拥挤度结论: {derivatives_state['summary']}")
return "\n".join(lines)
def _summarize_derivatives_state(self, market_data: Optional[Dict[str, Any]]) -> Dict[str, Any]:
"""把资金费率/持仓/溢价压缩成更适合 LLM 判断的拥挤度特征"""
summary = {
'crowding_bias': 'neutral',
'crowding_score': 0,
'oi_regime': 'stable',
'premium_regime': 'neutral',
'summary': '中性',
}
if not market_data:
return summary
funding = market_data.get('funding_rate') or {}
oi_change_pct = float(market_data.get('oi_change_percent_24h', 0) or 0)
premium_rate = float(market_data.get('premium_rate', 0) or 0)
funding_pct = float(funding.get('funding_rate_percent', 0) or 0)
score = 0
bias = 'neutral'
if funding_pct >= 0.03:
score += 20
bias = 'long_crowded'
elif funding_pct <= -0.03:
score += 20
bias = 'short_crowded'
if abs(oi_change_pct) >= 8:
score += 20
summary['oi_regime'] = 'expanding_fast'
elif abs(oi_change_pct) >= 3:
score += 10
summary['oi_regime'] = 'expanding'
elif abs(oi_change_pct) <= 1:
summary['oi_regime'] = 'flat'
if premium_rate >= 0.25:
score += 10
summary['premium_regime'] = 'rich'
if bias == 'neutral':
bias = 'long_crowded'
elif premium_rate <= -0.25:
score += 10
summary['premium_regime'] = 'discount'
if bias == 'neutral':
bias = 'short_crowded'
if score >= 40:
regime = 'high'
elif score >= 20:
regime = 'medium'
else:
regime = 'low'
summary['crowding_bias'] = bias
summary['crowding_score'] = score
summary['crowding_regime'] = regime
summary['summary'] = (
f"{bias} | score={score} | oi={summary['oi_regime']} | premium={summary['premium_regime']}"
)
return summary
def _build_futures_context_block(self, market_data: Optional[Dict[str, Any]]) -> Optional[Dict[str, Any]]:
"""为 LLM 构建稳定的衍生品结构化输入"""
if not market_data:
return None
funding = market_data.get('funding_rate') or {}
oi = market_data.get('open_interest') or {}
state = self._summarize_derivatives_state(market_data)
return {
'funding_rate_percent': round(float(funding.get('funding_rate_percent', 0) or 0), 4),
'funding_sentiment': funding.get('sentiment_level') or funding.get('sentiment') or 'neutral',
'open_interest': round(float(oi.get('open_interest', 0) or 0), 2),
'oi_change_percent_24h': round(float(market_data.get('oi_change_percent_24h', 0) or 0), 2),
'premium_rate_percent': round(float(market_data.get('premium_rate', 0) or 0), 4),
'mark_vs_index_basis_percent': round(
float(market_data.get('premium_rate', 0) or 0),
4
),
'crowding_bias': state.get('crowding_bias', 'neutral'),
'crowding_regime': state.get('crowding_regime', 'low'),
'crowding_score': state.get('crowding_score', 0),
'oi_regime': state.get('oi_regime', 'stable'),
'premium_regime': state.get('premium_regime', 'neutral'),
'price_change_24h_pct': round(float(market_data.get('price_change_24h_pct', 0) or 0), 2),
'range_position_24h': round(float(market_data.get('range_position_24h', 0.5) or 0.5), 2),
'bid_ask_spread_pct': round(float(market_data.get('bid_ask_spread_pct', 0) or 0), 4),
'quote_volume_24h': round(float(market_data.get('quote_volume_24h', 0) or 0), 2),
}
def _build_analysis_prompt(self, symbol: str, lane: str,
market_context: Dict[str, str],
news_context: str,
futures_context: str = "",
futures_market_data: Optional[Dict[str, Any]] = None) -> str:
"""构建分析提示词"""
lane_text = "日内交易分析" if lane == "intraday" else "趋势交易分析"
lane_scope = (
[
"只根据下面提供的日内结构化特征做判断,不要脑补未提供的数据。",
"先看 JSON 结构块,再用后面的说明性摘要做交叉验证。",
"重点判断是否存在位置优势,而不是只判断方向。",
"优先参考 priority_support / priority_resistance / best_long_zone / best_short_zone。",
"先明确当前属于哪一种 setup区间边界反转、突破确认、突破后回踩不要混淆。",
]
if lane == "intraday"
else [
"只根据下面提供的趋势结构化特征做判断,不要脑补未提供的数据。",
"先看 JSON 结构块,再用后面的说明性摘要做交叉验证。",
"趋势单必须同时回答四个问题大方向是否清晰、1h 节奏是否支持、位置是否优、拥挤是否可接受。",
"优先参考 priority_support / priority_resistance / best_long_zone / best_short_zone不接受远离关键位追价。",
"先明确当前属于哪一种 setup趋势延续回调、深回踩延续、趋势反转不要只给方向。",
]
)
structured_market_context = (
market_context.get('intraday_structured', '')
if lane == "intraday"
else market_context.get('trend_structured', '')
)
futures_block = self._build_futures_context_block(futures_market_data)
selected_sections = [
market_context.get('snapshot', ''),
structured_market_context,
market_context.get('intraday', '') if lane == "intraday" else market_context.get('trend', ''),
market_context.get('levels', ''),
]
prompt_parts = [
f"请对 {symbol} 进行{lane_text}",
*lane_scope,
]
if futures_block:
prompt_parts.extend([
"",
"## 衍生品结构化特征",
"```json",
json.dumps(futures_block, ensure_ascii=False, indent=2),
"```",
])
for section in selected_sections:
if section:
prompt_parts.append("")
prompt_parts.append(section)
if news_context and news_context not in {"无最新新闻", "新闻获取失败"}:
prompt_parts.append("")
prompt_parts.append(news_context)
if futures_context:
prompt_parts.append("")
prompt_parts.append(futures_context)
if market_context.get('range_warning'):
prompt_parts.append("")
prompt_parts.append(market_context['range_warning'])
prompt_parts.append("")
prompt_parts.append("判断时必须优先看这些约束:")
prompt_parts.append("1. 没有位置优势,不交易。")
prompt_parts.append("2. 方向正确但拥挤过热,也可以不交易。")
prompt_parts.append("3. 远离优先交易区、处于区间中部、或已经加速延伸,优先空仓。")
prompt_parts.append("4. 输出的是可执行 setup不是主观行情评论。")
prompt_parts.append("5. setup 必须说得清是反转、确认、回踩还是延续;说不清就空仓。")
prompt_parts.append("")
prompt_parts.append("输出要求:只返回 system prompt 定义的 JSON 对象。没有高质量 setup 就返回 signals: []。")
return "\n".join(prompt_parts)
def _merge_lane_results(self, symbol: str,
intraday_result: Dict[str, Any],
trend_result: Dict[str, Any]) -> Dict[str, Any]:
"""合并日内与趋势两路 LLM 结果"""
result = self._get_empty_signal(symbol)
result['raw_response'] = {
'intraday': intraday_result.get('raw_response', ''),
'trend': trend_result.get('raw_response', '')
}
# 1. 先确定趋势方向trend 车道优先fallback 到 intraday
trend_direction = trend_result.get('trend_direction')
if trend_direction in (None, 'neutral'):
trend_direction = intraday_result.get('trend_direction', 'neutral')
trend_direction = trend_direction or 'neutral'
result['trend_direction'] = trend_direction
# 2. 标准化信号
intraday_signals = self._normalize_lane_signals(intraday_result.get('signals', []), 'short_term')
trend_signals = self._normalize_lane_signals(trend_result.get('signals', []), 'medium_term')
# 3. 过滤逆势信号(上升趋势丢弃 sell下降趋势丢弃 buy
intraday_signals = self._filter_counter_trend_signals(intraday_signals, trend_direction)
trend_signals = self._filter_counter_trend_signals(trend_signals, trend_direction)
# 4. 合并取 top 2
merged_signals = sorted(
intraday_signals + trend_signals,
key=lambda signal: signal.get('confidence', 0),
reverse=True
)[:2]
result['signals'] = merged_signals
result['pre_regime_trade_signal_count'] = len(merged_signals)
result['pre_regime_lane_signal_counts'] = {
'short_term': len(intraday_signals),
'medium_term': len(trend_signals),
}
result['key_levels'] = {
'support': self._dedupe_levels(
(intraday_result.get('key_levels', {}) or {}).get('support', []) +
(trend_result.get('key_levels', {}) or {}).get('support', []),
reverse=True
),
'resistance': self._dedupe_levels(
(intraday_result.get('key_levels', {}) or {}).get('resistance', []) +
(trend_result.get('key_levels', {}) or {}).get('resistance', []),
reverse=False
),
}
trend_strength = trend_result.get('trend_strength')
if trend_strength in (None, 'weak') and result['trend_direction'] == 'neutral':
trend_strength = intraday_result.get('trend_strength', 'weak')
result['trend_strength'] = trend_strength or 'weak'
intraday_state = intraday_result.get('market_state')
trend_state = trend_result.get('market_state')
if trend_state == 'trending':
result['market_state'] = '趋势市'
elif intraday_state == 'ranging':
result['market_state'] = '震荡市'
elif intraday_state == 'trending':
result['market_state'] = '日内趋势'
else:
result['market_state'] = intraday_state or trend_state or '中性'
intraday_summary = intraday_result.get('analysis_summary', '')
trend_summary = trend_result.get('analysis_summary', '')
result['analysis_summary'] = f"日内:{intraday_summary} | 趋势:{trend_summary}"
if result['trend_direction'] == 'uptrend':
result['trend'] = 'up'
elif result['trend_direction'] == 'downtrend':
result['trend'] = 'down'
else:
result['trend'] = 'sideways'
result['timestamp'] = datetime.now().isoformat()
return result
def apply_regime_policy(
self,
symbol: str,
market_signal: Dict[str, Any],
market_context: Dict[str, Any],
futures_market_data: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
"""根据 regime 对市场信号做硬约束过滤"""
if not market_signal:
return self._get_empty_signal(symbol)
range_metrics = market_context.get("range_metrics") or {}
market_location = market_context.get("market_location") or {}
reversal_detection = market_context.get("reversal_detection") or {}
trend_stage = market_context.get("trend_stage") or {}
derivatives_state = self._summarize_derivatives_state(futures_market_data)
regime_profile = self.regime_engine.classify(
range_metrics=range_metrics,
market_location=market_location,
trend_direction=market_signal.get("trend_direction", "neutral"),
trend_strength=market_signal.get("trend_strength", "weak"),
derivatives_state=derivatives_state,
reversal_detection=reversal_detection,
trend_stage=trend_stage,
)
enriched_signals = []
for signal in market_signal.get("signals", []) or []:
lane = signal.get("timeframe") or signal.get("type") or "short_term"
volume_price_context = self._select_volume_price_context(
market_context=market_context,
lane="trend" if lane == "medium_term" else "intraday",
)
enriched_signals.append(
{
**signal,
"regime": range_metrics.get("regime", ""),
"market_location": market_location,
"trend_stage": trend_stage,
"volume_price_context": volume_price_context,
"breakout_quality": (volume_price_context or {}).get("breakout_quality"),
"pullback_quality": (volume_price_context or {}).get("pullback_quality"),
"rejection_signal": (volume_price_context or {}).get("rejection_signal"),
"volume_price_state": (volume_price_context or {}).get("volume_price_state"),
}
)
filtered_signals, blocked_reasons, blocked_reason_counts = self.setup_policy.filter_signals(enriched_signals, regime_profile)
normalized = dict(market_signal)
normalized["signals"] = filtered_signals
normalized["regime_profile"] = regime_profile
normalized["blocked_reasons"] = blocked_reasons[:6]
normalized["blocked_reason_counts"] = blocked_reason_counts
normalized["post_regime_lane_signal_counts"] = {
'short_term': len([signal for signal in filtered_signals if (signal.get("timeframe") or signal.get("type")) == "short_term"]),
'medium_term': len([signal for signal in filtered_signals if (signal.get("timeframe") or signal.get("type")) == "medium_term"]),
}
if not filtered_signals and blocked_reasons:
normalized["analysis_summary"] = self._truncate_summary(regime_profile.get("summary") or "当前状态不交易")
return normalized
def _select_volume_price_context(self, market_context: Dict[str, Any], lane: str) -> Dict[str, Any]:
structured_key = 'trend_structured' if lane == 'trend' else 'intraday_structured'
structured = market_context.get(structured_key)
if not structured:
return {}
try:
payload = structured.replace("```json", "").replace("```", "").strip()
block = json.loads(payload)
except Exception:
return {}
preferred_timeframes = ['1h', '4h', '1d'] if lane == 'trend' else ['5m', '15m', '1h']
timeframes = block.get('timeframes') or {}
for timeframe in preferred_timeframes:
feature = timeframes.get(timeframe) or {}
if feature.get('available'):
return {
'timeframe': timeframe,
'volume_price_state': feature.get('volume_price_state'),
'breakout_quality': feature.get('breakout_quality'),
'pullback_quality': feature.get('pullback_quality'),
'rejection_signal': feature.get('rejection_signal'),
'exhaustion_risk': feature.get('exhaustion_risk'),
'pressure_bias': feature.get('pressure_bias'),
}
return {}
def _normalize_lane_signals(self, signals: List[Dict[str, Any]], lane_type: str) -> List[Dict[str, Any]]:
"""统一信号时间框架标识"""
normalized = []
for signal in sorted(signals, key=lambda item: item.get('confidence', 0), reverse=True):
if signal.get('action') not in ['buy', 'sell']:
continue
signal = dict(signal)
signal['confidence'] = max(0, min(float(signal.get('confidence', 0) or 0), 100))
min_confidence = self.LANE_MIN_CONFIDENCE.get(lane_type, 60)
if signal['confidence'] < min_confidence:
continue
signal['entry_type'] = signal.get('entry_type', 'market')
if signal['entry_type'] not in {'market', 'limit'}:
signal['entry_type'] = 'market'
if not self._is_signal_price_structure_valid(signal):
continue
if not self._meets_min_price_distance(signal, lane_type):
continue
if not self._meets_min_risk_reward(signal, lane_type):
continue
signal['grade'] = self._infer_signal_grade(signal['confidence'], lane_type)
if not signal.get('reasoning'):
signal['reasoning'] = '结构与关键位共振'
signal['timeframe'] = lane_type
signal['type'] = lane_type
normalized.append(signal)
return normalized[:1]
def _filter_counter_trend_signals(self, signals: List[Dict[str, Any]],
trend_direction: str) -> List[Dict[str, Any]]:
"""
过滤掉与确认趋势方向矛盾的信号。
- uptrend → 丢弃 sell 信号
- downtrend → 丢弃 buy 信号
- neutral → 不过滤
"""
if trend_direction not in ('uptrend', 'downtrend'):
return signals
forbidden = 'sell' if trend_direction == 'uptrend' else 'buy'
kept = []
for s in signals:
if s.get('action') == forbidden:
lane = s.get('timeframe') or s.get('type', 'unknown')
logger.info(
f" [TrendFilter] 丢弃逆势 {forbidden} 信号 "
f"({lane}, confidence={s.get('confidence')}) "
f"因为 trend_direction={trend_direction}"
)
else:
kept.append(s)
return kept
def _infer_signal_grade(self, confidence: float, lane_type: str) -> str:
"""根据 lane 规则统一 grade避免模型随意给等级"""
if lane_type == 'medium_term':
if confidence >= 82:
return 'A'
if confidence >= 72:
return 'B'
return 'C'
if confidence >= 80:
return 'A'
if confidence >= 70:
return 'B'
return 'C'
def _is_signal_price_structure_valid(self, signal: Dict[str, Any]) -> bool:
"""验证信号价格结构是否合法"""
entry_price = signal.get('entry_price')
stop_loss = signal.get('stop_loss')
take_profit = signal.get('take_profit')
action = signal.get('action')
if not all(isinstance(price, (int, float)) and price > 0 for price in [entry_price, stop_loss, take_profit]):
return False
if action == 'buy':
return stop_loss < entry_price < take_profit
if action == 'sell':
return take_profit < entry_price < stop_loss
return False
def _meets_min_risk_reward(self, signal: Dict[str, Any], lane_type: str) -> bool:
"""验证信号是否达到最小盈亏比要求"""
entry_price = signal.get('entry_price')
stop_loss = signal.get('stop_loss')
take_profit = signal.get('take_profit')
action = signal.get('action')
if not all(isinstance(price, (int, float)) and price > 0 for price in [entry_price, stop_loss, take_profit]):
return False
if action == 'buy':
risk = entry_price - stop_loss
reward = take_profit - entry_price
elif action == 'sell':
risk = stop_loss - entry_price
reward = entry_price - take_profit
else:
return False
if risk <= 0 or reward <= 0:
return False
rr = reward / risk
return rr >= self.LANE_MIN_RISK_REWARD.get(lane_type, 1.5)
def _meets_min_price_distance(self, signal: Dict[str, Any], lane_type: str) -> bool:
"""验证止损和止盈的绝对距离是否符合对应周期"""
entry_price = signal.get('entry_price')
stop_loss = signal.get('stop_loss')
take_profit = signal.get('take_profit')
action = signal.get('action')
if not all(isinstance(price, (int, float)) and price > 0 for price in [entry_price, stop_loss, take_profit]):
return False
if action == 'buy':
stop_distance_pct = (entry_price - stop_loss) / entry_price * 100
take_distance_pct = (take_profit - entry_price) / entry_price * 100
elif action == 'sell':
stop_distance_pct = (stop_loss - entry_price) / entry_price * 100
take_distance_pct = (entry_price - take_profit) / entry_price * 100
else:
return False
min_stop_pct = self.LANE_MIN_STOP_LOSS_PCT.get(lane_type, 0.6)
min_take_pct = self.LANE_MIN_TAKE_PROFIT_PCT.get(lane_type, 1.0)
return stop_distance_pct >= min_stop_pct and take_distance_pct >= min_take_pct
def _parse_llm_response(self, response: str, symbol: str) -> Dict[str, Any]:
"""解析 LLM 响应"""
try:
# 尝试提取 JSON
json_match = re.search(r'```json\s*([\s\S]*?)\s*```', response)
if json_match:
json_str = json_match.group(1)
else:
json_match = re.search(r'\{[\s\S]*\}', response)
if json_match:
json_str = json_match.group(0)
else:
raise ValueError("无法找到 JSON 响应")
# 清理 JSON 字符串(移除可能导致解析错误的注释等)
json_str = self._clean_json_string(json_str)
logger.debug(f"解析的 JSON 字符串: {json_str[:500]}...") # 打印前500字符用于调试
result = json.loads(json_str)
# 清理价格字段 - 转换为 float
result = self._clean_price_fields(result)
result = self._normalize_response_schema(result)
# 添加元数据
result['symbol'] = symbol
result['timestamp'] = datetime.now().isoformat()
result['raw_response'] = response
# 兼容处理:确保 signals 中的字段与旧格式一致
if 'signals' in result:
for sig in result['signals']:
# LLM 输出的 "type" 是 timeframe (short_term/medium_term/long_term)
# 需要映射为 "timeframe",而 "action" 才是 buy/sell/wait
if 'type' in sig:
# 如果 type 是 short_term/medium_term/long_term映射为 timeframe
if sig['type'] in ['short_term', 'medium_term', 'long_term']:
sig['timeframe'] = sig.pop('type')
# 如果 type 是 buy/sell/wait映射为 action
elif sig['type'] in ['buy', 'sell', 'wait']:
sig['action'] = sig.pop('type')
# 确保 action 字段存在
if 'action' not in sig and 'timeframe' in sig:
# 从 reasoning 或其他字段推断 action
sig['action'] = 'wait'
# 确保 grade 字段存在
if 'grade' not in sig:
# 根据 confidence 推断 grade
confidence = sig.get('confidence', 0)
if confidence >= 80:
sig['grade'] = 'A'
elif confidence >= 60:
sig['grade'] = 'B'
elif confidence >= 40:
sig['grade'] = 'C'
else:
sig['grade'] = 'D'
# 处理趋势字段 - 优先使用 LLM 返回的趋势字段,否则从信号推断
if 'trend_direction' not in result or 'trend_strength' not in result:
# 从 signals 中推断趋势
if 'signals' in result and result['signals']:
best_signal = max(result['signals'], key=lambda s: s.get('confidence', 0))
action = best_signal.get('action', 'wait')
confidence = best_signal.get('confidence', 0)
# 推断趋势方向(如果 LLM 没有提供)
if 'trend_direction' not in result:
if action == 'buy':
result['trend_direction'] = 'uptrend'
elif action == 'sell':
result['trend_direction'] = 'downtrend'
else:
result['trend_direction'] = 'neutral'
# 推断趋势强度(如果 LLM 没有提供)
if 'trend_strength' not in result:
result['trend_strength'] = 'strong' if confidence >= 70 else 'medium' if confidence >= 50 else 'weak'
# 从信号中推断 market_state用于向后兼容
if 'signals' in result and result['signals']:
best_signal = max(result['signals'], key=lambda s: s.get('confidence', 0))
action = best_signal.get('action', 'wait')
confidence = best_signal.get('confidence', 0)
trend_direction = result.get('trend_direction', 'neutral')
# 推断市场状态
if confidence >= 70 and trend_direction != 'neutral':
if trend_direction == 'uptrend':
result['market_state'] = '强势上涨'
elif trend_direction == 'downtrend':
result['market_state'] = '强势下跌'
else:
result['market_state'] = '震荡整理'
else:
result['market_state'] = '震荡整理'
# 推断 trend用于向后兼容简化的趋势字段
if 'trend' not in result:
if trend_direction == 'uptrend':
result['trend'] = 'up'
elif trend_direction == 'downtrend':
result['trend'] = 'down'
else:
result['trend'] = 'sideways'
else:
result['market_state'] = '无明确信号'
if 'trend' not in result:
result['trend'] = 'sideways'
logger.info(f"✅ 市场信号分析完成: {symbol}")
logger.debug(f"市场信号: {json.dumps(result, ensure_ascii=False, indent=2)}")
return result
except Exception as e:
logger.warning(f"解析 LLM 响应失败: {e}")
logger.warning(f"原始响应: {response[:1000]}...") # 打印前1000字符
return self._get_empty_signal(symbol)
def _clean_json_string(self, json_str: str) -> str:
"""清理 JSON 字符串,移除可能导致解析错误的内容"""
# 移除单行注释 // ...
json_str = re.sub(r'//.*?(?=\n|$)', '', json_str)
# 移除多行注释 /* ... */
json_str = re.sub(r'/\*[\s\S]*?\*/', '', json_str)
# 移除尾随逗号(例如 {"a": 1,} -> {"a": 1}
json_str = re.sub(r',\s*([}\]])', r'\1', json_str)
return json_str
def _normalize_response_schema(self, result: Dict[str, Any]) -> Dict[str, Any]:
"""归一化 LLM 输出结构,避免下游依赖脏数据"""
if not isinstance(result, dict):
return self._get_empty_signal("")
normalized = dict(result)
normalized['market_state'] = str(normalized.get('market_state', 'neutral') or 'neutral')
normalized['trend_direction'] = str(normalized.get('trend_direction', 'neutral') or 'neutral')
normalized['trend_strength'] = str(normalized.get('trend_strength', 'weak') or 'weak')
normalized['analysis_summary'] = self._truncate_summary(normalized.get('analysis_summary', ''))
normalized['key_levels'] = self._normalize_key_levels(normalized.get('key_levels'))
normalized['signals'] = normalized.get('signals') if isinstance(normalized.get('signals'), list) else []
return normalized
def _normalize_key_levels(self, key_levels: Any) -> Dict[str, List[float]]:
"""归一化关键位结构"""
if not isinstance(key_levels, dict):
return {'support': [], 'resistance': []}
support = [float(level) for level in key_levels.get('support', []) if isinstance(level, (int, float))]
resistance = [float(level) for level in key_levels.get('resistance', []) if isinstance(level, (int, float))]
return {
'support': self._dedupe_levels(support, reverse=True),
'resistance': self._dedupe_levels(resistance, reverse=False),
}
def _truncate_summary(self, summary: Any, max_length: int = 20) -> str:
text = str(summary or '').strip()
return text[:max_length]
def _clean_price_fields(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""清理价格字段,转换为 float"""
def clean_price(price_value):
if price_value is None:
return None
if isinstance(price_value, (int, float)):
return float(price_value)
if isinstance(price_value, str):
# 移除 $ 符号和逗号
cleaned = price_value.replace('$', '').replace(',', '').strip()
if cleaned:
try:
return float(cleaned)
except ValueError:
return None
return None
# 清理 key_levels 中的支撑位和阻力位
if 'key_levels' in data and data['key_levels']:
key_levels = data['key_levels']
if 'support' in key_levels:
data['key_levels']['support'] = [clean_price(s) for s in key_levels['support']]
if 'resistance' in key_levels:
data['key_levels']['resistance'] = [clean_price(r) for r in key_levels['resistance']]
# 清理 signals 中的价格字段
if 'signals' in data:
for sig in data['signals']:
price_fields = ['entry_price', 'stop_loss', 'take_profit']
for field in price_fields:
if field in sig:
sig[field] = clean_price(sig[field])
# 验证止损止盈价格的合理性
entry_price = sig.get('entry_price')
stop_loss = sig.get('stop_loss')
take_profit = sig.get('take_profit')
action = sig.get('action', '')
if entry_price and entry_price > 0:
MAX_REASONABLE_DEVIATION = 0.50 # 50%
has_invalid_price = False
# 检查止损
if stop_loss is not None:
deviation = abs(stop_loss - entry_price) / entry_price
if deviation > MAX_REASONABLE_DEVIATION:
logger.warning(f"⚠️ [{data.get('symbol', '')}] 信号止损价格不合理: entry={entry_price}, stop_loss={stop_loss}, 偏离={deviation*100:.1f}%")
has_invalid_price = True
elif action == 'buy' and stop_loss >= entry_price:
logger.warning(f"⚠️ [{data.get('symbol', '')}] 做多止损错误: entry={entry_price}, stop_loss={stop_loss} 应该 < entry")
has_invalid_price = True
elif action == 'sell' and stop_loss <= entry_price:
logger.warning(f"⚠️ [{data.get('symbol', '')}] 做空止损错误: entry={entry_price}, stop_loss={stop_loss} 应该 > entry")
has_invalid_price = True
# 检查止盈
if take_profit is not None:
deviation = abs(take_profit - entry_price) / entry_price
if deviation > MAX_REASONABLE_DEVIATION:
logger.warning(f"⚠️ [{data.get('symbol', '')}] 信号止盈价格不合理: entry={entry_price}, take_profit={take_profit}, 偏离={deviation*100:.1f}%")
has_invalid_price = True
elif action == 'buy' and take_profit <= entry_price:
logger.warning(f"⚠️ [{data.get('symbol', '')}] 做多止盈错误: entry={entry_price}, take_profit={take_profit} 应该 > entry")
has_invalid_price = True
elif action == 'sell' and take_profit >= entry_price:
logger.warning(f"⚠️ [{data.get('symbol', '')}] 做空止盈错误: entry={entry_price}, take_profit={take_profit} 应该 < entry")
has_invalid_price = True
# 如果价格不合理,降低等级为 D 或移除信号
if has_invalid_price:
original_grade = sig.get('grade', 'C')
sig['grade'] = 'D'
sig['confidence'] = 0
# 添加错误说明
if 'reasoning' in sig:
sig['reasoning'] = f"[价格异常] {sig['reasoning']}"
logger.error(f"❌ [{data.get('symbol', '')}] 信号价格异常,等级从 {original_grade} 降为 D止损止盈已清空")
# 清空不合理的价格
sig['stop_loss'] = None
sig['take_profit'] = None
return data
def _calculate_price_change_24h(self, df) -> str:
"""计算24小时涨跌幅"""
try:
if df is None or len(df) < 24:
return "N/A"
current_price = float(df['close'].iloc[-1])
price_24h_ago = float(df['close'].iloc[-24])
change = ((current_price - price_24h_ago) / price_24h_ago) * 100
sign = "+" if change >= 0 else ""
return f"{sign}{change:.2f}%"
except Exception as e:
logger.debug(f"计算24h涨跌失败: {e}")
return "N/A"
def _get_empty_signal(self, symbol: str) -> Dict[str, Any]:
"""返回空信号"""
return {
'symbol': symbol,
'trend_direction': 'neutral',
'trend_strength': 'weak',
'analysis_summary': 'unknown',
'volume_analysis': '分析失败',
'news_sentiment': 'neutral',
'news_impact': '',
'market_state': '分析失败',
'trend': 'sideways',
'signals': [],
'key_levels': {},
'timestamp': datetime.now().isoformat(),
'error': '信号分析失败'
}
def _analyze_volatility(self, data: Dict[str, pd.DataFrame]) -> str:
"""分析波动率变化(使用 1h 作为日内主周期)"""
df = data.get('1h')
if df is None or len(df) < 12 or 'atr' not in df.columns:
return ""
lines = []
# ATR 变化趋势
recent_atr = df['atr'].iloc[-6:].mean() # 最近 6 根6小时
older_atr = df['atr'].iloc[-12:-6].mean() # 之前 6 根6小时
if pd.isna(recent_atr) or pd.isna(older_atr) or older_atr == 0:
return ""
atr_change = (recent_atr - older_atr) / older_atr * 100
current_atr = float(df['atr'].iloc[-1])
current_price = float(df['close'].iloc[-1])
atr_percent = current_atr / current_price * 100
lines.append(f"当前 ATR (1h): ${current_atr:.2f} ({atr_percent:.2f}%)")
if atr_change > 20:
lines.append(f"**波动率扩张**: ATR 上升 {atr_change:.0f}%,日内趋势可能启动")
elif atr_change < -20:
lines.append(f"**波动率收缩**: ATR 下降 {abs(atr_change):.0f}%,可能即将变盘")
else:
lines.append(f"波动率稳定: ATR 变化 {atr_change:+.0f}%")
# 布林带宽度
if 'bb_upper' in df.columns and 'bb_lower' in df.columns:
bb_width = (float(df['bb_upper'].iloc[-1]) - float(df['bb_lower'].iloc[-1])) / current_price * 100
bb_width_prev = (float(df['bb_upper'].iloc[-6]) - float(df['bb_lower'].iloc[-6])) / float(df['close'].iloc[-6]) * 100
if bb_width < bb_width_prev * 0.8:
lines.append(f"**布林带收口**: 宽度 {bb_width:.1f}%,变盘信号")
elif bb_width > bb_width_prev * 1.2:
lines.append(f"**布林带开口**: 宽度 {bb_width:.1f}%,趋势延续")
return "\n".join(lines)
def _quantify_ranging_state(self, data: Dict[str, pd.DataFrame]) -> Dict[str, Any]:
"""
使用价格行为与量价关系量化当前更像震荡、过渡还是趋势环境。
Returns:
包含压缩、结构切换、方向效率、量价接受度等状态
"""
result = {
'atr_pct': 0.0,
'atr_ratio_trend': 0.0,
'ema_convergence_pct': 0.0,
'range_efficiency': 0.0,
'bb_squeeze': False,
'adx': 0.0,
'compression_pct': 0.0,
'swing_flip_count': 0,
'directional_conviction': 0.0,
'volume_price_balance': 'neutral',
'regime': 'unknown',
'regime_score': 0,
}
try:
df_1h = data.get('1h')
if df_1h is None or len(df_1h) < 20:
return result
price = float(df_1h['close'].iloc[-1])
feature_1h = self.feature_engine.summarize_timeframe_features(df_1h, '1h')
recent = df_1h.iloc[-12:].copy()
if 'atr' in df_1h.columns:
atr = float(df_1h['atr'].iloc[-1])
result['atr_pct'] = atr / price * 100 if price > 0 else 0
if len(df_1h) >= 18:
atr_recent = df_1h['atr'].iloc[-6:].mean()
atr_older = df_1h['atr'].iloc[-18:-6].mean()
if atr_older > 0:
result['atr_ratio_trend'] = float((atr_recent - atr_older) / atr_older)
if all(col in df_1h.columns for col in ['ema5', 'ema10', 'ema20']):
ema_short = float(df_1h['ema5'].iloc[-1])
ema_mid = float(df_1h['ema10'].iloc[-1])
ema_long = float(df_1h['ema20'].iloc[-1])
ema_spread = (max(ema_short, ema_mid, ema_long) - min(ema_short, ema_mid, ema_long))
result['ema_convergence_pct'] = ema_spread / price * 100 if price > 0 else 0
lookback = 14
if len(df_1h) >= lookback:
high_low_range = df_1h['high'].iloc[-lookback:].max() - df_1h['low'].iloc[-lookback:].min()
directional_move = abs(df_1h['close'].iloc[-1] - df_1h['close'].iloc[-lookback])
result['range_efficiency'] = float(directional_move / high_low_range) if high_low_range > 0 else 0
if all(col in df_1h.columns for col in ['bb_upper', 'bb_lower']) and len(df_1h) >= 20:
bb_width_current = (df_1h['bb_upper'].iloc[-1] - df_1h['bb_lower'].iloc[-1]) / price * 100
bb_width_ma = (df_1h['bb_upper'].iloc[-20:] - df_1h['bb_lower'].iloc[-20:]).mean() / \
df_1h['close'].iloc[-20:].mean() * 100
result['bb_squeeze'] = bb_width_ma > 0 and bb_width_current < bb_width_ma * 0.7
if 'adx' in df_1h.columns:
adx_val = df_1h['adx'].iloc[-1]
if pd.notna(adx_val):
result['adx'] = float(adx_val)
recent_high = float(recent['high'].max())
recent_low = float(recent['low'].min())
recent_range_pct = ((recent_high - recent_low) / price * 100) if price > 0 else 0
result['compression_pct'] = recent_range_pct
close_changes = np.sign(recent['close'].diff().fillna(0).values)
swing_flips = 0
previous = 0
for change in close_changes:
if change == 0:
continue
if previous != 0 and change != previous:
swing_flips += 1
previous = change
result['swing_flip_count'] = int(swing_flips)
momentum_12 = abs(float(feature_1h.get('momentum_12') or 0))
body_ratio = float(feature_1h.get('body_ratio') or 0)
close_pos = float(feature_1h.get('close_position_in_bar') or 0.5)
pressure_bias = feature_1h.get('pressure_bias', 'neutral')
volume_state = feature_1h.get('volume_price_state', 'neutral')
directional_conviction = momentum_12
if pressure_bias != 'neutral':
directional_conviction += 1.0
if volume_state in {'bullish_acceptance', 'bearish_acceptance', 'bullish_continuation', 'bearish_continuation'}:
directional_conviction += 1.2
if body_ratio >= 0.6:
directional_conviction += 0.6
if close_pos >= 0.75 or close_pos <= 0.25:
directional_conviction += 0.4
result['directional_conviction'] = round(directional_conviction, 2)
result['volume_price_balance'] = volume_state
score = 0
if recent_range_pct <= 4.0:
score += 22
elif recent_range_pct <= 6.0:
score += 12
if result['range_efficiency'] < 0.22:
score += 22
elif result['range_efficiency'] < 0.38:
score += 12
if swing_flips >= 6:
score += 20
elif swing_flips >= 4:
score += 10
if pressure_bias == 'neutral':
score += 10
if volume_state in {'neutral', 'pullback_on_light_volume', 'high_volume_churn'}:
score += 10
if body_ratio <= 0.35:
score += 8
if result['atr_ratio_trend'] < -0.15:
score += 8
result['regime_score'] = int(score)
if score >= 72:
result['regime'] = 'ranging'
elif score >= 46:
result['regime'] = 'transitional'
elif directional_conviction >= 2.8 and result['range_efficiency'] >= 0.38:
result['regime'] = 'strong_trend'
else:
result['regime'] = 'weak_trend'
except Exception as e:
logger.warning(f"震荡市场量化失败: {e}")
return result
def _detect_range_zone(self, data: Dict[str, pd.DataFrame]) -> Dict[str, Any]:
"""
检测震荡区间 - 优先使用价格通道、成交密集区与边界响应。
"""
result = {
'is_ranging': False,
'support_level': None,
'resistance_level': None,
'range_width_pct': None,
'confidence': 0,
'volume_profile_support': None,
'volume_profile_resistance': None,
'analysis': ''
}
try:
df_1h = data.get('1h')
if df_1h is None or len(df_1h) < 24: # 需要至少24根K线24小时
return result
current_price = float(df_1h['close'].iloc[-1])
feature_1h = self.feature_engine.summarize_timeframe_features(df_1h, '1h')
range_state = self._quantify_ranging_state(data)
pressure_bias = feature_1h.get('pressure_bias', 'neutral')
volume_state = feature_1h.get('volume_price_state', 'neutral')
lookback_periods = [12, 18, 24]
price_channels = []
for period in lookback_periods:
if len(df_1h) >= period:
period_data = df_1h.iloc[-period:]
high = period_data['high'].max()
low = period_data['low'].min()
price_channels.append({'high': high, 'low': low, 'width': high - low})
if price_channels:
avg_width = sum(pc['width'] for pc in price_channels) / len(price_channels)
selected_channel = min(price_channels,
key=lambda pc: abs(pc['width'] - avg_width))
support = selected_channel['low']
resistance = selected_channel['high']
range_width = resistance - support
range_width_pct = (range_width / current_price) * 100
is_narrow_range = range_width_pct < 5.0
price_in_middle = (current_price - support) / range_width > 0.3 and \
(current_price - support) / range_width < 0.7
volume_profile_support = None
volume_profile_resistance = None
volume_profile_mid = None
if len(df_1h) >= 24:
df_1h_copy = df_1h.iloc[-24:].copy()
df_1h_copy['avg_price'] = (df_1h_copy['high'] + df_1h_copy['low'] + df_1h_copy['close']) / 3
price_bins = pd.cut(df_1h_copy['avg_price'], bins=10)
volume_by_price = df_1h_copy.groupby(price_bins, observed=True)['volume'].sum()
if len(volume_by_price) > 0:
max_vol_bin = volume_by_price.idxmax()
if max_vol_bin is not None:
vp_level = (max_vol_bin.left + max_vol_bin.right) / 2
volume_profile_mid = float(vp_level)
if vp_level < current_price:
volume_profile_support = float(vp_level)
if vp_level > current_price:
volume_profile_resistance = float(vp_level)
support_candidates = []
resistance_candidates = []
if support:
support_candidates.append(support)
if volume_profile_support:
support_candidates.append(volume_profile_support)
if resistance:
resistance_candidates.append(resistance)
if volume_profile_resistance:
resistance_candidates.append(volume_profile_resistance)
final_support = np.median(support_candidates) if support_candidates else None
final_resistance = np.median(resistance_candidates) if resistance_candidates else None
confidence = 0
reasons = []
if is_narrow_range:
confidence += 24
reasons.append(f"区间窄({range_width_pct:.1f}%)")
if price_in_middle:
confidence += 20
reasons.append("价格在中部")
if range_state.get('swing_flip_count', 0) >= 5:
confidence += 18
reasons.append(f"方向切换频繁({range_state.get('swing_flip_count')}次)")
if range_state.get('range_efficiency', 1) < 0.3:
confidence += 16
reasons.append("方向效率低")
if pressure_bias == 'neutral' or volume_state in {'neutral', 'high_volume_churn'}:
confidence += 12
reasons.append("量价中性/换手")
if volume_profile_mid and final_support and final_resistance:
if final_support < volume_profile_mid < final_resistance:
confidence += 10
reasons.append("成交密集区位于区间内部")
if final_support and final_resistance:
bounce_count = 0
for i in range(-12, 0):
if i >= -len(df_1h):
row = df_1h.iloc[i]
if abs(row['low'] - final_support) / final_support < 0.005 and row['close'] > row['open']:
bounce_count += 1
if abs(row['high'] - final_resistance) / final_resistance < 0.005 and row['close'] < row['open']:
bounce_count += 1
if bounce_count >= 2:
confidence += 12
reasons.append(f"边界反弹{bounce_count}")
result.update({
'is_ranging': confidence >= 60,
'support_level': float(final_support) if final_support else None,
'resistance_level': float(final_resistance) if final_resistance else None,
'range_width_pct': range_width_pct,
'confidence': confidence,
'volume_profile_support': volume_profile_support,
'volume_profile_resistance': volume_profile_resistance,
'analysis': f"震荡判断: {confidence}% ({', '.join(reasons) if reasons else ''})"
})
except Exception as e:
logger.warning(f"震荡区间检测失败: {e}")
import traceback
logger.debug(traceback.format_exc())
return result
def _detect_trend_reversal(self, data: Dict[str, pd.DataFrame]) -> Dict[str, Any]:
"""
检测趋势反转信号
综合多个指标判断趋势是否可能反转:
1. RSI 背离价格创新高但RSI不创新高 / 价格创新低但RSI不创新低
2. MACD 柱状图缩短/背离
3. 量价背离(价格上涨但成交量下降)
4. 关键K线形态吞没、锤子线、十字星等
5. 多周期趋势不一致
"""
result = {
'is_reversing': False,
'reversal_type': None, # 'bullish_reversal' or 'bearish_reversal'
'confidence': 0,
'signals': [],
'analysis': ''
}
try:
df_15m = data.get('15m')
df_1h = data.get('1h')
if df_15m is None or len(df_15m) < 30:
return result
reversal_signals = []
bullish_signals = 0
bearish_signals = 0
# ========== 1. RSI 背离检测 ==========
if 'rsi' in df_15m.columns and len(df_15m) >= 20:
recent_5 = df_15m.iloc[-5:]
prev_5 = df_15m.iloc[-15:-10]
# 顶背离(看跌反转)
recent_high = recent_5['high'].max()
recent_rsi_at_high = recent_5.loc[recent_5['high'] == recent_high, 'rsi'].values[0]
prev_high = prev_5['high'].max()
prev_rsi_at_high = prev_5.loc[prev_5['high'] == prev_high, 'rsi'].values[0]
if recent_high > prev_high and recent_rsi_at_high < prev_rsi_at_high:
bearish_signals += 2
reversal_signals.append({
'type': 'rsi_divergence',
'direction': 'bearish',
'weight': 2,
'desc': 'RSI顶背离价格创新高但RSI不创新高'
})
# 底背离(看涨反转)
recent_low = recent_5['low'].min()
recent_rsi_at_low = recent_5.loc[recent_5['low'] == recent_low, 'rsi'].values[0]
prev_low = prev_5['low'].min()
prev_rsi_at_low = prev_5.loc[prev_5['low'] == prev_low, 'rsi'].values[0]
if recent_low < prev_low and recent_rsi_at_low > prev_rsi_at_low:
bullish_signals += 2
reversal_signals.append({
'type': 'rsi_divergence',
'direction': 'bullish',
'weight': 2,
'desc': 'RSI底背离价格创新低但RSI不创新低'
})
# ========== 2. MACD 柱状图分析 ==========
if 'macd_hist' in df_15m.columns and len(df_15m) >= 10:
hist_recent = df_15m['macd_hist'].iloc[-3:].values
hist_prev = df_15m['macd_hist'].iloc[-6:-3].values
# MACD 柱状图缩短 = 动能衰竭
if all(h > 0 for h in hist_prev): # 之前是正向
if hist_recent[2] < hist_recent[1] < hist_recent[0]: # 持续缩短
bearish_signals += 1
reversal_signals.append({
'type': 'macd_histogram',
'direction': 'bearish',
'weight': 1,
'desc': 'MACD柱状图持续缩短上涨动能衰竭'
})
if all(h < 0 for h in hist_prev): # 之前是负向
if hist_recent[2] > hist_recent[1] > hist_recent[0]: # 持续收窄
bullish_signals += 1
reversal_signals.append({
'type': 'macd_histogram',
'direction': 'bullish',
'weight': 1,
'desc': 'MACD柱状图持续收窄下跌动能衰竭'
})
# MACD 金叉/死叉
if len(df_15m) >= 2:
macd_current = df_15m['macd'].iloc[-1]
signal_current = df_15m['macd_signal'].iloc[-1]
macd_prev = df_15m['macd'].iloc[-2]
signal_prev = df_15m['macd_signal'].iloc[-2]
# 金叉
if macd_prev <= signal_prev and macd_current > signal_current:
bullish_signals += 1
reversal_signals.append({
'type': 'macd_cross',
'direction': 'bullish',
'weight': 1,
'desc': 'MACD金叉'
})
# 死叉
if macd_prev >= signal_prev and macd_current < signal_current:
bearish_signals += 1
reversal_signals.append({
'type': 'macd_cross',
'direction': 'bearish',
'weight': 1,
'desc': 'MACD死叉'
})
# ========== 3. 量价背离检测 ==========
if 'volume' in df_15m.columns and len(df_15m) >= 10:
recent_price_change = (df_15m['close'].iloc[-1] - df_15m['close'].iloc[-5]) / df_15m['close'].iloc[-5]
recent_volume = df_15m['volume'].iloc[-5:].mean()
older_volume = df_15m['volume'].iloc[-10:-5].mean()
# 价格上涨但成交量下降(量价背离)
if recent_price_change > 0.01 and recent_volume < older_volume * 0.8:
bearish_signals += 1
reversal_signals.append({
'type': 'volume_divergence',
'direction': 'bearish',
'weight': 1,
'desc': '量价背离:价格上涨但成交量萎缩'
})
# 价格下跌但成交量下降(可能见底)
if recent_price_change < -0.01 and recent_volume < older_volume * 0.7:
bullish_signals += 1
reversal_signals.append({
'type': 'volume_divergence',
'direction': 'bullish',
'weight': 1,
'desc': '下跌缩量:抛压枯竭,可能见底'
})
# ========== 4. 关键K线形态检测 ==========
if len(df_15m) >= 3:
latest = df_15m.iloc[-1]
prev = df_15m.iloc[-2]
# 吞没形态
open_latest, close_latest = latest['open'], latest['close']
open_prev, close_prev = prev['open'], prev['close']
# 阳包阴(看涨)
if (close_latest > open_latest and # 当前是阳线
close_prev < open_prev and # 前一个是阴线
open_latest <= close_prev and # 开盘价低于前一个收盘价
close_latest >= open_prev): # 收盘价高于前一个开盘价
bullish_signals += 2
reversal_signals.append({
'type': 'candlestick',
'direction': 'bullish',
'weight': 2,
'desc': '阳包阴吞没形态(强反转信号)'
})
# 阴包阳(看跌)
if (close_latest < open_latest and # 当前是阴线
close_prev > open_prev and # 前一个是阳线
open_latest >= close_prev and # 开盘价高于前一个收盘价
close_latest <= open_prev): # 收盘价低于前一个开盘价
bearish_signals += 2
reversal_signals.append({
'type': 'candlestick',
'direction': 'bearish',
'weight': 2,
'desc': '阴包阳吞没形态(强反转信号)'
})
# 锤子线/倒锤子
body_size = abs(close_latest - open_latest)
upper_shadow = df_15m['high'].iloc[-1] - max(open_latest, close_latest)
lower_shadow = min(open_latest, close_latest) - df_15m['low'].iloc[-1]
# 锤子线(看涨)
if lower_shadow >= body_size * 2 and upper_shadow < body_size * 0.5:
bullish_signals += 1
reversal_signals.append({
'type': 'candlestick',
'direction': 'bullish',
'weight': 1,
'desc': '锤子线(底部反转信号)'
})
# 倒锤子(看跌)
if upper_shadow >= body_size * 2 and lower_shadow < body_size * 0.5:
bearish_signals += 1
reversal_signals.append({
'type': 'candlestick',
'direction': 'bearish',
'weight': 1,
'desc': '倒锤子线(顶部反转信号)'
})
# ========== 5. 多周期趋势不一致 ==========
trend_15m = self._get_trend_direction(df_15m)
trend_1h = self._get_trend_direction(df_1h)
# 小周期反转但大周期未反应
if trend_15m and trend_1h and trend_15m != trend_1h:
if trend_15m == 'bull' and trend_1h == 'bear':
bullish_signals += 1
reversal_signals.append({
'type': 'timeframe_divergence',
'direction': 'bullish',
'weight': 1,
'desc': '15分钟转多但1小时仍看空潜在反转'
})
elif trend_15m == 'bear' and trend_1h == 'bull':
bearish_signals += 1
reversal_signals.append({
'type': 'timeframe_divergence',
'direction': 'bearish',
'weight': 1,
'desc': '15分钟转空但1小时仍看多潜在反转'
})
# ========== 计算反转信号强度 ==========
total_signals = len(reversal_signals)
if total_signals >= 3:
# 至少3个反转信号才认为可能反转
if bullish_signals >= bearish_signals + 2:
result['is_reversing'] = True
result['reversal_type'] = 'bullish_reversal'
result['confidence'] = min(90, bullish_signals * 15)
result['signals'] = [s for s in reversal_signals if s['direction'] == 'bullish']
elif bearish_signals >= bullish_signals + 2:
result['is_reversing'] = True
result['reversal_type'] = 'bearish_reversal'
result['confidence'] = min(90, bearish_signals * 15)
result['signals'] = [s for s in reversal_signals if s['direction'] == 'bearish']
if reversal_signals:
result['analysis'] = f"检测到 {len(reversal_signals)} 个反转信号"
else:
result['analysis'] = "无反转信号"
except Exception as e:
logger.warning(f"趋势反转检测失败: {e}")
import traceback
logger.debug(traceback.format_exc())
return result
def _get_trend_direction(self, df: pd.DataFrame) -> str:
"""获取趋势方向bull/bear/neutral"""
if df is None or len(df) < 10:
return 'neutral'
try:
# 使用EMA判断
ma5 = df['ma5'].iloc[-1] if 'ma5' in df.columns else None
ma10 = df['ma10'].iloc[-1] if 'ma10' in df.columns else None
ma20 = df['ma20'].iloc[-1] if 'ma20' in df.columns else None
if ma5 and ma10 and ma20:
if ma5 > ma10 > ma20:
return 'bull'
elif ma5 < ma10 < ma20:
return 'bear'
# 使用MACD判断
if 'macd' in df.columns and 'macd_signal' in df.columns:
macd = df['macd'].iloc[-1]
signal = df['macd_signal'].iloc[-1]
if macd > signal and macd > 0:
return 'bull'
elif macd < signal and macd < 0:
return 'bear'
except Exception as e:
logger.debug(f"趋势方向判断失败: {e}")
return 'neutral'
def _detect_trend_stage(self, data: Dict[str, pd.DataFrame]) -> Dict[str, Any]:
"""
基于价格行为和量价关系识别趋势阶段:早期 / 中期 / 晚期。
"""
result = {
'stage': 'unknown', # 'early', 'middle', 'late'
'confidence': 0,
'signals': [],
'analysis': ''
}
try:
df_1h = data.get('1h')
df_4h = data.get('4h')
if df_1h is None or len(df_1h) < 24:
return result
feature_1h = self.feature_engine.summarize_timeframe_features(df_1h, '1h')
feature_4h = self.feature_engine.summarize_timeframe_features(df_4h, '4h') if df_4h is not None and len(df_4h) >= 20 else {}
stage_signals = []
early_score = 0
middle_score = 0
late_score = 0
structure_1h = feature_1h.get('structure')
structure_4h = feature_4h.get('structure')
volume_state = feature_1h.get('volume_price_state', 'neutral')
breakout_quality = feature_1h.get('breakout_quality', 'none')
pullback_quality = feature_1h.get('pullback_quality', 'neutral')
rejection_signal = feature_1h.get('rejection_signal', 'none')
exhaustion_risk = feature_1h.get('exhaustion_risk', 'low')
body_ratio = float(feature_1h.get('body_ratio') or 0)
close_pos = float(feature_1h.get('close_position_in_bar') or 0.5)
dist_high = abs(float(feature_1h.get('distance_to_recent_high') or 0))
dist_low = abs(float(feature_1h.get('distance_to_recent_low') or 0))
extension_pct = min(dist_high, dist_low)
momentum_12 = float(feature_1h.get('momentum_12') or 0)
if breakout_quality in {'acceptance_breakout_up', 'acceptance_breakout_down'}:
early_score += 28
stage_signals.append("突破后被市场接受")
elif volume_state in {'bullish_continuation', 'bearish_continuation'} and abs(momentum_12) >= 2.0:
middle_score += 24
stage_signals.append("趋势推进与量价延续同步")
if pullback_quality == 'healthy_pullback':
middle_score += 18
stage_signals.append("回调缩量,趋势结构健康")
elif pullback_quality in {'heavy_sell_pullback', 'heavy_buy_pullback'}:
late_score += 16
stage_signals.append("回调放量,对趋势不利")
if structure_1h in {'HH/HL', 'LH/LL'} and structure_4h == structure_1h:
middle_score += 18
stage_signals.append("1h 与 4h 结构同向")
elif structure_1h in {'HH/HL', 'LH/LL'} and structure_4h and structure_4h != structure_1h:
early_score += 12
stage_signals.append("1h 先动4h 尚未完全跟随")
if rejection_signal in {'bullish_rejection', 'bearish_rejection'}:
early_score += 10
stage_signals.append("关键位置出现拒绝信号")
if exhaustion_risk in {'upside_climax', 'downside_climax'}:
late_score += 26
stage_signals.append("单边推进出现高潮风险")
elif exhaustion_risk == 'high_volume_churn':
late_score += 16
stage_signals.append("高成交换手,趋势延续性存疑")
if extension_pct <= 0.6:
early_score += 8
middle_score += 8
stage_signals.append("价格仍贴近可持续推进区")
elif extension_pct >= 2.5:
late_score += 18
stage_signals.append(f"价格相对近端结构已延伸 {extension_pct:.1f}%")
if body_ratio >= 0.65 and (close_pos >= 0.78 or close_pos <= 0.22):
early_score += 8
middle_score += 8
stage_signals.append("实体推进强,收盘靠近极值")
scores = {
'early': early_score,
'middle': middle_score,
'late': late_score
}
max_score = max(scores.values())
if max_score < 20:
result['stage'] = 'unknown'
result['analysis'] = "趋势阶段不明确"
elif max_score == late_score and late_score >= 40:
result['stage'] = 'late'
result['confidence'] = min(95, late_score)
result['signals'] = stage_signals
result['analysis'] = f"⚠️ 趋势晚期({late_score}分)- " + "; ".join(stage_signals[:3])
elif max_score == early_score and early_score >= 30:
result['stage'] = 'early'
result['confidence'] = min(90, early_score)
result['signals'] = stage_signals
result['analysis'] = f"趋势早期({early_score}分)- " + "; ".join(stage_signals[:3])
else:
result['stage'] = 'middle'
result['confidence'] = min(85, middle_score)
result['signals'] = stage_signals
result['analysis'] = f"趋势中期({middle_score}分)- " + "; ".join(stage_signals[:3])
except Exception as e:
logger.warning(f"趋势阶段检测失败: {e}")
import traceback
logger.debug(traceback.format_exc())
return result