""" 信号存储服务 - 保存加密货币和美股的交易信号 """ import json import os from datetime import datetime from typing import Dict, List, Optional, Any from app.utils.logger import logger class SignalStorageService: """信号存储服务""" def __init__(self): """初始化服务""" self.storage_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), 'data', 'signals') os.makedirs(self.storage_dir, exist_ok=True) # 信号文件 self.crypto_file = os.path.join(self.storage_dir, 'crypto_signals.json') self.stock_file = os.path.join(self.storage_dir, 'stock_signals.json') # 加载现有信号 self._crypto_signals = self._load_signals(self.crypto_file) self._stock_signals = self._load_signals(self.stock_file) logger.info(f"信号存储服务初始化完成,加密货币信号: {len(self._crypto_signals)},美股信号: {len(self._stock_signals)}") def _load_signals(self, file_path: str) -> List[Dict[str, Any]]: """从文件加载信号""" if not os.path.exists(file_path): return [] try: with open(file_path, 'r', encoding='utf-8') as f: return json.load(f) except Exception as e: logger.error(f"加载信号失败 {file_path}: {e}") return [] def _save_signals(self, file_path: str, signals: List[Dict[str, Any]]): """保存信号到文件""" try: with open(file_path, 'w', encoding='utf-8') as f: json.dump(signals, f, ensure_ascii=False, indent=2) except Exception as e: logger.error(f"保存信号失败 {file_path}: {e}") def add_crypto_signal(self, signal: Dict[str, Any]): """添加加密货币信号""" # 添加时间戳和类型 signal['timestamp'] = datetime.now().isoformat() signal['signal_type'] = 'crypto' # 保存到内存 self._crypto_signals.insert(0, signal) # 只保留最近 100 条 if len(self._crypto_signals) > 100: self._crypto_signals = self._crypto_signals[:100] # 持久化 self._save_signals(self.crypto_file, self._crypto_signals) logger.info(f"添加加密货币信号: {signal.get('symbol', 'N/A')} - {signal.get('action', 'N/A')}") def add_stock_signal(self, signal: Dict[str, Any]): """添加美股信号""" # 添加时间戳和类型 signal['timestamp'] = datetime.now().isoformat() signal['signal_type'] = 'stock' # 保存到内存 self._stock_signals.insert(0, signal) # 只保留最近 100 条 if len(self._stock_signals) > 100: self._stock_signals = self._stock_signals[:100] # 持久化 self._save_signals(self.stock_file, self._stock_signals) logger.info(f"添加美股信号: {signal.get('symbol', 'N/A')} - {signal.get('action', 'N/A')}") def get_crypto_signals(self, limit: int = 50) -> List[Dict[str, Any]]: """获取加密货币信号列表""" return self._crypto_signals[:limit] def get_stock_signals(self, limit: int = 50) -> List[Dict[str, Any]]: """获取美股信号列表""" return self._stock_signals[:limit] def get_all_signals(self, limit: int = 100) -> Dict[str, List[Dict[str, Any]]]: """获取所有信号""" return { 'crypto': self._crypto_signals[:limit], 'stock': self._stock_signals[:limit] } def get_latest_signal(self, signal_type: str, symbol: str) -> Optional[Dict[str, Any]]: """获取指定交易对的最新信号""" if signal_type == 'crypto': signals = self._crypto_signals elif signal_type == 'stock': signals = self._stock_signals else: return None for signal in signals: if signal.get('symbol') == symbol: return signal return None def clear_old_signals(self, days: int = 7): """清理旧信号""" from datetime import timedelta cutoff_time = (datetime.now() - timedelta(days=days)).isoformat() # 清理加密货币信号 self._crypto_signals = [ s for s in self._crypto_signals if s.get('timestamp', '') >= cutoff_time ] self._save_signals(self.crypto_file, self._crypto_signals) # 清理美股信号 self._stock_signals = [ s for s in self._stock_signals if s.get('timestamp', '') >= cutoff_time ] self._save_signals(self.stock_file, self._stock_signals) logger.info(f"清理旧信号完成,保留 {days} 天内的信号") # 全局单例 _signal_storage: Optional[SignalStorageService] = None def get_signal_storage() -> SignalStorageService: """获取信号存储服务单例""" global _signal_storage if _signal_storage is None: _signal_storage = SignalStorageService() return _signal_storage