""" 板块异动分析(Tushare 版本) 检测板块涨跌幅、量能、资金流向异动 """ import pandas as pd from typing import Dict, List, Optional, Tuple from datetime import datetime from app.utils.logger import logger from app.utils.error_handler import notify_error class TushareSectorAnalyzer: """板块异动分析器(使用 Tushare 同花顺接口)""" def __init__(self, tushare_client, change_threshold: float = 2.0): """ 初始化异动分析器 Args: tushare_client: TushareClient 实例 change_threshold: 涨跌幅阈值(%) """ self.change_threshold = change_threshold self.ts_client = tushare_client def detect_sector_changes(self) -> List[Dict]: """ 检测异动板块 Returns: 异动板块列表 """ try: # 使用 tushare 获取异动板块(一次性获取) df = self.ts_client.get_hot_sectors(threshold=self.change_threshold) if df.empty: logger.info("未检测到异动板块") return [] # 转换为结果列表 results = [] for _, row in df.iterrows(): # 成交额转换为万元 amount_wan = row['amount'] / 10000 if row['amount'] > 0 else 0 # 格式化成交额显示 if amount_wan >= 100000: amount_str = f"{amount_wan/100000:.1f}亿" elif amount_wan >= 10000: amount_str = f"{amount_wan/10000:.1f}万" else: amount_str = f"{amount_wan:.0f}元" results.append({ 'name': row['name'], 'ts_code': row['ts_code'], 'change_pct': float(row['change_pct']), 'change': float(row.get('change', 0)), # 涨跌额 'close': float(row['close']), 'amount': float(row['amount']), 'amount_str': amount_str, 'volume': float(row['volume']), 'turnover_rate': float(row.get('turnover_rate', 0)), # 换手率 'trade_date': row['trade_date'], 'timestamp': datetime.now() }) logger.info(f"检测到 {len(results)} 个异动概念板块(Tushare)") return results except Exception as e: error_msg = f"Tushare 检测板块异动失败: {e}" logger.error(error_msg) # 发送通知 notify_error( title="A股板块监控 - Tushare 数据获取失败", message=f"错误: {e}\n\n可能原因:\n- Tushare token 未配置或无效\n- API 频率限制\n- 网络连接问题", level="warning" ) return [] def get_sector_stocks(self, ts_code: str, sector_name: str) -> List[Dict]: """ 获取板块成分股 Args: ts_code: 板块指数代码 sector_name: 板块名称 Returns: 成分股列表 """ try: # 获取成分股 members_df = self.ts_client.get_sector_members(ts_code) if members_df.empty: logger.warning(f"板块 {sector_name} 成分股数据为空") return [] # 获取成分股的行情数据 stock_codes = members_df['ts_code'].tolist() # 限制数量,避免请求过多 if len(stock_codes) > 50: stock_codes = stock_codes[:50] # 获取实时行情 realtime_df = self.ts_client.get_realtime_data(stock_codes) if realtime_df.empty: logger.warning(f"板块 {sector_name} 成分股行情为空") return [] # 合并数据 merged = pd.merge( members_df, realtime_df, on='ts_code', how='inner' ) if merged.empty: return [] # 转换结果 results = [] for _, row in merged.iterrows(): results.append({ 'code': row['ts_code'], 'name': row.get('name', row.get('member_name', '')), 'price': float(row.get('close', 0)), 'change_pct': float(row.get('pct_chg', 0)), 'change_amount': float(row.get('change', 0)), 'amount': float(row.get('amount', 0)), 'volume': float(row.get('vol', 0)), }) return results except Exception as e: logger.error(f"获取板块 {sector_name} 成分股失败: {e}") return [] def get_hot_reason(self, sector_name: str, top_stocks: List[Dict]) -> str: """ 推测异动原因(基于龙头股分析) Args: sector_name: 板块名称 top_stocks: 龙头股列表 Returns: 异动原因描述 """ try: if not top_stocks: return "板块整体异动" reasons = [] # 检查是否有涨停股 limit_up_count = sum(1 for s in top_stocks if s.get('change_pct', 0) >= 9.9) if limit_up_count > 0: reasons.append(f"{limit_up_count}只个股涨停") # 检查平均涨幅 avg_change = sum(s.get('change_pct', 0) for s in top_stocks) / len(top_stocks) if avg_change >= 7: reasons.append("板块全线爆发") # 检查是否集中在某个龙头 if len(top_stocks) >= 2: top1_change = top_stocks[0].get('change_pct', 0) top2_change = top_stocks[1].get('change_pct', 0) if top1_change - top2_change > 3: reasons.append(f"{top_stocks[0].get('name', '')}龙头领涨") if reasons: return ",".join(reasons) else: return "资金集中流入" except Exception as e: logger.error(f"推测异动原因失败: {e}") return "板块异动"