This commit is contained in:
aaron 2026-05-18 15:40:59 +08:00
parent c2e5a73aba
commit 7304e41440
10 changed files with 234 additions and 8 deletions

View File

@ -3,7 +3,7 @@
import argparse
import sys
from app.services import altcoin_confirm, altcoin_screener, event_driven_screener, onchain_monitor, paper_trader, price_streamer, price_tracker, review_engine, sentiment_monitor
from app.services import altcoin_confirm, altcoin_screener, event_driven_screener, market_overview, onchain_monitor, paper_trader, price_streamer, price_tracker, review_engine, sentiment_monitor
def build_parser():
@ -22,6 +22,7 @@ def build_parser():
paper.add_argument("--limit", type=int, default=100, help="本轮最多处理的可执行推荐数量")
subparsers.add_parser("price-streamer", help="运行 websocket 实时价格流")
subparsers.add_parser("market", help="采集全市场快照")
review = subparsers.add_parser("review", help="运行复盘")
review.add_argument("--compact", action="store_true", help="输出紧凑 JSON")
@ -59,6 +60,10 @@ def main():
return paper_trader.main(limit=args.limit)
if args.command == "price-streamer":
return price_streamer.main()
if args.command == "market":
result = market_overview.collect_market_snapshot()
print(sentiment_monitor.json.dumps(result, ensure_ascii=False, indent=2))
return result
if args.command == "review":
return review_engine.run_review(push_enabled=not args.no_push, compact=args.compact)
if args.command == "event":

88
app/db/market_db.py Normal file
View File

@ -0,0 +1,88 @@
"""Market overview snapshot persistence."""
import json
from datetime import datetime
from app.db.schema import get_conn
SNAPSHOT_TYPE = "crypto_market"
def _now():
return datetime.now().isoformat(timespec="seconds")
def save_market_snapshot(data, *, status="success", error_message=""):
payload = data if isinstance(data, dict) else {}
snapshot_time = str(payload.get("updated_at") or _now())
source = str(payload.get("source") or "binance_spot_usdt_market")
conn = get_conn()
try:
row = conn.execute(
"""
INSERT INTO market_snapshots (
snapshot_type, source, snapshot_time, data_json, status, error_message, created_at
) VALUES (%s, %s, %s, %s, %s, %s, %s)
RETURNING id
""",
(
SNAPSHOT_TYPE,
source,
snapshot_time,
json.dumps(payload, ensure_ascii=False, default=str),
status,
str(error_message or "")[:1000],
_now(),
),
).fetchone()
conn.commit()
return int(row["id"] if row else 0)
except Exception:
conn.rollback()
raise
finally:
conn.close()
def get_latest_market_snapshot(max_age_seconds=None):
conn = get_conn()
try:
row = conn.execute(
"""
SELECT *
FROM market_snapshots
WHERE snapshot_type=%s AND status='success'
ORDER BY snapshot_time DESC, id DESC
LIMIT 1
""",
(SNAPSHOT_TYPE,),
).fetchone()
finally:
conn.close()
if not row:
return None
item = dict(row)
try:
data = json.loads(item.get("data_json") or "{}")
except Exception:
data = {}
if not isinstance(data, dict):
data = {}
data.setdefault("updated_at", item.get("snapshot_time") or "")
data.setdefault("source", item.get("source") or "binance_spot_usdt_market")
if max_age_seconds:
try:
ts = datetime.fromisoformat(str(item.get("snapshot_time") or data.get("updated_at")))
data["snapshot_age_seconds"] = max(0, round((datetime.now() - ts).total_seconds()))
data["snapshot_stale"] = data["snapshot_age_seconds"] > int(max_age_seconds)
except Exception:
data["snapshot_age_seconds"] = None
data["snapshot_stale"] = True
return data
def save_market_error(error_message, data=None):
payload = data if isinstance(data, dict) else {"updated_at": _now(), "source": "binance_spot_usdt_market"}
return save_market_snapshot(payload, status="error", error_message=error_message)

View File

@ -0,0 +1,14 @@
CREATE TABLE IF NOT EXISTS market_snapshots (
id BIGSERIAL PRIMARY KEY,
snapshot_type TEXT NOT NULL DEFAULT 'crypto_market',
source TEXT NOT NULL DEFAULT 'binance_spot_usdt_market',
snapshot_time TEXT NOT NULL,
data_json TEXT NOT NULL DEFAULT '{}',
status TEXT NOT NULL DEFAULT 'success',
error_message TEXT DEFAULT '',
created_at TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_market_snapshots_type_time
ON market_snapshots(snapshot_type, snapshot_time DESC, id DESC);

View File

@ -48,6 +48,16 @@ DEFAULT_JOBS = [
"description": "模拟交易账本同步",
"sort_order": 25,
},
{
"job_name": "market",
"command": "market",
"args": [],
"every_seconds": 180,
"initial_delay": 35,
"lock_group": "market_write",
"description": "全市场快照采集",
"sort_order": 28,
},
{
"job_name": "confirm",
"command": "confirm",

View File

@ -88,12 +88,29 @@ def get_dynamic_weights():
def fetch_all_tickers():
tickers = exchange.fetch_tickers()
try:
markets = exchange.markets or exchange.load_markets()
except Exception:
markets = {}
usdt_pairs = {}
universe_exclusions = []
for symbol, info in tickers.items():
if "/USDT" in symbol:
base = symbol.split("/")[0]
vol_usd = info.get("quoteVolume", 0) or 0
market = markets.get(symbol) or {}
if market and (market.get("spot") is False or market.get("active") is False):
universe_exclusions.append({"symbol": symbol, "base": base, "price": info.get("last", 0) or 0, "volume_24h": vol_usd, "reason_code": "inactive_market", "reason_label": "非活跃现货交易对"})
continue
ticker_dt = info.get("datetime")
if ticker_dt:
try:
ticker_time = datetime.fromisoformat(str(ticker_dt).replace("Z", "+00:00")).replace(tzinfo=None)
if datetime.utcnow() - ticker_time > timedelta(hours=36):
universe_exclusions.append({"symbol": symbol, "base": base, "price": info.get("last", 0) or 0, "volume_24h": vol_usd, "reason_code": "stale_ticker", "reason_label": "行情数据过旧"})
continue
except Exception:
pass
if base in STABLECOINS or base in WRAPPED or base in GOLD_METAL:
reason = universe_gate_reason(base, vol_usd, 0, symbol=symbol) or {"reason_code": "excluded_base", "reason_label": "排除基础资产"}
universe_exclusions.append({"symbol": symbol, "base": base, "price": info.get("last", 0) or 0, "volume_24h": vol_usd, **reason})

View File

@ -11,6 +11,7 @@ from datetime import datetime
import requests
from app.db.market_db import get_latest_market_snapshot, save_market_error, save_market_snapshot
from app.services import altcoin_screener
@ -139,7 +140,7 @@ def _funding_overview(universe_symbols=None):
}
def get_crypto_market_overview():
def compute_crypto_market_overview():
pairs = altcoin_screener.fetch_all_tickers()
benchmarks = _benchmark_overview()
items = []
@ -193,4 +194,51 @@ def get_crypto_market_overview():
return overview
__all__ = ["get_crypto_market_overview"]
def collect_market_snapshot():
try:
overview = compute_crypto_market_overview()
snapshot_id = save_market_snapshot(overview)
return {
"status": "success",
"snapshot_id": snapshot_id,
"updated_at": overview.get("updated_at"),
"sample_count": overview.get("sample_count", 0),
"top_gainers": [x.get("symbol") for x in overview.get("top_gainers", [])[:8]],
}
except Exception as exc:
try:
save_market_error(str(exc))
except Exception:
pass
raise
def get_crypto_market_overview(*, allow_live_fallback=False):
snapshot = get_latest_market_snapshot(max_age_seconds=300)
if snapshot:
snapshot["snapshot_source"] = "database"
return snapshot
if not allow_live_fallback:
return {
"updated_at": "",
"source": "binance_spot_usdt_market",
"snapshot_source": "database",
"snapshot_missing": True,
"state": {
"label": "等待市场快照",
"tone": "neutral",
"summary": "市场采集任务尚未写入快照,请等待定时任务或在调度中心手动触发 market。",
},
"benchmarks": {},
"sample_count": 0,
"top_gainers": [],
"top_losers": [],
"top_volume": [],
"funding": {"sample_count": 0},
}
overview = compute_crypto_market_overview()
overview["snapshot_source"] = "live_fallback"
return overview
__all__ = ["collect_market_snapshot", "compute_crypto_market_overview", "get_crypto_market_overview"]

View File

@ -1,4 +1,5 @@
from fastapi import APIRouter, Cookie
from fastapi.responses import JSONResponse
from app.db.onchain_db import get_onchain_overview
from app.services.market_overview import get_crypto_market_overview
@ -41,7 +42,7 @@ async def api_market_overview(hours: int = 24, altcoin_session: str = Cookie(def
}
except Exception:
ai_analysis = {}
return {
payload = {
"hours": int(hours or 24),
"updated_at": crypto_market.get("updated_at"),
"market": {
@ -52,3 +53,11 @@ async def api_market_overview(hours: int = 24, altcoin_session: str = Cookie(def
"ai_analysis": ai_analysis,
},
}
return JSONResponse(
payload,
headers={
"Cache-Control": "no-cache, no-store, must-revalidate",
"Pragma": "no-cache",
"Expires": "0",
},
)

File diff suppressed because one or more lines are too long

View File

@ -30,7 +30,7 @@ def test_crypto_market_overview_uses_full_market_inputs(monkeypatch):
"ETH/USDT": {"last": 5_000, "percentage": 2.1, "quoteVolume": 800_000_000},
})
overview = market_overview.get_crypto_market_overview()
overview = market_overview.compute_crypto_market_overview()
assert overview["source"] == "binance_spot_usdt_market"
assert overview["sample_count"] == 4
@ -68,3 +68,4 @@ def test_market_overview_api_returns_crypto_market_not_candidate_stats(monkeypat
assert data["market"]["crypto_market"]["source"] == "binance_spot_usdt_market"
assert "stats" not in data["market"]
assert "market_context_overview" not in data["market"]
assert resp.headers["cache-control"] == "no-cache, no-store, must-revalidate"

View File

@ -1,5 +1,6 @@
import os
import sys
from datetime import datetime
import pandas as pd
@ -56,6 +57,37 @@ def test_fetch_all_tickers_filters_stable_and_fiat_suffixes(monkeypatch):
assert "AUD/USDT" not in pairs
def test_fetch_all_tickers_filters_inactive_and_stale_markets(monkeypatch):
fresh_time = datetime.utcnow().isoformat(timespec="seconds") + "Z"
monkeypatch.setattr(
altcoin_screener.exchange,
"fetch_tickers",
lambda: {
"CREAM/USDT": {"last": 2.1, "percentage": 65, "quoteVolume": 200000, "datetime": "2026-05-08T15:24:40.529Z"},
"PNT/USDT": {"last": 0.03, "percentage": 45, "quoteVolume": 200000, "datetime": fresh_time},
"FIDA/USDT": {"last": 0.02, "percentage": 35, "quoteVolume": 2000000, "datetime": fresh_time},
},
)
monkeypatch.setattr(
altcoin_screener.exchange,
"load_markets",
lambda: {
"CREAM/USDT": {"spot": True, "active": False},
"PNT/USDT": {"spot": True, "active": False},
"FIDA/USDT": {"spot": True, "active": True},
},
)
altcoin_screener.exchange.markets = None
pairs = altcoin_screener.fetch_all_tickers()
assert "CREAM/USDT" not in pairs
assert "PNT/USDT" not in pairs
assert "FIDA/USDT" in pairs
exclusions = getattr(altcoin_screener.fetch_all_tickers, "last_universe_exclusions", [])
assert any(x["symbol"] == "CREAM/USDT" and x["reason_code"] == "inactive_market" for x in exclusions)
def _mock_weights():
return {
"量价齐飞": 5,