""" 测试A股板块异动监控 - Tushare 版本 运行一次检查并打印结果并发送钉钉通知 """ import asyncio import sys import os # 添加项目根目录到 Python 路径 sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from app.config import get_settings from app.utils.logger import logger def test_tushare_sector_monitor(): """测试板块异动监控(Tushare 版本)""" print("=" * 60) print("A股板块异动监控测试 - Tushare 版本") print("=" * 60) print() # 获取配置 settings = get_settings() token = settings.tushare_token if not token: print("❌ Tushare token 未配置!") print("请在 .env 文件中设置 TUSHARE_TOKEN") return print(f"✅ Tushare token 已配置: {token[:10]}...") print() # 导入 Tushare 客户端和通知器 from app.astock_agent.tushare_client import TushareClient from app.astock_agent.tushare_sector_analyzer import TushareSectorAnalyzer from app.astock_agent.tushare_stock_selector import TushareStockSelector from app.astock_agent.notifier import DingTalkNotifier # 检查钉钉配置 webhook = settings.dingtalk_astock_webhook or settings.dingtalk_webhook_url secret = settings.dingtalk_astock_secret or settings.dingtalk_secret notifier = None if webhook: notifier = DingTalkNotifier(webhook, secret) print(f"✅ 钉钉通知已配置") else: print("⚠️ 钉钉通知未配置,仅打印结果") print() # 配置参数 change_threshold = 2.0 # 涨跌幅阈值 2% top_n = 3 # 每个板块返回前3只龙头股 print(f"配置参数:") print(f" - 涨跌幅阈值: {change_threshold}%") print(f" - 龙头股数量: Top{top_n}") print() # 创建 Tushare 客户端 print("初始化 Tushare 客户端...") ts_client = TushareClient(token=token) print("✅ Tushare 客户端初始化成功") print() # 创建分析器 print("-" * 60) print("开始检查板块异动...") print("-" * 60) print() analyzer = TushareSectorAnalyzer(ts_client, change_threshold=change_threshold) # 执行检查 hot_sectors = analyzer.detect_sector_changes() if not hot_sectors: print("未检测到异动板块") return print(f"检测到 {len(hot_sectors)} 个异动板块:") print() # 显示异动板块详情 notified_count = 0 for idx, sector in enumerate(hot_sectors, 1): print(f"{idx}. 【{sector['name']}】") print(f" 代码: {sector['ts_code']}") print(f" 涨跌幅: {sector['change_pct']:+.2f}%") print(f" 收盘价: {sector['close']:.2f}") print(f" 成交额: {sector['amount_str']}") print(f" 交易日期: {sector['trade_date']}") print() # 获取龙头股 print(f" 正在获取龙头股...") selector = TushareStockSelector(ts_client, top_n=top_n) top_stocks = selector.select_leading_stocks(sector['ts_code'], sector['name']) if top_stocks: print(f" 🏆 龙头股 Top {len(top_stocks)}:") for stock in top_stocks: # 成交额格式化 amount = stock['amount'] if amount >= 100000000: amount_str = f"{amount/100000000:.2f}亿" elif amount >= 10000: amount_str = f"{amount/10000:.2f}万" else: amount_str = f"{amount:.0f}元" print(f" {stock['name']} ({stock['code']})") print(f" 价格: {stock['price']:.2f} | 涨跌幅: {stock['change_pct']:+.2f}% {stock['speed_level']}") print(f" 成交额: {amount_str} | 评分: {stock['score']:.1f}") else: print(f" ⚠️ 未找到龙头股") print() # 发送钉钉通知 if notifier and top_stocks: # 使用实际的板块数据 sector_for_notify = { 'name': sector['name'], 'change_pct': sector['change_pct'], 'change_amount': sector.get('change', 0), # 涨跌额 'amount': sector['amount'], 'leading_stock': top_stocks[0]['name'] if top_stocks else '', 'ts_code': sector['ts_code'], } # 分析异动原因 reason = analyzer.get_hot_reason(sector['name'], top_stocks) # 发送通知 print(f" 📲 发送钉钉通知...") success = notifier.send_sector_alert( sector_data=sector_for_notify, top_stocks=top_stocks, reason=reason ) if success: print(f" ✅ 通知发送成功") notified_count += 1 else: print(f" ❌ 通知发送失败") print() print("=" * 60) print("测试完成!") print(f"检测到 {len(hot_sectors)} 个异动板块") if notifier: print(f"发送通知 {notified_count}/{len(hot_sectors)}") print("=" * 60) if __name__ == "__main__": try: test_tushare_sector_monitor() except KeyboardInterrupt: print("\n\n测试被用户中断") except Exception as e: print(f"\n\n测试失败: {e}") import traceback traceback.print_exc()