375 lines
14 KiB
Python
375 lines
14 KiB
Python
"""
|
|
Signal Generation Scheduler - 定时生成交易信号
|
|
|
|
每隔指定时间间隔自动运行量化分析和LLM决策
|
|
支持多币种: BTC/USDT, ETH/USDT 等
|
|
|
|
特性:
|
|
- 定时分析: 每隔指定时间自动分析
|
|
- 波动率触发: 当价格波动率突增时自动触发分析
|
|
"""
|
|
import asyncio
|
|
import logging
|
|
import signal
|
|
import sys
|
|
import os
|
|
import json
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Dict, Any, List
|
|
|
|
# Add parent directory to path
|
|
sys.path.insert(0, str(Path(__file__).parent))
|
|
|
|
from config.settings import settings
|
|
from analysis.engine import MarketAnalysisEngine
|
|
from analysis.volatility_monitor import VolatilityMonitor
|
|
from signals.quantitative import QuantitativeSignalGenerator
|
|
from signals.llm_decision import LLMDecisionMaker
|
|
from signals.aggregator import SignalAggregator
|
|
from notifiers.dingtalk import DingTalkNotifier
|
|
|
|
# Setup logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class SignalScheduler:
|
|
"""定时信号生成调度器 - 支持多币种 + 波动率触发"""
|
|
|
|
def __init__(
|
|
self,
|
|
interval_minutes: int = 5,
|
|
symbols: List[str] = None,
|
|
enable_volatility_trigger: bool = True,
|
|
volatility_threshold: float = 0.5, # 波动率阈值 0.5%
|
|
volatility_cooldown: int = 3, # 波动率触发冷却时间(分钟)
|
|
):
|
|
"""
|
|
Args:
|
|
interval_minutes: 生成信号的时间间隔(分钟)
|
|
symbols: 交易对列表,如 ['BTCUSDT', 'ETHUSDT']
|
|
enable_volatility_trigger: 是否启用波动率触发
|
|
volatility_threshold: 波动率触发阈值(百分比)
|
|
volatility_cooldown: 波动率触发后的冷却时间(分钟)
|
|
"""
|
|
self.interval_minutes = interval_minutes
|
|
self.is_running = False
|
|
self.enable_volatility_trigger = enable_volatility_trigger
|
|
|
|
# 支持多币种
|
|
self.symbols = symbols or settings.symbols_list
|
|
logger.info(f"支持的交易对: {', '.join(self.symbols)}")
|
|
|
|
# 为每个币种初始化分析引擎
|
|
self.engines: Dict[str, MarketAnalysisEngine] = {}
|
|
for symbol in self.symbols:
|
|
self.engines[symbol] = MarketAnalysisEngine(symbol=symbol)
|
|
|
|
# 共享组件
|
|
self.quant_generator = QuantitativeSignalGenerator()
|
|
self.llm_maker = LLMDecisionMaker(provider='openai')
|
|
|
|
# Initialize DingTalk notifier
|
|
dingtalk_webhook = os.getenv('DINGTALK_WEBHOOK')
|
|
dingtalk_secret = os.getenv('DINGTALK_SECRET')
|
|
self.dingtalk = DingTalkNotifier(
|
|
webhook_url=dingtalk_webhook,
|
|
secret=dingtalk_secret,
|
|
enabled=bool(dingtalk_webhook)
|
|
)
|
|
|
|
# 波动率监控器
|
|
self.volatility_monitor = None
|
|
if enable_volatility_trigger:
|
|
self.volatility_monitor = VolatilityMonitor(
|
|
symbols=self.symbols,
|
|
on_volatility_spike=self._on_volatility_spike,
|
|
check_interval=5, # 每5秒检查一次
|
|
price_change_threshold=volatility_threshold,
|
|
volatility_multiplier=2.0,
|
|
cooldown_minutes=volatility_cooldown,
|
|
)
|
|
|
|
# 用于追踪触发的分析任务
|
|
self._pending_analysis: Dict[str, bool] = {}
|
|
self._analysis_lock = asyncio.Lock()
|
|
|
|
logger.info(
|
|
f"Signal Scheduler 初始化完成 - 定时: 每{interval_minutes}分钟, "
|
|
f"波动率触发: {'启用' if enable_volatility_trigger else '禁用'}"
|
|
)
|
|
|
|
async def generate_signal_for_symbol(self, symbol: str) -> Dict:
|
|
"""为单个币种生成信号"""
|
|
try:
|
|
engine = self.engines.get(symbol)
|
|
if not engine:
|
|
logger.error(f"未找到 {symbol} 的分析引擎")
|
|
return None
|
|
|
|
logger.info(f"[{symbol}] 开始分析...")
|
|
|
|
# Step 1: Market analysis
|
|
analysis = engine.analyze_current_market(timeframe='5m')
|
|
|
|
if 'error' in analysis:
|
|
logger.warning(f"[{symbol}] 市场分析失败: {analysis['error']}")
|
|
return None
|
|
|
|
logger.info(f"[{symbol}] 价格: ${analysis['current_price']:,.2f}, 趋势: {analysis['trend_analysis'].get('direction')}")
|
|
|
|
# Step 2: Quantitative signal
|
|
quant_signal = self.quant_generator.generate_signal(analysis)
|
|
logger.info(f"[{symbol}] 量化信号: {quant_signal['signal_type']} (得分: {quant_signal['composite_score']:.1f})")
|
|
|
|
# Step 3: LLM decision
|
|
llm_context = engine.get_llm_context(format='full')
|
|
llm_signal = self.llm_maker.generate_decision(llm_context, analysis)
|
|
|
|
if llm_signal.get('enabled', True):
|
|
logger.info(f"[{symbol}] LLM信号: {llm_signal['signal_type']} (置信度: {llm_signal.get('confidence', 0):.2%})")
|
|
else:
|
|
logger.info(f"[{symbol}] LLM未启用")
|
|
|
|
# Step 4: Aggregate signals
|
|
aggregated = SignalAggregator.aggregate_signals(quant_signal, llm_signal)
|
|
aggregated['symbol'] = symbol # 添加币种标识
|
|
|
|
logger.info(f"[{symbol}] 最终信号: {aggregated['final_signal']} (置信度: {aggregated['final_confidence']:.2%})")
|
|
|
|
# Step 5: Save to file (每个币种独立文件)
|
|
output_dir = Path(__file__).parent / 'output'
|
|
output_dir.mkdir(exist_ok=True)
|
|
|
|
# 保存独立信号文件
|
|
symbol_file = output_dir / f'signal_{symbol.lower()}.json'
|
|
output_data = {
|
|
'symbol': symbol,
|
|
'timestamp': datetime.now().isoformat(),
|
|
'aggregated_signal': aggregated,
|
|
'market_analysis': {
|
|
'price': analysis['current_price'],
|
|
'trend': analysis['trend_analysis'],
|
|
'momentum': analysis['momentum'],
|
|
},
|
|
'quantitative_signal': quant_signal,
|
|
'llm_signal': llm_signal if llm_signal and llm_signal.get('enabled', True) else None,
|
|
}
|
|
|
|
with open(symbol_file, 'w') as f:
|
|
json.dump(output_data, f, indent=2, ensure_ascii=False)
|
|
|
|
return output_data
|
|
|
|
except Exception as e:
|
|
logger.error(f"[{symbol}] 信号生成失败: {e}", exc_info=True)
|
|
return None
|
|
|
|
async def generate_signal_once(self) -> Dict:
|
|
"""为所有币种生成信号"""
|
|
logger.info("=" * 80)
|
|
logger.info(f"开始生成交易信号 - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
|
|
logger.info(f"交易对: {', '.join(self.symbols)}")
|
|
logger.info("=" * 80)
|
|
|
|
all_signals = {}
|
|
|
|
# 为每个币种生成信号
|
|
for symbol in self.symbols:
|
|
result = await self.generate_signal_for_symbol(symbol)
|
|
if result:
|
|
all_signals[symbol] = result
|
|
|
|
# 保存汇总信号文件 (latest_signal.json 保持向后兼容,使用第一个币种)
|
|
if all_signals:
|
|
# 合并所有信号到一个文件
|
|
combined_file = Path(__file__).parent / 'output' / 'latest_signals.json'
|
|
with open(combined_file, 'w') as f:
|
|
json.dump({
|
|
'timestamp': datetime.now().isoformat(),
|
|
'symbols': all_signals,
|
|
}, f, indent=2, ensure_ascii=False)
|
|
|
|
# 向后兼容: latest_signal.json 使用第一个币种
|
|
first_symbol = self.symbols[0]
|
|
if first_symbol in all_signals:
|
|
compat_file = Path(__file__).parent / 'output' / 'latest_signal.json'
|
|
with open(compat_file, 'w') as f:
|
|
json.dump(all_signals[first_symbol], f, indent=2, ensure_ascii=False)
|
|
|
|
# Step 6: Send DingTalk notification for signals with opportunities
|
|
for symbol, signal_data in all_signals.items():
|
|
await self._send_notification(symbol, signal_data)
|
|
|
|
logger.info("=" * 80)
|
|
return all_signals
|
|
|
|
async def _send_notification(self, symbol: str, signal_data: Dict):
|
|
"""发送钉钉通知"""
|
|
try:
|
|
aggregated = signal_data.get('aggregated_signal', {})
|
|
final_signal = aggregated.get('final_signal', 'HOLD')
|
|
|
|
should_notify = False
|
|
notify_reason = ""
|
|
|
|
if final_signal in ['BUY', 'SELL']:
|
|
should_notify = True
|
|
notify_reason = f"[{symbol}] 明确{final_signal}信号"
|
|
elif final_signal == 'HOLD':
|
|
llm_signal = aggregated.get('llm_signal')
|
|
if llm_signal and isinstance(llm_signal, dict):
|
|
opportunities = llm_signal.get('opportunities', {})
|
|
short_term = opportunities.get('short_term_5m_15m_1h', {})
|
|
if short_term.get('exists', False):
|
|
should_notify = True
|
|
direction = short_term.get('direction', 'N/A')
|
|
notify_reason = f"[{symbol}] 存在短期{direction}机会"
|
|
|
|
if should_notify:
|
|
logger.info(f"发送钉钉通知 - {notify_reason}")
|
|
# 在信号中添加币种信息
|
|
aggregated['symbol'] = symbol
|
|
sent = self.dingtalk.send_signal(aggregated)
|
|
if sent:
|
|
logger.info(f"[{symbol}] 钉钉通知发送成功")
|
|
else:
|
|
logger.warning(f"[{symbol}] 钉钉通知发送失败或未配置")
|
|
except Exception as e:
|
|
logger.error(f"[{symbol}] 钉钉通知发送异常: {e}", exc_info=True)
|
|
|
|
async def _on_volatility_spike(self, symbol: str, spike_info: Dict):
|
|
"""波动率突增时的回调处理"""
|
|
logger.info(
|
|
f"🚨 [{symbol}] 波动率触发! "
|
|
f"类型: {spike_info['type']}, "
|
|
f"变化: {spike_info['change_pct']:.2f}%, "
|
|
f"方向: {spike_info.get('direction', 'N/A')}"
|
|
)
|
|
|
|
# 避免重复触发
|
|
async with self._analysis_lock:
|
|
if self._pending_analysis.get(symbol):
|
|
logger.info(f"[{symbol}] 已有分析任务在执行,跳过")
|
|
return
|
|
self._pending_analysis[symbol] = True
|
|
|
|
try:
|
|
# 为该币种生成信号
|
|
logger.info(f"🔄 [{symbol}] 波动率触发分析开始...")
|
|
result = await self.generate_signal_for_symbol(symbol)
|
|
|
|
if result:
|
|
# 保存独立信号文件(已在 generate_signal_for_symbol 中完成)
|
|
# 发送通知(标记为波动率触发)
|
|
aggregated = result.get('aggregated_signal', {})
|
|
aggregated['trigger_type'] = 'volatility_spike'
|
|
aggregated['spike_info'] = spike_info
|
|
await self._send_notification(symbol, result)
|
|
|
|
logger.info(f"✅ [{symbol}] 波动率触发分析完成")
|
|
else:
|
|
logger.warning(f"[{symbol}] 波动率触发分析失败")
|
|
|
|
finally:
|
|
async with self._analysis_lock:
|
|
self._pending_analysis[symbol] = False
|
|
|
|
async def run(self):
|
|
"""启动调度器主循环"""
|
|
self.is_running = True
|
|
logger.info(f"Signal Scheduler 启动 - 每{self.interval_minutes}分钟生成信号")
|
|
|
|
# 启动波动率监控器(如果启用)
|
|
volatility_task = None
|
|
if self.volatility_monitor:
|
|
volatility_task = asyncio.create_task(self.volatility_monitor.start())
|
|
logger.info("波动率监控器已启动")
|
|
|
|
try:
|
|
# 立即生成一次
|
|
await self.generate_signal_once()
|
|
|
|
# 定时循环
|
|
while self.is_running:
|
|
try:
|
|
# 等待指定时间间隔
|
|
await asyncio.sleep(self.interval_minutes * 60)
|
|
|
|
# 生成信号
|
|
await self.generate_signal_once()
|
|
|
|
except asyncio.CancelledError:
|
|
logger.info("调度器收到取消信号")
|
|
break
|
|
except Exception as e:
|
|
logger.error(f"调度器错误: {e}", exc_info=True)
|
|
await asyncio.sleep(60) # 错误后等待1分钟再继续
|
|
|
|
finally:
|
|
# 停止波动率监控器
|
|
if self.volatility_monitor:
|
|
self.volatility_monitor.stop()
|
|
if volatility_task:
|
|
volatility_task.cancel()
|
|
try:
|
|
await volatility_task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
logger.info("Signal Scheduler 已停止")
|
|
|
|
def stop(self):
|
|
"""停止调度器"""
|
|
self.is_running = False
|
|
|
|
|
|
async def main():
|
|
"""主入口"""
|
|
# 从环境变量获取配置
|
|
interval = int(os.getenv('SIGNAL_INTERVAL_MINUTES', '5'))
|
|
enable_volatility = os.getenv('ENABLE_VOLATILITY_TRIGGER', 'true').lower() == 'true'
|
|
volatility_threshold = float(os.getenv('VOLATILITY_THRESHOLD', '1.0')) # 1% 阈值 (原0.5%太敏感)
|
|
volatility_cooldown = int(os.getenv('VOLATILITY_COOLDOWN_MINUTES', '5')) # 5分钟冷却 (原3分钟)
|
|
|
|
logger.info("=" * 60)
|
|
logger.info("Signal Scheduler 配置:")
|
|
logger.info(f" 定时间隔: {interval} 分钟")
|
|
logger.info(f" 波动率触发: {'启用' if enable_volatility else '禁用'}")
|
|
if enable_volatility:
|
|
logger.info(f" 波动率阈值: {volatility_threshold}%")
|
|
logger.info(f" 触发冷却: {volatility_cooldown} 分钟")
|
|
logger.info("=" * 60)
|
|
|
|
scheduler = SignalScheduler(
|
|
interval_minutes=interval,
|
|
enable_volatility_trigger=enable_volatility,
|
|
volatility_threshold=volatility_threshold,
|
|
volatility_cooldown=volatility_cooldown,
|
|
)
|
|
|
|
# Setup signal handlers for graceful shutdown
|
|
def signal_handler(sig, _frame):
|
|
logger.info(f"收到信号 {sig},正在关闭...")
|
|
scheduler.stop()
|
|
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
signal.signal(signal.SIGTERM, signal_handler)
|
|
|
|
# Start scheduler
|
|
await scheduler.run()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
asyncio.run(main())
|
|
except KeyboardInterrupt:
|
|
logger.info("用户中断")
|
|
except Exception as e:
|
|
logger.error(f"致命错误: {e}", exc_info=True)
|
|
sys.exit(1)
|