251 lines
7.7 KiB
Python
251 lines
7.7 KiB
Python
"""
|
||
板块异动监控主程序
|
||
协调各个模块,实现监控流程
|
||
"""
|
||
import asyncio
|
||
from typing import List, Dict, Optional
|
||
from datetime import datetime
|
||
from app.utils.logger import logger
|
||
from app.config import get_settings
|
||
from .tushare_client import get_tushare_client
|
||
from .tushare_sector_analyzer import TushareSectorAnalyzer
|
||
from .tushare_stock_selector import TushareStockSelector
|
||
from .notifier import get_dingtalk_notifier
|
||
|
||
|
||
class SectorMonitor:
|
||
"""板块异动监控器"""
|
||
|
||
def __init__(
|
||
self,
|
||
change_threshold: float = 2.0,
|
||
top_n: int = 3,
|
||
enable_notifier: bool = True
|
||
):
|
||
"""
|
||
初始化监控器
|
||
|
||
Args:
|
||
change_threshold: 涨跌幅阈值(%)
|
||
top_n: 每个板块返回前N只龙头股
|
||
enable_notifier: 是否启用钉钉通知
|
||
"""
|
||
self.change_threshold = change_threshold
|
||
self.top_n = top_n
|
||
self.enable_notifier = enable_notifier
|
||
|
||
# 获取 Tushare 客户端
|
||
settings = get_settings()
|
||
ts_client = get_tushare_client(settings.tushare_token)
|
||
if not ts_client:
|
||
logger.warning("Tushare token 未配置,板块监控可能无法正常工作")
|
||
|
||
# 初始化各个模块
|
||
self.analyzer = TushareSectorAnalyzer(ts_client, change_threshold=change_threshold)
|
||
self.selector = TushareStockSelector(ts_client, top_n=top_n)
|
||
self.notifier = get_dingtalk_notifier() if enable_notifier else None
|
||
|
||
# 统计信息
|
||
self.stats = {
|
||
'total_checks': 0,
|
||
'total_hot_sectors': 0,
|
||
'total_stocks': 0,
|
||
'last_check_time': None,
|
||
'last_hot_count': 0
|
||
}
|
||
|
||
async def check_once(self) -> Dict:
|
||
"""
|
||
执行一次检查
|
||
|
||
Returns:
|
||
检查结果统计
|
||
"""
|
||
try:
|
||
logger.info("开始板块异动检查...")
|
||
start_time = datetime.now()
|
||
|
||
# 1. 检测异动板块
|
||
hot_sectors = self.analyzer.detect_sector_changes()
|
||
|
||
if not hot_sectors:
|
||
logger.info("未检测到异动板块")
|
||
self.stats['total_checks'] += 1
|
||
self.stats['last_check_time'] = datetime.now()
|
||
self.stats['last_hot_count'] = 0
|
||
return {
|
||
'hot_sectors': 0,
|
||
'stocks': 0,
|
||
'notified': 0
|
||
}
|
||
|
||
logger.info(f"检测到 {len(hot_sectors)} 个异动板块")
|
||
|
||
# 2. 对每个异动板块进行深度分析
|
||
results = []
|
||
total_stocks = 0
|
||
|
||
for sector in hot_sectors:
|
||
sector_name = sector['name']
|
||
ts_code = sector['ts_code']
|
||
|
||
# 筛选龙头股(Tushare 版本需要 ts_code)
|
||
top_stocks = self.selector.select_leading_stocks(ts_code, sector_name)
|
||
|
||
if not top_stocks:
|
||
logger.warning(f"板块 {sector_name} 未找到龙头股")
|
||
continue
|
||
|
||
# 分析异动原因
|
||
reason = self.analyzer.get_hot_reason(sector_name, top_stocks)
|
||
|
||
# 发送钉钉通知
|
||
notified = False
|
||
if self.notifier:
|
||
notified = self.notifier.send_sector_alert(
|
||
sector_data=sector,
|
||
top_stocks=top_stocks,
|
||
reason=reason
|
||
)
|
||
|
||
results.append({
|
||
'sector': sector,
|
||
'stocks': top_stocks,
|
||
'reason': reason,
|
||
'notified': notified
|
||
})
|
||
|
||
total_stocks += len(top_stocks)
|
||
logger.info(
|
||
f"板块 {sector_name}: {len(top_stocks)} 只龙头股, "
|
||
f"原因: {reason}, 通知: {'成功' if notified else '失败'}"
|
||
)
|
||
|
||
# 更新统计
|
||
self.stats['total_checks'] += 1
|
||
self.stats['total_hot_sectors'] += len(hot_sectors)
|
||
self.stats['total_stocks'] += total_stocks
|
||
self.stats['last_check_time'] = datetime.now()
|
||
self.stats['last_hot_count'] = len(hot_sectors)
|
||
|
||
elapsed = (datetime.now() - start_time).total_seconds()
|
||
logger.info(
|
||
f"检查完成: {len(hot_sectors)} 个异动板块, "
|
||
f"{total_stocks} 只龙头股, 耗时 {elapsed:.2f}秒"
|
||
)
|
||
|
||
return {
|
||
'hot_sectors': len(hot_sectors),
|
||
'stocks': total_stocks,
|
||
'notified': sum(1 for r in results if r['notified']),
|
||
'results': results
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"板块异动检查失败: {e}")
|
||
# 发送错误通知
|
||
if self.notifier:
|
||
self.notifier.send_error(str(e))
|
||
return {
|
||
'hot_sectors': 0,
|
||
'stocks': 0,
|
||
'notified': 0,
|
||
'error': str(e)
|
||
}
|
||
|
||
async def run_periodic(self, interval_minutes: int = 30, max_runs: int = None):
|
||
"""
|
||
周期性运行监控
|
||
|
||
Args:
|
||
interval_minutes: 检查间隔(分钟)
|
||
max_runs: 最大运行次数(None表示无限运行)
|
||
"""
|
||
logger.info(
|
||
f"启动周期性监控: 间隔 {interval_minutes}分钟, "
|
||
f"阈值 {self.change_threshold}%, Top{self.top_n}"
|
||
)
|
||
|
||
run_count = 0
|
||
|
||
try:
|
||
while True:
|
||
# 检查是否达到最大运行次数
|
||
if max_runs and run_count >= max_runs:
|
||
logger.info(f"已达到最大运行次数 {max_runs},停止监控")
|
||
break
|
||
|
||
# 执行检查
|
||
await self.check_once()
|
||
run_count += 1
|
||
|
||
# 等待下一次检查
|
||
if interval_minutes > 0:
|
||
logger.info(f"等待 {interval_minutes} 分钟后进行下次检查...")
|
||
await asyncio.sleep(interval_minutes * 60)
|
||
else:
|
||
break
|
||
|
||
except asyncio.CancelledError:
|
||
logger.info("监控任务被取消")
|
||
|
||
except Exception as e:
|
||
logger.error(f"周期性监控异常: {e}")
|
||
if self.notifier:
|
||
self.notifier.send_error(f"周期性监控异常: {e}")
|
||
|
||
def get_stats(self) -> Dict:
|
||
"""
|
||
获取统计信息
|
||
|
||
Returns:
|
||
统计信息字典
|
||
"""
|
||
return {
|
||
**self.stats,
|
||
'avg_stocks_per_check': (
|
||
self.stats['total_stocks'] / self.stats['total_checks']
|
||
if self.stats['total_checks'] > 0 else 0
|
||
)
|
||
}
|
||
|
||
def send_summary_report(self) -> bool:
|
||
"""
|
||
发送汇总报告
|
||
|
||
Returns:
|
||
是否发送成功
|
||
"""
|
||
if not self.notifier:
|
||
return False
|
||
|
||
return self.notifier.send_summary(
|
||
total_sectors=self.stats['total_hot_sectors'],
|
||
total_stocks=self.stats['total_stocks']
|
||
)
|
||
|
||
|
||
# 快捷函数
|
||
async def quick_check(
|
||
change_threshold: float = 2.0,
|
||
top_n: int = 3,
|
||
enable_notifier: bool = True
|
||
) -> Dict:
|
||
"""
|
||
快捷检查函数
|
||
|
||
Args:
|
||
change_threshold: 涨跌幅阈值(%)
|
||
top_n: 每个板块返回前N只龙头股
|
||
enable_notifier: 是否启用钉钉通知
|
||
|
||
Returns:
|
||
检查结果
|
||
"""
|
||
monitor = SectorMonitor(
|
||
change_threshold=change_threshold,
|
||
top_n=top_n,
|
||
enable_notifier=enable_notifier
|
||
)
|
||
return await monitor.check_once()
|