diff --git a/app/cli.py b/app/cli.py index abb4079..08a1b6d 100644 --- a/app/cli.py +++ b/app/cli.py @@ -30,6 +30,8 @@ def build_parser(): event = subparsers.add_parser("event", help="运行事件驱动筛选") event.add_argument("--no-process-existing", action="store_true", help="只处理本轮新采集事件") + event.add_argument("--limit", type=int, default=None, help="单轮最多处理事件数") + event.add_argument("--max-seconds", type=int, default=None, help="单轮最大运行秒数") sentiment = subparsers.add_parser("sentiment", help="运行舆情任务") sentiment.add_argument("--collect", action="store_true", help="采集并存储") @@ -73,7 +75,11 @@ def main(): if args.command == "review": return review_engine.run_review(push_enabled=not args.no_push, compact=args.compact) if args.command == "event": - result = event_driven_screener.run_once(process_existing=not args.no_process_existing) + result = event_driven_screener.run_once( + process_existing=not args.no_process_existing, + limit=args.limit, + max_seconds=args.max_seconds, + ) print(event_driven_screener.json.dumps(result, ensure_ascii=False, indent=2, default=str)) return result if args.command == "sentiment": diff --git a/app/db/scheduler_db.py b/app/db/scheduler_db.py index 0614dd6..e2d0d63 100644 --- a/app/db/scheduler_db.py +++ b/app/db/scheduler_db.py @@ -21,7 +21,7 @@ DEFAULT_JOBS = [ { "job_name": "event", "command": "event", - "args": [], + "args": ["--limit", "4", "--max-seconds", "50"], "every_seconds": 60, "initial_delay": 5, "lock_group": "recommendation_write", diff --git a/app/services/altcoin_screener.py b/app/services/altcoin_screener.py index 6e61ec7..ffa8843 100644 --- a/app/services/altcoin_screener.py +++ b/app/services/altcoin_screener.py @@ -102,9 +102,21 @@ def _fetch_spot_24h_tickers(): load_markets(), which is exactly the endpoint most likely to be IP-banned. The public spot 24h endpoint is enough for our broad universe scan. """ - resp = requests.get(f"{BINANCE_SPOT_BASE_URL}/api/v3/ticker/24hr", timeout=15) - resp.raise_for_status() - data = resp.json() + try: + resp = requests.get(f"{BINANCE_SPOT_BASE_URL}/api/v3/ticker/24hr", timeout=15) + resp.raise_for_status() + data = resp.json() + _write_spot_24h_ticker_cache(data) + except Exception as exc: + data = _read_spot_24h_ticker_cache(max_age_seconds=0) + if data is None: + print(f"Binance 24h ticker 拉取失败且无缓存,跳过本轮行情快照: {exc}") + _fetch_spot_24h_tickers.last_raw_count = 0 + _fetch_spot_24h_tickers.last_usdt_count = 0 + _fetch_spot_24h_tickers.last_error = str(exc)[:500] + return {} + print(f"Binance 24h ticker 拉取失败,使用本地缓存兜底: {exc}") + _fetch_spot_24h_tickers.last_error = str(exc)[:500] if not isinstance(data, list): return {} tickers = {} @@ -137,6 +149,38 @@ def _fetch_spot_24h_tickers(): return tickers +def _spot_24h_ticker_cache_path(): + return EXCHANGE_CACHE_DIR / "binance_spot_24h_tickers.json" + + +def _read_spot_24h_ticker_cache(max_age_seconds=300): + path = _spot_24h_ticker_cache_path() + try: + if not path.exists(): + return None + payload = json.loads(path.read_text(encoding="utf-8")) + fetched_at = float(payload.get("fetched_at") or 0) + if max_age_seconds and time.time() - fetched_at > max_age_seconds: + return None + data = payload.get("data") + return data if isinstance(data, list) else None + except Exception: + return None + + +def _write_spot_24h_ticker_cache(data): + try: + if not isinstance(data, list): + return + EXCHANGE_CACHE_DIR.mkdir(parents=True, exist_ok=True) + _spot_24h_ticker_cache_path().write_text( + json.dumps({"fetched_at": time.time(), "data": data}, ensure_ascii=False), + encoding="utf-8", + ) + except Exception: + pass + + def _spot_exchange_info_cache_path(): return EXCHANGE_CACHE_DIR / "binance_spot_exchange_info.json" diff --git a/app/services/event_driven_screener.py b/app/services/event_driven_screener.py index 0b26cda..8bcb66d 100644 --- a/app/services/event_driven_screener.py +++ b/app/services/event_driven_screener.py @@ -42,7 +42,10 @@ from app.services.altcoin_screener import ( from app.services.altcoin_confirm import fetch_derivatives_context from app.core.pa_engine import full_pa_analysis, calc_atr -exchange = ccxt.binance({"enableRateLimit": True}) +exchange = ccxt.binance({ + "enableRateLimit": True, + "timeout": int(os.getenv("ALPHAX_EVENT_CCXT_TIMEOUT_MS", "8000")), +}) LEVEL_RANK = {"S": 4, "A": 3, "B": 2, "C": 1, "D": 0, "RISK": 5} @@ -60,6 +63,22 @@ def _cfg(): return load_rules(force_reload=True).get("event_driven", {}) +def _positive_int(value, default): + try: + n = int(value) + return n if n > 0 else default + except Exception: + return default + + +def _process_limit(override=None): + return _positive_int(override, _positive_int(_cfg().get("max_process_events_per_run"), 4)) + + +def _max_run_seconds(override=None): + return _positive_int(override, _positive_int(_cfg().get("max_run_seconds"), 50)) + + def _parse_binance_time(ms): try: return datetime.fromtimestamp(int(ms) / 1000) @@ -249,19 +268,27 @@ def _event_copy_for_symbol(event, symbol, theme_name, theme, expanded=True): def expand_theme_events(events): """重大生态/主题事件扩散到同生态币,解决 TON/DOGS 这类联动行情漏选。""" - if not _theme_cfg().get("enabled", False): + theme_cfg = _theme_cfg() + if not theme_cfg.get("enabled", False): return events expanded = list(events) seen = {(e.get("source"), e.get("title"), e.get("symbol")) for e in expanded} + max_expanded = _positive_int(theme_cfg.get("max_expanded_symbols"), 12) for e in events: + expanded_for_event = 0 for theme_name, theme in _matched_themes(e.get("title", ""), e.get("symbol", "")): + if expanded_for_event >= max_expanded: + break direct = _event_copy_for_symbol(e, e.get("symbol"), theme_name, theme, expanded=False) key = (direct.get("source"), direct.get("title"), direct.get("symbol")) if direct.get("symbol") and key not in seen and _tradable_symbol(direct.get("symbol")): expanded.append(direct) seen.add(key) + expanded_for_event += 1 for base in theme.get("symbols", []): + if expanded_for_event >= max_expanded: + break symbol = f"{str(base).upper()}/USDT" if symbol == e.get("symbol") or not _tradable_symbol(symbol): continue @@ -270,6 +297,7 @@ def expand_theme_events(events): if key not in seen: expanded.append(child) seen.add(key) + expanded_for_event += 1 return expanded @@ -866,15 +894,25 @@ def load_unprocessed_events(limit=20): return events -def run_once(process_existing=True): +def run_once(process_existing=True, limit=None, max_seconds=None): started = _now() init_db() init_event_tables() + process_limit = _process_limit(limit) + runtime_limit = _max_run_seconds(max_seconds) collected = collect_events() stored = store_events(collected) - to_process = stored if stored else (load_unprocessed_events() if process_existing else []) + if stored: + to_process = stored[:process_limit] + else: + to_process = load_unprocessed_events(limit=process_limit) if process_existing else [] processed = [] + skipped_due_to_limit = max(len(stored) - len(to_process), 0) if stored else 0 + stopped_by_runtime = False for e in to_process: + if (_now() - started).total_seconds() >= runtime_limit: + stopped_by_runtime = True + break if isinstance(e.get("published_at"), str): e["published_at"] = datetime.fromisoformat(e["published_at"]) processed.append(process_event(e)) @@ -884,6 +922,10 @@ def run_once(process_existing=True): "collected_count": len(collected), "stored_count": len(stored), "processed_count": len(processed), + "skipped_due_to_limit": skipped_due_to_limit, + "stopped_by_runtime": stopped_by_runtime, + "process_limit": process_limit, + "max_run_seconds": runtime_limit, "decisions": {k: sum(1 for p in processed if p["result"]["decision"] == k) for k in ["recommend", "observe", "risk", "ignore"]}, "events": [ { @@ -908,7 +950,15 @@ def run_once(process_existing=True): started_at=started.isoformat(), finished_at=_now().isoformat(), duration_ms=int((_now() - started).total_seconds() * 1000), - summary={"stored_count": len(stored), "processed_count": len(processed), "decisions": output["decisions"]}, + summary={ + "stored_count": len(stored), + "processed_count": len(processed), + "skipped_due_to_limit": skipped_due_to_limit, + "stopped_by_runtime": stopped_by_runtime, + "process_limit": process_limit, + "max_run_seconds": runtime_limit, + "decisions": output["decisions"], + }, error_message="", ) return output @@ -919,8 +969,14 @@ def main(): parser = argparse.ArgumentParser(description="事件驱动舆情触发选币") parser.add_argument("--once", action="store_true") parser.add_argument("--no-process-existing", action="store_true") + parser.add_argument("--limit", type=int, default=None, help="单轮最多处理事件数") + parser.add_argument("--max-seconds", type=int, default=None, help="单轮最大运行秒数") args = parser.parse_args() - out = run_once(process_existing=not args.no_process_existing) + out = run_once( + process_existing=not args.no_process_existing, + limit=args.limit, + max_seconds=args.max_seconds, + ) print(json.dumps(out, ensure_ascii=False, indent=2, default=str)) diff --git a/docker/scheduler.py b/docker/scheduler.py index c57511c..644b116 100755 --- a/docker/scheduler.py +++ b/docker/scheduler.py @@ -214,8 +214,10 @@ def finish_running_jobs(running: dict[str, RunningJob]) -> None: proc = item.proc timeout = max(item.job.every_seconds * 2, 600) elapsed = time.time() - item.started_at + killed_by_timeout = False if proc.poll() is None and elapsed > timeout: proc.kill() + killed_by_timeout = True print(f"[{now_str()}] [scheduler] timeout {name} after {elapsed:.1f}s", flush=True) if proc.poll() is None: continue @@ -241,14 +243,30 @@ def finish_running_jobs(running: dict[str, RunningJob]) -> None: if output_tail: print(output_tail, flush=True) if exit_code != 0: + timed_out = killed_by_timeout or elapsed >= timeout + error_type = f"{name}_exit_{exit_code}" + message = f"scheduler job {name} failed with exit={exit_code}" + if timed_out and exit_code == -9: + error_type = f"{name}_timeout_killed" + message = f"scheduler job {name} exceeded timeout={int(timeout)}s and was killed" + elif exit_code == -9: + error_type = f"{name}_sigkill" + message = f"scheduler job {name} received SIGKILL before scheduler timeout; check container resources and external API stalls" record_system_error( source="scheduler", level="error", - error_type=f"{name}_exit_{exit_code}", - message=f"scheduler job {name} failed with exit={exit_code}", + error_type=error_type, + message=message, stack_trace=output_tail, status_code=exit_code, - context={"job_name": name, "run_kind": item.run_kind, "trigger_id": item.trigger_id}, + context={ + "job_name": name, + "run_kind": item.run_kind, + "trigger_id": item.trigger_id, + "duration_ms": duration_ms, + "timeout_seconds": int(timeout), + "killed_by_timeout": timed_out, + }, ) update_runtime( name, diff --git a/rules.yaml b/rules.yaml index 567f253..f505afe 100644 --- a/rules.yaml +++ b/rules.yaml @@ -303,6 +303,8 @@ event_driven: enabled: true poll_interval_min: 1 decision_target_seconds: 60 + max_process_events_per_run: 4 + max_run_seconds: 50 news_time_window_hours: 3 max_event_age_hours: 6 dedup_window_hours: 24 diff --git a/tests/test_event_driven_screener.py b/tests/test_event_driven_screener.py index da0e7dd..c2ba473 100644 --- a/tests/test_event_driven_screener.py +++ b/tests/test_event_driven_screener.py @@ -90,6 +90,28 @@ def test_theme_expansion_spreads_ton_news_to_ecosystem_symbols(): assert "主题扩散:ton_ecosystem" in by_symbol["DOGS/USDT"]["title"] +def test_theme_expansion_respects_max_expanded_symbols(monkeypatch): + event = { + "source": "coingecko_trending", + "symbol": "TON/USDT", + "title": "Telegram becomes the main driver of the TON ecosystem and cuts TON fees", + "url": "https://example.com/ton", + "published_at": datetime.now(), + "importance": "B", + "event_type": "market_heat", + "raw": {}, + } + + original_cfg = ed._cfg() + limited_cfg = json.loads(json.dumps(original_cfg)) + limited_cfg["theme_expansion"]["max_expanded_symbols"] = 2 + monkeypatch.setattr(ed, "_cfg", lambda: limited_cfg) + + expanded = ed.expand_theme_events([event]) + + assert len(expanded) <= 3 # 原始事件 + 最多 2 个扩散事件 + + def test_wublock_atom_feed_events_are_parsed_and_symbolized(monkeypatch): xml = """ @@ -186,3 +208,39 @@ def test_theme_static_accumulation_bonus_can_upgrade_to_recommend(): assert result["score"] >= 6 assert result["decision"] == "recommend" assert any("生态主题+强静K蓄力升权" in s for s in result["signals"]) + + +def test_run_once_caps_processed_events(monkeypatch): + events = [ + { + "source": "unit", + "symbol": f"T{i}/USDT", + "title": f"event {i}", + "url": "", + "published_at": datetime.now(), + "importance": "A", + "event_type": "news", + "event_hash": f"h{i}", + "raw": {}, + } + for i in range(6) + ] + processed = [] + + monkeypatch.setattr(ed, "init_db", lambda: None) + monkeypatch.setattr(ed, "init_event_tables", lambda: None) + monkeypatch.setattr(ed, "collect_events", lambda: events) + monkeypatch.setattr(ed, "store_events", lambda collected: collected) + monkeypatch.setattr(ed, "process_event", lambda event: processed.append(event["symbol"]) or { + "event": event, + "result": {"decision": "ignore", "score": 0, "reason": ""}, + "rec_id": 0, + "pushed": False, + }) + monkeypatch.setattr(ed, "log_cron_run", lambda **kwargs: None) + + result = ed.run_once(limit=2, max_seconds=30) + + assert result["processed_count"] == 2 + assert result["skipped_due_to_limit"] == 4 + assert processed == ["T0/USDT", "T1/USDT"] diff --git a/tests/test_screener_optimizations.py b/tests/test_screener_optimizations.py index 7d84d56..db70746 100644 --- a/tests/test_screener_optimizations.py +++ b/tests/test_screener_optimizations.py @@ -3,6 +3,7 @@ import sys from datetime import datetime import pandas as pd +import requests PROJECT_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) if PROJECT_DIR not in sys.path: @@ -119,6 +120,33 @@ def test_fetch_all_tickers_filters_inactive_and_stale_markets(monkeypatch): assert any(x["symbol"] == "DEAD/USDT" and x["reason_code"] == "inactive_market" for x in exclusions) +def test_fetch_spot_24h_tickers_uses_cache_when_dns_fails(monkeypatch, tmp_path): + monkeypatch.setattr(altcoin_screener, "EXCHANGE_CACHE_DIR", tmp_path) + cached_data = [ + { + "symbol": "AIUSDT", + "lastPrice": "1.23", + "priceChangePercent": "8.5", + "quoteVolume": "1234567", + "highPrice": "1.4", + "lowPrice": "1.0", + "closeTime": 1770000000000, + } + ] + altcoin_screener._write_spot_24h_ticker_cache(cached_data) + + def fail_get(*args, **kwargs): + raise requests.exceptions.ConnectionError("dns failed") + + monkeypatch.setattr(altcoin_screener.requests, "get", fail_get) + + tickers = altcoin_screener._fetch_spot_24h_tickers() + + assert tickers["AI/USDT"]["last"] == 1.23 + assert tickers["AI/USDT"]["percentage"] == 8.5 + assert tickers["AI/USDT"]["quoteVolume"] == 1234567 + + def _mock_weights(): return { "量价齐飞": 5,