""" 测试A股板块异动监控 运行一次检查并打印结果,不发送钉钉通知 """ import asyncio import sys import os # 添加项目根目录到 Python 路径 sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from app.astock_agent import SectorMonitor from app.utils.logger import logger async def test_sector_monitor(): """测试板块异动监控""" print("=" * 60) print("A股板块异动监控测试") print("=" * 60) print() # 配置参数 change_threshold = 2.0 # 涨跌幅阈值 2% top_n = 3 # 每个板块返回前3只龙头股 print(f"配置参数:") print(f" - 涨跌幅阈值: {change_threshold}%") print(f" - 龙头股数量: Top{top_n}") print(f" - 钉钉通知: 已禁用(测试模式)") print() # 创建监控器(不启用通知) monitor = SectorMonitor( change_threshold=change_threshold, top_n=top_n, enable_notifier=False # 测试时不发送通知 ) print("开始检查板块异动...") print("-" * 60) print() # 执行一次检查 result = await monitor.check_once() # 打印结果 print() print("-" * 60) print("检查结果:") print(f" - 异动板块数: {result['hot_sectors']}") print(f" - 龙头股总数: {result['stocks']}") print() # 显示详细信息 if 'results' in result and result['results']: print("=" * 60) print("异动板块详情:") print("=" * 60) print() for idx, item in enumerate(result['results'], 1): sector = item['sector'] stocks = item['stocks'] reason = item['reason'] # 板块信息 change_icon = "📈" if sector['change_pct'] > 0 else "📉" print(f"{idx}. 【{sector['name']}】{change_icon} {sector['change_pct']:+.2f}%") print(f" - 涨跌额: {sector['change_amount']:+.2f}") print(f" - 上涨家数: {sector['ups']}") print(f" - 下跌家数: {sector['downs']}") print(f" - 异动原因: {reason}") print() # 龙头股信息 print(f" 🏆 龙头股 Top {len(stocks)}:") for s_idx, stock in enumerate(stocks, 1): change_pct = stock['change_pct'] if change_pct >= 5: speed_icon = "⚡⚡⚡" elif change_pct >= 3: speed_icon = "⚡⚡" elif change_pct >= 1: speed_icon = "⚡" else: speed_icon = "🐌" # 成交额格式化 amount = stock['amount'] if amount >= 100000: amount_str = f"{amount/100000:.1f}亿" elif amount >= 10000: amount_str = f"{amount/10000:.1f}万" else: amount_str = f"{amount:.0f}元" print(f" {s_idx}. {stock['name']} ({stock['code']})") print(f" 价格: ¥{stock['price']:.2f} | 涨跌幅: {change_pct:+.2f}% {speed_icon}") print(f" 成交额: {amount_str} | 换手率: {stock['turnover']:.2f}%") print(f" 评分: {stock['score']:.1f} | 涨速: {stock['speed_level']}") print() # 显示统计信息 print("=" * 60) print("统计信息:") print("=" * 60) stats = monitor.get_stats() print(f" - 总检查次数: {stats['total_checks']}") print(f" - 总异动板块: {stats['total_hot_sectors']}") print(f" - 总龙头股数: {stats['total_stocks']}") if stats['total_checks'] > 0: print(f" - 平均龙头股: {stats['avg_stocks_per_check']:.1f}") print(f" - 最后检查时间: {stats['last_check_time']}") print() print("=" * 60) print("测试完成!") print("=" * 60) if __name__ == "__main__": try: asyncio.run(test_sector_monitor()) except KeyboardInterrupt: print("\n\n测试被用户中断") except Exception as e: print(f"\n\n测试失败: {e}") import traceback traceback.print_exc()