#!/usr/bin/env python3 """ 数据采集服务 定时运行策略扫描 """ import time import schedule import sys from pathlib import Path # 添加项目根目录到路径 current_dir = Path(__file__).parent project_root = current_dir.parent sys.path.insert(0, str(project_root)) from src.strategy.kline_pattern_strategy import KLinePatternStrategy from src.utils.config_loader import ConfigLoader from src.data.data_fetcher import ADataFetcher from src.utils.notification import NotificationManager from src.database.database_manager import DatabaseManager from loguru import logger def run_strategy(): """运行策略扫描""" try: logger.info('开始运行策略扫描...') # 初始化各个组件 config_loader = ConfigLoader() config = config_loader.config # 初始化数据获取器 data_fetcher = ADataFetcher(config) # 初始化通知管理器 notification_manager = NotificationManager(config) # 初始化数据库管理器 db_manager = DatabaseManager() # 初始化策略 strategy = KLinePatternStrategy( data_fetcher=data_fetcher, notification_manager=notification_manager, config=config, db_manager=db_manager ) # 这里可以添加具体的策略运行逻辑 # strategy.scan_all_stocks() logger.info('策略扫描完成') except Exception as e: logger.error(f'策略运行失败: {e}') def main(): """主函数""" logger.info('数据采集服务已启动') # 每天9点和15点运行策略 schedule.every().day.at('09:00').do(run_strategy) schedule.every().day.at('15:00').do(run_strategy) # 立即运行一次(用于测试) logger.info('立即执行一次策略扫描...') run_strategy() # 主循环 while True: schedule.run_pending() time.sleep(60) if __name__ == '__main__': main()