alphax/app/services/market_overview.py
2026-05-24 20:44:22 +08:00

239 lines
8.6 KiB
Python

"""Crypto market-wide overview providers.
This module is intentionally separate from recommendation analytics. The market
overview page should describe the broad crypto environment, not the current
recommendation sample.
"""
from __future__ import annotations
from datetime import datetime
import requests
from app.db.market_db import get_latest_market_snapshot, save_market_error, save_market_snapshot
from app.services import altcoin_screener
def _safe_float(value, default=0.0):
try:
return float(value or 0)
except Exception:
return default
def _percentile(values, pct):
values = sorted([float(v) for v in values if isinstance(v, (int, float))])
if not values:
return 0.0
if len(values) == 1:
return round(values[0], 2)
k = (len(values) - 1) * float(pct)
lower = int(k)
upper = min(lower + 1, len(values) - 1)
weight = k - lower
return round(values[lower] * (1 - weight) + values[upper] * weight, 2)
def _market_state(avg_change, advance_decline_ratio, hot_count, crash_count, benchmarks=None):
benchmarks = benchmarks or {}
btc_change = _safe_float((benchmarks.get("BTC/USDT") or {}).get("change_24h"))
eth_change = _safe_float((benchmarks.get("ETH/USDT") or {}).get("change_24h"))
majors_weak = btc_change <= -2 or eth_change <= -2.5
majors_strong = btc_change >= 1 and eth_change >= 1
if crash_count >= 20 or advance_decline_ratio < 0.65 or (majors_weak and avg_change < 0):
return {
"label": "全市场偏弱",
"tone": "risk_off",
"summary": "主流币或山寨广度走弱,山寨机会更容易变成反弹噪音,优先控制仓位和追高风险。",
}
if avg_change >= 1.0 and advance_decline_ratio >= 1.2 and hot_count >= 20 and not majors_weak:
return {
"label": "全市场偏强",
"tone": "risk_on",
"summary": "上涨覆盖面和强势币数量都不错,可以更积极寻找放量后的确认机会。",
}
if majors_strong and hot_count >= 10 and advance_decline_ratio >= 0.9:
return {
"label": "主流带动轮动",
"tone": "selective",
"summary": "BTC/ETH 提供方向支撑,但山寨仍是结构性轮动,适合等待量价和入场窗口共振。",
}
return {
"label": "结构性行情",
"tone": "neutral",
"summary": "市场不是单边环境,机会更依赖板块轮动和单币确认,适合精选而不是广撒网。",
}
def _benchmark_overview(tickers=None):
tickers = tickers or {}
result = {}
for symbol in ("BTC/USDT", "ETH/USDT"):
info = tickers.get(symbol) or {}
result[symbol] = {
"symbol": symbol,
"price": _safe_float(info.get("last")),
"change_24h": _safe_float(info.get("percentage")),
"volume_24h": _safe_float(info.get("quoteVolume")),
}
return result
def _funding_snapshot():
try:
data = altcoin_screener.exchange.fapiPublicGetPremiumIndex()
except Exception:
try:
resp = requests.get("https://fapi.binance.com/fapi/v1/premiumIndex", timeout=8)
data = resp.json() if resp.status_code == 200 else []
except Exception:
data = []
if isinstance(data, dict):
data = [data]
result = {}
for item in data or []:
raw_symbol = str(item.get("symbol") or "")
if not raw_symbol.endswith("USDT"):
continue
rate = item.get("lastFundingRate")
try:
rate = float(rate)
except Exception:
continue
result[raw_symbol.replace("USDT", "/USDT")] = rate
return result
def _funding_overview(universe_symbols=None):
rates = _funding_snapshot()
if not rates:
rates = altcoin_screener.fetch_funding_rates()
universe = set(universe_symbols or [])
values = [
float(v)
for symbol, v in (rates or {}).items()
if isinstance(v, (int, float)) and (not universe or symbol in universe)
]
if not values:
return {
"sample_count": 0,
"avg_funding_rate": 0,
"positive_count": 0,
"negative_count": 0,
"extreme_positive_count": 0,
"extreme_negative_count": 0,
}
return {
"sample_count": len(values),
"avg_funding_rate": round(sum(values) / len(values), 6),
"positive_count": sum(1 for v in values if v > 0),
"negative_count": sum(1 for v in values if v < 0),
"extreme_positive_count": sum(1 for v in values if v >= 0.001),
"extreme_negative_count": sum(1 for v in values if v <= -0.001),
}
def compute_crypto_market_overview():
pairs = altcoin_screener.fetch_all_tickers()
benchmarks = _benchmark_overview(pairs)
items = []
for symbol, info in (pairs or {}).items():
volume = _safe_float(info.get("volume_24h"))
change = _safe_float(info.get("change_24h"))
price = _safe_float(info.get("price"))
if volume <= 0 or price <= 0:
continue
items.append({
"symbol": symbol,
"price": price,
"change_24h": change,
"volume_24h": volume,
})
changes = [x["change_24h"] for x in items]
volumes = [x["volume_24h"] for x in items]
up = sum(1 for x in changes if x > 0)
down = sum(1 for x in changes if x < 0)
hot = [x for x in items if x["change_24h"] >= 5]
crash = [x for x in items if x["change_24h"] <= -5]
adv_dec = round(up / down, 2) if down else float(up)
avg_change = round(sum(changes) / len(changes), 2) if changes else 0
state = _market_state(avg_change, adv_dec, len(hot), len(crash), benchmarks=benchmarks)
top_gainers = sorted(items, key=lambda x: x["change_24h"], reverse=True)[:10]
top_losers = sorted(items, key=lambda x: x["change_24h"])[:10]
top_volume = sorted(items, key=lambda x: x["volume_24h"], reverse=True)[:10]
overview = {
"updated_at": datetime.now().isoformat(timespec="seconds"),
"source": "binance_spot_usdt_market",
"universe": "Binance spot USDT crypto market, with BTC/ETH as benchmarks and altcoin breadth excluding stables/wrapped/metals/BNB",
"benchmarks": benchmarks,
"sample_count": len(items),
"up_count": up,
"down_count": down,
"flat_count": max(0, len(items) - up - down),
"advance_decline_ratio": adv_dec,
"avg_change_24h": avg_change,
"median_change_24h": _percentile(changes, 0.5),
"p75_change_24h": _percentile(changes, 0.75),
"p25_change_24h": _percentile(changes, 0.25),
"hot_count_5pct": len(hot),
"crash_count_5pct": len(crash),
"total_quote_volume_24h": round(sum(volumes), 2),
"state": state,
"top_gainers": top_gainers,
"top_losers": top_losers,
"top_volume": top_volume,
"funding": _funding_overview({x["symbol"] for x in items}),
}
return overview
def collect_market_snapshot():
try:
overview = compute_crypto_market_overview()
snapshot_id = save_market_snapshot(overview)
return {
"status": "success",
"snapshot_id": snapshot_id,
"updated_at": overview.get("updated_at"),
"sample_count": overview.get("sample_count", 0),
"top_gainers": [x.get("symbol") for x in overview.get("top_gainers", [])[:8]],
}
except Exception as exc:
try:
save_market_error(str(exc))
except Exception:
pass
raise
def get_crypto_market_overview(*, allow_live_fallback=False):
snapshot = get_latest_market_snapshot(max_age_seconds=300)
if snapshot:
snapshot["snapshot_source"] = "database"
return snapshot
if not allow_live_fallback:
return {
"updated_at": "",
"source": "binance_spot_usdt_market",
"snapshot_source": "database",
"snapshot_missing": True,
"state": {
"label": "等待市场快照",
"tone": "neutral",
"summary": "市场采集任务尚未写入快照,请等待定时任务或在调度中心手动触发 market。",
},
"benchmarks": {},
"sample_count": 0,
"top_gainers": [],
"top_losers": [],
"top_volume": [],
"funding": {"sample_count": 0},
}
overview = compute_crypto_market_overview()
overview["snapshot_source"] = "live_fallback"
return overview
__all__ = ["collect_market_snapshot", "compute_crypto_market_overview", "get_crypto_market_overview"]