#!/usr/bin/env python3 """ 新架构集成示例 展示如何使用重构后的模块化架构进行策略执行和任务调度 """ import sys import time from pathlib import Path from datetime import datetime # 添加src目录到路径 current_dir = Path(__file__).parent src_dir = current_dir.parent / "src" sys.path.insert(0, str(src_dir)) from loguru import logger from src.data.tushare_fetcher import TushareFetcher from src.data.stock_pool_manager import StockPoolManager from src.strategy.kline_pattern_strategy import KLinePatternStrategy from src.execution.strategy_executor import StrategyExecutor from src.execution.task_scheduler import TaskScheduler from src.utils.notification import NotificationManager from src.utils.config_loader import config_loader def demo_new_architecture(): """演示新架构的完整工作流程""" print("=" * 80) print("🚀 新架构演示 - 模块化策略执行系统") print("=" * 80) print("📋 系统架构:") print(" 数据层: TushareFetcher + StockPoolManager") print(" 策略层: BaseStrategy + KLinePatternStrategy") print(" 执行层: TaskScheduler + StrategyExecutor") print(" 通知层: NotificationManager") print() # 1. 初始化所有组件 print("📦 第1步: 初始化所有组件") print("-" * 60) # 数据层 fetcher = TushareFetcher() pool_manager = StockPoolManager(fetcher) # 通知层 notification_config = config_loader.get('notification', {}) notification_manager = NotificationManager(notification_config) # 策略层 strategy_config = { 'min_entity_ratio': 0.55, 'final_yang_min_ratio': 0.40, 'max_turnover_ratio': 40.0, 'timeframes': ['daily'], 'pullback_tolerance': 0.02, 'monitor_days': 30, 'pullback_confirmation_days': 7 } kline_strategy = KLinePatternStrategy( data_fetcher=fetcher, notification_manager=notification_manager, config=strategy_config ) # 执行层 executor = StrategyExecutor(pool_manager, notification_manager) scheduler = TaskScheduler() print("✅ 所有组件初始化完成") print() # 2. 注册策略到执行器 print("📋 第2步: 注册策略") print("-" * 60) executor.register_strategy("kline_pattern", kline_strategy) print("已注册策略:") for strategy_id, strategy_name in executor.get_registered_strategies().items(): print(f" {strategy_id}: {strategy_name}") print() # 3. 展示股票池规则 print("🎯 第3步: 可用股票池规则") print("-" * 60) available_rules = pool_manager.get_available_rules() print("可用规则:") for rule_id, rule_name in available_rules.items(): print(f" {rule_id}: {rule_name}") print() # 4. 手动执行单个任务 print("⚡ 第4步: 手动执行策略任务") print("-" * 60) task_id = f"manual_task_{datetime.now().strftime('%Y%m%d_%H%M%S')}" print(f"执行任务: {task_id}") print("参数:") print(f" 策略: kline_pattern") print(f" 股票池规则: tushare_hot") print(f" 最大股票数: 5") print() result = executor.execute_task( task_id=task_id, strategy_id="kline_pattern", stock_pool_rule="tushare_hot", stock_pool_params={"limit": 10}, max_stocks=5, send_notification=False # 演示时不发送通知 ) # 显示执行结果 print("📊 执行结果摘要:") summary = result.get_summary() for key, value in summary.items(): print(f" {key}: {value}") print() # 5. 设置定时任务 print("⏰ 第5步: 设置定时任务") print("-" * 60) # 创建任务函数 task_function_1 = executor.create_task_function( strategy_id="kline_pattern", stock_pool_rule="tushare_hot", stock_pool_params={"limit": 20}, max_stocks=10, send_notification=False ) task_function_2 = executor.create_task_function( strategy_id="kline_pattern", stock_pool_rule="combined_hot", stock_pool_params={"limit_per_source": 15, "final_limit": 25}, max_stocks=15, send_notification=False ) # 添加定时任务 scheduler.add_task( task_id="hot_stocks_scan", name="同花顺热榜K线形态扫描", func=task_function_1, schedule_rule="every 30 minutes", enabled=False # 演示时不启用 ) scheduler.add_task( task_id="combined_hot_scan", name="合并热门股票K线形态扫描", func=task_function_2, schedule_rule="daily at 09:30", enabled=False # 演示时不启用 ) scheduler.add_task( task_id="leading_stocks_scan", name="龙头股K线形态扫描", func=executor.create_task_function( strategy_id="kline_pattern", stock_pool_rule="leading_stocks", stock_pool_params={"top_boards": 5, "stocks_per_board": 2}, max_stocks=10, send_notification=False ), schedule_rule="weekdays at 14:30", enabled=False # 演示时不启用 ) print("已添加定时任务:") task_status = scheduler.get_task_status() for task_id, status in task_status.items(): print(f" {task_id}:") print(f" 名称: {status['name']}") print(f" 规则: {status['schedule_rule']}") print(f" 状态: {status['status']}") print(f" 启用: {status['enabled']}") print() # 6. 演示立即执行定时任务 print("🎬 第6步: 演示立即执行定时任务") print("-" * 60) print("立即执行任务: hot_stocks_scan") success = scheduler.execute_task_now("hot_stocks_scan") print(f"执行结果: {'成功' if success else '失败'}") print() # 7. 显示任务执行历史 print("📈 第7步: 任务执行历史") print("-" * 60) updated_status = scheduler.get_task_status() for task_id, status in updated_status.items(): if status['total_executions'] > 0: print(f"任务: {status['name']}") print(f" 总执行次数: {status['total_executions']}") print(f" 成功率: {status['success_rate']:.1f}%") print(f" 最后执行: {status['last_execution_time']}") print() # 8. 系统功能总结 print("🎯 第8步: 新架构优势总结") print("-" * 60) advantages = """ ✅ 模块化设计: • 数据获取、股票池管理、策略执行、任务调度完全分离 • 每个模块职责单一,易于维护和扩展 • 支持插件化添加新的股票池规则和策略 ✅ 灵活的股票池管理: • 支持多种数据源:同花顺热榜、合并热门、龙头股 • 可配置参数:股票数量、筛选条件等 • 易于添加新的股票池规则 ✅ 标准化的策略接口: • BaseStrategy抽象基类统一策略接口 • StrategyResult标准化输出格式 • 支持多时间周期分析 ✅ 强大的任务调度: • 支持多种调度规则:间隔时间、每日定时、工作日定时 • 任务执行历史和成功率统计 • 支持立即执行和定时执行 ✅ 统一的执行协调: • StrategyExecutor协调股票池获取→策略分析→结果通知 • 完整的执行结果记录和统计 • 支持并发执行多个策略 ✅ 完善的通知系统: • 支持多种通知方式 • 策略结果汇总推送 • 回踩提醒等特殊通知 """ print(advantages) print("🚀 架构使用示例:") example_usage = """ # 基础用法 - 立即执行 result = executor.execute_task( task_id="my_task", strategy_id="kline_pattern", stock_pool_rule="tushare_hot", max_stocks=20 ) # 定时任务 - 每天9:30执行 scheduler.add_task( task_id="morning_scan", name="晨间形态扫描", func=executor.create_task_function(...), schedule_rule="daily at 09:30" ) # 启动调度器 scheduler.start() """ print(example_usage) print("=" * 80) print("🎉 新架构演示完成!") print("💡 现在可以通过简单配置实现复杂的策略执行和调度") print("🔧 支持灵活的股票池规则和多策略并行执行") print("📱 统一的结果通知和监控体系") print("=" * 80) if __name__ == "__main__": # 设置日志 logger.remove() logger.add(sys.stdout, level="INFO", format="{time:HH:mm:ss} | {level} | {message}") try: demo_new_architecture() except Exception as e: logger.error(f"演示过程中发生错误: {e}") import traceback traceback.print_exc()