This commit is contained in:
aaron 2025-05-14 21:44:39 +08:00
parent a261e92e67
commit d2878b31b2
8 changed files with 326 additions and 14 deletions

View File

@ -7,8 +7,9 @@ import datetime
import time
class BinanceAPI:
"""Binance API交互类用于获取市场数据和执行交易"""
"""Binance API交互类用于获取市场数据和执行交易"""
def __init__(self, api_key: str, api_secret: str, test_mode: bool = True):
"""
初始化Binance API客户端
@ -224,4 +225,21 @@ class BinanceAPI:
)
except BinanceAPIException as e:
print(f"下单时出错: {e}")
return {}
return {}
from cryptoai.utils.config_loader import ConfigLoader
def get_binance_api() -> BinanceAPI:
"""
获取Binance API实例
Returns:
BinanceAPI实例
"""
config = ConfigLoader()
api_key = config.get_binance_config()['api_key']
api_secret = config.get_binance_config()['api_secret']
test_mode = config.get_binance_config()['test_mode']
return BinanceAPI(api_key, api_secret, test_mode)

View File

@ -4,6 +4,14 @@ from fastapi import APIRouter, Depends, HTTPException, status, Body, Query, Path
from cryptoai.api.adata_api import AStockAPI
from cryptoai.utils.db_manager import get_db_manager
from datetime import datetime
from typing import Dict, Any, List, Optional
from pydantic import BaseModel
from cryptoai.api.deepseek_api import DeepSeekAPI
from cryptoai.utils.config_loader import ConfigLoader
from fastapi.responses import StreamingResponse
from cryptoai.routes.user import get_current_user
import requests
# 创建路由
router = APIRouter()
@ -107,3 +115,46 @@ async def get_stock_data_all(stock_code: str):
return {}
return result
@router.post('/{stock_code}/analysis', summary="获取股票分析数据")
async def get_stock_analysis(stock_code: str, current_user: Dict[str, Any] = Depends(get_current_user)):
url = 'https://mate.aimateplus.com/v1/workflows/run'
token = 'app-nWuCOa0YfQVtAosTY3Jr5vFV'
headers = {
'Authorization': f'Bearer {token}',
'Content-Type': 'application/json'
}
data = {
"inputs" : {
"stock_code" : stock_code
},
"response_mode": "streaming",
"user": current_user["mail"]
}
response = requests.post(url, headers=headers, json=data, stream=True)
# 如果响应不成功,返回错误
if response.status_code != 200:
raise HTTPException(
status_code=response.status_code,
detail=f"Failed to get response from Dify API: {response.text}"
)
# 获取response的stream
def stream_response():
for chunk in response.iter_content(chunk_size=1024):
if chunk:
yield chunk
return StreamingResponse(stream_response(), media_type="text/plain")

90
cryptoai/routes/crypto.py Normal file
View File

@ -0,0 +1,90 @@
import json
import logging
from fastapi import APIRouter, Depends, HTTPException, status, Body, Query, Path
from cryptoai.api.adata_api import AStockAPI
from cryptoai.utils.db_manager import get_db_manager
from datetime import datetime
from typing import Dict, Any, List, Optional
from pydantic import BaseModel
from cryptoai.api.deepseek_api import DeepSeekAPI
from cryptoai.utils.config_loader import ConfigLoader
from fastapi.responses import StreamingResponse
from cryptoai.routes.user import get_current_user
import requests
# 创建路由
router = APIRouter()
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
@router.get("/crypto/search/{key}")
async def search_crypto(key: str):
manager = get_db_manager()
result = manager.search_token(key)
return result
class CryptoAnalysisRequest(BaseModel):
symbol: str
timeframe: Optional[str] = None
@router.post("/crypto/analysis")
async def analysis_crypto(request: CryptoAnalysisRequest,
current_user: dict = Depends(get_current_user)):
# 尝试从数据库获取Agent
try:
agent_id = 1
if request.timeframe:
user_prompt = f"请分析以下加密货币:{request.symbol},并给出 {request.timeframe} 级别的分析报告。"
else:
user_prompt = f"请分析以下加密货币:{request.symbol},并给出分析报告。"
agent = get_db_manager().get_agent_by_id(agent_id)
if not agent:
raise HTTPException(status_code=400, detail="Invalid agent ID")
token = agent.get("dify_token")
inputs = agent.get("inputs") or {}
inputs["current_date"] = datetime.now().strftime("%Y-%m-%d")
except ValueError:
raise HTTPException(status_code=400, detail="Invalid agent ID format")
url = "https://mate.aimateplus.com/v1/chat-messages"
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
data = {
"inputs": inputs,
"query": user_prompt,
"response_mode": "streaming",
"user": current_user["mail"]
}
logging.info(f"Chat request data: {data}")
# 保存用户提问
get_db_manager().save_user_question(current_user["id"], agent_id, user_prompt)
response = requests.post(url, headers=headers, json=data, stream=True)
# 如果响应不成功,返回错误
if response.status_code != 200:
raise HTTPException(
status_code=response.status_code,
detail=f"Failed to get response from Dify API: {response.text}"
)
# 获取response的stream
def stream_response():
for chunk in response.iter_content(chunk_size=1024):
if chunk:
yield chunk
return StreamingResponse(stream_response(), media_type="text/plain")

View File

@ -20,6 +20,7 @@ from cryptoai.routes.feed import router as feed_router
from cryptoai.routes.user import router as user_router
from cryptoai.routes.adata import router as adata_router
from cryptoai.routes.question import router as question_router
from cryptoai.routes.crypto import router as crypto_router
# 配置日志
logging.basicConfig(
@ -54,7 +55,7 @@ app.include_router(feed_router, prefix="/feed", tags=["AI Agent信息流"])
app.include_router(user_router, prefix="/user", tags=["用户管理"])
app.include_router(question_router, prefix="/question", tags=["用户提问"])
app.include_router(adata_router, prefix="/adata", tags=["A股数据"])
app.include_router(crypto_router, prefix="/crypto", tags=["加密货币数据"])
# 请求计时中间件
@app.middleware("http")
async def add_process_time_header(request: Request, call_next):

View File

View File

@ -12,6 +12,7 @@ from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy.dialects.mysql import JSON
from sqlalchemy.pool import QueuePool
from sqlalchemy import or_
from cryptoai.utils.config_loader import ConfigLoader
@ -25,6 +26,25 @@ logger = logging.getLogger('db_manager')
# 创建模型基类
Base = declarative_base()
# 定义Token模型
class Token(Base):
"""Token信息表模型"""
__tablename__ = 'tokens'
id = Column(Integer, primary_key=True, autoincrement=True)
symbol = Column(String(50), nullable=False, unique=True, comment='交易对符号')
base_asset = Column(String(20), nullable=False, comment='基础资产')
quote_asset = Column(String(20), nullable=False, comment='计价资产')
created_at = Column(DateTime, nullable=False, default=datetime.now, comment='创建时间')
# 索引
__table_args__ = (
Index('idx_symbol', 'symbol'),
Index('idx_base_asset', 'base_asset'),
Index('idx_quote_asset', 'quote_asset'),
{'mysql_charset': 'utf8mb4', 'mysql_collate': 'utf8mb4_unicode_ci'}
)
# 定义分析结果模型
class AnalysisResult(Base):
"""分析结果表模型"""
@ -1648,7 +1668,136 @@ class DBManager:
except Exception as e:
logger.error(f"获取股票列表失败: {e}")
return []
def create_token(self, symbol: str, base_asset: str, quote_asset: str) -> bool:
"""
创建新的Token信息
Args:
symbol: 交易对符号
base_asset: 基础资产
quote_asset: 计价资产
Returns:
创建是否成功
"""
if not self.engine:
try:
self._init_db()
except Exception as e:
logger.error(f"重新连接数据库失败: {e}")
return False
try:
session = self.Session()
try:
# 检查交易对是否已存在
existing_token = session.query(Token).filter(Token.symbol == symbol).first()
if existing_token:
logger.warning(f"交易对 {symbol} 已存在")
return False
# 创建新Token记录
new_token = Token(
symbol=symbol,
base_asset=base_asset,
quote_asset=quote_asset
)
session.add(new_token)
session.commit()
logger.info(f"成功创建Token信息: {symbol}")
return True
except Exception as e:
session.rollback()
logger.error(f"创建Token信息失败: {e}")
return False
finally:
session.close()
except Exception as e:
logger.error(f"创建数据库会话失败: {e}")
return False
def delete_token(self, symbol: str) -> bool:
"""
删除Token信息
Args:
symbol: 交易对符号
Returns:
删除是否成功
"""
if not self.engine:
try:
self._init_db()
except Exception as e:
logger.error(f"重新连接数据库失败: {e}")
return False
try:
session = self.Session()
try:
# 查询Token
token = session.query(Token).filter(Token.symbol == symbol).first()
if not token:
logger.warning(f"交易对 {symbol} 不存在")
return False
# 删除Token
session.delete(token)
session.commit()
logger.info(f"成功删除Token信息: {symbol}")
return True
except Exception as e:
session.rollback()
logger.error(f"删除Token信息失败: {e}")
return False
finally:
session.close()
except Exception as e:
logger.error(f"创建数据库会话失败: {e}")
return False
def search_token(self, key: str, limit: int = 10) -> List[Dict[str, Any]]:
"""
搜索Token
"""
if not self.engine:
try:
self._init_db()
except Exception as e:
logger.error(f"重新连接数据库失败: {e}")
return []
try:
session = self.Session()
# 使用 SQLAlchemy 的 ORM 查询
tokens = session.query(Token).filter(Token.symbol.like(f"{key}%")).limit(limit).all()
return [{
'symbol': token.symbol,
'base_asset': token.base_asset,
'quote_asset': token.quote_asset
} for token in tokens]
except Exception as e:
logger.error(f"获取Token信息失败: {e}")
return []
finally:
session.close()
# 单例模式
_db_instance = None

25
test.py
View File

@ -1,17 +1,20 @@
from cryptoai.utils.db_manager import get_db_manager
from cryptoai.api.adata_api import AStockAPI
from cryptoai.api.binance_api import get_binance_api
import json
from time import sleep
if __name__ == "__main__":
print("开始获取A股数据")
api = AStockAPI()
stock_codes = api.get_all_stock_codes()
list = json.loads(stock_codes.to_json(orient="records"))
# print(list[0])
symbols = get_binance_api().get_all_symbols()
# for symbol in symbols:
# # 移除 symbol 中的 USDT
# symbol = symbol.replace('USDT', '')
# print(symbol)
# 保存到数据库
for stock in list:
print(f"创建股票: {stock['stock_code']} - {stock['short_name']}")
get_db_manager().create_stock(stock["stock_code"], stock["short_name"], stock["exchange"], stock["list_date"])
print(f"创建股票: {stock['stock_code']} - {stock['short_name']} 完成")
sleep(1)
manager = get_db_manager()
for symbol in symbols:
base_asset = symbol.split('USDT')[0]
quote_asset = 'USDT'
manager.create_token(symbol,base_asset, quote_asset)