diff --git a/app/cli.py b/app/cli.py index 0f968ed..82cbdc4 100644 --- a/app/cli.py +++ b/app/cli.py @@ -3,7 +3,7 @@ import argparse import sys -from app.services import altcoin_confirm, altcoin_screener, event_driven_screener, onchain_monitor, paper_trader, price_streamer, price_tracker, review_engine, sentiment_monitor +from app.services import altcoin_confirm, altcoin_screener, event_driven_screener, market_overview, onchain_monitor, paper_trader, price_streamer, price_tracker, review_engine, sentiment_monitor def build_parser(): @@ -22,6 +22,7 @@ def build_parser(): paper.add_argument("--limit", type=int, default=100, help="本轮最多处理的可执行推荐数量") subparsers.add_parser("price-streamer", help="运行 websocket 实时价格流") + subparsers.add_parser("market", help="采集全市场快照") review = subparsers.add_parser("review", help="运行复盘") review.add_argument("--compact", action="store_true", help="输出紧凑 JSON") @@ -59,6 +60,10 @@ def main(): return paper_trader.main(limit=args.limit) if args.command == "price-streamer": return price_streamer.main() + if args.command == "market": + result = market_overview.collect_market_snapshot() + print(sentiment_monitor.json.dumps(result, ensure_ascii=False, indent=2)) + return result if args.command == "review": return review_engine.run_review(push_enabled=not args.no_push, compact=args.compact) if args.command == "event": diff --git a/app/db/market_db.py b/app/db/market_db.py new file mode 100644 index 0000000..69f4cdd --- /dev/null +++ b/app/db/market_db.py @@ -0,0 +1,88 @@ +"""Market overview snapshot persistence.""" + +import json +from datetime import datetime + +from app.db.schema import get_conn + + +SNAPSHOT_TYPE = "crypto_market" + + +def _now(): + return datetime.now().isoformat(timespec="seconds") + + +def save_market_snapshot(data, *, status="success", error_message=""): + payload = data if isinstance(data, dict) else {} + snapshot_time = str(payload.get("updated_at") or _now()) + source = str(payload.get("source") or "binance_spot_usdt_market") + conn = get_conn() + try: + row = conn.execute( + """ + INSERT INTO market_snapshots ( + snapshot_type, source, snapshot_time, data_json, status, error_message, created_at + ) VALUES (%s, %s, %s, %s, %s, %s, %s) + RETURNING id + """, + ( + SNAPSHOT_TYPE, + source, + snapshot_time, + json.dumps(payload, ensure_ascii=False, default=str), + status, + str(error_message or "")[:1000], + _now(), + ), + ).fetchone() + conn.commit() + return int(row["id"] if row else 0) + except Exception: + conn.rollback() + raise + finally: + conn.close() + + +def get_latest_market_snapshot(max_age_seconds=None): + conn = get_conn() + try: + row = conn.execute( + """ + SELECT * + FROM market_snapshots + WHERE snapshot_type=%s AND status='success' + ORDER BY snapshot_time DESC, id DESC + LIMIT 1 + """, + (SNAPSHOT_TYPE,), + ).fetchone() + finally: + conn.close() + if not row: + return None + item = dict(row) + try: + data = json.loads(item.get("data_json") or "{}") + except Exception: + data = {} + if not isinstance(data, dict): + data = {} + data.setdefault("updated_at", item.get("snapshot_time") or "") + data.setdefault("source", item.get("source") or "binance_spot_usdt_market") + if max_age_seconds: + try: + ts = datetime.fromisoformat(str(item.get("snapshot_time") or data.get("updated_at"))) + data["snapshot_age_seconds"] = max(0, round((datetime.now() - ts).total_seconds())) + data["snapshot_stale"] = data["snapshot_age_seconds"] > int(max_age_seconds) + except Exception: + data["snapshot_age_seconds"] = None + data["snapshot_stale"] = True + return data + + +def save_market_error(error_message, data=None): + payload = data if isinstance(data, dict) else {"updated_at": _now(), "source": "binance_spot_usdt_market"} + return save_market_snapshot(payload, status="error", error_message=error_message) + diff --git a/app/db/migrations/0010_market_snapshots.sql b/app/db/migrations/0010_market_snapshots.sql new file mode 100644 index 0000000..d9572f0 --- /dev/null +++ b/app/db/migrations/0010_market_snapshots.sql @@ -0,0 +1,14 @@ +CREATE TABLE IF NOT EXISTS market_snapshots ( + id BIGSERIAL PRIMARY KEY, + snapshot_type TEXT NOT NULL DEFAULT 'crypto_market', + source TEXT NOT NULL DEFAULT 'binance_spot_usdt_market', + snapshot_time TEXT NOT NULL, + data_json TEXT NOT NULL DEFAULT '{}', + status TEXT NOT NULL DEFAULT 'success', + error_message TEXT DEFAULT '', + created_at TEXT NOT NULL +); + +CREATE INDEX IF NOT EXISTS idx_market_snapshots_type_time + ON market_snapshots(snapshot_type, snapshot_time DESC, id DESC); + diff --git a/app/db/scheduler_db.py b/app/db/scheduler_db.py index 66a8148..2e6afac 100644 --- a/app/db/scheduler_db.py +++ b/app/db/scheduler_db.py @@ -48,6 +48,16 @@ DEFAULT_JOBS = [ "description": "模拟交易账本同步", "sort_order": 25, }, + { + "job_name": "market", + "command": "market", + "args": [], + "every_seconds": 180, + "initial_delay": 35, + "lock_group": "market_write", + "description": "全市场快照采集", + "sort_order": 28, + }, { "job_name": "confirm", "command": "confirm", diff --git a/app/services/altcoin_screener.py b/app/services/altcoin_screener.py index 60ec765..b30087a 100644 --- a/app/services/altcoin_screener.py +++ b/app/services/altcoin_screener.py @@ -88,12 +88,29 @@ def get_dynamic_weights(): def fetch_all_tickers(): tickers = exchange.fetch_tickers() + try: + markets = exchange.markets or exchange.load_markets() + except Exception: + markets = {} usdt_pairs = {} universe_exclusions = [] for symbol, info in tickers.items(): if "/USDT" in symbol: base = symbol.split("/")[0] vol_usd = info.get("quoteVolume", 0) or 0 + market = markets.get(symbol) or {} + if market and (market.get("spot") is False or market.get("active") is False): + universe_exclusions.append({"symbol": symbol, "base": base, "price": info.get("last", 0) or 0, "volume_24h": vol_usd, "reason_code": "inactive_market", "reason_label": "非活跃现货交易对"}) + continue + ticker_dt = info.get("datetime") + if ticker_dt: + try: + ticker_time = datetime.fromisoformat(str(ticker_dt).replace("Z", "+00:00")).replace(tzinfo=None) + if datetime.utcnow() - ticker_time > timedelta(hours=36): + universe_exclusions.append({"symbol": symbol, "base": base, "price": info.get("last", 0) or 0, "volume_24h": vol_usd, "reason_code": "stale_ticker", "reason_label": "行情数据过旧"}) + continue + except Exception: + pass if base in STABLECOINS or base in WRAPPED or base in GOLD_METAL: reason = universe_gate_reason(base, vol_usd, 0, symbol=symbol) or {"reason_code": "excluded_base", "reason_label": "排除基础资产"} universe_exclusions.append({"symbol": symbol, "base": base, "price": info.get("last", 0) or 0, "volume_24h": vol_usd, **reason}) diff --git a/app/services/market_overview.py b/app/services/market_overview.py index 9e4524d..d2b1635 100644 --- a/app/services/market_overview.py +++ b/app/services/market_overview.py @@ -11,6 +11,7 @@ from datetime import datetime import requests +from app.db.market_db import get_latest_market_snapshot, save_market_error, save_market_snapshot from app.services import altcoin_screener @@ -139,7 +140,7 @@ def _funding_overview(universe_symbols=None): } -def get_crypto_market_overview(): +def compute_crypto_market_overview(): pairs = altcoin_screener.fetch_all_tickers() benchmarks = _benchmark_overview() items = [] @@ -193,4 +194,51 @@ def get_crypto_market_overview(): return overview -__all__ = ["get_crypto_market_overview"] +def collect_market_snapshot(): + try: + overview = compute_crypto_market_overview() + snapshot_id = save_market_snapshot(overview) + return { + "status": "success", + "snapshot_id": snapshot_id, + "updated_at": overview.get("updated_at"), + "sample_count": overview.get("sample_count", 0), + "top_gainers": [x.get("symbol") for x in overview.get("top_gainers", [])[:8]], + } + except Exception as exc: + try: + save_market_error(str(exc)) + except Exception: + pass + raise + + +def get_crypto_market_overview(*, allow_live_fallback=False): + snapshot = get_latest_market_snapshot(max_age_seconds=300) + if snapshot: + snapshot["snapshot_source"] = "database" + return snapshot + if not allow_live_fallback: + return { + "updated_at": "", + "source": "binance_spot_usdt_market", + "snapshot_source": "database", + "snapshot_missing": True, + "state": { + "label": "等待市场快照", + "tone": "neutral", + "summary": "市场采集任务尚未写入快照,请等待定时任务或在调度中心手动触发 market。", + }, + "benchmarks": {}, + "sample_count": 0, + "top_gainers": [], + "top_losers": [], + "top_volume": [], + "funding": {"sample_count": 0}, + } + overview = compute_crypto_market_overview() + overview["snapshot_source"] = "live_fallback" + return overview + + +__all__ = ["collect_market_snapshot", "compute_crypto_market_overview", "get_crypto_market_overview"] diff --git a/app/web/routes_market.py b/app/web/routes_market.py index 9e507aa..0bce2a1 100644 --- a/app/web/routes_market.py +++ b/app/web/routes_market.py @@ -1,4 +1,5 @@ from fastapi import APIRouter, Cookie +from fastapi.responses import JSONResponse from app.db.onchain_db import get_onchain_overview from app.services.market_overview import get_crypto_market_overview @@ -41,7 +42,7 @@ async def api_market_overview(hours: int = 24, altcoin_session: str = Cookie(def } except Exception: ai_analysis = {} - return { + payload = { "hours": int(hours or 24), "updated_at": crypto_market.get("updated_at"), "market": { @@ -52,3 +53,11 @@ async def api_market_overview(hours: int = 24, altcoin_session: str = Cookie(def "ai_analysis": ai_analysis, }, } + return JSONResponse( + payload, + headers={ + "Cache-Control": "no-cache, no-store, must-revalidate", + "Pragma": "no-cache", + "Expires": "0", + }, + ) diff --git a/static/market.html b/static/market.html index 03b429a..82b7609 100644 --- a/static/market.html +++ b/static/market.html @@ -2,7 +2,7 @@ {% block title %}AlphaX Agent — 市场总览{% endblock %} {% block extra_head_css %} {% endblock %} {% block content %} @@ -18,6 +18,7 @@ + 等待刷新 @@ -61,15 +62,16 @@ {% endblock %} diff --git a/tests/test_market_overview_api.py b/tests/test_market_overview_api.py index 8d14056..22483ca 100644 --- a/tests/test_market_overview_api.py +++ b/tests/test_market_overview_api.py @@ -30,7 +30,7 @@ def test_crypto_market_overview_uses_full_market_inputs(monkeypatch): "ETH/USDT": {"last": 5_000, "percentage": 2.1, "quoteVolume": 800_000_000}, }) - overview = market_overview.get_crypto_market_overview() + overview = market_overview.compute_crypto_market_overview() assert overview["source"] == "binance_spot_usdt_market" assert overview["sample_count"] == 4 @@ -68,3 +68,4 @@ def test_market_overview_api_returns_crypto_market_not_candidate_stats(monkeypat assert data["market"]["crypto_market"]["source"] == "binance_spot_usdt_market" assert "stats" not in data["market"] assert "market_context_overview" not in data["market"] + assert resp.headers["cache-control"] == "no-cache, no-store, must-revalidate" diff --git a/tests/test_screener_optimizations.py b/tests/test_screener_optimizations.py index 41cd670..eedce0c 100644 --- a/tests/test_screener_optimizations.py +++ b/tests/test_screener_optimizations.py @@ -1,5 +1,6 @@ import os import sys +from datetime import datetime import pandas as pd @@ -56,6 +57,37 @@ def test_fetch_all_tickers_filters_stable_and_fiat_suffixes(monkeypatch): assert "AUD/USDT" not in pairs +def test_fetch_all_tickers_filters_inactive_and_stale_markets(monkeypatch): + fresh_time = datetime.utcnow().isoformat(timespec="seconds") + "Z" + monkeypatch.setattr( + altcoin_screener.exchange, + "fetch_tickers", + lambda: { + "CREAM/USDT": {"last": 2.1, "percentage": 65, "quoteVolume": 200000, "datetime": "2026-05-08T15:24:40.529Z"}, + "PNT/USDT": {"last": 0.03, "percentage": 45, "quoteVolume": 200000, "datetime": fresh_time}, + "FIDA/USDT": {"last": 0.02, "percentage": 35, "quoteVolume": 2000000, "datetime": fresh_time}, + }, + ) + monkeypatch.setattr( + altcoin_screener.exchange, + "load_markets", + lambda: { + "CREAM/USDT": {"spot": True, "active": False}, + "PNT/USDT": {"spot": True, "active": False}, + "FIDA/USDT": {"spot": True, "active": True}, + }, + ) + altcoin_screener.exchange.markets = None + + pairs = altcoin_screener.fetch_all_tickers() + + assert "CREAM/USDT" not in pairs + assert "PNT/USDT" not in pairs + assert "FIDA/USDT" in pairs + exclusions = getattr(altcoin_screener.fetch_all_tickers, "last_universe_exclusions", []) + assert any(x["symbol"] == "CREAM/USDT" and x["reason_code"] == "inactive_market" for x in exclusions) + + def _mock_weights(): return { "量价齐飞": 5,