This commit is contained in:
aaron 2025-05-06 12:09:13 +08:00
parent bbde9e935c
commit 3602a15280
8 changed files with 322 additions and 8 deletions

View File

@ -350,6 +350,7 @@ class CryptoAgent:
# 把分析结果调用大模型转化成交易建议
print(f"开始把JSON分析结果调用大模型转化成可读的交易建议...")
message = self.convert_analysis_to_trading_suggestions(results)
print(f"交易建议: {message}")
if self.dingtalk_bot:
@ -359,6 +360,15 @@ class CryptoAgent:
if self.discord_bot:
print(f"发送交易建议到Discord...")
self.discord_bot.send_message(content=message)
# 保存交易建议到数据库
try:
saved = self.db_manager.save_agent_feed(
agent_name="加密货币AI助理",
content=message
)
except Exception as e:
print(f"保存交易建议到数据库时出错: {e}")
# 导出 DeepSeek API token 使用情况
self._export_token_usage()
@ -394,6 +404,8 @@ class CryptoAgent:
message = self.deepseek_api.extract_text_from_response(response)
# 保存交易建议到数据库
return message
def _export_token_usage(self) -> None:

View File

@ -32,10 +32,10 @@ crypto:
base_currencies:
- "BTC"
- "ETH"
- "SOL"
- "SUI"
- "DOGE"
- "LTC"
# - "SOL"
# - "SUI"
# - "DOGE"
# - "LTC"
quote_currency: "USDT"
time_interval: "4h" # 可选: 1m, 5m, 15m, 30m, 1h, 4h, 1d
historical_days: 90

View File

@ -16,6 +16,7 @@ import time
from typing import Dict, Any
from cryptoai.routes.agent import router as agent_router
from cryptoai.routes.feed import router as feed_router
# 配置日志
logging.basicConfig(
@ -46,6 +47,7 @@ app.add_middleware(
# 添加API路由
app.include_router(agent_router, prefix="/agent")
app.include_router(feed_router, prefix="/feed", tags=["AI Agent信息流"])
# 请求计时中间件
@app.middleware("http")

108
cryptoai/routes/feed.py Normal file
View File

@ -0,0 +1,108 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
AI Agent信息流API路由模块提供信息流的增删改查功能
"""
import logging
from fastapi import APIRouter, HTTPException, status, Query
from pydantic import BaseModel
from typing import Dict, Any, List, Optional
from datetime import datetime
from cryptoai.utils.db_manager import get_db_manager
# 配置日志
logger = logging.getLogger("feed_router")
# 创建路由
router = APIRouter()
# 请求模型
class AgentFeedCreate(BaseModel):
"""创建信息流请求模型"""
agent_name: str
content: str
avatar_url: Optional[str] = None
# 响应模型
class AgentFeedResponse(BaseModel):
"""信息流响应模型"""
id: int
agent_name: str
avatar_url: Optional[str] = None
content: str
create_time: datetime
@router.post("/", response_model=Dict[str, Any], status_code=status.HTTP_201_CREATED)
async def create_feed(feed: AgentFeedCreate) -> Dict[str, Any]:
"""
创建新的AI Agent信息流
Args:
feed: 信息流创建请求
Returns:
创建成功的状态信息
"""
try:
# 获取数据库管理器
db_manager = get_db_manager()
# 保存信息流
success = db_manager.save_agent_feed(
agent_name=feed.agent_name,
content=feed.content,
avatar_url=feed.avatar_url
)
if not success:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="保存信息流失败"
)
return {
"status": "success",
"message": "信息流创建成功"
}
except Exception as e:
logger.error(f"创建信息流失败: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"创建信息流失败: {str(e)}"
)
@router.get("/", response_model=List[AgentFeedResponse])
async def get_feeds(
agent_name: Optional[str] = Query(None, description="AI Agent名称可选"),
limit: int = Query(20, description="返回的最大记录数默认20条"),
skip: int = Query(0, description="跳过的记录数默认0条")
) -> List[AgentFeedResponse]:
"""
获取AI Agent信息流列表
Args:
agent_name: 可选指定获取特定Agent的信息流
limit: 返回的最大记录数默认20条
Returns:
信息流列表
"""
try:
# 获取数据库管理器
db_manager = get_db_manager()
# 获取信息流
feeds = db_manager.get_agent_feeds(agent_name=agent_name, limit=limit, skip=skip)
return feeds
except Exception as e:
logger.error(f"获取信息流失败: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"获取信息流失败: {str(e)}"
)

View File

@ -13,7 +13,7 @@ from sqlalchemy.orm import sessionmaker
from sqlalchemy.dialects.mysql import JSON
from sqlalchemy.pool import QueuePool
from utils.config_loader import ConfigLoader
from cryptoai.utils.config_loader import ConfigLoader
# 配置日志
logging.basicConfig(
@ -46,6 +46,24 @@ class AnalysisResult(Base):
Index('idx_created_at', 'created_at'),
)
# 定义AI Agent信息流模型
class AgentFeed(Base):
"""AI Agent信息流表模型"""
__tablename__ = 'agent_feeds'
id = Column(Integer, primary_key=True, autoincrement=True)
agent_name = Column(String(50), nullable=False, comment='AI Agent名称')
avatar_url = Column(String(255), nullable=True, comment='头像URL')
content = Column(Text, nullable=False, comment='内容')
create_time = Column(DateTime, nullable=False, default=datetime.now, comment='创建时间')
# 索引和表属性
__table_args__ = (
Index('idx_agent_name', 'agent_name'),
Index('idx_create_time', 'create_time'),
{'mysql_charset': 'utf8mb4', 'mysql_collate': 'utf8mb4_unicode_ci'}
)
class DBManager:
"""数据库管理工具用于连接MySQL数据库并保存智能体分析结果"""
@ -85,7 +103,8 @@ class DBManager:
max_overflow=10, # 最大溢出连接数
pool_timeout=30, # 连接超时时间
pool_recycle=1800, # 连接回收时间(秒)
pool_pre_ping=True # 在使用连接前先ping一下确保连接有效
pool_pre_ping=True, # 在使用连接前先ping一下确保连接有效
connect_args={'charset': 'utf8mb4'}
)
# 创建会话工厂
@ -160,6 +179,114 @@ class DBManager:
pass
return False
def save_agent_feed(self, agent_name: str, content: str, avatar_url: Optional[str] = None) -> bool:
"""
保存AI Agent信息流到数据库
Args:
agent_name: AI Agent名称
content: 内容
avatar_url: 头像URL可选
Returns:
保存是否成功
"""
if not self.engine:
try:
self._init_db()
except Exception as e:
logger.error(f"重新连接数据库失败: {e}")
return False
try:
# 创建会话
session = self.Session()
try:
# 创建新记录
new_feed = AgentFeed(
agent_name=agent_name,
content=content,
avatar_url=avatar_url
)
# 添加并提交
session.add(new_feed)
session.commit()
logger.info(f"成功保存 {agent_name} 的信息流")
return True
except Exception as e:
session.rollback()
logger.error(f"保存信息流失败: {e}")
return False
finally:
session.close()
except Exception as e:
logger.error(f"创建数据库会话失败: {e}")
# 如果是连接错误,尝试重新初始化
try:
self._init_db()
except:
pass
return False
def get_agent_feeds(self, agent_name: Optional[str] = None, limit: int = 20, skip: int = 0) -> List[Dict[str, Any]]:
"""
获取AI Agent信息流
Args:
agent_name: 可选指定获取特定Agent的信息流
limit: 返回的最大记录数默认20条
Returns:
信息流列表如果查询失败则返回空列表
"""
if not self.engine:
try:
self._init_db()
except Exception as e:
logger.error(f"重新连接数据库失败: {e}")
return []
try:
# 创建会话
session = self.Session()
try:
# 构建查询
query = session.query(AgentFeed)
# 如果指定了agent_name则筛选
if agent_name:
query = query.filter(AgentFeed.agent_name == agent_name)
# 按创建时间降序排序并限制数量
results = query.order_by(AgentFeed.create_time.desc()).offset(skip).limit(limit).all()
# 转换为字典列表
feeds = []
for result in results:
feeds.append({
'id': result.id,
'agent_name': result.agent_name,
'avatar_url': result.avatar_url,
'content': result.content,
'create_time': result.create_time
})
return feeds
finally:
session.close()
except Exception as e:
logger.error(f"获取信息流失败: {e}")
return []
def get_latest_result(self, agent: str, symbol: str, time_interval: str) -> Optional[Dict[str, Any]]:
"""
获取最新的分析结果

View File

@ -0,0 +1,65 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
更新数据库表字符集为utf8mb4以支持emoji和其他特殊字符
"""
import logging
from cryptoai.utils.db_manager import get_db_manager
from sqlalchemy import text
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('update_db_charset')
def update_table_charset():
"""更新数据库表字符集为utf8mb4"""
try:
# 获取数据库管理器
db_manager = get_db_manager()
if not db_manager.engine:
logger.error("数据库连接失败")
return False
# 创建会话
session = db_manager.Session()
try:
# 更新数据库字符集
session.execute(text("ALTER DATABASE CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;"))
# 更新agent_feeds表字符集
session.execute(text("""
ALTER TABLE agent_feeds
CONVERT TO CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
"""))
# 特别更新content列的字符集
session.execute(text("""
ALTER TABLE agent_feeds
MODIFY content TEXT CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
"""))
session.commit()
logger.info("成功更新数据库表字符集为utf8mb4")
return True
except Exception as e:
session.rollback()
logger.error(f"更新数据库表字符集失败: {e}")
return False
finally:
session.close()
except Exception as e:
logger.error(f"更新数据库表字符集失败: {e}")
return False
if __name__ == "__main__":
update_table_charset()

View File

@ -4,7 +4,7 @@ services:
cryptoai-task:
build: .
container_name: cryptoai-task
image: cryptoai:0.0.14
image: cryptoai:0.0.15
restart: always
volumes:
- ./cryptoai/data:/app/cryptoai/data
@ -29,7 +29,7 @@ services:
cryptoai-api:
build: .
container_name: cryptoai-api
image: cryptoai-api:0.0.1
image: cryptoai-api:0.0.2
restart: always
ports:
- "8000:8000"