astock-agent/backend/app/analysis/intraday.py
2026-04-23 17:59:43 +08:00

450 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""盘中实时扫描
盘中 Tushare 的 daily / moneyflow / limit_list_d 等接口尚无当日数据,
因此盘中扫描采用混合策略:
- 板块热度:使用前一日 Tushare 数据作为基线(哪些板块是近期热点)
- 个股筛选:用腾讯实时行情替代 Tushare 的当日 daily_basic / moneyflow
- 技术面:历史 K 线Tushare + 当日实时价格(腾讯)拼接后计算
盘中重点捕捉:
1. 热门板块成分股中,盘中放量拉升的个股
2. 量比 > 1.5、换手率适中、涨幅在 2%-7% 的活跃股
3. 用实时价格更新支撑/压力位和技术信号
"""
import logging
import httpx
import pandas as pd
from datetime import datetime
from app.data.tushare_client import tushare_client
from app.data import tencent_client
from app.data.models import SectorInfo, Recommendation, MarketTemperature, StockQuote
from app.data.eastmoney_client import (
SECTOR_LIST_URL,
SECTOR_HEADERS,
_parse_eastmoney_json,
get_a_share_realtime_ranking,
)
from app.analysis.sector_scanner import scan_hot_sectors
from app.analysis.technical import add_all_indicators
from app.analysis.signals import generate_signals
from app.analysis.capital_flow import _score_valuation
from app.config import settings
logger = logging.getLogger(__name__)
async def intraday_market_temperature(prev_temp: MarketTemperature) -> MarketTemperature:
"""盘中市场温度:用腾讯实时行情计算涨跌数据 + 东方财富统计涨停跌停"""
index_data = await tencent_client.get_index_realtime()
sh_index = index_data.get("000001.SH")
if not sh_index:
return prev_temp
# ── 用腾讯实时行情计算涨跌数量 ──
up_count = prev_temp.up_count
down_count = prev_temp.down_count
limit_up_count = prev_temp.limit_up_count
limit_down_count = prev_temp.limit_down_count
eastmoney_quotes = await get_a_share_realtime_ranking(page_size=6000)
if eastmoney_quotes:
up_count = sum(1 for q in eastmoney_quotes if q.get("pct_chg", 0) > 0)
down_count = sum(1 for q in eastmoney_quotes if q.get("pct_chg", 0) < 0)
limit_up_count = sum(1 for q in eastmoney_quotes if q.get("pct_chg", 0) >= _limit_threshold(q.get("ts_code", "")))
limit_down_count = sum(1 for q in eastmoney_quotes if q.get("pct_chg", 0) <= -_limit_threshold(q.get("ts_code", "")))
logger.info(
"东方财富实时市场温度: 上涨=%s 下跌=%s 涨停=%s 跌停=%s (共%s只)",
up_count,
down_count,
limit_up_count,
limit_down_count,
len(eastmoney_quotes),
)
else:
logger.warning("东方财富全市场实时行情为空,尝试腾讯批量行情补充涨跌家数")
try:
if not eastmoney_quotes:
stock_basic = tushare_client.get_stock_basic()
if stock_basic.empty:
raise ValueError("股票基础列表为空")
all_codes = stock_basic[~stock_basic["name"].str.contains("ST", na=False)]["ts_code"].tolist()
if all_codes:
quotes = await tencent_client.get_realtime_quotes_batch(all_codes)
up_count = sum(1 for q in quotes.values() if q.pct_chg > 0)
down_count = sum(1 for q in quotes.values() if q.pct_chg < 0)
logger.info(
f"盘中实时涨跌统计: 上涨={up_count} 下跌={down_count} (共{len(quotes)}只)"
)
except Exception as e:
logger.warning(f"获取盘中实时涨跌统计失败,使用上一日数据: {e}")
# ── 用东方财富 clist API 统计涨停跌停(比腾讯涨停价字段更可靠) ──
try:
if eastmoney_quotes:
raise RuntimeError("已使用东方财富全市场实时涨跌停统计")
realtime_limit_up_count = 0
realtime_limit_down_count = 0
for fs, threshold in [
("m:0+t:6,m:0+t:80,m:0+t:81+s:2048", 9.9), # 主板 10%
("m:1+t:2,m:1+t:23", 19.9), # 创业板/科创板 20%
]:
async with httpx.AsyncClient(follow_redirects=True) as client:
# 涨停:按涨幅降序取 top 200
params_up = {
"pn": "1", "pz": "200", "po": "1", "np": "1",
"ut": "b1f8f8f8", "fltt": "2", "invt": "2",
"fid": "f3", "fs": fs,
"fields": "f3,f12,f14",
}
resp = await client.get(SECTOR_LIST_URL, params=params_up, headers=SECTOR_HEADERS, timeout=10)
data_up = _parse_eastmoney_json(resp, "涨停统计")
items = data_up.get("data", {}).get("diff", []) if data_up.get("data") else []
for item in items:
pct = item.get("f3")
if pct == "-" or pct is None:
continue
if float(pct) >= threshold:
realtime_limit_up_count += 1
# 跌停:按涨幅升序取 top 200
params_down = {
"pn": "1", "pz": "200", "po": "0", "np": "1",
"ut": "b1f8f8f8", "fltt": "2", "invt": "2",
"fid": "f3", "fs": fs,
"fields": "f3,f12,f14",
}
resp_down = await client.get(SECTOR_LIST_URL, params=params_down, headers=SECTOR_HEADERS, timeout=10)
data_down = _parse_eastmoney_json(resp_down, "跌停统计")
items_down = data_down.get("data", {}).get("diff", []) if data_down.get("data") else []
neg_threshold = -threshold
for item in items_down:
pct = item.get("f3")
if pct == "-" or pct is None:
continue
if float(pct) <= neg_threshold:
realtime_limit_down_count += 1
limit_up_count = realtime_limit_up_count
limit_down_count = realtime_limit_down_count
logger.info(f"东方财富盘中涨跌停: 涨停={limit_up_count} 跌停={limit_down_count}")
except Exception as e:
if not eastmoney_quotes:
logger.warning(f"东方财富涨跌停统计失败,使用基线数据: {e}")
# ── 温度分数:基于实时涨跌比重新计算 ──
ratio = up_count / max(down_count, 1)
temp_from_ratio = min(ratio / 3.0 * 25, 25) # 涨跌比维度 (0-25)
temp_from_limit_up = min(limit_up_count / 2, 25) # 涨停数维度 (0-25)
# 指数方向调整 (0-25)
pct = sh_index.get("pct_chg", 0)
temp_from_index = min(max(pct * 8 + 12.5, 0), 25)
new_temp = round(temp_from_ratio + temp_from_limit_up + temp_from_index + 25, 1)
new_temp = min(max(new_temp, 0), 100)
return MarketTemperature(
trade_date=datetime.now().strftime("%Y%m%d"),
up_count=up_count,
down_count=down_count,
limit_up_count=limit_up_count,
limit_down_count=limit_down_count,
max_streak=prev_temp.max_streak,
broken_rate=prev_temp.broken_rate,
index_above_ma20=pct > 0 if sh_index else prev_temp.index_above_ma20,
temperature=new_temp,
)
def _limit_threshold(ts_code: str) -> float:
code = ts_code.split(".")[0] if ts_code else ""
if code.startswith(("300", "301", "688")):
return 19.8
return 9.8
async def intraday_filter_stocks(
hot_sectors: list[SectorInfo],
) -> list[dict]:
"""盘中个股筛选:从热门板块成分股中用腾讯实时行情筛选
替代 capital_flow.py 中依赖 Tushare 日级数据的逻辑。
盘中无法获取资金流向,改用 量比 + 换手率 + 涨幅 + 市值 筛选。
"""
# 收集热门板块成分股
stock_sectors: dict[str, list[str]] = {}
for sector in hot_sectors:
members = tushare_client.get_ths_members(sector.sector_code)
if members.empty or "con_code" not in members.columns:
continue
for _, m in members.iterrows():
code = m.get("con_code", "")
if not code or "." not in str(code):
continue
if code not in stock_sectors:
stock_sectors[code] = []
stock_sectors[code].append(sector.sector_name)
if not stock_sectors:
logger.warning("盘中筛选: 热门板块成分股为空")
return []
all_codes = list(stock_sectors.keys())
logger.info(f"盘中筛选: {len(all_codes)} 只成分股,开始获取实时行情")
# 过滤 ST 和次新
stock_basic = tushare_client.get_stock_basic()
st_codes = set()
new_codes = set()
if not stock_basic.empty:
st_codes = set(stock_basic[stock_basic["name"].str.contains("ST", na=False)]["ts_code"])
from datetime import timedelta
cutoff = (datetime.now() - timedelta(days=settings.min_list_days)).strftime("%Y%m%d")
new_codes = set(stock_basic[stock_basic["list_date"] > cutoff]["ts_code"])
eligible = [c for c in all_codes if c not in st_codes and c not in new_codes]
# 获取前一日 daily_basicPE/PB 估值数据)
prev_date = tushare_client.get_latest_trade_date()
basic_df = tushare_client.get_daily_basic(prev_date)
# 批量获取腾讯实时行情
quotes = await tencent_client.get_realtime_quotes_batch(eligible)
results = []
for ts_code, quote in quotes.items():
# 硬性条件
if quote.pct_chg <= 0:
continue # 盘中只关注上涨股
if quote.pct_chg > 9.8:
continue # 已涨停,无买入空间
if quote.turnover_rate < settings.min_turnover_rate:
continue
if quote.turnover_rate > settings.max_turnover_rate:
continue
if quote.circ_mv is not None:
if quote.circ_mv < settings.min_circ_mv or quote.circ_mv > settings.max_circ_mv:
continue
# 评分
score = _score_intraday(quote)
# 估值评分(用前一日 PE/PB
pe = None
pb = None
if not basic_df.empty:
b_row = basic_df[basic_df["ts_code"] == ts_code]
if not b_row.empty:
b = b_row.iloc[0]
pe = float(b["pe_ttm"]) if pd.notna(b.get("pe_ttm")) else None
pb = float(b["pb"]) if pd.notna(b.get("pb")) else None
valuation_score = _score_valuation(pe, pb)
# 名称
name = quote.name or ts_code
results.append({
"ts_code": ts_code,
"name": name,
"sector": stock_sectors[ts_code][0],
"sectors": stock_sectors[ts_code],
"price": quote.price,
"pct_chg": quote.pct_chg,
"turnover_rate": quote.turnover_rate,
"volume_ratio": quote.volume_ratio,
"circ_mv": quote.circ_mv,
"capital_score": round(score, 1),
"valuation_score": round(valuation_score, 1),
# 盘中无资金流向数据
"main_net_inflow": 0,
"inflow_ratio": 0,
})
results.sort(key=lambda x: x["capital_score"], reverse=True)
top = results[:settings.top_stock_count]
for r in top:
logger.info(
f"盘中筛选: {r['name']}({r['ts_code']}) 板块={r['sector']} "
f"涨幅={r['pct_chg']}% 量比={r['volume_ratio']} "
f"换手率={r['turnover_rate']}% 评分={r['capital_score']}"
)
return top
async def intraday_active_market_recall(limit: int = 80) -> list[dict]:
"""实时全市场活跃股召回,不依赖 Tushare 板块成分映射。"""
quotes = await get_a_share_realtime_ranking(sort_by="f8", descending=True, page_size=800)
if not quotes:
return []
results = []
for item in quotes:
ts_code = item.get("ts_code", "")
name = item.get("name", "")
if not ts_code or not name or "ST" in name:
continue
pct_chg = float(item.get("pct_chg", 0) or 0)
turnover_rate = float(item.get("turnover_rate", 0) or 0)
circ_mv_raw = item.get("circ_mv")
circ_mv = float(circ_mv_raw or 0) / 100000000 if circ_mv_raw else None
if pct_chg <= 0 or pct_chg >= _limit_threshold(ts_code):
continue
if turnover_rate < max(settings.min_turnover_rate * 0.5, 1.0):
continue
if circ_mv is not None and circ_mv > 0:
if circ_mv < settings.min_circ_mv or circ_mv > settings.max_circ_mv * 1.5:
continue
recall_score = 35
recall_score += min(max(pct_chg, 0), 8) * 4
recall_score += min(turnover_rate, 12) * 2
if item.get("main_net_inflow", 0) > 0:
recall_score += 8
results.append({
"ts_code": ts_code,
"name": name,
"sector": "实时活跃",
"sector_stage": "intraday",
"price": item.get("price"),
"pct_chg": pct_chg,
"turnover_rate": turnover_rate,
"circ_mv": circ_mv,
"pe": item.get("pe"),
"pb": item.get("pb"),
"volume_ratio": None,
"main_net_inflow": float(item.get("main_net_inflow", 0) or 0) / 10000,
"inflow_ratio": 0,
"recall_score": round(recall_score, 1),
"recall_tags": ["realtime_active"],
"stock_role_hint": "今日全市场活跃股",
})
results.sort(key=lambda x: (x["recall_score"], x.get("turnover_rate", 0)), reverse=True)
return results[:limit]
def _score_intraday(quote: StockQuote) -> float:
"""盘中评分逻辑(替代资金流向评分)
盘中无法获取大单/特大单数据,改用以下指标:
- 涨幅区间 (25%): 2%-7% 为活跃区间
- 量比 (30%): > 1.5 说明资金活跃
- 换手率 (20%): 5%-10% 最佳
- 振幅 (10%): 适度振幅说明活跃
- 市值适配 (15%): 100-300亿最佳
"""
score = 0.0
# 涨幅区间 (25分)
pct = quote.pct_chg
if 3 <= pct <= 7:
score += 25
elif 2 <= pct < 3:
score += 18
elif 7 < pct <= 9:
score += 15
elif 0 < pct < 2:
score += 8
# 量比 (30分)
vr = quote.volume_ratio
if vr is not None:
if vr > 3.0:
score += 30
elif vr > 2.0:
score += 24
elif vr > 1.5:
score += 18
elif vr > 1.0:
score += 10
# 换手率 (20分)
tr = quote.turnover_rate
if 5 <= tr <= 10:
score += 20
elif 3 <= tr < 5:
score += 14
elif 10 < tr <= 15:
score += 10
# 振幅 (10分) - 适度振幅
amp = quote.amplitude or 0
if 3 <= amp <= 8:
score += 10
elif 2 <= amp < 3:
score += 6
elif amp > 8:
score += 3
# 市值适配 (15分)
if quote.circ_mv is not None:
mv = quote.circ_mv
if 100 <= mv <= 300:
score += 15
elif settings.min_circ_mv <= mv <= settings.max_circ_mv:
score += 10
elif mv > 0:
score += 3
return score
async def intraday_sector_scan(prev_sectors: list[SectorInfo]) -> list[SectorInfo]:
"""盘中板块热度更新:用东方财富实时板块数据刷新涨幅和涨停数
一次请求替代之前腾讯批量获取数千只成分股的方式。
"""
if not prev_sectors:
return prev_sectors
# 从东方财富获取实时板块排名1次 HTTP 请求)
try:
from app.data.eastmoney_client import get_sector_realtime_ranking
em_sectors = await get_sector_realtime_ranking()
except Exception as e:
logger.warning(f"东方财富板块实时数据获取失败: {e}")
return prev_sectors
if not em_sectors:
return prev_sectors
# 按板块名称匹配更新数据
matched = 0
for sector in prev_sectors:
em_data = None
# 先精确匹配
for em_s in em_sectors:
if em_s["sector_name"] == sector.sector_name:
em_data = em_s
break
# 模糊匹配
if not em_data:
for em_s in em_sectors:
em_name = em_s["sector_name"].rstrip("行业").rstrip("板块").strip()
ts_name = sector.sector_name.rstrip("行业").rstrip("板块").strip()
if em_name == ts_name or (len(em_name) <= len(ts_name) and em_name in ts_name) or (len(ts_name) <= len(em_name) and ts_name in em_name):
em_data = em_s
break
if em_data:
matched += 1
sector.pct_change = em_data["pct_change"]
# 涨停数保留 Tushare 数据(东方财富此字段不可用)
logger.info(
f"盘中板块实时更新: {matched}/{len(prev_sectors)} 匹配成功, "
f"涨幅最高={max(prev_sectors, key=lambda s: s.pct_change).sector_name} "
f"({max(s.pct_change for s in prev_sectors):.1f}%)"
)
return prev_sectors