""" 测试A股板块异动监控 - Tushare 版本(详细调试) """ import sys import os sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from app.config import get_settings def test_tushare_detailed(): """详细测试 Tushare 板块数据""" print("=" * 60) print("Tushare 板块数据详细测试") print("=" * 60) print() settings = get_settings() token = settings.tushare_token from app.astock_agent.tushare_client import TushareClient ts_client = TushareClient(token=token) # 1. 测试获取概念板块列表 print("1. 获取概念板块列表...") sectors_df = ts_client.get_concept_sectors() print(f" ✅ 获取到 {len(sectors_df)} 个概念板块") print(f" 示例板块:") for idx, row in sectors_df.head(5).iterrows(): print(f" - {row['name']}: {row['ts_code']}") print() # 2. 测试获取单个板块行情 print("2. 获取单个板块行情(以人工智能为例)...") ai_sectors = sectors_df[sectors_df['name'].str.contains('人工智能', na=False)] if not ai_sectors.empty: ai_code = ai_sectors.iloc[0]['ts_code'] print(f" 找到人工智能板块: {ai_code}") daily_df = ts_client.get_sector_daily(ai_code) if not daily_df.empty: print(f" ✅ 获取到 {len(daily_df)} 天行情数据") latest = daily_df.sort_values('trade_date').iloc[-1] print(f" 最新行情 ({latest['trade_date']}):") print(f" - 收盘价: {latest['close']:.2f}") print(f" - 涨跌幅: {latest.get('pct_chg', 'N/A')}%") else: print(f" ❌ 行情数据为空") else: print(f" ⚠️ 未找到人工智能板块") print() # 3. 测试获取板块成分股 print("3. 获取板块成分股...") if not ai_sectors.empty: members_df = ts_client.get_sector_members(ai_code) if not members_df.empty: print(f" ✅ 获取到 {len(members_df)} 个成分股") print(f" 前5个成分股:") for idx, row in members_df.head(5).iterrows(): print(f" - {row.get('name', row.get('member_name', 'N/A'))}: {row['ts_code']}") else: print(f" ❌ 成分股数据为空") print() # 4. 测试获取异动板块(降低阈值) print("4. 检测异动板块(阈值 0%)...") hot_df = ts_client.get_hot_sectors(threshold=0.0) if hot_df.empty: print(" ⚠️ 未检测到上涨的板块") else: print(f" ✅ 检测到 {len(hot_df)} 个上涨板块") print(f" 涨幅最高的 5 个板块:") for idx, row in hot_df.head(5).iterrows(): print(f" {idx}. {row['name']}: {row['change_pct']:+.2f}%") print() print("=" * 60) print("测试完成!") print("=" * 60) if __name__ == "__main__": test_tushare_detailed()