astock-agent/backend/app/engine/scheduler.py
2026-06-01 21:29:26 +08:00

162 lines
5.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""盘中调度器
使用 APScheduler 管理盘前/盘中/盘后定时任务。
"""
import logging
import traceback
from datetime import datetime
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTrigger
from app.engine.recommender import refresh_recommendations
from app.engine.watchlist import analyze_watchlist_for_all_users
from app.api.websocket import broadcast_update
logger = logging.getLogger(__name__)
scheduler = AsyncIOScheduler(timezone="Asia/Shanghai")
async def _run_scan(session_name: str):
"""执行一次扫描并推送结果"""
logger.info(f"=== 定时扫描: {session_name} ({datetime.now().strftime('%H:%M:%S')}) ===")
try:
result = await refresh_recommendations(scan_session=session_name)
rec_count = len(result.get("recommendations", []))
logger.info(f"扫描完成: {rec_count} 只推荐股票")
await broadcast_update({
"type": "scan_update",
"session": session_name,
"count": rec_count,
"timestamp": datetime.now().isoformat(),
})
except Exception as e:
logger.error(f"定时扫描失败 ({session_name}): {e}")
from app.db.error_logger import log_error
await log_error("scheduler", f"定时扫描失败 ({session_name}): {e}", detail=traceback.format_exc())
async def _run_news_refresh(session_name: str = "scheduled"):
"""后台采集新闻并更新主题催化分。"""
logger.info("=== 新闻催化刷新: %s ===", session_name)
try:
from app.news.pipeline import refresh_news_catalysts
result = await refresh_news_catalysts()
await broadcast_update({
"type": "news_catalysts_ready",
"session": session_name,
"inserted": result.get("inserted", 0),
"analyzed": result.get("analyzed", 0),
"timestamp": datetime.now().isoformat(),
})
except Exception as e:
logger.error(f"新闻催化刷新失败 ({session_name}): {e}")
from app.db.error_logger import log_error
await log_error("scheduler", f"新闻催化刷新失败 ({session_name}): {e}", detail=traceback.format_exc())
async def _run_watchlist_analysis():
"""收盘后自动分析所有用户自选股。"""
logger.info("=== 开始自选股定时分析 ===")
try:
count = await analyze_watchlist_for_all_users(mode="scheduled")
logger.info(f"自选股定时分析完成: {count}")
except Exception as e:
logger.error(f"自选股定时分析失败: {e}")
from app.db.error_logger import log_error
await log_error("scheduler", f"自选股定时分析失败: {e}", detail=traceback.format_exc())
async def _run_trigger_monitor():
"""盘中买点触发监控 — 每分钟检查埋伏池是否命中条件。"""
try:
from app.engine.trigger_monitor import check_triggers
await check_triggers()
except Exception as e:
logger.debug(f"买点触发监控异常: {e}")
def setup_scheduler():
"""配置所有定时任务(交易日时间)"""
news_schedule = [
("news_pre_market", 8, 50, "pre_market"),
("news_morning", 10, 5, "morning"),
("news_noon", 12, 45, "noon"),
("news_afternoon", 13, 55, "afternoon"),
("news_post_market", 15, 40, "post_market"),
]
for job_id, hour, minute, session_name in news_schedule:
scheduler.add_job(
_run_news_refresh,
CronTrigger(hour=hour, minute=minute, day_of_week="mon-fri"),
args=[session_name],
id=job_id,
replace_existing=True,
)
# 盘前埋伏扫描 09:00
scheduler.add_job(
_run_scan, CronTrigger(hour=9, minute=0, day_of_week="mon-fri"),
args=["pre_market_ambush"], id="pre_market_ambush", replace_existing=True
)
# 盘中扫描
scan_schedule = [
("morning_open_0935", 9, 35, "morning_open"),
("morning_open_0950", 9, 50, "morning_open"),
("morning_mid_1020", 10, 20, "morning_mid"),
("morning_mid_1050", 10, 50, "morning_mid"),
("morning_mid_1120", 11, 20, "morning_mid"),
("afternoon_1310", 13, 10, "afternoon"),
("afternoon_1340", 13, 40, "afternoon"),
("late_1410", 14, 10, "late_session"),
("late_1440", 14, 40, "late_session"),
("close_1500", 15, 0, "late_session"),
]
for job_id, hour, minute, session_name in scan_schedule:
scheduler.add_job(
_run_scan,
CronTrigger(hour=hour, minute=minute, day_of_week="mon-fri"),
args=[session_name],
id=job_id,
replace_existing=True,
)
# 收盘总结 16:00
scheduler.add_job(
_run_scan, CronTrigger(hour=16, minute=0, day_of_week="mon-fri"),
args=["post_market"], id="post_market", replace_existing=True
)
scheduler.add_job(
_run_watchlist_analysis, CronTrigger(hour=16, minute=20, day_of_week="mon-fri"),
id="watchlist_analysis", replace_existing=True
)
# 盘中买点触发监控9:35-15:00 每分钟执行
scheduler.add_job(
_run_trigger_monitor,
IntervalTrigger(minutes=1, start_date="2020-01-01 09:35:00", end_date="2020-01-01 15:00:00"),
id="trigger_monitor",
replace_existing=True,
)
logger.info("盘中调度器已配置完成")
def start_scheduler():
setup_scheduler()
scheduler.start()
logger.info("调度器已启动")
def stop_scheduler():
if scheduler.running:
scheduler.shutdown()
logger.info("调度器已停止")