"""盘中调度器 使用 APScheduler 管理盘前/盘中/盘后定时任务。 """ import logging import traceback from datetime import datetime from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.cron import CronTrigger from apscheduler.triggers.interval import IntervalTrigger from app.engine.recommender import refresh_recommendations from app.engine.watchlist import analyze_watchlist_for_all_users from app.api.websocket import broadcast_update logger = logging.getLogger(__name__) scheduler = AsyncIOScheduler(timezone="Asia/Shanghai") async def _run_scan(session_name: str): """执行一次扫描并推送结果""" logger.info(f"=== 定时扫描: {session_name} ({datetime.now().strftime('%H:%M:%S')}) ===") try: result = await refresh_recommendations(scan_session=session_name) rec_count = len(result.get("recommendations", [])) logger.info(f"扫描完成: {rec_count} 只推荐股票") await broadcast_update({ "type": "scan_update", "session": session_name, "count": rec_count, "timestamp": datetime.now().isoformat(), }) except Exception as e: logger.error(f"定时扫描失败 ({session_name}): {e}") from app.db.error_logger import log_error await log_error("scheduler", f"定时扫描失败 ({session_name}): {e}", detail=traceback.format_exc()) async def _run_news_refresh(session_name: str = "scheduled"): """后台采集新闻并更新主题催化分。""" logger.info("=== 新闻催化刷新: %s ===", session_name) try: from app.news.pipeline import refresh_news_catalysts result = await refresh_news_catalysts() await broadcast_update({ "type": "news_catalysts_ready", "session": session_name, "inserted": result.get("inserted", 0), "analyzed": result.get("analyzed", 0), "timestamp": datetime.now().isoformat(), }) except Exception as e: logger.error(f"新闻催化刷新失败 ({session_name}): {e}") from app.db.error_logger import log_error await log_error("scheduler", f"新闻催化刷新失败 ({session_name}): {e}", detail=traceback.format_exc()) async def _run_watchlist_analysis(): """收盘后自动分析所有用户自选股。""" logger.info("=== 开始自选股定时分析 ===") try: count = await analyze_watchlist_for_all_users(mode="scheduled") logger.info(f"自选股定时分析完成: {count} 条") except Exception as e: logger.error(f"自选股定时分析失败: {e}") from app.db.error_logger import log_error await log_error("scheduler", f"自选股定时分析失败: {e}", detail=traceback.format_exc()) async def _run_trigger_monitor(): """盘中买点触发监控 — 每分钟检查埋伏池是否命中条件。""" try: from app.engine.trigger_monitor import check_triggers await check_triggers() except Exception as e: logger.debug(f"买点触发监控异常: {e}") def setup_scheduler(): """配置所有定时任务(交易日时间)""" news_schedule = [ ("news_pre_market", 8, 50, "pre_market"), ("news_morning", 10, 5, "morning"), ("news_noon", 12, 45, "noon"), ("news_afternoon", 13, 55, "afternoon"), ("news_post_market", 15, 40, "post_market"), ] for job_id, hour, minute, session_name in news_schedule: scheduler.add_job( _run_news_refresh, CronTrigger(hour=hour, minute=minute, day_of_week="mon-fri"), args=[session_name], id=job_id, replace_existing=True, ) # 盘前埋伏扫描 09:00 scheduler.add_job( _run_scan, CronTrigger(hour=9, minute=0, day_of_week="mon-fri"), args=["pre_market_ambush"], id="pre_market_ambush", replace_existing=True ) # 盘中扫描 scan_schedule = [ ("morning_open_0935", 9, 35, "morning_open"), ("morning_open_0950", 9, 50, "morning_open"), ("morning_mid_1020", 10, 20, "morning_mid"), ("morning_mid_1050", 10, 50, "morning_mid"), ("morning_mid_1120", 11, 20, "morning_mid"), ("afternoon_1310", 13, 10, "afternoon"), ("afternoon_1340", 13, 40, "afternoon"), ("late_1410", 14, 10, "late_session"), ("late_1440", 14, 40, "late_session"), ("close_1500", 15, 0, "late_session"), ] for job_id, hour, minute, session_name in scan_schedule: scheduler.add_job( _run_scan, CronTrigger(hour=hour, minute=minute, day_of_week="mon-fri"), args=[session_name], id=job_id, replace_existing=True, ) # 收盘总结 16:00 scheduler.add_job( _run_scan, CronTrigger(hour=16, minute=0, day_of_week="mon-fri"), args=["post_market"], id="post_market", replace_existing=True ) scheduler.add_job( _run_watchlist_analysis, CronTrigger(hour=16, minute=20, day_of_week="mon-fri"), id="watchlist_analysis", replace_existing=True ) # 盘中买点触发监控:9:35-15:00 每分钟执行 scheduler.add_job( _run_trigger_monitor, IntervalTrigger(minutes=1, start_date="2020-01-01 09:35:00", end_date="2020-01-01 15:00:00"), id="trigger_monitor", replace_existing=True, ) logger.info("盘中调度器已配置完成") def start_scheduler(): setup_scheduler() scheduler.start() logger.info("调度器已启动") def stop_scheduler(): if scheduler.running: scheduler.shutdown() logger.info("调度器已停止")