tradusai/scheduler.py
2025-12-04 01:27:58 +08:00

218 lines
7.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Signal Generation Scheduler - 定时生成交易信号
每隔指定时间间隔自动运行量化分析和LLM决策
"""
import asyncio
import logging
import signal
import sys
from datetime import datetime
from pathlib import Path
# Add parent directory to path
sys.path.insert(0, str(Path(__file__).parent))
from config.settings import settings
from analysis.engine import MarketAnalysisEngine
from signals.quantitative import QuantitativeSignalGenerator
from signals.llm_decision import LLMDecisionMaker
from signals.aggregator import SignalAggregator
from notifiers.dingtalk import DingTalkNotifier
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class SignalScheduler:
"""定时信号生成调度器"""
def __init__(self, interval_minutes: int = 5):
"""
Args:
interval_minutes: 生成信号的时间间隔(分钟)
"""
self.interval_minutes = interval_minutes
self.is_running = False
# Initialize components
self.engine = MarketAnalysisEngine()
self.quant_generator = QuantitativeSignalGenerator()
# Initialize LLM decision maker
self.llm_maker = LLMDecisionMaker(provider='openai')
# Initialize DingTalk notifier
import os
dingtalk_webhook = os.getenv('DINGTALK_WEBHOOK')
dingtalk_secret = os.getenv('DINGTALK_SECRET')
self.dingtalk = DingTalkNotifier(
webhook_url=dingtalk_webhook,
secret=dingtalk_secret,
enabled=bool(dingtalk_webhook)
)
logger.info(f"Signal Scheduler 初始化完成 - 每{interval_minutes}分钟生成一次信号")
async def generate_signal_once(self) -> dict:
"""执行一次信号生成"""
try:
logger.info("=" * 80)
logger.info(f"开始生成交易信号 - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
logger.info("=" * 80)
# Step 1: Market analysis
analysis = self.engine.analyze_current_market(timeframe='5m')
if 'error' in analysis:
logger.warning(f"市场分析失败: {analysis['error']}")
return None
logger.info(f"市场分析完成 - 价格: ${analysis['current_price']:,.2f}, 趋势: {analysis['trend_analysis'].get('direction')}")
# Step 2: Quantitative signal
quant_signal = self.quant_generator.generate_signal(analysis)
logger.info(f"量化信号: {quant_signal['signal_type']} (得分: {quant_signal['composite_score']:.1f})")
# Step 3: LLM decision
llm_signal = None
llm_context = self.engine.get_llm_context(format='full')
llm_signal = self.llm_maker.generate_decision(llm_context, analysis)
if llm_signal.get('enabled', True):
logger.info(f"LLM信号: {llm_signal['signal_type']} (置信度: {llm_signal.get('confidence', 0):.2%})")
else:
logger.info("LLM未启用 (无API key)")
# Step 4: Aggregate signals
aggregated = SignalAggregator.aggregate_signals(quant_signal, llm_signal)
logger.info(f"最终信号: {aggregated['final_signal']} (置信度: {aggregated['final_confidence']:.2%})")
# Step 5: Save to file
output_file = Path(__file__).parent / 'output' / 'latest_signal.json'
output_file.parent.mkdir(exist_ok=True)
import json
output_data = {
'timestamp': datetime.now().isoformat(),
'aggregated_signal': aggregated,
'market_analysis': {
'price': analysis['current_price'],
'trend': analysis['trend_analysis'],
'momentum': analysis['momentum'],
},
'quantitative_signal': quant_signal,
'llm_signal': llm_signal if llm_signal and llm_signal.get('enabled', True) else None,
}
with open(output_file, 'w') as f:
json.dump(output_data, f, indent=2, ensure_ascii=False)
logger.info(f"信号已保存到: {output_file}")
# Step 6: Send DingTalk notification
try:
final_signal = aggregated.get('final_signal', 'HOLD')
should_notify = False
notify_reason = ""
if final_signal in ['BUY', 'SELL']:
should_notify = True
notify_reason = f"明确{final_signal}信号"
elif final_signal == 'HOLD':
# 检查是否有日内机会
llm_signal = aggregated.get('llm_signal')
if llm_signal and isinstance(llm_signal, dict):
opportunities = llm_signal.get('opportunities', {})
short_term = opportunities.get('short_term_5m_15m_1h', {})
if short_term.get('exists', False):
should_notify = True
direction = short_term.get('direction', 'N/A')
notify_reason = f"HOLD信号但存在短期{direction}机会"
if should_notify:
logger.info(f"发送钉钉通知 - {notify_reason}")
sent = self.dingtalk.send_signal(aggregated)
if sent:
logger.info(f"钉钉通知发送成功")
else:
logger.warning(f"钉钉通知发送失败或未配置")
else:
logger.info(f"HOLD信号且无日内机会跳过钉钉通知")
except Exception as e:
logger.error(f"钉钉通知发送异常: {e}", exc_info=True)
logger.info("=" * 80)
return aggregated
except Exception as e:
logger.error(f"信号生成失败: {e}", exc_info=True)
return None
async def run(self):
"""启动调度器主循环"""
self.is_running = True
logger.info(f"Signal Scheduler 启动 - 每{self.interval_minutes}分钟生成信号")
# 立即生成一次
await self.generate_signal_once()
# 定时循环
while self.is_running:
try:
# 等待指定时间间隔
await asyncio.sleep(self.interval_minutes * 60)
# 生成信号
await self.generate_signal_once()
except asyncio.CancelledError:
logger.info("调度器收到取消信号")
break
except Exception as e:
logger.error(f"调度器错误: {e}", exc_info=True)
await asyncio.sleep(60) # 错误后等待1分钟再继续
logger.info("Signal Scheduler 已停止")
def stop(self):
"""停止调度器"""
self.is_running = False
async def main():
"""主入口"""
# 从环境变量或默认值获取间隔
import os
interval = int(os.getenv('SIGNAL_INTERVAL_MINUTES', '5'))
scheduler = SignalScheduler(interval_minutes=interval)
# Setup signal handlers for graceful shutdown
def signal_handler(sig, _frame):
logger.info(f"收到信号 {sig},正在关闭...")
scheduler.stop()
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
# Start scheduler
await scheduler.run()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
logger.info("用户中断")
except Exception as e:
logger.error(f"致命错误: {e}", exc_info=True)
sys.exit(1)