201 lines
6.9 KiB
Python
201 lines
6.9 KiB
Python
"""
|
||
板块异动分析
|
||
检测板块涨跌幅、量能、资金流向异动
|
||
"""
|
||
import pandas as pd
|
||
from typing import Dict, List, Optional, Tuple
|
||
from datetime import datetime
|
||
from app.utils.logger import logger
|
||
from app.utils.error_handler import notify_error
|
||
from .akshare_client import get_akshare_client
|
||
|
||
|
||
class SectorChangeAnalyzer:
|
||
"""板块异动分析器"""
|
||
|
||
def __init__(self, change_threshold: float = 2.0):
|
||
"""
|
||
初始化异动分析器
|
||
|
||
Args:
|
||
change_threshold: 涨跌幅阈值(%)
|
||
"""
|
||
self.change_threshold = change_threshold
|
||
self.akshare = get_akshare_client()
|
||
|
||
def detect_sector_changes(self) -> List[Dict]:
|
||
"""
|
||
检测异动板块(使用概念板块)
|
||
|
||
Returns:
|
||
异动板块列表
|
||
"""
|
||
try:
|
||
# 获取概念板块行情
|
||
df = self.akshare.get_concept_spot()
|
||
if df.empty:
|
||
logger.warning("概念板块行情数据为空")
|
||
return []
|
||
|
||
# 转换数据类型(概念板块返回的列名)
|
||
df['涨跌幅'] = pd.to_numeric(df['涨跌幅'], errors='coerce')
|
||
df['涨跌额'] = pd.to_numeric(df['涨跌额'], errors='coerce')
|
||
df['最新价'] = pd.to_numeric(df['最新价'], errors='coerce')
|
||
df['成交额'] = pd.to_numeric(df['成交额'], errors='coerce')
|
||
|
||
# 筛选异动板块
|
||
hot_sectors = df[df['涨跌幅'] >= self.change_threshold].copy()
|
||
|
||
if hot_sectors.empty:
|
||
return []
|
||
|
||
# 排序:涨幅优先,然后成交额
|
||
hot_sectors = hot_sectors.sort_values(
|
||
by=['涨跌幅', '成交额'],
|
||
ascending=[False, False]
|
||
)
|
||
|
||
# 转换为结果列表
|
||
results = []
|
||
for _, row in hot_sectors.iterrows():
|
||
results.append({
|
||
'name': row['板块名称'],
|
||
'change_pct': float(row['涨跌幅']),
|
||
'change_amount': float(row.get('涨跌额', 0)),
|
||
'volume': float(row.get('成交量', 0)) if '成交量' in row else 0.0,
|
||
'amount': float(row.get('成交额', 0)),
|
||
'leading_stock': row.get('领涨股', ''),
|
||
'ups': int(row.get('上涨家数', 0)),
|
||
'downs': int(row.get('下跌家数', 0)),
|
||
'timestamp': datetime.now()
|
||
})
|
||
|
||
logger.info(f"检测到 {len(results)} 个异动概念板块")
|
||
return results
|
||
|
||
except Exception as e:
|
||
error_msg = f"检测板块异动失败: {e}"
|
||
logger.error(error_msg)
|
||
|
||
# 如果是连接错误,发送通知
|
||
if 'Connection' in str(e) or 'RemoteDisconnected' in str(e):
|
||
notify_error(
|
||
title="A股板块监控 - 数据源连接失败",
|
||
message=f"akshare 概念板块 API 连接失败\n\n错误: {e}\n\n可能原因:\n- eastmoney API 服务不稳定\n- 网络连接问题\n- 建议:稍后自动重试或考虑使用 tushare",
|
||
level="warning"
|
||
)
|
||
|
||
return []
|
||
|
||
def analyze_sector_momentum(self, sector_name: str) -> Dict:
|
||
"""
|
||
分析板块动能
|
||
|
||
Args:
|
||
sector_name: 板块名称
|
||
|
||
Returns:
|
||
板块动能分析结果
|
||
"""
|
||
try:
|
||
# 获取板块成分股
|
||
stocks_df = self.akshare.get_concept_stocks(sector_name)
|
||
if stocks_df.empty:
|
||
return {}
|
||
|
||
# 获取实时行情
|
||
spot_df = self.akshare.get_stock_spot()
|
||
if spot_df.empty:
|
||
return {}
|
||
|
||
# 合并数据
|
||
merged = pd.merge(
|
||
stocks_df,
|
||
spot_df,
|
||
on='代码',
|
||
how='inner'
|
||
)
|
||
|
||
if merged.empty:
|
||
return {}
|
||
|
||
# 计算统计
|
||
total_stocks = len(merged)
|
||
up_stocks = len(merged[merged['涨跌幅'] > 0])
|
||
down_stocks = len(merged[merged['涨跌幅'] < 0])
|
||
avg_change = merged['涨跌幅'].mean()
|
||
max_change = merged['涨跌幅'].max()
|
||
|
||
# 计算总成交额
|
||
total_amount = merged['成交额'].sum() if '成交额' in merged.columns else 0
|
||
|
||
# 找出涨幅最大的股票
|
||
if not merged.empty:
|
||
top_stock = merged.loc[merged['涨跌幅'].idxmax()]
|
||
else:
|
||
top_stock = None
|
||
|
||
return {
|
||
'sector_name': sector_name,
|
||
'total_stocks': total_stocks,
|
||
'up_stocks': up_stocks,
|
||
'down_stocks': down_stocks,
|
||
'up_down_ratio': f"{up_stocks}:{down_stocks}",
|
||
'avg_change': float(avg_change) if pd.notna(avg_change) else 0,
|
||
'max_change': float(max_change) if pd.notna(max_change) else 0,
|
||
'total_amount': float(total_amount),
|
||
'top_stock': {
|
||
'code': top_stock['代码'] if top_stock is not None else '',
|
||
'name': top_stock['名称'] if top_stock is not None else '',
|
||
'change': float(top_stock['涨跌幅']) if top_stock is not None else 0,
|
||
} if top_stock is not None else None
|
||
}
|
||
|
||
except Exception as e:
|
||
logger.error(f"分析板块动能失败 {sector_name}: {e}")
|
||
return {}
|
||
|
||
def get_hot_reason(self, sector_name: str, top_stocks: List[Dict]) -> str:
|
||
"""
|
||
推测异动原因(基于龙头股分析)
|
||
|
||
Args:
|
||
sector_name: 板块名称
|
||
top_stocks: 龙头股列表
|
||
|
||
Returns:
|
||
异动原因描述
|
||
"""
|
||
try:
|
||
if not top_stocks:
|
||
return "板块整体异动"
|
||
|
||
# 简单的原因分析
|
||
reasons = []
|
||
|
||
# 检查是否有涨停股
|
||
limit_up_count = sum(1 for s in top_stocks if s.get('change_pct', 0) >= 9.9)
|
||
if limit_up_count > 0:
|
||
reasons.append(f"{limit_up_count}只个股涨停")
|
||
|
||
# 检查平均涨幅
|
||
avg_change = sum(s.get('change_pct', 0) for s in top_stocks) / len(top_stocks)
|
||
if avg_change >= 7:
|
||
reasons.append("板块全线爆发")
|
||
|
||
# 检查是否集中在某个龙头
|
||
if len(top_stocks) >= 2:
|
||
top1_change = top_stocks[0].get('change_pct', 0)
|
||
top2_change = top_stocks[1].get('change_pct', 0)
|
||
if top1_change - top2_change > 3:
|
||
reasons.append(f"{top_stocks[0].get('name', '')}龙头领涨")
|
||
|
||
if reasons:
|
||
return ",".join(reasons)
|
||
else:
|
||
return "资金集中流入"
|
||
|
||
except Exception as e:
|
||
logger.error(f"推测异动原因失败: {e}")
|
||
return "板块异动"
|