#!/usr/bin/env python3 """ A股量化交易 - 策略测试入口 简化版本,专注于策略测试和执行 """ import sys from pathlib import Path # 将src目录添加到Python路径 current_dir = Path(__file__).parent src_dir = current_dir / "src" sys.path.insert(0, str(src_dir)) from loguru import logger from src.utils.config_loader import config_loader from src.data.tushare_fetcher import TushareFetcher from src.data.stock_pool_manager import StockPoolManager from src.strategy.kline_pattern_strategy import KLinePatternStrategy from src.execution.strategy_executor import StrategyExecutor from src.execution.task_scheduler import TaskScheduler from src.utils.notification import NotificationManager def setup_logging(): """设置简化的日志配置""" logger.remove() logger.add( sys.stdout, level="INFO", format="{time:HH:mm:ss} | {level} | {message}" ) def create_strategy_system(): """创建策略系统的所有组件""" # 数据层 fetcher = TushareFetcher() pool_manager = StockPoolManager(fetcher) # 通知层 notification_config = config_loader.get('notification', {}) notification_manager = NotificationManager(notification_config) # 策略层 strategy_config = config_loader.get('strategy', {}).get('kline_pattern', { 'min_entity_ratio': 0.55, 'final_yang_min_ratio': 0.40, 'max_turnover_ratio': 40.0, 'timeframes': ['daily'], 'pullback_tolerance': 0.02, 'monitor_days': 30, 'pullback_confirmation_days': 7 }) # 数据库层 from src.database.mysql_database_manager import MySQLDatabaseManager db_manager = MySQLDatabaseManager() kline_strategy = KLinePatternStrategy( data_fetcher=fetcher, notification_manager=notification_manager, config=strategy_config, db_manager=db_manager ) # 执行层 executor = StrategyExecutor(pool_manager, notification_manager) scheduler = TaskScheduler() # 注册策略 executor.register_strategy("kline_pattern", kline_strategy) return { 'fetcher': fetcher, 'pool_manager': pool_manager, 'notification_manager': notification_manager, 'kline_strategy': kline_strategy, 'executor': executor, 'scheduler': scheduler } def print_help(): """打印帮助信息""" print("\n🚀 策略测试命令:") print("-" * 50) print(" scan <股票代码> - 分析单只股票") print(" market <数量> - 扫描热门股票(默认20只)") print(" pools - 查看可用股票池规则") print(" task <规则> <数量> - 执行策略任务") print(" schedule - 显示定时任务示例") print(" help - 显示帮助") print(" quit - 退出程序") print("-" * 50) def main(): """主函数""" setup_logging() print("=" * 60) print("🎯 A股量化交易 - 策略测试系统") print("=" * 60) try: # 初始化系统 logger.info("正在初始化策略系统...") system = create_strategy_system() logger.info("✅ 策略系统初始化完成") print(f"\n📊 系统组件:") print(f" ✅ 数据获取器: TushareFetcher") print(f" ✅ 股票池管理: StockPoolManager") print(f" ✅ K线策略: KLinePatternStrategy") print(f" ✅ 执行器: StrategyExecutor") print(f" ✅ 调度器: TaskScheduler") print_help() # 命令行交互 while True: try: command = input("\n> ").strip() if not command: continue if command.lower() in ['quit', 'exit']: print("👋 感谢使用策略测试系统!") break elif command.lower() == 'help': print_help() elif command.startswith('scan '): stock_code = command[5:].strip() if stock_code: scan_single_stock(system['kline_strategy'], stock_code) else: print("请提供股票代码,如: scan 000001.SZ") elif command.startswith('market'): parts = command.split() max_stocks = int(parts[1]) if len(parts) > 1 and parts[1].isdigit() else 20 scan_market(system['executor'], max_stocks) elif command.lower() == 'pools': show_stock_pools(system['pool_manager']) elif command.startswith('task '): parts = command.split() if len(parts) >= 2: rule = parts[1] max_stocks = int(parts[2]) if len(parts) > 2 and parts[2].isdigit() else 10 execute_task(system['executor'], rule, max_stocks) else: print("请提供股票池规则,如: task index_components 100") elif command.lower() == 'schedule': show_schedule_examples(system['scheduler'], system['executor']) else: print("❌ 未知命令,输入 'help' 查看帮助") except KeyboardInterrupt: print("\n\n👋 程序被用户中断") break except Exception as e: logger.error(f"命令执行错误: {e}") print(f"❌ 执行错误: {e}") except Exception as e: logger.error(f"系统启动失败: {e}") print(f"❌ 启动失败: {e}") return 1 return 0 def scan_single_stock(strategy, stock_code): """扫描单只股票""" print(f"\n🔍 分析股票: {stock_code}") print("-" * 40) try: results = strategy.analyze_stock(stock_code) total_signals = 0 for timeframe, result in results.items(): signal_count = result.get_signal_count() total_signals += signal_count if signal_count > 0: print(f"📊 {timeframe.upper()}: 发现 {signal_count} 个信号") for i, signal in enumerate(result.signals, 1): print(f" {i}. {signal['date']} | {signal['signal_type']} | 价格: {signal['price']:.2f}元") else: print(f"📭 {timeframe.upper()}: 无信号") print(f"\n📈 总计: {total_signals} 个信号") except Exception as e: logger.error(f"分析失败: {e}") print(f"❌ 分析失败: {e}") def scan_market(executor, max_stocks): """扫描主力股票池""" print(f"\n🎯 扫描主力股票池 (沪深300+中证500+创业板指,最多{max_stocks}只)") print("-" * 50) try: result = executor.execute_task( task_id=f"market_scan_{max_stocks}", strategy_id="kline_pattern", stock_pool_rule="index_components", # 使用主力股票池 stock_pool_params={}, max_stocks=max_stocks, send_notification=False ) summary = result.get_summary() print(f"📊 扫描结果:") print(f" 股票池: {summary['stock_pool_rule_display']}") print(f" 总扫描: {summary['total_stocks_analyzed']} 只") print(f" 有信号: {summary['stocks_with_signals']} 只") print(f" 信号数: {summary['total_signals_found']} 个") print(f" 耗时: {summary['execution_time']:.2f} 秒") if result.strategy_results: print(f"\n🎯 信号详情:") for stock_code, timeframe_results in result.strategy_results.items(): for timeframe, strategy_result in timeframe_results.items(): if strategy_result.get_signal_count() > 0: stock_name = executor.stock_pool_manager.fetcher.get_stock_name(stock_code) print(f" 📈 {stock_code}({stock_name}): {strategy_result.get_signal_count()} 个信号") except Exception as e: logger.error(f"市场扫描失败: {e}") print(f"❌ 扫描失败: {e}") def show_stock_pools(pool_manager): """显示可用股票池规则""" print(f"\n📋 可用股票池规则:") print("-" * 40) rules = pool_manager.get_available_rules() for rule_id, rule_name in rules.items(): print(f" 🎯 {rule_id}: {rule_name}") print(f"\n💡 使用方法: task <规则名> <股票数量>") print(f" 示例: task index_components 100 # 主力股票池") print(f" 示例: task tushare_hot 50 # 同花顺热榜") def execute_task(executor, rule, max_stocks): """执行策略任务""" print(f"\n⚡ 执行策略任务") print(f"股票池规则: {rule}") print(f"最大股票数: {max_stocks}") print("-" * 40) try: result = executor.execute_task( task_id=f"manual_{rule}_{max_stocks}", strategy_id="kline_pattern", stock_pool_rule=rule, max_stocks=max_stocks, send_notification=False ) summary = result.get_summary() print(f"✅ 任务完成:") print(f" 任务ID: {summary['task_id']}") print(f" 成功: {'是' if summary['success'] else '否'}") print(f" 耗时: {summary['execution_time']:.2f} 秒") print(f" 信号数: {summary['total_signals_found']} 个") if summary['error']: print(f" 错误: {summary['error']}") except Exception as e: logger.error(f"任务执行失败: {e}") print(f"❌ 任务失败: {e}") def show_schedule_examples(scheduler, executor): """显示定时任务配置示例""" print(f"\n⏰ 定时任务配置示例:") print("-" * 50) examples = [ { "name": "开盘前热门股扫描", "rule": "weekdays at 09:00", "desc": "每个工作日9点扫描同花顺热榜" }, { "name": "午间龙头股扫描", "rule": "weekdays at 12:30", "desc": "每个工作日12:30扫描龙头股" }, { "name": "收盘后综合扫描", "rule": "weekdays at 15:30", "desc": "每个工作日15:30综合扫描" }, { "name": "高频监控", "rule": "every 15 minutes", "desc": "每15分钟监控一次" } ] for i, example in enumerate(examples, 1): print(f" {i}. {example['name']}") print(f" 规则: {example['rule']}") print(f" 说明: {example['desc']}") print() print("💡 要启用定时任务,请参考 examples/ 目录下的配置示例") if __name__ == "__main__": sys.exit(main())