From e155274828cfa0417ef55370c69b13b355aadcd9 Mon Sep 17 00:00:00 2001 From: aaron <> Date: Thu, 19 Feb 2026 21:45:27 +0800 Subject: [PATCH] update --- backend/app/api/signals.py | 186 +++++++ backend/app/crypto_agent/crypto_agent.py | 9 + backend/app/main.py | 11 +- backend/app/models/signal.py | 91 ++++ .../app/services/signal_database_service.py | 290 +++++++++++ .../app/services/signal_storage_service.py | 147 ++++++ backend/app/stock_agent/stock_agent.py | 9 + frontend/signals.html | 459 ++++++++++++++++++ 8 files changed, 1201 insertions(+), 1 deletion(-) create mode 100644 backend/app/api/signals.py create mode 100644 backend/app/models/signal.py create mode 100644 backend/app/services/signal_database_service.py create mode 100644 backend/app/services/signal_storage_service.py create mode 100644 frontend/signals.html diff --git a/backend/app/api/signals.py b/backend/app/api/signals.py new file mode 100644 index 0000000..49808ec --- /dev/null +++ b/backend/app/api/signals.py @@ -0,0 +1,186 @@ +""" +信号 API - 提供加密货币和美股信号查询接口(数据库版本) +""" +from fastapi import APIRouter, HTTPException, Query +from typing import Dict, List, Optional, Any + +from app.services.signal_database_service import get_signal_db_service +from app.utils.logger import logger + + +router = APIRouter(prefix="/api/signals", tags=["信号管理"]) + + +@router.get("/crypto") +async def get_crypto_signals( + limit: int = Query(50, ge=1, le=200, description="返回数量限制"), + symbol: Optional[str] = Query(None, description="过滤指定交易对"), + days: int = Query(7, ge=1, le=30, description="查询最近多少天的信号") +) -> Dict[str, Any]: + """ + 获取加密货币信号列表 + + Args: + limit: 返回数量限制(默认50) + symbol: 过滤指定交易对 + days: 查询最近多少天的信号(默认7天) + + Returns: + 信号列表 + """ + try: + service = get_signal_db_service() + + if symbol: + # 获取指定交易对的最新信号 + signal = service.get_latest_signal('crypto', symbol) + return { + 'success': True, + 'symbol': symbol, + 'signal': signal, + 'count': 1 if signal else 0 + } + else: + # 获取所有信号 + signals = service.get_crypto_signals(limit=limit, days=days) + return { + 'success': True, + 'signals': signals, + 'count': len(signals) + } + except Exception as e: + logger.error(f"获取加密货币信号失败: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + +@router.get("/stock") +async def get_stock_signals( + limit: int = Query(50, ge=1, le=200, description="返回数量限制"), + symbol: Optional[str] = Query(None, description="过滤指定股票"), + days: int = Query(7, ge=1, le=30, description="查询最近多少天的信号") +) -> Dict[str, Any]: + """ + 获取美股信号列表 + + Args: + limit: 返回数量限制(默认50) + symbol: 过滤指定股票 + days: 查询最近多少天的信号(默认7天) + + Returns: + 信号列表 + """ + try: + service = get_signal_db_service() + + if symbol: + # 获取指定股票的最新信号 + signal = service.get_latest_signal('stock', symbol) + return { + 'success': True, + 'symbol': symbol, + 'signal': signal, + 'count': 1 if signal else 0 + } + else: + # 获取所有信号 + signals = service.get_stock_signals(limit=limit, days=days) + return { + 'success': True, + 'signals': signals, + 'count': len(signals) + } + except Exception as e: + logger.error(f"获取美股信号失败: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + +@router.get("/all") +async def get_all_signals( + limit: int = Query(50, ge=1, le=200, description="每种类型返回数量限制"), + days: int = Query(7, ge=1, le=30, description="查询最近多少天的信号") +) -> Dict[str, Any]: + """ + 获取所有信号(加密货币 + 美股) + + Args: + limit: 每种类型返回数量限制(默认50) + days: 查询最近多少天的信号(默认7天) + + Returns: + 所有信号 + """ + try: + service = get_signal_db_service() + signals = service.get_all_signals(limit=limit, days=days) + + return { + 'success': True, + 'crypto': { + 'signals': signals['crypto'], + 'count': len(signals['crypto']) + }, + 'stock': { + 'signals': signals['stock'], + 'count': len(signals['stock']) + }, + 'total_count': len(signals['crypto']) + len(signals['stock']) + } + except Exception as e: + logger.error(f"获取所有信号失败: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + +@router.get("/latest") +async def get_latest_signals( + limit: int = Query(20, ge=1, le=100, description="返回数量限制"), + days: int = Query(7, ge=1, le=30, description="查询最近多少天的信号") +) -> Dict[str, Any]: + """ + 获取最新的所有信号(按时间排序) + + Args: + limit: 返回数量限制(默认20) + days: 查询最近多少天的信号(默认7天) + + Returns: + 最新信号列表 + """ + try: + service = get_signal_db_service() + signals = service.get_latest_signals(limit=limit, days=days) + + return { + 'success': True, + 'signals': signals, + 'count': len(signals) + } + except Exception as e: + logger.error(f"获取最新信号失败: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + +@router.get("/stats") +async def get_signal_stats( + days: int = Query(7, ge=1, le=30, description="统计最近多少天的信号") +) -> Dict[str, Any]: + """ + 获取信号统计信息 + + Args: + days: 统计最近多少天的信号(默认7天) + + Returns: + 统计数据 + """ + try: + service = get_signal_db_service() + stats = service.get_signal_stats(days=days) + + return { + 'success': True, + **stats + } + except Exception as e: + logger.error(f"获取信号统计失败: {e}") + raise HTTPException(status_code=500, detail=str(e)) diff --git a/backend/app/crypto_agent/crypto_agent.py b/backend/app/crypto_agent/crypto_agent.py index 2cea9bc..9fa0e4e 100644 --- a/backend/app/crypto_agent/crypto_agent.py +++ b/backend/app/crypto_agent/crypto_agent.py @@ -13,6 +13,7 @@ from app.services.feishu_service import get_feishu_service from app.services.telegram_service import get_telegram_service from app.services.paper_trading_service import get_paper_trading_service from app.services.price_monitor_service import get_price_monitor_service +from app.services.signal_database_service import get_signal_db_service from app.crypto_agent.llm_signal_analyzer import LLMSignalAnalyzer @@ -26,6 +27,7 @@ class CryptoAgent: self.feishu = get_feishu_service() self.telegram = get_telegram_service() self.llm_analyzer = LLMSignalAnalyzer() + self.signal_db = get_signal_db_service() # 信号数据库服务 # 模拟交易服务 self.paper_trading_enabled = self.settings.paper_trading_enabled @@ -352,6 +354,13 @@ class CryptoAgent: logger.info(f" ✅ 已发送信号通知") + # 保存信号到数据库 + signal_to_save = best_signal.copy() + signal_to_save['signal_type'] = 'crypto' + signal_to_save['symbol'] = symbol + signal_to_save['current_price'] = current_price + self.signal_db.add_signal(signal_to_save) + # 更新状态 self.last_signals[symbol] = best_signal self.signal_cooldown[symbol] = datetime.now() diff --git a/backend/app/main.py b/backend/app/main.py index 4b89574..3c0e541 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -9,7 +9,7 @@ from fastapi.responses import FileResponse from contextlib import asynccontextmanager from app.config import get_settings from app.utils.logger import logger -from app.api import chat, stock, skills, llm, auth, admin, paper_trading, stocks +from app.api import chat, stock, skills, llm, auth, admin, paper_trading, stocks, signals import os @@ -233,6 +233,7 @@ app.include_router(skills.router, prefix="/api/skills", tags=["技能管理"]) app.include_router(llm.router, tags=["LLM模型"]) app.include_router(paper_trading.router, tags=["模拟交易"]) app.include_router(stocks.router, prefix="/api/stocks", tags=["美股分析"]) +app.include_router(signals.router, tags=["信号管理"]) # 挂载静态文件 frontend_path = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "frontend") @@ -268,6 +269,14 @@ async def paper_trading_page(): return FileResponse(page_path) return {"message": "页面不存在"} +@app.get("/signals") +async def signals_page(): + """信号列表页面""" + page_path = os.path.join(frontend_path, "signals.html") + if os.path.exists(page_path): + return FileResponse(page_path) + return {"message": "页面不存在"} + if __name__ == "__main__": import uvicorn uvicorn.run( diff --git a/backend/app/models/signal.py b/backend/app/models/signal.py new file mode 100644 index 0000000..1c1787f --- /dev/null +++ b/backend/app/models/signal.py @@ -0,0 +1,91 @@ +""" +交易信号数据库模型 +""" +from datetime import datetime +from sqlalchemy import Column, Integer, String, Text, DateTime, JSON, Float, Boolean +from sqlalchemy.orm import relationship + +from app.models.database import Base + + +class TradingSignal(Base): + """交易信号表""" + __tablename__ = "trading_signals" + + id = Column(Integer, primary_key=True, index=True) + + # 信号基本信息 + signal_type = Column(String(20), nullable=False, index=True) # 'crypto' or 'stock' + symbol = Column(String(50), nullable=False, index=True) # 交易对或股票代码 + + # 信号方向和评级 + action = Column(String(10), nullable=False) # 'buy', 'sell', 'hold' + grade = Column(String(5), nullable=False) # 'A', 'B', 'C', 'D' + confidence = Column(Float, nullable=False) # 置信度 0-100 + + # 价格信息 + entry_price = Column(Float, nullable=True) + stop_loss = Column(Float, nullable=True) + take_profit = Column(Float, nullable=True) + current_price = Column(Float, nullable=True) # 信号生成时的当前价格 + + # 信号详情 + signal_type_detail = Column(String(20), nullable=True) # 'short_term', 'medium_term', 'long_term' + entry_type = Column(String(10), nullable=True) # 'market', 'limit' + position_size = Column(String(20), nullable=True) # 'light', 'medium', 'heavy' + + # 分析信息 + reason = Column(Text, nullable=True) # 信号理由 + risk_warning = Column(Text, nullable=True) # 风险提示 + analysis_summary = Column(Text, nullable=True) # 分析摘要 + news_sentiment = Column(String(20), nullable=True) # 新闻情绪 + news_impact = Column(String(100), nullable=True) # 消息影响 + + # 关键价位 + key_levels = Column(JSON, nullable=True) # 支撑位和阻力位 + + # 技术指标(JSON 格式存储) + indicators = Column(JSON, nullable=True) + + # 状态 + is_active = Column(Boolean, default=True) # 信号是否有效 + notified = Column(Boolean, default=False) # 是否已发送通知 + notification_sent_at = Column(DateTime, nullable=True) + + # 时间戳 + created_at = Column(DateTime, default=datetime.utcnow, index=True) + updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) + + def __repr__(self): + return f"" + + def to_dict(self): + """转换为字典""" + return { + 'id': self.id, + 'signal_type': self.signal_type, + 'symbol': self.symbol, + 'action': self.action, + 'grade': self.grade, + 'confidence': self.confidence, + 'entry_price': self.entry_price, + 'stop_loss': self.stop_loss, + 'take_profit': self.take_profit, + 'current_price': self.current_price, + 'signal_type_detail': self.signal_type_detail, + 'entry_type': self.entry_type, + 'position_size': self.position_size, + 'reason': self.reason, + 'risk_warning': self.risk_warning, + 'analysis_summary': self.analysis_summary, + 'news_sentiment': self.news_sentiment, + 'news_impact': self.news_impact, + 'key_levels': self.key_levels, + 'indicators': self.indicators, + 'is_active': self.is_active, + 'notified': self.notified, + 'notification_sent_at': self.notification_sent_at.isoformat() if self.notification_sent_at else None, + 'created_at': self.created_at.isoformat() if self.created_at else None, + 'updated_at': self.updated_at.isoformat() if self.updated_at else None, + 'timestamp': self.created_at.isoformat() if self.created_at else None + } diff --git a/backend/app/services/signal_database_service.py b/backend/app/services/signal_database_service.py new file mode 100644 index 0000000..e2423f2 --- /dev/null +++ b/backend/app/services/signal_database_service.py @@ -0,0 +1,290 @@ +""" +交易信号数据库服务 +""" +from typing import Dict, List, Optional, Any +from datetime import datetime, timedelta +from sqlalchemy.orm import Session +from sqlalchemy import desc, and_, or_ + +from app.models.signal import TradingSignal +from app.models.database import SessionLocal, engine, Base +from app.utils.logger import logger + + +class SignalDatabaseService: + """交易信号数据库服务""" + + def __init__(self): + """初始化服务""" + self._ensure_tables() + + def _ensure_tables(self): + """确保表已创建""" + try: + Base.metadata.create_all(bind=engine) + logger.info("交易信号表已创建") + except Exception as e: + logger.error(f"创建交易信号表失败: {e}") + + def _get_db(self) -> Session: + """获取数据库会话""" + db = SessionLocal() + try: + return db + except Exception as e: + logger.error(f"获取数据库会话失败: {e}") + raise + + def add_signal(self, signal_data: Dict[str, Any]) -> Optional[TradingSignal]: + """添加信号到数据库""" + db = self._get_db() + try: + # 创建信号对象 + signal = TradingSignal( + signal_type=signal_data.get('signal_type', 'crypto'), + symbol=signal_data.get('symbol', ''), + action=signal_data.get('action', 'hold'), + grade=signal_data.get('grade', 'D'), + confidence=signal_data.get('confidence', 0), + entry_price=signal_data.get('entry_price'), + stop_loss=signal_data.get('stop_loss'), + take_profit=signal_data.get('take_profit'), + current_price=signal_data.get('current_price'), + signal_type_detail=signal_data.get('type'), + entry_type=signal_data.get('entry_type'), + position_size=signal_data.get('position_size'), + reason=signal_data.get('reason'), + risk_warning=signal_data.get('risk_warning'), + analysis_summary=signal_data.get('analysis_summary'), + news_sentiment=signal_data.get('news_sentiment'), + news_impact=signal_data.get('news_impact'), + key_levels=signal_data.get('key_levels'), + indicators=signal_data.get('indicators'), + notified=True, + notification_sent_at=datetime.utcnow() + ) + + db.add(signal) + db.commit() + db.refresh(signal) + + logger.info(f"保存信号到数据库: {signal.signal_type} {signal.symbol} {signal.action} {signal.grade}") + return signal + + except Exception as e: + db.rollback() + logger.error(f"保存信号失败: {e}") + return None + finally: + db.close() + + def get_crypto_signals( + self, + limit: int = 50, + symbol: Optional[str] = None, + days: int = 7 + ) -> List[Dict[str, Any]]: + """获取加密货币信号""" + db = self._get_db() + try: + cutoff_time = datetime.utcnow() - timedelta(days=days) + + query = db.query(TradingSignal).filter( + TradingSignal.signal_type == 'crypto', + TradingSignal.created_at >= cutoff_time + ) + + if symbol: + query = query.filter(TradingSignal.symbol == symbol.upper()) + + signals = query.order_by(desc(TradingSignal.created_at)).limit(limit).all() + + return [signal.to_dict() for signal in signals] + + except Exception as e: + logger.error(f"获取加密货币信号失败: {e}") + return [] + finally: + db.close() + + def get_stock_signals( + self, + limit: int = 50, + symbol: Optional[str] = None, + days: int = 7 + ) -> List[Dict[str, Any]]: + """获取美股信号""" + db = self._get_db() + try: + cutoff_time = datetime.utcnow() - timedelta(days=days) + + query = db.query(TradingSignal).filter( + TradingSignal.signal_type == 'stock', + TradingSignal.created_at >= cutoff_time + ) + + if symbol: + query = query.filter(TradingSignal.symbol == symbol.upper()) + + signals = query.order_by(desc(TradingSignal.created_at)).limit(limit).all() + + return [signal.to_dict() for signal in signals] + + except Exception as e: + logger.error(f"获取美股信号失败: {e}") + return [] + finally: + db.close() + + def get_all_signals(self, limit: int = 100, days: int = 7) -> Dict[str, List[Dict[str, Any]]]: + """获取所有信号""" + db = self._get_db() + try: + cutoff_time = datetime.utcnow() - timedelta(days=days) + + signals = db.query(TradingSignal).filter( + TradingSignal.created_at >= cutoff_time + ).order_by(desc(TradingSignal.created_at)).limit(limit).all() + + crypto_signals = [] + stock_signals = [] + + for signal in signals: + signal_dict = signal.to_dict() + if signal.signal_type == 'crypto': + crypto_signals.append(signal_dict) + else: + stock_signals.append(signal_dict) + + return { + 'crypto': crypto_signals, + 'stock': stock_signals + } + + except Exception as e: + logger.error(f"获取所有信号失败: {e}") + return {'crypto': [], 'stock': []} + finally: + db.close() + + def get_latest_signals(self, limit: int = 20, days: int = 7) -> List[Dict[str, Any]]: + """获取最新信号(混合)""" + db = self._get_db() + try: + cutoff_time = datetime.utcnow() - timedelta(days=days) + + signals = db.query(TradingSignal).filter( + TradingSignal.created_at >= cutoff_time + ).order_by(desc(TradingSignal.created_at)).limit(limit).all() + + return [signal.to_dict() for signal in signals] + + except Exception as e: + logger.error(f"获取最新信号失败: {e}") + return [] + finally: + db.close() + + def get_signal_stats(self, days: int = 7) -> Dict[str, Any]: + """获取信号统计""" + db = self._get_db() + try: + cutoff_time = datetime.utcnow() - timedelta(days=days) + + # 获取所有信号 + all_signals = db.query(TradingSignal).filter( + TradingSignal.created_at >= cutoff_time + ).all() + + # 统计加密货币信号 + crypto_signals = [s for s in all_signals if s.signal_type == 'crypto'] + crypto_buy = sum(1 for s in crypto_signals if s.action == 'buy') + crypto_sell = sum(1 for s in crypto_signals if s.action == 'sell') + + # 统计美股信号 + stock_signals = [s for s in all_signals if s.signal_type == 'stock'] + stock_buy = sum(1 for s in stock_signals if s.action == 'buy') + stock_sell = sum(1 for s in stock_signals if s.action == 'sell') + + # 按等级统计 + grade_stats = {} + for signal in all_signals: + grade_stats[signal.grade] = grade_stats.get(signal.grade, 0) + 1 + + # 最近24小时信号 + recent_cutoff = datetime.utcnow() - timedelta(hours=24) + recent_crypto = sum(1 for s in crypto_signals if s.created_at >= recent_cutoff) + recent_stock = sum(1 for s in stock_signals if s.created_at >= recent_cutoff) + + return { + 'crypto': { + 'total': len(crypto_signals), + 'buy': crypto_buy, + 'sell': crypto_sell, + 'recent_24h': recent_crypto + }, + 'stock': { + 'total': len(stock_signals), + 'buy': stock_buy, + 'sell': stock_sell, + 'recent_24h': recent_stock + }, + 'grades': grade_stats, + 'total': len(all_signals) + } + + except Exception as e: + logger.error(f"获取信号统计失败: {e}") + return {} + finally: + db.close() + + def get_latest_signal(self, signal_type: str, symbol: str) -> Optional[Dict[str, Any]]: + """获取指定交易对的最新信号""" + db = self._get_db() + try: + signal = db.query(TradingSignal).filter( + TradingSignal.signal_type == signal_type, + TradingSignal.symbol == symbol.upper() + ).order_by(desc(TradingSignal.created_at)).first() + + if signal: + return signal.to_dict() + return None + + except Exception as e: + logger.error(f"获取最新信号失败: {e}") + return None + finally: + db.close() + + def clear_old_signals(self, days: int = 30): + """清理旧信号""" + db = self._get_db() + try: + cutoff_time = datetime.utcnow() - timedelta(days=days) + + deleted = db.query(TradingSignal).filter( + TradingSignal.created_at < cutoff_time + ).delete() + + db.commit() + logger.info(f"清理了 {deleted} 条旧信号(超过 {days} 天)") + + except Exception as e: + db.rollback() + logger.error(f"清理旧信号失败: {e}") + finally: + db.close() + + +# 全局单例 +_signal_db_service: Optional[SignalDatabaseService] = None + + +def get_signal_db_service() -> SignalDatabaseService: + """获取信号数据库服务单例""" + global _signal_db_service + if _signal_db_service is None: + _signal_db_service = SignalDatabaseService() + return _signal_db_service diff --git a/backend/app/services/signal_storage_service.py b/backend/app/services/signal_storage_service.py new file mode 100644 index 0000000..f7ec2dd --- /dev/null +++ b/backend/app/services/signal_storage_service.py @@ -0,0 +1,147 @@ +""" +信号存储服务 - 保存加密货币和美股的交易信号 +""" +import json +import os +from datetime import datetime +from typing import Dict, List, Optional, Any +from app.utils.logger import logger + + +class SignalStorageService: + """信号存储服务""" + + def __init__(self): + """初始化服务""" + self.storage_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), 'data', 'signals') + os.makedirs(self.storage_dir, exist_ok=True) + + # 信号文件 + self.crypto_file = os.path.join(self.storage_dir, 'crypto_signals.json') + self.stock_file = os.path.join(self.storage_dir, 'stock_signals.json') + + # 加载现有信号 + self._crypto_signals = self._load_signals(self.crypto_file) + self._stock_signals = self._load_signals(self.stock_file) + + logger.info(f"信号存储服务初始化完成,加密货币信号: {len(self._crypto_signals)},美股信号: {len(self._stock_signals)}") + + def _load_signals(self, file_path: str) -> List[Dict[str, Any]]: + """从文件加载信号""" + if not os.path.exists(file_path): + return [] + + try: + with open(file_path, 'r', encoding='utf-8') as f: + return json.load(f) + except Exception as e: + logger.error(f"加载信号失败 {file_path}: {e}") + return [] + + def _save_signals(self, file_path: str, signals: List[Dict[str, Any]]): + """保存信号到文件""" + try: + with open(file_path, 'w', encoding='utf-8') as f: + json.dump(signals, f, ensure_ascii=False, indent=2) + except Exception as e: + logger.error(f"保存信号失败 {file_path}: {e}") + + def add_crypto_signal(self, signal: Dict[str, Any]): + """添加加密货币信号""" + # 添加时间戳和类型 + signal['timestamp'] = datetime.now().isoformat() + signal['signal_type'] = 'crypto' + + # 保存到内存 + self._crypto_signals.insert(0, signal) + + # 只保留最近 100 条 + if len(self._crypto_signals) > 100: + self._crypto_signals = self._crypto_signals[:100] + + # 持久化 + self._save_signals(self.crypto_file, self._crypto_signals) + + logger.info(f"添加加密货币信号: {signal.get('symbol', 'N/A')} - {signal.get('action', 'N/A')}") + + def add_stock_signal(self, signal: Dict[str, Any]): + """添加美股信号""" + # 添加时间戳和类型 + signal['timestamp'] = datetime.now().isoformat() + signal['signal_type'] = 'stock' + + # 保存到内存 + self._stock_signals.insert(0, signal) + + # 只保留最近 100 条 + if len(self._stock_signals) > 100: + self._stock_signals = self._stock_signals[:100] + + # 持久化 + self._save_signals(self.stock_file, self._stock_signals) + + logger.info(f"添加美股信号: {signal.get('symbol', 'N/A')} - {signal.get('action', 'N/A')}") + + def get_crypto_signals(self, limit: int = 50) -> List[Dict[str, Any]]: + """获取加密货币信号列表""" + return self._crypto_signals[:limit] + + def get_stock_signals(self, limit: int = 50) -> List[Dict[str, Any]]: + """获取美股信号列表""" + return self._stock_signals[:limit] + + def get_all_signals(self, limit: int = 100) -> Dict[str, List[Dict[str, Any]]]: + """获取所有信号""" + return { + 'crypto': self._crypto_signals[:limit], + 'stock': self._stock_signals[:limit] + } + + def get_latest_signal(self, signal_type: str, symbol: str) -> Optional[Dict[str, Any]]: + """获取指定交易对的最新信号""" + if signal_type == 'crypto': + signals = self._crypto_signals + elif signal_type == 'stock': + signals = self._stock_signals + else: + return None + + for signal in signals: + if signal.get('symbol') == symbol: + return signal + + return None + + def clear_old_signals(self, days: int = 7): + """清理旧信号""" + from datetime import timedelta + + cutoff_time = (datetime.now() - timedelta(days=days)).isoformat() + + # 清理加密货币信号 + self._crypto_signals = [ + s for s in self._crypto_signals + if s.get('timestamp', '') >= cutoff_time + ] + self._save_signals(self.crypto_file, self._crypto_signals) + + # 清理美股信号 + self._stock_signals = [ + s for s in self._stock_signals + if s.get('timestamp', '') >= cutoff_time + ] + self._save_signals(self.stock_file, self._stock_signals) + + logger.info(f"清理旧信号完成,保留 {days} 天内的信号") + + +# 全局单例 +_signal_storage: Optional[SignalStorageService] = None + + +def get_signal_storage() -> SignalStorageService: + """获取信号存储服务单例""" + global _signal_storage + if _signal_storage is None: + _signal_storage = SignalStorageService() + return _signal_storage diff --git a/backend/app/stock_agent/stock_agent.py b/backend/app/stock_agent/stock_agent.py index a6e119b..9c97a63 100644 --- a/backend/app/stock_agent/stock_agent.py +++ b/backend/app/stock_agent/stock_agent.py @@ -12,6 +12,7 @@ from app.config import get_settings from app.services.yfinance_service import get_yfinance_service from app.services.feishu_service import get_feishu_service from app.services.telegram_service import get_telegram_service +from app.services.signal_database_service import get_signal_db_service from app.crypto_agent.llm_signal_analyzer import LLMSignalAnalyzer @@ -25,6 +26,7 @@ class StockAgent: self.feishu = get_feishu_service() self.telegram = get_telegram_service() self.llm_analyzer = LLMSignalAnalyzer() + self.signal_db = get_signal_db_service() # 信号数据库服务 # 状态管理 self.last_signals: Dict[str, Dict[str, Any]] = {} @@ -299,6 +301,13 @@ class StockAgent: logger.info(f"✅ 信号通知已发送: {title}") + # 保存信号到数据库 + signal_to_save = signal.copy() + signal_to_save['signal_type'] = 'stock' + signal_to_save['symbol'] = symbol + signal_to_save['current_price'] = current_price + self.signal_db.add_signal(signal_to_save) + except Exception as e: logger.error(f"发送通知失败: {e}") diff --git a/frontend/signals.html b/frontend/signals.html new file mode 100644 index 0000000..907e3f4 --- /dev/null +++ b/frontend/signals.html @@ -0,0 +1,459 @@ + + + + + + 交易信号 - Stock Agent + + + +
+ +
+

🎯 交易信号中心

+
+
+

加密货币信号

+
-
+
最近24小时: -
+
+
+

美股信号

+
-
+
最近24小时: -
+
+
+

总信号数

+
-
+
+
+
+ + +
+ + + +
+ + +
+
+ 加载中... +
+
+
+ + + +