alphax/app/services/paper_trader.py
2026-06-08 00:15:54 +08:00

98 lines
4.1 KiB
Python

"""Paper trading job entrypoint."""
from __future__ import annotations
import json
from datetime import datetime
import ccxt
from app.db.altcoin_db import init_db, log_cron_run, update_latest_price_cache
from app.db.paper_trading import get_paper_trading_summary, sync_pending_paper_orders, sync_recommendation
from app.db.recommendation_queries import get_active_recommendations_deduped
from app.services.live_trading_sync import sync_live_protection_from_paper, sync_paper_trade_to_live
exchange = ccxt.binance({"enableRateLimit": True})
def run_once(limit: int = 100) -> dict:
init_db()
recs = get_active_recommendations_deduped(actionable_only=True, limit=limit, with_meta=False)
results = []
failed = []
for rec in recs:
symbol = rec.get("symbol")
try:
ticker = exchange.fetch_ticker(symbol)
current_price = float(ticker["last"] or 0)
event_time = datetime.now().isoformat()
update_latest_price_cache(symbol, current_price, updated_at=event_time, source="paper_trader")
result = sync_recommendation(rec, current_price, event_time=event_time)
if result.get("trade_id") and (result.get("opened") or result.get("paper_order", {}).get("filled")):
result["live_sync"] = sync_paper_trade_to_live(int(result["trade_id"]), execute=True)
result.update({"symbol": symbol, "rec_id": rec.get("id"), "current_price": current_price})
results.append(result)
except Exception as exc:
failed.append({"symbol": symbol, "error": str(exc)})
pending_result = sync_pending_paper_orders(limit=limit, event_time=datetime.now().isoformat())
for item in pending_result.get("results", []):
if item.get("trade_id") and (item.get("opened") or item.get("paper_order", {}).get("filled")):
item["live_sync"] = sync_paper_trade_to_live(int(item["trade_id"]), execute=True)
protection_result = sync_live_protection_from_paper(limit=limit)
output = {
"status": "completed",
"processed_count": len(results),
"pending_processed_count": pending_result.get("processed_count", 0),
"pending_filled_count": pending_result.get("filled_count", 0),
"failed_count": len(failed),
"failed": failed,
"results": results,
"pending_results": pending_result.get("results", []),
"live_protection_sync": protection_result,
"summary": get_paper_trading_summary(days=30),
"run_time": datetime.now().isoformat(),
}
print(json.dumps(output, ensure_ascii=False, indent=2, default=str))
return output
def main(limit: int = 100):
started_at = datetime.now()
try:
output = run_once(limit=limit)
except Exception as exc:
finished_at = datetime.now()
log_cron_run(
job_name="策略交易",
script_name="paper_trader.py",
run_status="error",
result_status="exception",
started_at=started_at.isoformat(),
finished_at=finished_at.isoformat(),
duration_ms=int((finished_at - started_at).total_seconds() * 1000),
summary={},
error_message=str(exc),
)
raise
finished_at = datetime.now()
log_cron_run(
job_name="策略交易",
script_name="paper_trader.py",
run_status="success",
result_status=output.get("status", "completed"),
started_at=started_at.isoformat(),
finished_at=finished_at.isoformat(),
duration_ms=int((finished_at - started_at).total_seconds() * 1000),
summary={
"processed_count": output.get("processed_count", 0),
"pending_processed_count": output.get("pending_processed_count", 0),
"pending_filled_count": output.get("pending_filled_count", 0),
"failed_count": output.get("failed_count", 0),
"open_count": output.get("summary", {}).get("open_count", 0),
"closed_count": output.get("summary", {}).get("closed_count", 0),
},
error_message="",
)
return output