alphax/app/db/recommendation_queries.py
2026-05-24 20:44:22 +08:00

307 lines
11 KiB
Python

"""Recommendation and lifecycle-facing DB API."""
from datetime import datetime, timedelta
from app.db.recommendation_commands import apply_recommendation_state_transition
from app.db.recommendation_state import (
classify_recommendation_result as _classify_recommendation_result,
derive_execution_fields as _derive_execution_fields,
is_actionable_execution_status as _is_actionable_execution_status,
)
from app.db.push_queries import get_recommendation_for_push, log_push, should_push
from app.db.schema import get_conn
from app.db.tracking_queries import update_recommendation_tracking
from app.services.llm_insights import attach_recommendation_insights
def _attach_paper_order(item: dict) -> dict:
order_id = item.get("paper_order_id")
if not order_id:
item["paper_order"] = None
item["paper_order_status"] = ""
return item
order = {
"id": order_id,
"recommendation_id": item.get("paper_order_recommendation_id") or item.get("id"),
"symbol": item.get("paper_order_symbol") or item.get("symbol"),
"side": item.get("paper_order_side") or "long",
"order_type": item.get("paper_order_type") or "limit",
"status": item.get("paper_order_status_raw") or "",
"target_price": item.get("paper_order_target_price") or 0,
"current_price_at_create": item.get("paper_order_current_price_at_create") or 0,
"fill_price": item.get("paper_order_fill_price") or 0,
"stop_loss": item.get("paper_order_stop_loss") or 0,
"tp1": item.get("paper_order_tp1") or 0,
"tp2": item.get("paper_order_tp2") or 0,
"created_at": item.get("paper_order_created_at") or "",
"updated_at": item.get("paper_order_updated_at") or "",
"expires_at": item.get("paper_order_expires_at") or "",
"filled_at": item.get("paper_order_filled_at") or "",
"canceled_at": item.get("paper_order_canceled_at") or "",
"cancel_reason": item.get("paper_order_cancel_reason") or "",
}
item["paper_order"] = order
item["paper_order_status"] = order["status"]
return item
def get_active_recommendations(actionable_only: bool = False):
"""获取所有 active 推荐。"""
conn = get_conn()
rows = conn.execute(
"""
SELECT * FROM recommendation
WHERE status='active' AND COALESCE(display_bucket,'watch_pool') != 'history'
ORDER BY rec_time DESC
"""
).fetchall()
conn.close()
result = []
for row in rows:
item = _derive_execution_fields(dict(row))
if actionable_only and not _is_actionable_execution_status(item.get("execution_status")):
continue
result.append(item)
return _attach_onchain_context(result)
def _attach_onchain_context(items):
if not items:
return items
symbols = sorted({item.get("symbol") for item in items if item.get("symbol")})
if not symbols:
return items
placeholders = ",".join(["%s"] * len(symbols))
try:
conn = get_conn()
rows = conn.execute(
f"""
SELECT m.*
FROM onchain_token_metrics m
JOIN (
SELECT symbol, MAX(metric_time) AS max_time
FROM onchain_token_metrics
WHERE symbol IN ({placeholders})
GROUP BY symbol
) latest ON latest.symbol=m.symbol AND latest.max_time=m.metric_time
""",
tuple(symbols),
).fetchall()
events = conn.execute(
f"""
SELECT *
FROM onchain_events
WHERE symbol IN ({placeholders})
AND detected_at >= %s
ORDER BY detected_at::timestamp DESC, id DESC
""",
(*symbols, (datetime.now() - timedelta(hours=24)).isoformat()),
).fetchall()
conn.close()
except Exception:
return items
metrics = {row["symbol"]: dict(row) for row in rows}
by_symbol = {}
for row in events:
by_symbol.setdefault(row["symbol"], []).append(dict(row))
for item in items:
metric = metrics.get(item.get("symbol")) or {}
evs = by_symbol.get(item.get("symbol")) or []
if not metric and not evs:
continue
risk_events = [e for e in evs if e.get("direction") == "risk"]
positive_events = [e for e in evs if e.get("direction") == "positive"]
if risk_events:
headline = risk_events[0].get("signal_label") or "链上风险升温"
elif positive_events:
headline = positive_events[0].get("signal_label") or "链上资金异动"
else:
headline = "链上异动"
item["onchain_context"] = {
"headline": headline,
"chain": metric.get("chain") or (evs[0].get("chain") if evs else ""),
"onchain_score": metric.get("onchain_score") or 0,
"risk_score": metric.get("risk_score") or 0,
"dex_volume_usd": metric.get("dex_volume_usd") or 0,
"liquidity_usd": metric.get("liquidity_usd") or 0,
"event_count_24h": len(evs),
"risk_event_count_24h": len(risk_events),
"top_events": [
{
"signal_code": e.get("signal_code"),
"signal_label": e.get("signal_label"),
"direction": e.get("direction"),
"value_usd": e.get("value_usd") or 0,
"detected_at": e.get("detected_at"),
}
for e in evs[:3]
],
}
return items
def get_active_recommendations_deduped(
actionable_only: bool = True,
version: str = "",
hours: float = 0,
watch_symbols=None,
limit: int = 0,
offset: int = 0,
with_meta: bool = False,
):
"""同 symbol 只保留最新 active 推荐,并附带派生执行状态。"""
conn = get_conn()
where = "status='active' AND COALESCE(display_bucket,'watch_pool') != 'history'"
params = []
version = str(version or "").strip()
if version:
where += " AND strategy_version=%s"
params.append(version)
if watch_symbols:
symbols = [str(s).strip().upper() for s in watch_symbols if str(s).strip()]
if symbols:
where += " AND symbol IN (" + ",".join(["%s"] * len(symbols)) + ")"
params.extend(symbols)
try:
hours = float(hours or 0)
except Exception:
hours = 0
if hours > 0:
where += " AND rec_time >= %s"
params.append((datetime.now() - timedelta(hours=hours)).isoformat())
try:
limit = max(0, int(limit or 0))
except Exception:
limit = 0
try:
offset = max(0, int(offset or 0))
except Exception:
offset = 0
rows = conn.execute(
f"""
SELECT r.*,
lpc.price AS latest_cache_price,
lpc.updated_at AS latest_cache_updated_at,
po.id AS paper_order_id,
po.recommendation_id AS paper_order_recommendation_id,
po.symbol AS paper_order_symbol,
po.side AS paper_order_side,
po.order_type AS paper_order_type,
po.status AS paper_order_status_raw,
po.target_price AS paper_order_target_price,
po.current_price_at_create AS paper_order_current_price_at_create,
po.fill_price AS paper_order_fill_price,
po.stop_loss AS paper_order_stop_loss,
po.tp1 AS paper_order_tp1,
po.tp2 AS paper_order_tp2,
po.created_at AS paper_order_created_at,
po.updated_at AS paper_order_updated_at,
po.expires_at AS paper_order_expires_at,
po.filled_at AS paper_order_filled_at,
po.canceled_at AS paper_order_canceled_at,
po.cancel_reason AS paper_order_cancel_reason
FROM recommendation r
LEFT JOIN latest_price_cache lpc ON lpc.symbol = r.symbol
LEFT JOIN paper_orders po ON po.recommendation_id = r.id
JOIN (
SELECT symbol, MAX(id) AS max_id
FROM recommendation
WHERE {where}
GROUP BY symbol
) latest ON latest.max_id = r.id
ORDER BY r.rec_time DESC
""",
tuple(params),
).fetchall()
conn.close()
all_items = []
summary = {
"buy_now": 0,
"wait_pullback": 0,
"observe": 0,
"observe_strong": 0,
"observe_weak": 0,
"expired": 0,
"total": 0,
"discovery_burst": 0,
"executable_now": 0,
"planned_entry": 0,
"watch_pool": 0,
}
now = datetime.now()
for row in rows:
item = dict(row)
rec_result, rec_result_label = _classify_recommendation_result(item)
item["recommendation_result"] = rec_result
item["recommendation_result_label"] = rec_result_label
_derive_execution_fields(item)
_attach_paper_order(item)
is_expired = False
if hours > 0:
try:
rec_time = item.get("rec_time")
if rec_time:
is_expired = (now - datetime.fromisoformat(str(rec_time))).total_seconds() > hours * 3600
except Exception:
is_expired = False
if item.get("execution_status") == "invalid" or item.get("status") in ("invalid", "expired", "archived") or item.get("display_bucket") == "history":
is_expired = True
if is_expired:
summary["expired"] += 1
continue
if actionable_only and not _is_actionable_execution_status(item.get("execution_status")):
continue
all_items.append(item)
if item.get("is_discovery_burst"):
summary["discovery_burst"] += 1
if item.get("is_executable_now"):
summary["executable_now"] += 1
if item.get("execution_status") == "wait_pullback":
summary["planned_entry"] += 1
if item.get("is_watch_pool"):
summary["watch_pool"] += 1
if item.get("execution_status") == "buy_now":
summary["buy_now"] += 1
elif item.get("execution_status") == "wait_pullback":
summary["wait_pullback"] += 1
else:
summary["observe"] += 1
if item.get("observe_tier") == "weak":
summary["observe_weak"] += 1
else:
summary["observe_strong"] += 1
summary["total"] = len(all_items)
summary["expired_filtered"] = summary.pop("expired", 0)
if not with_meta:
_attach_onchain_context(all_items)
return attach_recommendation_insights(all_items)
page_items = all_items[offset : offset + limit] if limit else all_items[offset:]
_attach_onchain_context(page_items)
attach_recommendation_insights(page_items)
return {
"items": page_items,
"total": len(all_items),
"limit": limit,
"offset": offset,
"has_more": bool(limit and offset + len(page_items) < len(all_items)),
"summary": summary,
}
__all__ = [
"apply_recommendation_state_transition",
"get_active_recommendations",
"get_active_recommendations_deduped",
"get_recommendation_for_push",
"log_push",
"should_push",
"update_recommendation_tracking",
]