"""Legacy coin_state compatibility queries.""" import json from datetime import datetime, timedelta from app.config.config_loader import confirm_state_cooldown_hours from app.db.schema import get_conn def update_state(symbol, new_state, score=0, anomaly_type="", sector="", leader_status="", detail=None): """Update legacy coin_state row used by screener/confirm compatibility flow.""" conn = get_conn() row = conn.execute("SELECT * FROM coin_state WHERE symbol=%s", (symbol,)).fetchone() if row: old_state = row["state"] old_score = row["score"] last_alert_time = row["last_alert_time"] or "" state_order = {"过期": 0, "蓄力": 1, "加速": 2, "爆发": 3} should_alert = False alert_level = "low" if state_order.get(new_state, 0) > state_order.get(old_state, 0): should_alert = True alert_level = "high" if new_state == "爆发" else "medium" if new_state == "加速" else "low" elif new_state == old_state: cooldown_hours = confirm_state_cooldown_hours() if last_alert_time: try: last_dt = datetime.fromisoformat(last_alert_time) hours_since = (datetime.now() - last_dt).total_seconds() / 3600 if hours_since < cooldown_hours: conn.close() return {"should_alert": False, "alert_level": "none", "reason": f"状态冷却中({hours_since:.1f}h<{cooldown_hours}h)"} except Exception: pass if score > old_score: if not last_alert_time: should_alert = True alert_level = "medium" else: try: last_dt = datetime.fromisoformat(last_alert_time) hours_since = (datetime.now() - last_dt).total_seconds() / 3600 if hours_since >= 12: should_alert = True alert_level = "medium" except Exception: should_alert = True alert_level = "medium" conn.execute(""" UPDATE coin_state SET state=%s, score=%s, anomaly_type=%s, sector=%s, leader_status=%s, detected_at=%s, detail_json=%s WHERE symbol=%s """, ( new_state, score, anomaly_type, sector, leader_status, datetime.now().isoformat(), json.dumps(detail, ensure_ascii=False, default=str) if detail else "{}", symbol, )) if should_alert: conn.execute(""" UPDATE coin_state SET last_alert_time=%s, last_alert_level=%s WHERE symbol=%s """, (datetime.now().isoformat(), alert_level, symbol)) conn.commit() conn.close() return {"should_alert": should_alert, "alert_level": alert_level} conn.execute(""" INSERT INTO coin_state (symbol, state, score, anomaly_type, sector, leader_status, detected_at, last_alert_time, last_alert_level, detail_json) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """, ( symbol, new_state, score, anomaly_type, sector, leader_status, datetime.now().isoformat(), datetime.now().isoformat(), "low", json.dumps(detail, ensure_ascii=False, default=str) if detail else "{}", )) conn.commit() conn.close() return {"should_alert": True, "alert_level": "low"} def get_all_active(): conn = get_conn() rows = conn.execute("SELECT * FROM coin_state WHERE state != '过期'").fetchall() conn.close() return [dict(r) for r in rows] def expire_old_states(hours=24): conn = get_conn() conn.execute(""" UPDATE coin_state SET state='过期' WHERE state != '过期' AND detected_at < %s """, ((datetime.now() - timedelta(hours=float(hours or 24))).isoformat(),)) conn.commit() conn.close()