stock-ai-agent/backend/app/services/signal_storage_service.py
2026-02-19 21:45:27 +08:00

148 lines
5.0 KiB
Python

"""
信号存储服务 - 保存加密货币和美股的交易信号
"""
import json
import os
from datetime import datetime
from typing import Dict, List, Optional, Any
from app.utils.logger import logger
class SignalStorageService:
"""信号存储服务"""
def __init__(self):
"""初始化服务"""
self.storage_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), 'data', 'signals')
os.makedirs(self.storage_dir, exist_ok=True)
# 信号文件
self.crypto_file = os.path.join(self.storage_dir, 'crypto_signals.json')
self.stock_file = os.path.join(self.storage_dir, 'stock_signals.json')
# 加载现有信号
self._crypto_signals = self._load_signals(self.crypto_file)
self._stock_signals = self._load_signals(self.stock_file)
logger.info(f"信号存储服务初始化完成,加密货币信号: {len(self._crypto_signals)},美股信号: {len(self._stock_signals)}")
def _load_signals(self, file_path: str) -> List[Dict[str, Any]]:
"""从文件加载信号"""
if not os.path.exists(file_path):
return []
try:
with open(file_path, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
logger.error(f"加载信号失败 {file_path}: {e}")
return []
def _save_signals(self, file_path: str, signals: List[Dict[str, Any]]):
"""保存信号到文件"""
try:
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(signals, f, ensure_ascii=False, indent=2)
except Exception as e:
logger.error(f"保存信号失败 {file_path}: {e}")
def add_crypto_signal(self, signal: Dict[str, Any]):
"""添加加密货币信号"""
# 添加时间戳和类型
signal['timestamp'] = datetime.now().isoformat()
signal['signal_type'] = 'crypto'
# 保存到内存
self._crypto_signals.insert(0, signal)
# 只保留最近 100 条
if len(self._crypto_signals) > 100:
self._crypto_signals = self._crypto_signals[:100]
# 持久化
self._save_signals(self.crypto_file, self._crypto_signals)
logger.info(f"添加加密货币信号: {signal.get('symbol', 'N/A')} - {signal.get('action', 'N/A')}")
def add_stock_signal(self, signal: Dict[str, Any]):
"""添加美股信号"""
# 添加时间戳和类型
signal['timestamp'] = datetime.now().isoformat()
signal['signal_type'] = 'stock'
# 保存到内存
self._stock_signals.insert(0, signal)
# 只保留最近 100 条
if len(self._stock_signals) > 100:
self._stock_signals = self._stock_signals[:100]
# 持久化
self._save_signals(self.stock_file, self._stock_signals)
logger.info(f"添加美股信号: {signal.get('symbol', 'N/A')} - {signal.get('action', 'N/A')}")
def get_crypto_signals(self, limit: int = 50) -> List[Dict[str, Any]]:
"""获取加密货币信号列表"""
return self._crypto_signals[:limit]
def get_stock_signals(self, limit: int = 50) -> List[Dict[str, Any]]:
"""获取美股信号列表"""
return self._stock_signals[:limit]
def get_all_signals(self, limit: int = 100) -> Dict[str, List[Dict[str, Any]]]:
"""获取所有信号"""
return {
'crypto': self._crypto_signals[:limit],
'stock': self._stock_signals[:limit]
}
def get_latest_signal(self, signal_type: str, symbol: str) -> Optional[Dict[str, Any]]:
"""获取指定交易对的最新信号"""
if signal_type == 'crypto':
signals = self._crypto_signals
elif signal_type == 'stock':
signals = self._stock_signals
else:
return None
for signal in signals:
if signal.get('symbol') == symbol:
return signal
return None
def clear_old_signals(self, days: int = 7):
"""清理旧信号"""
from datetime import timedelta
cutoff_time = (datetime.now() - timedelta(days=days)).isoformat()
# 清理加密货币信号
self._crypto_signals = [
s for s in self._crypto_signals
if s.get('timestamp', '') >= cutoff_time
]
self._save_signals(self.crypto_file, self._crypto_signals)
# 清理美股信号
self._stock_signals = [
s for s in self._stock_signals
if s.get('timestamp', '') >= cutoff_time
]
self._save_signals(self.stock_file, self._stock_signals)
logger.info(f"清理旧信号完成,保留 {days} 天内的信号")
# 全局单例
_signal_storage: Optional[SignalStorageService] = None
def get_signal_storage() -> SignalStorageService:
"""获取信号存储服务单例"""
global _signal_storage
if _signal_storage is None:
_signal_storage = SignalStorageService()
return _signal_storage