528 lines
19 KiB
Python
528 lines
19 KiB
Python
"""东方财富数据客户端
|
||
|
||
通过 push2.eastmoney.com / push2his.eastmoney.com 获取实时数据:
|
||
- 板块实时涨跌幅排名(行业/概念板块)
|
||
- A 股分钟级 K 线数据(盘中量能分布分析)
|
||
|
||
免费接口,无需认证,注意限频。
|
||
"""
|
||
|
||
import logging
|
||
import httpx
|
||
import pandas as pd
|
||
from datetime import datetime
|
||
|
||
from app.data.cache import cache
|
||
from app.config import settings
|
||
from app.db.error_logger import log_error
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# 东方财富接口
|
||
EASTMONEY_KLINE_URL = "https://push2his.eastmoney.com/api/qt/stock/kline/get"
|
||
SECTOR_LIST_URL = "https://push2.eastmoney.com/api/qt/clist/get"
|
||
HEADERS = {
|
||
"Referer": "https://finance.eastmoney.com",
|
||
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36",
|
||
}
|
||
SECTOR_HEADERS = {
|
||
"Referer": "https://data.eastmoney.com",
|
||
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36",
|
||
}
|
||
|
||
|
||
def _describe_exception(exc: Exception) -> str:
|
||
text = str(exc).strip()
|
||
if text:
|
||
return f"{exc.__class__.__name__}: {text}"
|
||
return exc.__class__.__name__
|
||
|
||
|
||
def _ts_code_to_eastmoney(ts_code: str) -> str:
|
||
"""600519.SH -> 1.600519 (上海=1, 深圳=0)"""
|
||
code, market = ts_code.split(".")
|
||
prefix = "1" if market == "SH" else "0"
|
||
return f"{prefix}.{code}"
|
||
|
||
|
||
async def get_sector_realtime_ranking(
|
||
fs: str = "m:90+t:2",
|
||
sort_by: str = "f3",
|
||
descending: bool = True,
|
||
page_size: int = 100,
|
||
notify: bool = True,
|
||
) -> list[dict]:
|
||
"""获取东方财富板块实时涨跌幅排名
|
||
|
||
一次请求获取所有板块的实时数据,盘中和盘后均可使用。
|
||
替代通过腾讯批量获取成分股行情再手动计算的方式。
|
||
|
||
Args:
|
||
fs: 板块类型
|
||
- "m:90+t:2": 行业板块(东方财富分类,约496个)
|
||
- "m:90+t:3": 概念板块(约493个)
|
||
- "m:90+t:1": 地域板块(约31个)
|
||
sort_by: 排序字段,默认 f3=涨跌幅
|
||
descending: True=降序(涨幅从高到低)
|
||
page_size: 返回板块数量
|
||
|
||
Returns:
|
||
list[dict],每个 dict 包含:
|
||
- sector_code: 东方财富板块代码 (如 "BK0428")
|
||
- sector_name: 板块名称 (如 "酿酒行业")
|
||
- pct_change: 实时涨跌幅 %
|
||
- amount: 成交额(元)
|
||
- turnover_rate: 换手率 %
|
||
- up_count: 上涨家数
|
||
- down_count: 下跌家数
|
||
- leading_stock_name: 领涨股名称 (f128)
|
||
- leading_stock_code: 领涨股代码 (f140)
|
||
- leading_stock_pct: 领涨股涨跌幅 % (f141)
|
||
"""
|
||
cache_key = f"sector_rt:{fs}:{sort_by}"
|
||
cached = cache.get(cache_key)
|
||
if cached is not None:
|
||
return cached
|
||
|
||
params = {
|
||
"pn": "1",
|
||
"pz": str(page_size),
|
||
# 东方财富列表接口里,po=1 对应降序,po=0 对应升序。
|
||
# 之前这里写反后,会把“涨幅榜”实际拿成跌幅靠前列表。
|
||
"po": "1" if descending else "0",
|
||
"np": "1",
|
||
"ut": "b1f8f8f8",
|
||
"fltt": "2",
|
||
"invt": "2",
|
||
"fid": sort_by,
|
||
"fs": fs,
|
||
"fields": "f2,f3,f4,f6,f8,f12,f14,f104,f105,f128,f140,f141",
|
||
}
|
||
|
||
try:
|
||
board_type = "industry" if fs == "m:90+t:2" else "concept" if fs == "m:90+t:3" else "region" if fs == "m:90+t:1" else "unknown"
|
||
async with httpx.AsyncClient() as client:
|
||
resp = await client.get(
|
||
SECTOR_LIST_URL,
|
||
params=params,
|
||
headers=SECTOR_HEADERS,
|
||
timeout=10,
|
||
follow_redirects=True,
|
||
)
|
||
data = _parse_eastmoney_json(resp, "板块实时排名")
|
||
|
||
items = data.get("data", {}).get("diff", [])
|
||
if not items:
|
||
logger.debug("东方财富板块实时排名无数据")
|
||
return []
|
||
|
||
result = []
|
||
for item in items:
|
||
# f3 可能是 "-"(停牌等异常情况)
|
||
pct = item.get("f3")
|
||
if pct == "-" or pct is None:
|
||
pct = 0.0
|
||
result.append({
|
||
"sector_code": item.get("f12", ""),
|
||
"sector_name": item.get("f14", ""),
|
||
"board_type": board_type,
|
||
"pct_change": float(pct),
|
||
"amount": float(item.get("f6", 0) or 0),
|
||
"turnover_rate": float(item.get("f8", 0) or 0),
|
||
"up_count": int(item.get("f104", 0) or 0),
|
||
"down_count": int(item.get("f105", 0) or 0),
|
||
"leading_stock_name": item.get("f128", ""),
|
||
"leading_stock_code": item.get("f140", ""),
|
||
"leading_stock_pct": float(item.get("f141", 0) or 0),
|
||
})
|
||
|
||
# 缓存:盘中 60 秒,盘后 300 秒
|
||
ttl = 60 if _is_trading_hours() else 300
|
||
cache.set(cache_key, result, ttl)
|
||
logger.info(f"东方财富板块实时排名: 获取 {len(result)} 个板块")
|
||
return result
|
||
|
||
except Exception as e:
|
||
logger.error(f"东方财富板块实时排名获取失败: {e}")
|
||
if notify:
|
||
await log_error(
|
||
"eastmoney",
|
||
f"东方财富板块实时排名获取失败: {e}",
|
||
detail=f"fs={fs}, sort_by={sort_by}, page_size={page_size}",
|
||
)
|
||
return []
|
||
|
||
|
||
async def get_a_share_realtime_ranking(
|
||
sort_by: str = "f3",
|
||
descending: bool = True,
|
||
page_size: int = 6000,
|
||
) -> list[dict]:
|
||
"""获取 A 股实时行情列表,用于市场温度和实时候选召回。"""
|
||
cache_key = f"ashare_rt:{sort_by}:{descending}:{page_size}"
|
||
cached = cache.get(cache_key)
|
||
if cached is not None:
|
||
return cached
|
||
|
||
fs = "m:0+t:6,m:0+t:80,m:0+t:81+s:2048,m:1+t:2,m:1+t:23"
|
||
fields = "f2,f3,f6,f8,f9,f12,f14,f20,f21,f23,f62"
|
||
|
||
try:
|
||
result = []
|
||
page = 1
|
||
page_size_per_request = min(max(page_size, 200), 500)
|
||
async with httpx.AsyncClient() as client:
|
||
while len(result) < page_size:
|
||
params = {
|
||
"pn": str(page),
|
||
"pz": str(page_size_per_request),
|
||
"po": "1" if descending else "0",
|
||
"np": "1",
|
||
"ut": "b1f8f8f8",
|
||
"fltt": "2",
|
||
"invt": "2",
|
||
"fid": sort_by,
|
||
"fs": fs,
|
||
"fields": fields,
|
||
}
|
||
resp = await client.get(
|
||
SECTOR_LIST_URL,
|
||
params=params,
|
||
headers=SECTOR_HEADERS,
|
||
timeout=12,
|
||
follow_redirects=True,
|
||
)
|
||
data = _parse_eastmoney_json(resp, f"A股实时行情 第{page}页")
|
||
items = data.get("data", {}).get("diff", [])
|
||
if not items:
|
||
break
|
||
|
||
before_count = len(result)
|
||
for item in items:
|
||
pct = item.get("f3")
|
||
price = item.get("f2")
|
||
code = str(item.get("f12", "") or "")
|
||
if not code or pct == "-" or price == "-" or pct is None or price is None:
|
||
continue
|
||
result.append({
|
||
"ts_code": _eastmoney_code_to_ts(code),
|
||
"name": item.get("f14", ""),
|
||
"price": float(price or 0),
|
||
"pct_chg": float(pct or 0),
|
||
"amount": float(item.get("f6", 0) or 0),
|
||
"turnover_rate": float(item.get("f8", 0) or 0),
|
||
"pe": _safe_float(item.get("f9")),
|
||
"pb": _safe_float(item.get("f23")),
|
||
"total_mv": _safe_float(item.get("f20")),
|
||
"circ_mv": _safe_float(item.get("f21")),
|
||
"main_net_inflow": _safe_float(item.get("f62")) or 0,
|
||
"source": "eastmoney",
|
||
})
|
||
if len(result) == before_count or len(items) < page_size_per_request:
|
||
break
|
||
page += 1
|
||
|
||
# 去重,避免跨页或接口异常重复
|
||
deduped = {}
|
||
for item in result:
|
||
deduped[item["ts_code"]] = item
|
||
result = list(deduped.values())[:page_size]
|
||
|
||
ttl = 60 if _is_trading_hours() else 300
|
||
cache.set(cache_key, result, ttl)
|
||
logger.info("东方财富A股实时行情: 获取 %s 只", len(result))
|
||
return result
|
||
except Exception as e:
|
||
reason = _describe_exception(e)
|
||
logger.warning("东方财富A股实时行情获取失败,尝试腾讯兜底: %s", reason)
|
||
fallback = await _get_a_share_realtime_ranking_from_tencent(
|
||
sort_by=sort_by,
|
||
descending=descending,
|
||
page_size=page_size,
|
||
)
|
||
if fallback:
|
||
ttl = 60 if _is_trading_hours() else 300
|
||
cache.set(cache_key, fallback, ttl)
|
||
logger.warning("东方财富A股实时行情获取失败,已切换腾讯兜底: %s", reason)
|
||
return fallback
|
||
|
||
logger.error("A股实时行情双源获取失败: %s", reason)
|
||
await log_error(
|
||
"market_data",
|
||
f"A股实时行情双源获取失败: {reason}",
|
||
detail=f"primary=eastmoney, fallback=tencent, sort_by={sort_by}, page_size={page_size}",
|
||
)
|
||
return []
|
||
|
||
|
||
async def _get_a_share_realtime_ranking_from_tencent(
|
||
sort_by: str,
|
||
descending: bool,
|
||
page_size: int,
|
||
) -> list[dict]:
|
||
"""东方财富失败时,用腾讯批量行情兜底全市场快照。"""
|
||
try:
|
||
from app.data.tushare_client import tushare_client
|
||
from app.data.tencent_client import get_realtime_quotes_batch
|
||
|
||
stock_basic = tushare_client.get_stock_basic()
|
||
if stock_basic.empty:
|
||
return []
|
||
|
||
ts_codes = stock_basic["ts_code"].dropna().astype(str).tolist()
|
||
quotes = await get_realtime_quotes_batch(ts_codes)
|
||
if not quotes:
|
||
return []
|
||
|
||
rows = []
|
||
for ts_code, quote in quotes.items():
|
||
if quote.price <= 0:
|
||
continue
|
||
rows.append({
|
||
"ts_code": ts_code,
|
||
"name": quote.name,
|
||
"price": quote.price,
|
||
"pct_chg": quote.pct_chg,
|
||
"amount": quote.amount,
|
||
"turnover_rate": quote.turnover_rate,
|
||
"pe": quote.pe,
|
||
"pb": quote.pb,
|
||
"total_mv": quote.total_mv,
|
||
"circ_mv": quote.circ_mv,
|
||
"main_net_inflow": 0,
|
||
"source": "tencent_fallback",
|
||
})
|
||
|
||
sort_key_map = {
|
||
"f3": "pct_chg",
|
||
"f6": "amount",
|
||
"f8": "turnover_rate",
|
||
"f20": "total_mv",
|
||
"f21": "circ_mv",
|
||
}
|
||
sort_key = sort_key_map.get(sort_by, "pct_chg")
|
||
rows.sort(key=lambda item: float(item.get(sort_key, 0) or 0), reverse=descending)
|
||
logger.info("腾讯兜底A股实时行情: 获取 %s 只", len(rows))
|
||
return rows[:page_size]
|
||
except Exception as e:
|
||
logger.warning("腾讯兜底A股实时行情失败: %s", _describe_exception(e))
|
||
return []
|
||
|
||
|
||
def _eastmoney_code_to_ts(code: str) -> str:
|
||
if code.startswith("6"):
|
||
return f"{code}.SH"
|
||
return f"{code}.SZ"
|
||
|
||
|
||
def _safe_float(value) -> float | None:
|
||
if value in (None, "", "-"):
|
||
return None
|
||
try:
|
||
return float(value)
|
||
except (TypeError, ValueError):
|
||
return None
|
||
|
||
|
||
async def get_min_kline(
|
||
ts_code: str,
|
||
period: str = "5",
|
||
count: int = 48,
|
||
) -> pd.DataFrame:
|
||
"""获取分钟级 K 线数据
|
||
|
||
Args:
|
||
ts_code: Tushare 格式代码,如 600519.SH
|
||
period: 分钟周期 ("1", "5", "15", "30", "60")
|
||
count: 请求条数(5分钟×48=4小时≈一个交易日)
|
||
|
||
Returns:
|
||
DataFrame with columns: time, open, close, high, low, volume, amount
|
||
空 DataFrame if failed
|
||
"""
|
||
cache_key = f"min_kline:{ts_code}:{period}"
|
||
cached = cache.get(cache_key)
|
||
if cached is not None:
|
||
return cached
|
||
|
||
secid = _ts_code_to_eastmoney(ts_code)
|
||
|
||
params = {
|
||
"secid": secid,
|
||
"fields1": "f1,f2,f3,f4,f5,f6",
|
||
"fields2": "f51,f52,f53,f54,f55,f56,f57,f58,f59,f60,f61",
|
||
"klt": period, # 分钟周期
|
||
"fqt": "1", # 前复权
|
||
"beg": "0",
|
||
"end": "20500101",
|
||
"lmt": str(count), # 返回条数限制
|
||
}
|
||
|
||
try:
|
||
async with httpx.AsyncClient() as client:
|
||
resp = await client.get(
|
||
EASTMONEY_KLINE_URL,
|
||
params=params,
|
||
headers=HEADERS,
|
||
timeout=10,
|
||
follow_redirects=True,
|
||
)
|
||
data = _parse_eastmoney_json(resp, f"分钟K线 {ts_code}")
|
||
|
||
klines = data.get("data", {}).get("klines", [])
|
||
if not klines:
|
||
logger.debug(f"东方财富分钟K线 {ts_code} 无数据")
|
||
return pd.DataFrame()
|
||
|
||
# 解析:格式 "2026-04-15 09:30,5.71,5.73,5.74,5.70,12345,70000.00"
|
||
rows = []
|
||
for line in klines:
|
||
parts = line.split(",")
|
||
if len(parts) < 7:
|
||
continue
|
||
rows.append({
|
||
"time": parts[0],
|
||
"open": float(parts[1]),
|
||
"close": float(parts[2]),
|
||
"high": float(parts[3]),
|
||
"low": float(parts[4]),
|
||
"volume": float(parts[5]),
|
||
"amount": float(parts[6]),
|
||
})
|
||
|
||
df = pd.DataFrame(rows)
|
||
if df.empty:
|
||
return df
|
||
|
||
# 缓存(分钟数据盘中3分钟过期,盘后30分钟过期)
|
||
ttl = 180 if _is_trading_hours() else 1800
|
||
cache.set(cache_key, df, ttl)
|
||
return df
|
||
|
||
except Exception as e:
|
||
logger.warning(f"东方财富分钟K线获取失败 {ts_code}: {e},尝试新浪兜底")
|
||
try:
|
||
from app.data.sina_client import get_min_kline as get_sina_min_kline
|
||
fallback_df = await get_sina_min_kline(ts_code, period=period, count=count)
|
||
if not fallback_df.empty:
|
||
logger.info("新浪分钟K线兜底成功 %s: %s 条", ts_code, len(fallback_df))
|
||
return fallback_df
|
||
except Exception as fallback_error:
|
||
logger.warning("新浪分钟K线兜底异常 %s: %s", ts_code, fallback_error)
|
||
|
||
logger.error(f"分钟K线双源获取失败 {ts_code}: 东方财富={e}")
|
||
await log_error(
|
||
"market_data",
|
||
f"分钟K线双源获取失败 {ts_code}: {e}",
|
||
detail=f"primary=eastmoney, fallback=sina, period={period}, count={count}",
|
||
)
|
||
return pd.DataFrame()
|
||
|
||
|
||
def _parse_eastmoney_json(resp: httpx.Response, label: str) -> dict:
|
||
"""解析东方财富 JSON 响应,遇到 302/HTML 等非 JSON 情况给出更清晰日志。"""
|
||
resp.raise_for_status()
|
||
content_type = resp.headers.get("content-type", "")
|
||
text_preview = (resp.text or "")[:160].replace("\n", " ").replace("\r", " ")
|
||
if "json" not in content_type.lower() and not resp.text.strip().startswith("{"):
|
||
raise ValueError(
|
||
f"{label} 返回非JSON响应(status={resp.status_code}, content_type={content_type}, body={text_preview})"
|
||
)
|
||
try:
|
||
return resp.json()
|
||
except Exception as e:
|
||
raise ValueError(
|
||
f"{label} JSON解析失败(status={resp.status_code}, content_type={content_type}, body={text_preview})"
|
||
) from e
|
||
|
||
|
||
def analyze_intraday_volume_distribution(min_df: pd.DataFrame) -> dict:
|
||
"""分析盘中量能分布(基于5分钟K线)
|
||
|
||
Returns:
|
||
{
|
||
"morning_volume_ratio": float, # 上午成交额占比
|
||
"afternoon_volume_ratio": float, # 下午成交额占比
|
||
"opening_strength": float, # 开盘30分钟成交额/全天占比
|
||
"closing_strength": float, # 尾盘30分钟成交额/全天占比
|
||
"volume_trend": str, # "morning_dominant" / "afternoon_dominant" / "balanced"
|
||
"key_periods": list[str], # 关键放量时段描述
|
||
}
|
||
"""
|
||
if min_df.empty or len(min_df) < 4:
|
||
return {
|
||
"morning_volume_ratio": 0.5,
|
||
"afternoon_volume_ratio": 0.5,
|
||
"opening_strength": 0.25,
|
||
"closing_strength": 0.25,
|
||
"volume_trend": "balanced",
|
||
"key_periods": [],
|
||
}
|
||
|
||
total_amount = min_df["amount"].sum()
|
||
if total_amount == 0:
|
||
return {
|
||
"morning_volume_ratio": 0.5,
|
||
"afternoon_volume_ratio": 0.5,
|
||
"opening_strength": 0.25,
|
||
"closing_strength": 0.25,
|
||
"volume_trend": "balanced",
|
||
"key_periods": [],
|
||
}
|
||
|
||
# 按时间段分割
|
||
morning = min_df[min_df["time"].str.contains("09:|10:|11:0", na=False)]
|
||
afternoon = min_df[min_df["time"].str.contains("13:|14:0|14:1|14:2|14:3|14:4|14:5", na=False)]
|
||
|
||
morning_ratio = morning["amount"].sum() / total_amount if not morning.empty else 0.5
|
||
afternoon_ratio = afternoon["amount"].sum() / total_amount if not afternoon.empty else 0.5
|
||
|
||
# 开盘30分钟(9:30-10:00)
|
||
opening_30 = min_df[min_df["time"].str.contains("09:3|09:4|09:5|10:0", na=False)]
|
||
opening_strength = opening_30["amount"].sum() / total_amount if not opening_30.empty else 0.25
|
||
|
||
# 尾盘30分钟(14:30-15:00)
|
||
closing_30 = min_df[min_df["time"].str.contains("14:3|14:4|14:5|15:0", na=False)]
|
||
closing_strength = closing_30["amount"].sum() / total_amount if not closing_30.empty else 0.25
|
||
|
||
# 量能趋势判断
|
||
if morning_ratio > 0.6:
|
||
volume_trend = "morning_dominant"
|
||
elif afternoon_ratio > 0.6:
|
||
volume_trend = "afternoon_dominant"
|
||
else:
|
||
volume_trend = "balanced"
|
||
|
||
# 关键放量时段
|
||
key_periods = []
|
||
avg_amount = min_df["amount"].mean()
|
||
hot_bars = min_df[min_df["amount"] > avg_amount * 2]
|
||
for _, bar in hot_bars.iterrows():
|
||
time_str = bar["time"].split(" ")[1] if " " in bar["time"] else bar["time"]
|
||
pct = bar["amount"] / total_amount * 100
|
||
key_periods.append(f"{time_str}放量(占比{pct:.0f}%)")
|
||
|
||
return {
|
||
"morning_volume_ratio": round(morning_ratio, 2),
|
||
"afternoon_volume_ratio": round(afternoon_ratio, 2),
|
||
"opening_strength": round(opening_strength, 2),
|
||
"closing_strength": round(closing_strength, 2),
|
||
"volume_trend": volume_trend,
|
||
"key_periods": key_periods[:3],
|
||
}
|
||
|
||
|
||
def _is_trading_hours() -> bool:
|
||
"""简单判断是否在交易时间"""
|
||
now = datetime.now()
|
||
if now.weekday() >= 5: # 周六周日
|
||
return False
|
||
hour = now.hour
|
||
minute = now.minute
|
||
# 9:30 - 11:30 或 13:00 - 15:00
|
||
if (hour == 9 and minute >= 30) or (hour == 10) or (hour == 11 and minute <= 30):
|
||
return True
|
||
if (hour == 13) or (hour == 14) or (hour == 15 and minute == 0):
|
||
return True
|
||
return False
|