astock-agent/backend/app/analysis/intraday.py
2026-04-07 20:51:00 +08:00

244 lines
7.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""盘中实时扫描
盘中 Tushare 的 daily / moneyflow / limit_list_d 等接口尚无当日数据,
因此盘中扫描采用混合策略:
- 板块热度:使用前一日 Tushare 数据作为基线(哪些板块是近期热点)
- 个股筛选:用腾讯实时行情替代 Tushare 的当日 daily_basic / moneyflow
- 技术面:历史 K 线Tushare + 当日实时价格(腾讯)拼接后计算
盘中重点捕捉:
1. 热门板块成分股中,盘中放量拉升的个股
2. 量比 > 1.5、换手率适中、涨幅在 2%-7% 的活跃股
3. 用实时价格更新支撑/压力位和技术信号
"""
import logging
import pandas as pd
from datetime import datetime
from app.data.tushare_client import tushare_client
from app.data import tencent_client
from app.data.models import SectorInfo, Recommendation, MarketTemperature, StockQuote
from app.analysis.sector_scanner import scan_hot_sectors
from app.analysis.technical import add_all_indicators
from app.analysis.signals import generate_signals
from app.analysis.capital_flow import _score_valuation
from app.config import settings
logger = logging.getLogger(__name__)
async def intraday_market_temperature(prev_temp: MarketTemperature) -> MarketTemperature:
"""盘中市场温度:在前一日基线上叠加实时指数涨跌"""
index_data = await tencent_client.get_index_realtime()
sh_index = index_data.get("000001.SH")
if not sh_index:
return prev_temp
# 盘中温度调整:基于上证实时涨跌幅微调
pct = sh_index.get("pct_chg", 0)
adjustment = 0
if pct > 1.5:
adjustment = 10
elif pct > 0.5:
adjustment = 5
elif pct > 0:
adjustment = 2
elif pct > -0.5:
adjustment = -2
elif pct > -1.5:
adjustment = -5
else:
adjustment = -10
new_temp = min(max(prev_temp.temperature + adjustment, 0), 100)
return MarketTemperature(
trade_date=datetime.now().strftime("%Y%m%d"),
up_count=prev_temp.up_count,
down_count=prev_temp.down_count,
limit_up_count=prev_temp.limit_up_count,
limit_down_count=prev_temp.limit_down_count,
max_streak=prev_temp.max_streak,
broken_rate=prev_temp.broken_rate,
index_above_ma20=prev_temp.index_above_ma20,
temperature=round(new_temp, 1),
)
async def intraday_filter_stocks(
hot_sectors: list[SectorInfo],
) -> list[dict]:
"""盘中个股筛选:从热门板块成分股中用腾讯实时行情筛选
替代 capital_flow.py 中依赖 Tushare 日级数据的逻辑。
盘中无法获取资金流向,改用 量比 + 换手率 + 涨幅 + 市值 筛选。
"""
# 收集热门板块成分股
stock_sectors: dict[str, list[str]] = {}
for sector in hot_sectors:
members = tushare_client.get_ths_members(sector.sector_code)
if members.empty or "con_code" not in members.columns:
continue
for _, m in members.iterrows():
code = m.get("con_code", "")
if not code or "." not in str(code):
continue
if code not in stock_sectors:
stock_sectors[code] = []
stock_sectors[code].append(sector.sector_name)
if not stock_sectors:
logger.warning("盘中筛选: 热门板块成分股为空")
return []
all_codes = list(stock_sectors.keys())
logger.info(f"盘中筛选: {len(all_codes)} 只成分股,开始获取实时行情")
# 过滤 ST 和次新
stock_basic = tushare_client.get_stock_basic()
st_codes = set()
new_codes = set()
if not stock_basic.empty:
st_codes = set(stock_basic[stock_basic["name"].str.contains("ST", na=False)]["ts_code"])
from datetime import timedelta
cutoff = (datetime.now() - timedelta(days=settings.min_list_days)).strftime("%Y%m%d")
new_codes = set(stock_basic[stock_basic["list_date"] > cutoff]["ts_code"])
eligible = [c for c in all_codes if c not in st_codes and c not in new_codes]
# 获取前一日 daily_basicPE/PB 估值数据)
prev_date = tushare_client.get_latest_trade_date()
basic_df = tushare_client.get_daily_basic(prev_date)
# 批量获取腾讯实时行情
quotes = await tencent_client.get_realtime_quotes_batch(eligible)
results = []
for ts_code, quote in quotes.items():
# 硬性条件
if quote.pct_chg <= 0:
continue # 盘中只关注上涨股
if quote.pct_chg > 9.8:
continue # 已涨停,无买入空间
if quote.turnover_rate < settings.min_turnover_rate:
continue
if quote.turnover_rate > settings.max_turnover_rate:
continue
if quote.circ_mv is not None:
if quote.circ_mv < settings.min_circ_mv or quote.circ_mv > settings.max_circ_mv:
continue
# 评分
score = _score_intraday(quote)
# 估值评分(用前一日 PE/PB
pe = None
pb = None
if not basic_df.empty:
b_row = basic_df[basic_df["ts_code"] == ts_code]
if not b_row.empty:
b = b_row.iloc[0]
pe = float(b["pe_ttm"]) if pd.notna(b.get("pe_ttm")) else None
pb = float(b["pb"]) if pd.notna(b.get("pb")) else None
valuation_score = _score_valuation(pe, pb)
# 名称
name = quote.name or ts_code
results.append({
"ts_code": ts_code,
"name": name,
"sector": stock_sectors[ts_code][0],
"sectors": stock_sectors[ts_code],
"price": quote.price,
"pct_chg": quote.pct_chg,
"turnover_rate": quote.turnover_rate,
"volume_ratio": quote.volume_ratio,
"circ_mv": quote.circ_mv,
"capital_score": round(score, 1),
"valuation_score": round(valuation_score, 1),
# 盘中无资金流向数据
"main_net_inflow": 0,
"inflow_ratio": 0,
})
results.sort(key=lambda x: x["capital_score"], reverse=True)
top = results[:settings.top_stock_count]
for r in top:
logger.info(
f"盘中筛选: {r['name']}({r['ts_code']}) 板块={r['sector']} "
f"涨幅={r['pct_chg']}% 量比={r['volume_ratio']} "
f"换手率={r['turnover_rate']}% 评分={r['capital_score']}"
)
return top
def _score_intraday(quote: StockQuote) -> float:
"""盘中评分逻辑(替代资金流向评分)
盘中无法获取大单/特大单数据,改用以下指标:
- 涨幅区间 (25%): 2%-7% 为活跃区间
- 量比 (30%): > 1.5 说明资金活跃
- 换手率 (20%): 5%-10% 最佳
- 振幅 (10%): 适度振幅说明活跃
- 市值适配 (15%): 100-300亿最佳
"""
score = 0.0
# 涨幅区间 (25分)
pct = quote.pct_chg
if 3 <= pct <= 7:
score += 25
elif 2 <= pct < 3:
score += 18
elif 7 < pct <= 9:
score += 15
elif 0 < pct < 2:
score += 8
# 量比 (30分)
vr = quote.volume_ratio
if vr is not None:
if vr > 3.0:
score += 30
elif vr > 2.0:
score += 24
elif vr > 1.5:
score += 18
elif vr > 1.0:
score += 10
# 换手率 (20分)
tr = quote.turnover_rate
if 5 <= tr <= 10:
score += 20
elif 3 <= tr < 5:
score += 14
elif 10 < tr <= 15:
score += 10
# 振幅 (10分) - 适度振幅
amp = quote.amplitude or 0
if 3 <= amp <= 8:
score += 10
elif 2 <= amp < 3:
score += 6
elif amp > 8:
score += 3
# 市值适配 (15分)
if quote.circ_mv is not None:
mv = quote.circ_mv
if 100 <= mv <= 300:
score += 15
elif settings.min_circ_mv <= mv <= settings.max_circ_mv:
score += 10
elif mv > 0:
score += 3
return score