This commit is contained in:
aaron 2026-02-20 23:07:15 +08:00
parent 7235baaf61
commit 21ccd2aca3
7 changed files with 95 additions and 456 deletions

View File

@ -206,26 +206,19 @@ class CryptoAgent:
logger.info(f" 模拟交易: 已启用")
logger.info("=" * 60 + "\n")
# 启动 WebSocket 价格监控(替代轮询模式)
# 启动价格监控(轮询模式)
if self.paper_trading_enabled and self.symbols:
try:
from app.services.websocket_monitor import get_ws_price_monitor
self.ws_monitor = get_ws_price_monitor()
for symbol in self.symbols:
self.ws_monitor.subscribe_symbol(symbol)
# 同时注册回调(用于模拟交易的价格触发)
self.ws_monitor.add_price_callback(self._on_price_update)
logger.info(f"已启动 WebSocket 价格监控: {', '.join(self.symbols)}")
except Exception as e:
logger.error(f"WebSocket 价格监控启动失败,将使用轮询模式: {e}")
# 降级到轮询模式
from app.services.price_monitor_service import get_price_monitor_service
self.price_monitor = get_price_monitor_service()
for symbol in self.symbols:
self.price_monitor.subscribe_symbol(symbol)
self.price_monitor.add_price_callback(self._on_price_update)
logger.info(f"已启动轮询模式价格监控: {', '.join(self.symbols)}")
except Exception as e:
logger.error(f"价格监控启动失败: {e}")
# 发送启动通知
await self.feishu.send_text(
@ -267,11 +260,7 @@ class CryptoAgent:
"""停止运行"""
self.running = False
# 停止 WebSocket 价格监控
if hasattr(self, 'ws_monitor') and self.ws_monitor:
self.ws_monitor.stop()
# 停止轮询价格监控(如果存在)
# 停止轮询价格监控
if hasattr(self, 'price_monitor') and self.price_monitor:
self.price_monitor.stop()

View File

@ -21,18 +21,17 @@ _crypto_agent_task = None
async def price_monitor_loop():
"""后台价格监控循环 - 使用 WebSocket 实时检查止盈止损"""
"""后台价格监控循环 - 使用轮询检查止盈止损"""
from app.services.paper_trading_service import get_paper_trading_service
from app.services.binance_service import binance_service
from app.services.feishu_service import get_feishu_service
from app.services.telegram_service import get_telegram_service
from app.services.websocket_monitor import get_ws_price_monitor
logger.info("后台价格监控任务已启动(WebSocket 模式)")
logger.info("后台价格监控任务已启动(轮询模式)")
feishu = get_feishu_service()
telegram = get_telegram_service()
paper_trading = get_paper_trading_service()
ws_monitor = get_ws_price_monitor()
# 价格更新回调 - 检查止盈止损
async def on_price_update(symbol: str, price: float):
@ -104,43 +103,97 @@ async def price_monitor_loop():
except Exception as e:
logger.error(f"处理 {symbol} 价格更新失败: {e}")
# 注册 WebSocket 回调
ws_monitor.add_price_callback(lambda s, p: asyncio.create_task(on_price_update(s, p)))
# 持续监控活跃订单,动态订阅/取消订阅
monitored_symbols = set()
# 持续监控活跃订单
while True:
try:
# 获取活跃订单
active_orders = paper_trading.get_active_orders()
# 获取需要监控的交易对
current_symbols = set(order.get('symbol') for order in active_orders if order.get('symbol'))
# 订阅新交易对
new_symbols = current_symbols - monitored_symbols
for symbol in new_symbols:
ws_monitor.subscribe_symbol(symbol)
logger.info(f"后台监控订阅 {symbol}")
# 取消订阅不再需要的交易对(可选,这里保留订阅以便快速响应)
# unmonitored_symbols = monitored_symbols - current_symbols
# for symbol in unmonitored_symbols:
# ws_monitor.unsubscribe_symbol(symbol)
monitored_symbols = current_symbols
# 没有活跃订单时,等待 10 秒
if not active_orders:
await asyncio.sleep(10)
else:
# 有活跃订单时,每 30 秒检查一次订阅状态
await asyncio.sleep(30)
await asyncio.sleep(10) # 没有活跃订单时10秒检查一次
continue
# 获取所有需要的交易对
symbols = set(order.get('symbol') for order in active_orders if order.get('symbol'))
# 获取价格并检查止盈止损
for symbol in symbols:
try:
price = binance_service.get_current_price(symbol)
if not price:
continue
# 检查止盈止损
triggered = paper_trading.check_price_triggers(symbol, price)
# 发送通知
for result in triggered:
status = result.get('status', '')
event_type = result.get('event_type', 'order_closed')
# 处理挂单成交事件
if event_type == 'order_filled':
side_text = "做多" if result.get('side') == 'long' else "做空"
grade = result.get('signal_grade', 'N/A')
message = f"""✅ 挂单成交
交易对: {result.get('symbol')}
方向: {side_text}
等级: {grade}
挂单价: ${result.get('entry_price', 0):,.2f}
成交价: ${result.get('filled_price', 0):,.2f}
仓位: ${result.get('quantity', 0):,.0f}
止损: ${result.get('stop_loss', 0):,.2f}
止盈: ${result.get('take_profit', 0):,.2f}"""
# 发送通知
await feishu.send_text(message)
await telegram.send_message(message)
logger.info(f"后台监控触发挂单成交: {result.get('order_id')} | {symbol}")
continue
# 处理订单平仓事件
is_win = result.get('is_win', False)
if status == 'closed_tp':
emoji = "🎯"
status_text = "止盈平仓"
elif status == 'closed_sl':
emoji = "🛑"
status_text = "止损平仓"
elif status == 'closed_be':
emoji = "🔒"
status_text = "保本止损"
else:
emoji = "📤"
status_text = "平仓"
win_text = "盈利" if is_win else "亏损"
side_text = "做多" if result.get('side') == 'long' else "做空"
message = f"""{emoji} 订单{status_text}
交易对: {result.get('symbol')}
方向: {side_text}
入场: ${result.get('entry_price', 0):,.2f}
出场: ${result.get('exit_price', 0):,.2f}
{win_text}: {result.get('pnl_percent', 0):+.2f}% (${result.get('pnl_amount', 0):+.2f})
持仓时间: {result.get('hold_duration', 'N/A')}"""
# 发送通知
await feishu.send_text(message)
await telegram.send_message(message)
logger.info(f"后台监控触发平仓: {result.get('order_id')} | {symbol}")
except Exception as e:
logger.error(f"检查 {symbol} 价格失败: {e}")
# 每 3 秒检查一次
await asyncio.sleep(3)
except Exception as e:
logger.error(f"价格监控循环出错: {e}")
await asyncio.sleep(60)
await asyncio.sleep(5)
async def periodic_report_loop():

View File

@ -1,6 +1,6 @@
"""
Binance 数据服务 - 获取加密货币 K 线数据和技术指标
使用 requests 直接调用 REST API避免与 WebSocket 的事件循环冲突
使用 requests 直接调用 REST API
"""
import pandas as pd
import numpy as np

View File

@ -1,284 +0,0 @@
"""
WebSocket 价格监控服务 - 使用 Binance WebSocket API 实现实时价格推送
"""
import json
import asyncio
import threading
from typing import Dict, List, Callable, Optional, Set
from datetime import datetime
import websockets
from app.utils.logger import logger
class WebSocketPriceMonitor:
"""WebSocket 实时价格监控服务"""
# Binance WebSocket 端点
BASE_WS_URL = "wss://stream.binance.com:9443/ws"
def __init__(self):
"""初始化 WebSocket 价格监控服务"""
self._ws = None
self._loop = None
self._thread = None
self._running = False
self._subscribed_symbols: Set[str] = set()
self._price_callbacks: List[Callable[[str, float], None]] = []
self._latest_prices: Dict[str, float] = {}
self._lock = threading.Lock()
self._last_heartbeat: Optional[datetime] = None
# 连接和重连配置
self._reconnect_delay = 5 # 重连延迟(秒)
self._max_reconnect_attempts = 10
logger.info("WebSocket 价格监控服务初始化完成")
def is_running(self) -> bool:
"""检查服务是否在运行"""
return self._running and self._ws is not None and self._running
def subscribe_symbol(self, symbol: str):
"""
订阅交易对的价格推送
Args:
symbol: 交易对 "BTCUSDT"
"""
symbol = symbol.upper()
need_start = False
with self._lock:
if symbol in self._subscribed_symbols:
logger.debug(f"[WS:{id(self)}] {symbol} 已订阅,跳过")
return
self._subscribed_symbols.add(symbol)
# 检查是否需要启动服务
if not self.is_running():
need_start = True
# 在锁外启动服务(避免死锁)
if need_start:
self.start()
# 在锁外获取当前价格(避免阻塞)
self._fetch_current_price(symbol)
logger.info(f"[WS:{id(self)}] 已订阅 {symbol} 价格更新 (当前订阅: {self._subscribed_symbols})")
def unsubscribe_symbol(self, symbol: str):
"""取消订阅交易对"""
symbol = symbol.upper()
with self._lock:
if symbol in self._subscribed_symbols:
self._subscribed_symbols.discard(symbol)
self._latest_prices.pop(symbol, None)
logger.info(f"[WS:{id(self)}] 已取消订阅 {symbol}")
# 如果没有订阅了,可以考虑断开连接
if not self._subscribed_symbols:
logger.info(f"[WS:{id(self)}] 没有订阅的交易对,准备断开连接")
def add_price_callback(self, callback: Callable[[str, float], None]):
"""添加价格更新回调函数"""
with self._lock:
if callback not in self._price_callbacks:
self._price_callbacks.append(callback)
def remove_price_callback(self, callback: Callable):
"""移除价格回调函数"""
with self._lock:
if callback in self._price_callbacks:
self._price_callbacks.remove(callback)
def get_latest_price(self, symbol: str) -> Optional[float]:
"""获取交易对的最新缓存价格"""
return self._latest_prices.get(symbol.upper())
def get_subscribed_symbols(self) -> List[str]:
"""获取已订阅的交易对列表"""
with self._lock:
return list(self._subscribed_symbols)
def start(self):
"""启动 WebSocket 连接"""
with self._lock:
if self._running:
logger.debug(f"[WS:{id(self)}] WebSocket 服务已在运行")
return
self._running = True
# 在新线程中运行事件循环
self._thread = threading.Thread(target=self._run_event_loop, daemon=True)
self._thread.start()
def stop(self):
"""停止 WebSocket 连接"""
with self._lock:
if not self._running:
return
self._running = False
# 关闭 WebSocket 连接
if self._loop and self._loop.is_running():
self._loop.call_soon_threadsafe(self._close_ws())
if self._thread and self._thread.is_alive():
self._thread.join(timeout=5)
logger.info(f"[WS:{id(self)}] WebSocket 价格监控服务已停止")
def _run_event_loop(self):
"""运行 WebSocket 事件循环(在单独线程中)"""
# 创建新的事件循环
self._loop = asyncio.new_event_loop()
asyncio.set_event_loop(self._loop)
try:
self._loop.run_until_complete(self._connect_and_listen())
except Exception as e:
logger.error(f"[WS:{id(self)}] WebSocket 事件循环出错: {e}")
finally:
self._loop.close()
async def _connect_and_listen(self):
"""连接并监听 WebSocket 消息"""
retry_count = 0
while self._running and retry_count < self._max_reconnect_attempts:
try:
# 构建订阅流
with self._lock:
symbols = list(self._subscribed_symbols)
if not symbols:
# 没有订阅的交易对,等待订阅
logger.debug(f"[WS:{id(self)}] 没有订阅的交易对,等待 5 秒")
await asyncio.sleep(5)
continue
# 构建 WebSocket 流
streams = []
for symbol in symbols:
streams.append(f"{symbol.lower()}@ticker")
# Binance 组合流 URL 格式: /stream?streams=btcusdt@ticker/ethusdt@ticker
url = f"{self.BASE_WS_URL}/stream?streams={'/'.join(streams)}"
logger.info(f"[WS:{id(self)}] 正在连接 WebSocket... (订阅: {', '.join(symbols)})")
logger.debug(f"[WS:{id(self)}] WebSocket URL: {url}")
async with websockets.connect(url, ping_interval=30) as ws:
self._ws = ws
retry_count = 0 # 连接成功,重置重试计数
self._last_heartbeat = datetime.now()
logger.info(f"[WS:{id(self)}] WebSocket 已连接")
# 监听消息
async for message in self._ws:
await self._on_message(message)
except websockets.exceptions.ConnectionClosed as e:
logger.warning(f"[WS:{id(self)}] WebSocket 连接关闭: {e}")
except websockets.exceptions.ConnectionError as e:
logger.error(f"[WS:{id(self)}] WebSocket 连接错误: {e}")
except Exception as e:
logger.error(f"[WS:{id(self)}] WebSocket 异常: {e}")
# 检查是否需要重连
with self._lock:
should_reconnect = self._running and self._subscribed_symbols and retry_count < self._max_reconnect_attempts
if should_reconnect:
retry_count += 1
logger.info(f"[WS:{id(self)}] 将在 {self._reconnect_delay} 秒后重连... (尝试 {retry_count}/{self._max_reconnect_attempts})")
await asyncio.sleep(self._reconnect_delay)
else:
if self._running:
logger.warning(f"[WS:{id(self)}] 达到最大重连次数,停止服务")
self._running = False
break
async def _on_message(self, message):
"""处理 WebSocket 消息"""
try:
data = json.loads(message)
# 处理不同的消息类型
if data.get('e') == '24hrTicker': # 24小时价格变动
symbol = data.get('s')
if symbol:
# 解析价格
price = float(data.get('c', 0)) # 当前价格
self._update_price(symbol.upper(), price)
elif data.get('result') is not None and isinstance(data['result'], list):
# 多个交易对的价格推送
for item in data['result']:
symbol = item.get('s')
if symbol:
price = float(item.get('c', 0))
self._update_price(symbol.upper(), price)
except json.JSONDecodeError as e:
logger.error(f"[WS:{id(self)}] 解析 WebSocket 消息失败: {e}")
except Exception as e:
logger.error(f"[WS:{id(self)}] 处理 WebSocket 消息出错: {e}")
def _update_price(self, symbol: str, price: float):
"""更新价格并触发回调"""
old_price = self._latest_prices.get(symbol)
# 只有价格变化时才触发回调
if old_price != price:
self._latest_prices[symbol] = price
# 调用所有注册的回调函数
with self._lock:
callbacks = self._price_callbacks.copy()
# 在线程中执行回调
for callback in callbacks:
try:
callback(symbol, price)
except Exception as e:
logger.error(f"[WS:{id(self)}] 价格回调执行出错: {e}")
async def _close_ws(self):
"""关闭 WebSocket 连接"""
if self._ws:
await self._ws.close()
self._ws = None
logger.info(f"[WS:{id(self)}] WebSocket 连接已关闭")
def _fetch_current_price(self, symbol: str):
"""立即获取当前价格WebSocket 连接建立前的临时方案)"""
try:
import requests
url = f"https://api.binance.com/api/v3/ticker/price?symbol={symbol}"
response = requests.get(url, timeout=5)
if response.status_code == 200:
data = response.json()
price = float(data['price'])
self._latest_prices[symbol] = price
logger.debug(f"[WS:{id(self)}] 获取 {symbol} 当前价格: ${price:,.2f}")
except Exception as e:
logger.warning(f"[WS:{id(self)}] 获取 {symbol} 当前价格失败: {e}")
# 全局单例
_ws_monitor: Optional[WebSocketPriceMonitor] = None
def get_ws_price_monitor() -> WebSocketPriceMonitor:
"""获取 WebSocket 价格监控服务单例"""
global _ws_monitor
if _ws_monitor is None:
_ws_monitor = WebSocketPriceMonitor()
return _ws_monitor

View File

@ -10,7 +10,6 @@ pydantic==2.5.3
pydantic-settings==2.1.0
python-dotenv==1.0.0
slowapi==0.1.9
websockets>=13.0
pandas>=2.2.0
numpy>=1.26.0
python-multipart==0.0.6

View File

@ -34,6 +34,6 @@ if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
# 静默处理 Ctrl+C,避免 WebSocket 关闭时的错误刷屏
# 静默处理 Ctrl+C
print("\n程序已退出")
sys.exit(0)

View File

@ -1,118 +0,0 @@
#!/usr/bin/env python3
"""
测试 WebSocket 价格监控服务
"""
import sys
import os
import asyncio
import threading
import time
# 确保路径正确
script_dir = os.path.dirname(os.path.abspath(__file__))
project_root = os.path.dirname(script_dir)
backend_dir = os.path.join(project_root, 'backend')
sys.path.insert(0, backend_dir)
from app.services.websocket_monitor import get_ws_price_monitor
from app.utils.logger import logger
# 用于接收价格更新的队列
price_updates = []
def on_price_update(symbol: str, price: float):
"""价格更新回调"""
price_updates.append((symbol, price))
print(f"📊 [回调] {symbol}: ${price:,.2f}")
async def main():
print("=" * 60)
print("🔌 测试 WebSocket 价格监控服务")
print("=" * 60)
ws_monitor = get_ws_price_monitor()
# 订阅几个交易对
symbols = ['BTCUSDT', 'ETHUSDT']
print(f"\n订阅交易对: {', '.join(symbols)}")
for symbol in symbols:
print(f" 订阅 {symbol}...")
ws_monitor.subscribe_symbol(symbol)
# 注册回调
print("注册价格回调...")
ws_monitor.add_price_callback(on_price_update)
print("\n等待价格推送30秒...")
print("提示: WebSocket 连接可能需要几秒钟建立...")
print("-" * 60)
# 给 WebSocket 线程一些时间启动
await asyncio.sleep(2)
# 检查状态
print(f"\n📡 WebSocket 运行状态: {ws_monitor.is_running()}")
print(f"📡 已订阅交易对: {ws_monitor.get_subscribed_symbols()}")
print(f"📡 WebSocket 线程存活: {ws_monitor._thread.is_alive() if ws_monitor._thread else 'None'}")
print(f"📡 事件循环: {'已创建' if ws_monitor._loop else '未创建'}")
print("-" * 60)
# 等待价格更新
start_time = time.time()
last_price_count = 0
for i in range(30):
await asyncio.sleep(1)
# 每5秒打印一次状态
if (i + 1) % 5 == 0:
elapsed = time.time() - start_time
current_count = len(price_updates)
new_updates = current_count - last_price_count
last_price_count = current_count
print(f"\n⏱️ 已运行 {i+1} 秒 | 收到 {current_count} 次价格更新 (最近5秒: +{new_updates})")
print(f" WebSocket 状态: {'🟢 运行中' if ws_monitor.is_running() else '🔴 未运行'}")
# 显示当前缓存的价格
for symbol in symbols:
price = ws_monitor.get_latest_price(symbol)
if price:
print(f" 📊 {symbol}: ${price:,.2f}")
# 显示获取到的价格
print("\n" + "=" * 60)
print("📊 最终结果:")
print("=" * 60)
print(f" 共收到 {len(price_updates)} 次价格更新")
for symbol in symbols:
price = ws_monitor.get_latest_price(symbol)
if price:
print(f" {symbol}: ${price:,.2f}")
else:
print(f" {symbol}: 未获取到价格")
# 停止服务
print("\n停止服务...")
ws_monitor.stop()
# 等待线程结束
if ws_monitor._thread:
ws_monitor._thread.join(timeout=3)
print(f"线程已结束: {not ws_monitor._thread.is_alive()}")
print("=" * 60)
print("✅ 测试完成")
print("=" * 60)
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\n\n⚠️ 测试中断")
sys.exit(0)