stock-ai-agent/backend/app/astock_agent/stock_selector.py
2026-02-27 09:54:17 +08:00

193 lines
6.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
龙头股筛选
从异动板块中筛选出龙头股
"""
import pandas as pd
from typing import Dict, List
from datetime import datetime
from app.utils.logger import logger
from .akshare_client import get_akshare_client
class StockSelector:
"""龙头股筛选器"""
def __init__(self, top_n: int = 3):
"""
初始化筛选器
Args:
top_n: 返回前 N 只龙头股
"""
self.top_n = top_n
self.akshare = get_akshare_client()
def select_leading_stocks(self, sector_name: str) -> List[Dict]:
"""
筛选板块龙头股
Args:
sector_name: 板块名称
Returns:
龙头股列表(已排序)
"""
try:
# 获取成分股
stocks_df = self.akshare.get_concept_stocks(sector_name)
if stocks_df.empty:
logger.warning(f"获取板块 {sector_name} 成分股失败")
return []
# 获取实时行情
spot_df = self.akshare.get_stock_spot()
if spot_df.empty:
logger.warning("获取实时行情失败")
return []
# 合并数据
merged = pd.merge(
stocks_df[['代码', '名称']],
spot_df,
on='代码',
how='inner'
)
if merged.empty:
return []
# 数据类型转换
merged['最新价'] = pd.to_numeric(merged['最新价'], errors='coerce')
merged['涨跌幅'] = pd.to_numeric(merged['涨跌幅'], errors='coerce')
merged['涨跌额'] = pd.to_numeric(merged['涨跌额'], errors='coerce')
merged['成交量'] = pd.to_numeric(merged['成交量'], errors='coerce')
merged['成交额'] = pd.to_numeric(merged['成交额'], errors='coerce')
merged['换手率'] = pd.to_numeric(merged['换手率'], errors='coerce')
merged['振幅'] = pd.to_numeric(merged['振幅'], errors='coerce')
merged['量比'] = pd.to_numeric(merged['量比'], errors='coerce')
# 过滤:只保留有成交额的股票
merged = merged[merged['成交额'] > 0].copy()
if merged.empty:
return []
# 计算综合评分
merged['score'] = merged.apply(self._calculate_score, axis=1)
# 排序:按综合得分
merged = merged.sort_values('score', ascending=False)
# 取前 N 只
top_stocks = merged.head(self.top_n)
# 转换结果
results = []
for _, row in top_stocks.iterrows():
# 计算涨速等级
change_pct = row['涨跌幅']
if change_pct >= 5:
speed_level = "⚡⚡⚡ 极快"
elif change_pct >= 3:
speed_level = "⚡⚡ 快速"
elif change_pct >= 1:
speed_level = "⚡ 较快"
else:
speed_level = "🐌 平稳"
results.append({
'code': row['代码'],
'name': row['名称'],
'price': float(row['最新价']),
'change_pct': float(row['涨跌幅']),
'change_amount': float(row['涨跌额']),
'amount': float(row['成交额']),
'turnover': float(row['换手率']),
'volume_ratio': float(row.get('量比', 1)),
'amplitude': float(row.get('振幅', 0)),
'score': float(row['score']),
'speed_level': speed_level,
})
logger.info(f"板块 {sector_name} 龙头股筛选完成Top {len(results)}")
return results
except Exception as e:
logger.error(f"筛选龙头股失败 {sector_name}: {e}")
return []
def _calculate_score(self, row: pd.Series) -> float:
"""
计算综合得分
评分维度:
- 涨跌幅 (40%)
- 成交额 (30%)
- 涨速 (20%)
- 换手率 (10%)
Args:
row: 股票数据行
Returns:
综合得分
"""
score = 0.0
# 1. 涨跌幅得分 (40分) - 涨幅越高得分越高
change_pct = row['涨跌幅']
if change_pct >= 7:
score += 40 # 涨停级别
elif change_pct >= 5:
score += 35
elif change_pct >= 3:
score += 30
elif change_pct >= 2:
score += 25
elif change_pct >= 1:
score += 20
elif change_pct > 0:
score += 15
else:
score += max(0, 10 + change_pct * 5) # 下跌也有基础分
# 2. 成交额得分 (30分) - 成交额越大得分越高
amount = row['成交额']
if amount >= 100000: # 10亿以上
score += 30
elif amount >= 50000: # 5亿以上
score += 25
elif amount >= 10000: # 1亿以上
score += 20
elif amount >= 5000: # 5000万以上
score += 15
elif amount >= 1000: # 1000万以上
score += 10
else:
score += 5
# 3. 涨速得分 (20分) - 简化用涨幅代替
if change_pct >= 5:
score += 20
elif change_pct >= 3:
score += 15
elif change_pct >= 1:
score += 10
else:
score += 5
# 4. 换手率得分 (10分) - 适中换手率加分
turnover = row['换手率']
if 5 <= turnover <= 15:
score += 10 # 适中换手率
elif 15 < turnover <= 25:
score += 8 # 活跃但不过热
elif turnover > 25:
score += 5 # 过热可能回调
elif turnover > 0:
score += 3 # 有成交即可
else:
score += 0
return score