diff --git a/cryptoai/agents/__pycache__/crypto_agent.cpython-313.pyc b/cryptoai/agents/__pycache__/crypto_agent.cpython-313.pyc index d74572d..c954b72 100644 Binary files a/cryptoai/agents/__pycache__/crypto_agent.cpython-313.pyc and b/cryptoai/agents/__pycache__/crypto_agent.cpython-313.pyc differ diff --git a/cryptoai/agents/crypto_agent.py b/cryptoai/agents/crypto_agent.py index c5eb45b..ee92ea4 100644 --- a/cryptoai/agents/crypto_agent.py +++ b/cryptoai/agents/crypto_agent.py @@ -350,6 +350,7 @@ class CryptoAgent: # 把分析结果调用大模型转化成交易建议 + print(f"开始把JSON分析结果调用大模型转化成可读的交易建议...") message = self.convert_analysis_to_trading_suggestions(results) print(f"交易建议: {message}") if self.dingtalk_bot: @@ -359,6 +360,15 @@ class CryptoAgent: if self.discord_bot: print(f"发送交易建议到Discord...") self.discord_bot.send_message(content=message) + + # 保存交易建议到数据库 + try: + saved = self.db_manager.save_agent_feed( + agent_name="加密货币AI助理", + content=message + ) + except Exception as e: + print(f"保存交易建议到数据库时出错: {e}") # 导出 DeepSeek API token 使用情况 self._export_token_usage() @@ -394,6 +404,8 @@ class CryptoAgent: message = self.deepseek_api.extract_text_from_response(response) + # 保存交易建议到数据库 + return message def _export_token_usage(self) -> None: diff --git a/cryptoai/config/config.yaml b/cryptoai/config/config.yaml index d198f37..987c325 100644 --- a/cryptoai/config/config.yaml +++ b/cryptoai/config/config.yaml @@ -32,10 +32,10 @@ crypto: base_currencies: - "BTC" - "ETH" - - "SOL" - - "SUI" - - "DOGE" - - "LTC" + # - "SOL" + # - "SUI" + # - "DOGE" + # - "LTC" quote_currency: "USDT" time_interval: "4h" # 可选: 1m, 5m, 15m, 30m, 1h, 4h, 1d historical_days: 90 diff --git a/cryptoai/routes/fastapi_app.py b/cryptoai/routes/fastapi_app.py index 879e4f6..04d3262 100644 --- a/cryptoai/routes/fastapi_app.py +++ b/cryptoai/routes/fastapi_app.py @@ -16,6 +16,7 @@ import time from typing import Dict, Any from cryptoai.routes.agent import router as agent_router +from cryptoai.routes.feed import router as feed_router # 配置日志 logging.basicConfig( @@ -46,6 +47,7 @@ app.add_middleware( # 添加API路由 app.include_router(agent_router, prefix="/agent") +app.include_router(feed_router, prefix="/feed", tags=["AI Agent信息流"]) # 请求计时中间件 @app.middleware("http") diff --git a/cryptoai/routes/feed.py b/cryptoai/routes/feed.py new file mode 100644 index 0000000..81bfaf7 --- /dev/null +++ b/cryptoai/routes/feed.py @@ -0,0 +1,108 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +""" +AI Agent信息流API路由模块,提供信息流的增删改查功能 +""" + +import logging +from fastapi import APIRouter, HTTPException, status, Query +from pydantic import BaseModel +from typing import Dict, Any, List, Optional +from datetime import datetime + +from cryptoai.utils.db_manager import get_db_manager + +# 配置日志 +logger = logging.getLogger("feed_router") + +# 创建路由 +router = APIRouter() + +# 请求模型 +class AgentFeedCreate(BaseModel): + """创建信息流请求模型""" + agent_name: str + content: str + avatar_url: Optional[str] = None + +# 响应模型 +class AgentFeedResponse(BaseModel): + """信息流响应模型""" + id: int + agent_name: str + avatar_url: Optional[str] = None + content: str + create_time: datetime + +@router.post("/", response_model=Dict[str, Any], status_code=status.HTTP_201_CREATED) +async def create_feed(feed: AgentFeedCreate) -> Dict[str, Any]: + """ + 创建新的AI Agent信息流 + + Args: + feed: 信息流创建请求 + + Returns: + 创建成功的状态信息 + """ + try: + # 获取数据库管理器 + db_manager = get_db_manager() + + # 保存信息流 + success = db_manager.save_agent_feed( + agent_name=feed.agent_name, + content=feed.content, + avatar_url=feed.avatar_url + ) + + if not success: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="保存信息流失败" + ) + + return { + "status": "success", + "message": "信息流创建成功" + } + + except Exception as e: + logger.error(f"创建信息流失败: {str(e)}") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"创建信息流失败: {str(e)}" + ) + +@router.get("/", response_model=List[AgentFeedResponse]) +async def get_feeds( + agent_name: Optional[str] = Query(None, description="AI Agent名称,可选"), + limit: int = Query(20, description="返回的最大记录数,默认20条"), + skip: int = Query(0, description="跳过的记录数,默认0条") +) -> List[AgentFeedResponse]: + """ + 获取AI Agent信息流列表 + + Args: + agent_name: 可选,指定获取特定Agent的信息流 + limit: 返回的最大记录数,默认20条 + + Returns: + 信息流列表 + """ + try: + # 获取数据库管理器 + db_manager = get_db_manager() + + # 获取信息流 + feeds = db_manager.get_agent_feeds(agent_name=agent_name, limit=limit, skip=skip) + + return feeds + + except Exception as e: + logger.error(f"获取信息流失败: {str(e)}") + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"获取信息流失败: {str(e)}" + ) \ No newline at end of file diff --git a/cryptoai/utils/db_manager.py b/cryptoai/utils/db_manager.py index d7ba2ec..55a9838 100644 --- a/cryptoai/utils/db_manager.py +++ b/cryptoai/utils/db_manager.py @@ -13,7 +13,7 @@ from sqlalchemy.orm import sessionmaker from sqlalchemy.dialects.mysql import JSON from sqlalchemy.pool import QueuePool -from utils.config_loader import ConfigLoader +from cryptoai.utils.config_loader import ConfigLoader # 配置日志 logging.basicConfig( @@ -46,6 +46,24 @@ class AnalysisResult(Base): Index('idx_created_at', 'created_at'), ) +# 定义AI Agent信息流模型 +class AgentFeed(Base): + """AI Agent信息流表模型""" + __tablename__ = 'agent_feeds' + + id = Column(Integer, primary_key=True, autoincrement=True) + agent_name = Column(String(50), nullable=False, comment='AI Agent名称') + avatar_url = Column(String(255), nullable=True, comment='头像URL') + content = Column(Text, nullable=False, comment='内容') + create_time = Column(DateTime, nullable=False, default=datetime.now, comment='创建时间') + + # 索引和表属性 + __table_args__ = ( + Index('idx_agent_name', 'agent_name'), + Index('idx_create_time', 'create_time'), + {'mysql_charset': 'utf8mb4', 'mysql_collate': 'utf8mb4_unicode_ci'} + ) + class DBManager: """数据库管理工具,用于连接MySQL数据库并保存智能体分析结果""" @@ -85,7 +103,8 @@ class DBManager: max_overflow=10, # 最大溢出连接数 pool_timeout=30, # 连接超时时间 pool_recycle=1800, # 连接回收时间(秒) - pool_pre_ping=True # 在使用连接前先ping一下,确保连接有效 + pool_pre_ping=True, # 在使用连接前先ping一下,确保连接有效 + connect_args={'charset': 'utf8mb4'} ) # 创建会话工厂 @@ -160,6 +179,114 @@ class DBManager: pass return False + def save_agent_feed(self, agent_name: str, content: str, avatar_url: Optional[str] = None) -> bool: + """ + 保存AI Agent信息流到数据库 + + Args: + agent_name: AI Agent名称 + content: 内容 + avatar_url: 头像URL,可选 + + Returns: + 保存是否成功 + """ + if not self.engine: + try: + self._init_db() + except Exception as e: + logger.error(f"重新连接数据库失败: {e}") + return False + + try: + # 创建会话 + session = self.Session() + + try: + # 创建新记录 + new_feed = AgentFeed( + agent_name=agent_name, + content=content, + avatar_url=avatar_url + ) + + # 添加并提交 + session.add(new_feed) + session.commit() + + logger.info(f"成功保存 {agent_name} 的信息流") + return True + + except Exception as e: + session.rollback() + logger.error(f"保存信息流失败: {e}") + return False + + finally: + session.close() + + except Exception as e: + logger.error(f"创建数据库会话失败: {e}") + # 如果是连接错误,尝试重新初始化 + try: + self._init_db() + except: + pass + return False + + def get_agent_feeds(self, agent_name: Optional[str] = None, limit: int = 20, skip: int = 0) -> List[Dict[str, Any]]: + """ + 获取AI Agent信息流 + + Args: + agent_name: 可选,指定获取特定Agent的信息流 + limit: 返回的最大记录数,默认20条 + + Returns: + 信息流列表,如果查询失败则返回空列表 + """ + if not self.engine: + try: + self._init_db() + except Exception as e: + logger.error(f"重新连接数据库失败: {e}") + return [] + + try: + # 创建会话 + session = self.Session() + + try: + # 构建查询 + query = session.query(AgentFeed) + + # 如果指定了agent_name,则筛选 + if agent_name: + query = query.filter(AgentFeed.agent_name == agent_name) + + # 按创建时间降序排序并限制数量 + results = query.order_by(AgentFeed.create_time.desc()).offset(skip).limit(limit).all() + + # 转换为字典列表 + feeds = [] + for result in results: + feeds.append({ + 'id': result.id, + 'agent_name': result.agent_name, + 'avatar_url': result.avatar_url, + 'content': result.content, + 'create_time': result.create_time + }) + + return feeds + + finally: + session.close() + + except Exception as e: + logger.error(f"获取信息流失败: {e}") + return [] + def get_latest_result(self, agent: str, symbol: str, time_interval: str) -> Optional[Dict[str, Any]]: """ 获取最新的分析结果 diff --git a/cryptoai/utils/update_db_charset.py b/cryptoai/utils/update_db_charset.py new file mode 100644 index 0000000..65be866 --- /dev/null +++ b/cryptoai/utils/update_db_charset.py @@ -0,0 +1,65 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +""" +更新数据库表字符集为utf8mb4,以支持emoji和其他特殊字符 +""" + +import logging +from cryptoai.utils.db_manager import get_db_manager +from sqlalchemy import text + +# 配置日志 +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger('update_db_charset') + +def update_table_charset(): + """更新数据库表字符集为utf8mb4""" + try: + # 获取数据库管理器 + db_manager = get_db_manager() + + if not db_manager.engine: + logger.error("数据库连接失败") + return False + + # 创建会话 + session = db_manager.Session() + + try: + # 更新数据库字符集 + session.execute(text("ALTER DATABASE CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;")) + + # 更新agent_feeds表字符集 + session.execute(text(""" + ALTER TABLE agent_feeds + CONVERT TO CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci; + """)) + + # 特别更新content列的字符集 + session.execute(text(""" + ALTER TABLE agent_feeds + MODIFY content TEXT CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci; + """)) + + session.commit() + logger.info("成功更新数据库表字符集为utf8mb4") + return True + + except Exception as e: + session.rollback() + logger.error(f"更新数据库表字符集失败: {e}") + return False + + finally: + session.close() + + except Exception as e: + logger.error(f"更新数据库表字符集失败: {e}") + return False + +if __name__ == "__main__": + update_table_charset() \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index 88fade1..5608b54 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -4,7 +4,7 @@ services: cryptoai-task: build: . container_name: cryptoai-task - image: cryptoai:0.0.14 + image: cryptoai:0.0.15 restart: always volumes: - ./cryptoai/data:/app/cryptoai/data @@ -29,7 +29,7 @@ services: cryptoai-api: build: . container_name: cryptoai-api - image: cryptoai-api:0.0.1 + image: cryptoai-api:0.0.2 restart: always ports: - "8000:8000"