tradusai/scheduler.py
2025-12-09 22:46:04 +08:00

269 lines
9.9 KiB
Python

"""
Signal Generation Scheduler - 定时生成交易信号
每隔指定时间间隔自动运行量化分析和LLM决策
支持多币种: BTC/USDT, ETH/USDT 等
"""
import asyncio
import logging
import signal
import sys
import os
import json
from datetime import datetime
from pathlib import Path
from typing import Dict, Any, List
# Add parent directory to path
sys.path.insert(0, str(Path(__file__).parent))
from config.settings import settings
from analysis.engine import MarketAnalysisEngine
from signals.quantitative import QuantitativeSignalGenerator
from signals.llm_decision import LLMDecisionMaker
from signals.aggregator import SignalAggregator
from notifiers.dingtalk import DingTalkNotifier
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class SignalScheduler:
"""定时信号生成调度器 - 支持多币种"""
def __init__(self, interval_minutes: int = 5, symbols: List[str] = None):
"""
Args:
interval_minutes: 生成信号的时间间隔(分钟)
symbols: 交易对列表,如 ['BTCUSDT', 'ETHUSDT']
"""
self.interval_minutes = interval_minutes
self.is_running = False
# 支持多币种
self.symbols = symbols or settings.symbols_list
logger.info(f"支持的交易对: {', '.join(self.symbols)}")
# 为每个币种初始化分析引擎
self.engines: Dict[str, MarketAnalysisEngine] = {}
for symbol in self.symbols:
self.engines[symbol] = MarketAnalysisEngine(symbol=symbol)
# 共享组件
self.quant_generator = QuantitativeSignalGenerator()
self.llm_maker = LLMDecisionMaker(provider='openai')
# Initialize DingTalk notifier
dingtalk_webhook = os.getenv('DINGTALK_WEBHOOK')
dingtalk_secret = os.getenv('DINGTALK_SECRET')
self.dingtalk = DingTalkNotifier(
webhook_url=dingtalk_webhook,
secret=dingtalk_secret,
enabled=bool(dingtalk_webhook)
)
logger.info(f"Signal Scheduler 初始化完成 - 每{interval_minutes}分钟生成一次信号")
async def generate_signal_for_symbol(self, symbol: str) -> Dict:
"""为单个币种生成信号"""
try:
engine = self.engines.get(symbol)
if not engine:
logger.error(f"未找到 {symbol} 的分析引擎")
return None
logger.info(f"[{symbol}] 开始分析...")
# Step 1: Market analysis
analysis = engine.analyze_current_market(timeframe='5m')
if 'error' in analysis:
logger.warning(f"[{symbol}] 市场分析失败: {analysis['error']}")
return None
logger.info(f"[{symbol}] 价格: ${analysis['current_price']:,.2f}, 趋势: {analysis['trend_analysis'].get('direction')}")
# Step 2: Quantitative signal
quant_signal = self.quant_generator.generate_signal(analysis)
logger.info(f"[{symbol}] 量化信号: {quant_signal['signal_type']} (得分: {quant_signal['composite_score']:.1f})")
# Step 3: LLM decision
llm_context = engine.get_llm_context(format='full')
llm_signal = self.llm_maker.generate_decision(llm_context, analysis)
if llm_signal.get('enabled', True):
logger.info(f"[{symbol}] LLM信号: {llm_signal['signal_type']} (置信度: {llm_signal.get('confidence', 0):.2%})")
else:
logger.info(f"[{symbol}] LLM未启用")
# Step 4: Aggregate signals
aggregated = SignalAggregator.aggregate_signals(quant_signal, llm_signal)
aggregated['symbol'] = symbol # 添加币种标识
logger.info(f"[{symbol}] 最终信号: {aggregated['final_signal']} (置信度: {aggregated['final_confidence']:.2%})")
# Step 5: Save to file (每个币种独立文件)
output_dir = Path(__file__).parent / 'output'
output_dir.mkdir(exist_ok=True)
# 保存独立信号文件
symbol_file = output_dir / f'signal_{symbol.lower()}.json'
output_data = {
'symbol': symbol,
'timestamp': datetime.now().isoformat(),
'aggregated_signal': aggregated,
'market_analysis': {
'price': analysis['current_price'],
'trend': analysis['trend_analysis'],
'momentum': analysis['momentum'],
},
'quantitative_signal': quant_signal,
'llm_signal': llm_signal if llm_signal and llm_signal.get('enabled', True) else None,
}
with open(symbol_file, 'w') as f:
json.dump(output_data, f, indent=2, ensure_ascii=False)
return output_data
except Exception as e:
logger.error(f"[{symbol}] 信号生成失败: {e}", exc_info=True)
return None
async def generate_signal_once(self) -> Dict:
"""为所有币种生成信号"""
logger.info("=" * 80)
logger.info(f"开始生成交易信号 - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
logger.info(f"交易对: {', '.join(self.symbols)}")
logger.info("=" * 80)
all_signals = {}
# 为每个币种生成信号
for symbol in self.symbols:
result = await self.generate_signal_for_symbol(symbol)
if result:
all_signals[symbol] = result
# 保存汇总信号文件 (latest_signal.json 保持向后兼容,使用第一个币种)
if all_signals:
# 合并所有信号到一个文件
combined_file = Path(__file__).parent / 'output' / 'latest_signals.json'
with open(combined_file, 'w') as f:
json.dump({
'timestamp': datetime.now().isoformat(),
'symbols': all_signals,
}, f, indent=2, ensure_ascii=False)
# 向后兼容: latest_signal.json 使用第一个币种
first_symbol = self.symbols[0]
if first_symbol in all_signals:
compat_file = Path(__file__).parent / 'output' / 'latest_signal.json'
with open(compat_file, 'w') as f:
json.dump(all_signals[first_symbol], f, indent=2, ensure_ascii=False)
# Step 6: Send DingTalk notification for signals with opportunities
for symbol, signal_data in all_signals.items():
await self._send_notification(symbol, signal_data)
logger.info("=" * 80)
return all_signals
async def _send_notification(self, symbol: str, signal_data: Dict):
"""发送钉钉通知"""
try:
aggregated = signal_data.get('aggregated_signal', {})
final_signal = aggregated.get('final_signal', 'HOLD')
should_notify = False
notify_reason = ""
if final_signal in ['BUY', 'SELL']:
should_notify = True
notify_reason = f"[{symbol}] 明确{final_signal}信号"
elif final_signal == 'HOLD':
llm_signal = aggregated.get('llm_signal')
if llm_signal and isinstance(llm_signal, dict):
opportunities = llm_signal.get('opportunities', {})
short_term = opportunities.get('short_term_5m_15m_1h', {})
if short_term.get('exists', False):
should_notify = True
direction = short_term.get('direction', 'N/A')
notify_reason = f"[{symbol}] 存在短期{direction}机会"
if should_notify:
logger.info(f"发送钉钉通知 - {notify_reason}")
# 在信号中添加币种信息
aggregated['symbol'] = symbol
sent = self.dingtalk.send_signal(aggregated)
if sent:
logger.info(f"[{symbol}] 钉钉通知发送成功")
else:
logger.warning(f"[{symbol}] 钉钉通知发送失败或未配置")
except Exception as e:
logger.error(f"[{symbol}] 钉钉通知发送异常: {e}", exc_info=True)
async def run(self):
"""启动调度器主循环"""
self.is_running = True
logger.info(f"Signal Scheduler 启动 - 每{self.interval_minutes}分钟生成信号")
# 立即生成一次
await self.generate_signal_once()
# 定时循环
while self.is_running:
try:
# 等待指定时间间隔
await asyncio.sleep(self.interval_minutes * 60)
# 生成信号
await self.generate_signal_once()
except asyncio.CancelledError:
logger.info("调度器收到取消信号")
break
except Exception as e:
logger.error(f"调度器错误: {e}", exc_info=True)
await asyncio.sleep(60) # 错误后等待1分钟再继续
logger.info("Signal Scheduler 已停止")
def stop(self):
"""停止调度器"""
self.is_running = False
async def main():
"""主入口"""
# 从环境变量或默认值获取间隔
import os
interval = int(os.getenv('SIGNAL_INTERVAL_MINUTES', '5'))
scheduler = SignalScheduler(interval_minutes=interval)
# Setup signal handlers for graceful shutdown
def signal_handler(sig, _frame):
logger.info(f"收到信号 {sig},正在关闭...")
scheduler.stop()
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
# Start scheduler
await scheduler.run()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
logger.info("用户中断")
except Exception as e:
logger.error(f"致命错误: {e}", exc_info=True)
sys.exit(1)