alphax/app/db/coin_state_queries.py
2026-05-20 00:57:46 +08:00

101 lines
3.9 KiB
Python

"""Legacy coin_state compatibility queries."""
import json
from datetime import datetime, timedelta
from app.config.config_loader import confirm_state_cooldown_hours
from app.db.schema import get_conn
def update_state(symbol, new_state, score=0, anomaly_type="", sector="",
leader_status="", detail=None):
"""Update legacy coin_state row used by screener/confirm compatibility flow."""
conn = get_conn()
row = conn.execute("SELECT * FROM coin_state WHERE symbol=%s", (symbol,)).fetchone()
if row:
old_state = row["state"]
old_score = row["score"]
last_alert_time = row["last_alert_time"] or ""
state_order = {"过期": 0, "蓄力": 1, "加速": 2, "爆发": 3}
should_alert = False
alert_level = "low"
if state_order.get(new_state, 0) > state_order.get(old_state, 0):
should_alert = True
alert_level = "high" if new_state == "爆发" else "medium" if new_state == "加速" else "low"
elif new_state == old_state:
cooldown_hours = confirm_state_cooldown_hours()
if last_alert_time:
try:
last_dt = datetime.fromisoformat(last_alert_time)
hours_since = (datetime.now() - last_dt).total_seconds() / 3600
if hours_since < cooldown_hours:
conn.close()
return {"should_alert": False, "alert_level": "none", "reason": f"状态冷却中({hours_since:.1f}h<{cooldown_hours}h)"}
except Exception:
pass
if score > old_score:
if not last_alert_time:
should_alert = True
alert_level = "medium"
else:
try:
last_dt = datetime.fromisoformat(last_alert_time)
hours_since = (datetime.now() - last_dt).total_seconds() / 3600
if hours_since >= 12:
should_alert = True
alert_level = "medium"
except Exception:
should_alert = True
alert_level = "medium"
conn.execute("""
UPDATE coin_state SET state=%s, score=%s, anomaly_type=%s, sector=%s,
leader_status=%s, detected_at=%s, detail_json=%s
WHERE symbol=%s
""", (
new_state, score, anomaly_type, sector, leader_status,
datetime.now().isoformat(),
json.dumps(detail, ensure_ascii=False, default=str) if detail else "{}",
symbol,
))
if should_alert:
conn.execute("""
UPDATE coin_state SET last_alert_time=%s, last_alert_level=%s WHERE symbol=%s
""", (datetime.now().isoformat(), alert_level, symbol))
conn.commit()
conn.close()
return {"should_alert": should_alert, "alert_level": alert_level}
conn.execute("""
INSERT INTO coin_state (symbol, state, score, anomaly_type, sector, leader_status, detected_at, last_alert_time, last_alert_level, detail_json)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
""", (
symbol, new_state, score, anomaly_type, sector, leader_status,
datetime.now().isoformat(), datetime.now().isoformat(), "low",
json.dumps(detail, ensure_ascii=False, default=str) if detail else "{}",
))
conn.commit()
conn.close()
return {"should_alert": True, "alert_level": "low"}
def get_all_active():
conn = get_conn()
rows = conn.execute("SELECT * FROM coin_state WHERE state != '过期'").fetchall()
conn.close()
return [dict(r) for r in rows]
def expire_old_states(hours=24):
conn = get_conn()
conn.execute("""
UPDATE coin_state SET state='过期' WHERE state != '过期'
AND detected_at < %s
""", ((datetime.now() - timedelta(hours=float(hours or 24))).isoformat(),))
conn.commit()
conn.close()