161 lines
5.6 KiB
Python
161 lines
5.6 KiB
Python
import json
|
|
import logging
|
|
from fastapi import APIRouter, Depends, HTTPException, status, Body, Query, Path
|
|
from cryptoai.api.adata_api import AStockAPI
|
|
from cryptoai.utils.db_manager import get_db_manager
|
|
from datetime import datetime
|
|
from typing import Dict, Any, List, Optional
|
|
from pydantic import BaseModel
|
|
|
|
from cryptoai.api.deepseek_api import DeepSeekAPI
|
|
from cryptoai.utils.config_loader import ConfigLoader
|
|
from fastapi.responses import StreamingResponse
|
|
from cryptoai.routes.user import get_current_user
|
|
import requests
|
|
from cryptoai.api.binance_api import get_binance_api
|
|
from cryptoai.models.data_processor import DataProcessor
|
|
from datetime import timedelta
|
|
# 创建路由
|
|
router = APIRouter()
|
|
|
|
logger = logging.getLogger(__name__)
|
|
logger.setLevel(logging.DEBUG)
|
|
|
|
@router.get("/search/{key}")
|
|
async def search_crypto(key: str):
|
|
manager = get_db_manager()
|
|
result = manager.search_token(key)
|
|
return result
|
|
|
|
class CryptoAnalysisRequest(BaseModel):
|
|
symbol: str
|
|
timeframe: Optional[str] = None
|
|
|
|
@router.get("/kline/{symbol}")
|
|
async def get_crypto_kline(symbol: str, timeframe: Optional[str] = None, limit: Optional[int] = 100):
|
|
# 检查symbol是否存在
|
|
tokens = get_db_manager().search_token(symbol)
|
|
if not tokens or len(tokens) == 0:
|
|
raise HTTPException(status_code=400, detail="您输入的币种在币安不存在,请检查后重新输入。")
|
|
|
|
symbol = tokens[0]["symbol"]
|
|
|
|
binance_api = get_binance_api()
|
|
result = {}
|
|
|
|
if timeframe is None:
|
|
result["15m"] = binance_api.get_historical_klines(symbol=symbol, interval="15m", limit=limit).to_dict(orient="records")
|
|
result["1h"] = binance_api.get_historical_klines(symbol=symbol, interval="1h", limit=limit).to_dict(orient="records")
|
|
result["4h"] = binance_api.get_historical_klines(symbol=symbol, interval="4h", limit=limit).to_dict(orient="records")
|
|
# result["1d"] = binance_api.get_historical_klines(symbol=symbol, interval="1d", limit=limit).to_dict(orient="records")
|
|
else:
|
|
result[timeframe] = binance_api.get_historical_klines(symbol=symbol, interval=timeframe, limit=limit).to_dict(orient="records")
|
|
|
|
return result
|
|
|
|
@router.post("/analysis_v2")
|
|
async def analysis_crypto_v2(request: CryptoAnalysisRequest,
|
|
current_user: dict = Depends(get_current_user)):
|
|
# 检查symbol是否存在
|
|
tokens = get_db_manager().search_token(request.symbol)
|
|
if not tokens or len(tokens) == 0:
|
|
raise HTTPException(status_code=400, detail="您输入的币种在币安不存在,请检查后重新输入。")
|
|
|
|
symbol = tokens[0]["symbol"]
|
|
|
|
url = 'https://mate.aimateplus.com/v1/workflows/run'
|
|
token = 'app-BbaqIAMPi0ktgaV9IizMlc2N'
|
|
headers = {
|
|
'Authorization': f'Bearer {token}',
|
|
'Content-Type': 'application/json'
|
|
}
|
|
|
|
data = {
|
|
"inputs" : {
|
|
"symbol" : symbol,
|
|
"timeframe" : request.timeframe
|
|
},
|
|
"response_mode": "streaming",
|
|
"user": current_user["mail"]
|
|
}
|
|
|
|
# 保存用户提问
|
|
get_db_manager().save_user_question(current_user["id"], symbol, "请分析以下加密货币:" + symbol + ",并给出分析报告。")
|
|
|
|
response = requests.post(url, headers=headers, json=data, stream=True)
|
|
|
|
# 如果响应不成功,返回错误
|
|
if response.status_code != 200:
|
|
raise HTTPException(
|
|
status_code=response.status_code,
|
|
detail=f"Failed to get response from Dify API: {response.text}"
|
|
)
|
|
|
|
# 获取response的stream
|
|
def stream_response():
|
|
for chunk in response.iter_content(chunk_size=1024):
|
|
if chunk:
|
|
yield chunk
|
|
|
|
return StreamingResponse(stream_response(), media_type="text/plain")
|
|
|
|
@router.post("/analysis")
|
|
async def analysis_crypto(request: CryptoAnalysisRequest,
|
|
current_user: dict = Depends(get_current_user)):
|
|
|
|
# 尝试从数据库获取Agent
|
|
try:
|
|
agent_id = 1
|
|
if request.timeframe:
|
|
user_prompt = f"请分析以下加密货币:{request.symbol},并给出 {request.timeframe} 级别的分析报告。"
|
|
else:
|
|
user_prompt = f"请分析以下加密货币:{request.symbol},并给出分析报告。"
|
|
|
|
agent = get_db_manager().get_agent_by_id(agent_id)
|
|
|
|
if not agent:
|
|
raise HTTPException(status_code=400, detail="Invalid agent ID")
|
|
|
|
token = agent.get("dify_token")
|
|
inputs = agent.get("inputs") or {}
|
|
inputs["current_date"] = datetime.now().strftime("%Y-%m-%d")
|
|
|
|
except ValueError:
|
|
raise HTTPException(status_code=400, detail="Invalid agent ID format")
|
|
|
|
url = "https://mate.aimateplus.com/v1/chat-messages"
|
|
headers = {
|
|
"Authorization": f"Bearer {token}",
|
|
"Content-Type": "application/json"
|
|
}
|
|
data = {
|
|
"inputs": inputs,
|
|
"query": user_prompt,
|
|
"response_mode": "streaming",
|
|
"user": current_user["mail"]
|
|
}
|
|
|
|
logging.info(f"Chat request data: {data}")
|
|
|
|
# 保存用户提问
|
|
get_db_manager().save_user_question(current_user["id"], agent_id, user_prompt)
|
|
|
|
response = requests.post(url, headers=headers, json=data, stream=True)
|
|
|
|
# 如果响应不成功,返回错误
|
|
if response.status_code != 200:
|
|
raise HTTPException(
|
|
status_code=response.status_code,
|
|
detail=f"Failed to get response from Dify API: {response.text}"
|
|
)
|
|
|
|
# 获取response的stream
|
|
def stream_response():
|
|
for chunk in response.iter_content(chunk_size=1024):
|
|
if chunk:
|
|
yield chunk
|
|
|
|
return StreamingResponse(stream_response(), media_type="text/plain")
|
|
|
|
|