crypto.ai/cryptoai/models/analysis_result.py
2025-05-30 22:09:45 +08:00

111 lines
4.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
from typing import Dict, Any, List, Optional
from datetime import datetime
from sqlalchemy.orm import Session
from sqlalchemy import Column, Integer, String, DateTime, Index
from sqlalchemy.dialects.mysql import JSON
from cryptoai.models.base import Base, logger
from cryptoai.utils.db_manager import get_db_context
# 定义分析结果模型
class AnalysisResult(Base):
"""分析结果表模型"""
__tablename__ = 'analysis_results'
id = Column(Integer, primary_key=True, autoincrement=True)
agent = Column(String(50), nullable=False, comment='智能体类型(crypto, gold)')
symbol = Column(String(50), nullable=False, comment='交易对符号')
time_interval = Column(String(20), nullable=False, comment='时间间隔')
completion_result = Column(JSON, nullable=False, comment='分析结果JSON')
created_at = Column(DateTime, nullable=False, default=datetime.now, comment='创建时间')
updated_at = Column(DateTime, nullable=False, default=datetime.now, onupdate=datetime.now, comment='更新时间')
# 索引
__table_args__ = (
Index('idx_agent', 'agent'),
Index('idx_symbol', 'symbol'),
Index('idx_time_interval', 'time_interval'),
Index('idx_created_at', 'created_at'),
)
class AnalysisResultManager:
"""分析结果管理类"""
def __init__(self, session: Session = None):
self.session = session
def save_analysis_result(self, agent: str, symbol: str, time_interval: str,
analysis_result: Dict[str, Any]) -> bool:
"""
保存分析结果到数据库
Args:
agent: 智能体类型,例如 'crypto''gold'
symbol: 交易对符号,例如 'BTCUSDT'
time_interval: 时间间隔,例如 '1h', '4h', '1d'
analysis_result: 分析结果数据
Returns:
保存是否成功
"""
try:
# 创建新记录
new_result = AnalysisResult(
agent=agent,
symbol=symbol,
time_interval=time_interval,
completion_result=analysis_result,
created_at=datetime.now(),
updated_at=datetime.now()
)
# 添加并提交
self.session.add(new_result)
self.session.commit()
logger.info(f"成功保存 {agent} 分析结果,交易对: {symbol}, 时间间隔: {time_interval}")
return True
except Exception as e:
self.session.rollback()
logger.error(f"保存分析结果失败: {e}")
return False
def get_latest_result(self, agent: str, symbol: str, time_interval: str) -> Optional[Dict[str, Any]]:
"""
获取最新的分析结果
Args:
agent: 智能体类型,例如 'crypto''gold'
symbol: 交易对符号,例如 'BTCUSDT'
time_interval: 时间间隔,例如 '1h', '4h', '1d'
Returns:
最新分析结果如果查询失败则返回None
"""
try:
# 查询最新的结果
result = self.session.query(AnalysisResult).filter(
AnalysisResult.agent == agent,
AnalysisResult.symbol == symbol,
AnalysisResult.time_interval == time_interval
).order_by(AnalysisResult.created_at.desc()).first()
if result:
# 转换为字典
return {
'id': result.id,
'agent': result.agent,
'symbol': result.symbol,
'time_interval': result.time_interval,
'completion_result': result.completion_result,
'created_at': result.created_at
}
else:
return None
except Exception as e:
logger.error(f"获取最新分析结果失败: {e}")
return None