#!/usr/bin/env python # -*- coding: utf-8 -*- from typing import Dict, Any, List, Optional from datetime import datetime from sqlalchemy import Column, Integer, String, Text, DateTime, Index, ForeignKey from sqlalchemy.orm import relationship from cryptoai.models.base import Base, logger # 定义分析历史模型 class AnalysisHistory(Base): """分析历史表模型""" __tablename__ = 'analysis_history' id = Column(Integer, primary_key=True, autoincrement=True) user_id = Column(Integer, ForeignKey('users.id'), nullable=False, comment='用户ID') type = Column(String(20), nullable=False, comment='分析类型(crypto, astock)') symbol = Column(String(50), nullable=False, comment='交易符号') timeframe = Column(String(20), nullable=True, comment='时间框架') content = Column(Text, nullable=False, comment='分析内容') create_time = Column(DateTime, nullable=False, default=datetime.now, comment='创建时间') # 关系 user = relationship("User", back_populates="analysis_histories") # 索引和表属性 __table_args__ = ( Index('idx_user_id', 'user_id'), Index('idx_type', 'type'), Index('idx_symbol', 'symbol'), Index('idx_create_time', 'create_time'), {'mysql_charset': 'utf8mb4', 'mysql_collate': 'utf8mb4_unicode_ci'} ) class AnalysisHistoryManager: """分析历史管理类""" def __init__(self, db_session): self.session = db_session def add_analysis_history(self, user_id: int, type: str, symbol: str, content: str, timeframe: str = None) -> bool: """ 添加分析历史记录 Args: user_id: 用户ID type: 分析类型(crypto, astock) symbol: 交易符号 content: 分析内容 timeframe: 时间框架(可选) Returns: 添加是否成功 """ try: # 创建新记录 new_history = AnalysisHistory( user_id=user_id, type=type, symbol=symbol, timeframe=timeframe, content=content, create_time=datetime.now() ) # 添加并提交 self.session.add(new_history) self.session.commit() logger.info(f"成功添加分析历史记录,用户ID: {user_id}, 类型: {type}, 交易符号: {symbol}") return True except Exception as e: self.session.rollback() logger.error(f"添加分析历史记录失败: {e}") return False def delete_analysis_history(self, history_id: int) -> bool: """ 删除分析历史记录 Args: history_id: 历史记录ID Returns: 删除是否成功 """ try: # 查询记录 history = self.session.query(AnalysisHistory).filter(AnalysisHistory.id == history_id).first() if not history: logger.warning(f"分析历史记录ID {history_id} 不存在") return False # 删除记录 self.session.delete(history) self.session.commit() logger.info(f"成功删除分析历史记录,ID: {history_id}") return True except Exception as e: self.session.rollback() logger.error(f"删除分析历史记录失败: {e}") return False def get_analysis_history_by_id(self, history_id: int) -> Optional[Dict[str, Any]]: """ 根据ID获取分析历史记录 Args: history_id: 历史记录ID Returns: 分析历史记录,如果不存在则返回None """ try: # 查询记录 history = self.session.query(AnalysisHistory).filter(AnalysisHistory.id == history_id).first() if history: # 转换为字典 return { 'id': history.id, 'user_id': history.user_id, 'type': history.type, 'symbol': history.symbol, 'timeframe': history.timeframe, 'content': history.content, 'create_time': history.create_time } else: return None except Exception as e: logger.error(f"获取分析历史记录失败: {e}") return None def get_user_analysis_history(self, user_id: int, type: str = None, symbol: str = None, limit: int = 10, offset: int = 0) -> List[Dict[str, Any]]: """ 获取用户的分析历史记录 Args: user_id: 用户ID type: 分析类型筛选(可选) symbol: 交易符号筛选(可选) limit: 返回记录数量限制 offset: 分页偏移量 Returns: 分析历史记录列表 """ try: # 构建查询 query = self.session.query(AnalysisHistory).filter(AnalysisHistory.user_id == user_id) # 添加可选过滤条件 if type: query = query.filter(AnalysisHistory.type == type) if symbol: query = query.filter(AnalysisHistory.symbol == symbol) # 按创建时间降序排序,并应用分页 results = query.order_by(AnalysisHistory.create_time.desc()).limit(limit).offset(offset).all() # 转换为字典列表 history_list = [] for history in results: history_list.append({ 'id': history.id, 'user_id': history.user_id, 'type': history.type, 'symbol': history.symbol, 'timeframe': history.timeframe, 'content': history.content, 'create_time': history.create_time }) return history_list except Exception as e: logger.error(f"获取用户分析历史记录失败: {e}") return []