trading.ai/examples/task_config_examples.py
2025-10-01 09:58:52 +08:00

304 lines
9.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
任务配置示例
展示如何配置不同类型的定时策略任务
"""
import sys
from pathlib import Path
# 添加src目录到路径
current_dir = Path(__file__).parent
src_dir = current_dir.parent / "src"
sys.path.insert(0, str(src_dir))
from loguru import logger
from src.data.tushare_fetcher import TushareFetcher
from src.data.stock_pool_manager import StockPoolManager
from src.strategy.kline_pattern_strategy import KLinePatternStrategy
from src.execution.strategy_executor import StrategyExecutor
from src.execution.task_scheduler import TaskScheduler
from src.utils.notification import NotificationManager
from src.utils.config_loader import config_loader
def setup_common_tasks():
"""设置常见的策略任务配置"""
print("=" * 80)
print("📋 常见策略任务配置示例")
print("=" * 80)
# 初始化组件
fetcher = TushareFetcher()
pool_manager = StockPoolManager(fetcher)
notification_manager = NotificationManager(config_loader.get('notification', {}))
executor = StrategyExecutor(pool_manager, notification_manager)
scheduler = TaskScheduler()
# 策略配置
strategy_config = {
'min_entity_ratio': 0.55,
'final_yang_min_ratio': 0.40,
'max_turnover_ratio': 40.0,
'timeframes': ['daily'],
'pullback_tolerance': 0.02,
'monitor_days': 30,
'pullback_confirmation_days': 7
}
# 注册策略
kline_strategy = KLinePatternStrategy(
data_fetcher=fetcher,
notification_manager=notification_manager,
config=strategy_config
)
executor.register_strategy("kline_pattern", kline_strategy)
print("🎯 任务配置场景:")
print()
# 场景1: 开盘前扫描热门股票
print("📊 场景1: 开盘前热门股票扫描")
print("-" * 60)
task_1 = executor.create_task_function(
strategy_id="kline_pattern",
stock_pool_rule="tushare_hot",
stock_pool_params={"limit": 50},
max_stocks=30,
send_notification=True
)
scheduler.add_task(
task_id="pre_market_hot_scan",
name="开盘前热门股票K线扫描",
func=task_1,
schedule_rule="weekdays at 09:00",
enabled=False
)
print("✅ 配置完成:")
print(" 任务: 开盘前热门股票K线扫描")
print(" 时间: 每个工作日 09:00")
print(" 股票池: 同花顺热榜前50只")
print(" 分析: 最多30只股票")
print(" 通知: 启用")
print()
# 场景2: 午间龙头股扫描
print("🐲 场景2: 午间龙头股扫描")
print("-" * 60)
task_2 = executor.create_task_function(
strategy_id="kline_pattern",
stock_pool_rule="leading_stocks",
stock_pool_params={
"top_boards": 8,
"stocks_per_board": 3,
"min_score": 70.0
},
max_stocks=20,
send_notification=True
)
scheduler.add_task(
task_id="midday_leading_scan",
name="午间龙头股K线扫描",
func=task_2,
schedule_rule="weekdays at 12:30",
enabled=False
)
print("✅ 配置完成:")
print(" 任务: 午间龙头股K线扫描")
print(" 时间: 每个工作日 12:30")
print(" 股票池: 热门板块前8个每板块前3只评分>70")
print(" 分析: 最多20只龙头股")
print(" 通知: 启用")
print()
# 场景3: 收盘后综合扫描
print("🌅 场景3: 收盘后综合扫描")
print("-" * 60)
task_3 = executor.create_task_function(
strategy_id="kline_pattern",
stock_pool_rule="combined_hot",
stock_pool_params={
"limit_per_source": 30,
"final_limit": 50
},
max_stocks=40,
send_notification=True
)
scheduler.add_task(
task_id="after_market_comprehensive_scan",
name="收盘后综合热门股扫描",
func=task_3,
schedule_rule="weekdays at 15:30",
enabled=False
)
print("✅ 配置完成:")
print(" 任务: 收盘后综合热门股扫描")
print(" 时间: 每个工作日 15:30")
print(" 股票池: 合并热门(同花顺+东财)各取30只合并后50只")
print(" 分析: 最多40只股票")
print(" 通知: 启用")
print()
# 场景4: 高频监控
print("⚡ 场景4: 高频监控扫描")
print("-" * 60)
task_4 = executor.create_task_function(
strategy_id="kline_pattern",
stock_pool_rule="tushare_hot",
stock_pool_params={"limit": 20},
max_stocks=15,
send_notification=False # 高频不通知,避免打扰
)
scheduler.add_task(
task_id="high_freq_monitor",
name="高频热门股监控",
func=task_4,
schedule_rule="every 15 minutes",
enabled=False
)
print("✅ 配置完成:")
print(" 任务: 高频热门股监控")
print(" 时间: 每15分钟执行一次")
print(" 股票池: 同花顺热榜前20只")
print(" 分析: 最多15只股票")
print(" 通知: 关闭(避免频繁打扰)")
print()
# 场景5: 自定义股票池
print("🎯 场景5: 自定义股票池扫描")
print("-" * 60)
# 创建自定义股票池
custom_stocks = [
"000001.SZ", # 平安银行
"000002.SZ", # 万科A
"600000.SH", # 浦发银行
"600036.SH", # 招商银行
"000858.SZ", # 五粮液
"600519.SH", # 贵州茅台
"000725.SZ", # 京东方A
"002415.SZ" # 海康威视
]
pool_manager.create_custom_rule("my_watchlist", custom_stocks)
task_5 = executor.create_task_function(
strategy_id="kline_pattern",
stock_pool_rule="my_watchlist",
stock_pool_params={},
max_stocks=len(custom_stocks),
send_notification=True
)
scheduler.add_task(
task_id="custom_watchlist_scan",
name="自选股K线形态扫描",
func=task_5,
schedule_rule="daily at 21:00",
enabled=False
)
print("✅ 配置完成:")
print(" 任务: 自选股K线形态扫描")
print(" 时间: 每日 21:00")
print(f" 股票池: 自定义股票池({len(custom_stocks)}只)")
print(" 分析: 全部自选股")
print(" 通知: 启用")
print()
# 显示所有任务状态
print("📋 所有配置任务总览:")
print("-" * 60)
task_status = scheduler.get_task_status()
for task_id, status in task_status.items():
print(f"🔹 {status['name']}")
print(f" ID: {task_id}")
print(f" 规则: {status['schedule_rule']}")
print(f" 状态: {status['status']}")
print(f" 启用: {'' if status['enabled'] else ''}")
print()
# 使用指南
print("📖 使用指南:")
print("-" * 60)
print("1. 根据实际需求选择合适的任务配置")
print("2. 调整股票池参数和分析数量")
print("3. 设置合适的执行时间")
print("4. 启用需要的任务: scheduler.enable_task('task_id')")
print("5. 启动调度器: scheduler.start()")
print("6. 立即测试: scheduler.execute_task_now('task_id')")
print()
print("⚙️ 高级配置技巧:")
print("-" * 60)
print("• 开盘前(09:00): 扫描热门股,发现隔夜机会")
print("• 午间时段(12:30): 扫描龙头股,捕捉强势股")
print("• 收盘后(15:30): 综合扫描,总结全天机会")
print("• 高频监控(15分钟): 实时跟踪,但关闭通知")
print("• 晚间复盘(21:00): 扫描自选股,制定明日策略")
print()
return scheduler, executor
def demo_task_execution(scheduler, executor):
"""演示任务执行"""
print("🎬 任务执行演示:")
print("-" * 60)
# 立即执行一个任务进行测试
print("立即执行任务: pre_market_hot_scan")
success = scheduler.execute_task_now("pre_market_hot_scan")
print(f"执行结果: {'成功' if success else '失败'}")
print()
# 显示执行统计
task_status = scheduler.get_task_status()
for task_id, status in task_status.items():
if status['total_executions'] > 0:
print(f"📊 任务: {status['name']}")
print(f" 执行次数: {status['total_executions']}")
print(f" 成功率: {status['success_rate']:.1f}%")
if status['last_execution_time']:
print(f" 最后执行: {status['last_execution_time']}")
print()
print("💡 提示: 在生产环境中启用任务并启动调度器")
print(" scheduler.enable_task('task_id')")
print(" scheduler.start()")
if __name__ == "__main__":
# 设置日志
logger.remove()
logger.add(sys.stdout, level="INFO", format="{time:HH:mm:ss} | {level} | {message}")
try:
scheduler, executor = setup_common_tasks()
demo_task_execution(scheduler, executor)
print("=" * 80)
print("🎉 任务配置示例演示完成!")
print("💼 可根据实际需求调整参数和时间规则")
print("🚀 启用任务并启动调度器即可自动运行")
print("=" * 80)
except Exception as e:
logger.error(f"演示过程中发生错误: {e}")
import traceback
traceback.print_exc()