import json import logging from fastapi import APIRouter, Depends, HTTPException, status, Body, Query, Path from cryptoai.api.adata_api import AStockAPI from cryptoai.utils.db_manager import get_db_manager from datetime import datetime from typing import Dict, Any, List, Optional from pydantic import BaseModel from cryptoai.api.deepseek_api import DeepSeekAPI from cryptoai.utils.config_loader import ConfigLoader from fastapi.responses import StreamingResponse from cryptoai.routes.user import get_current_user import requests from cryptoai.api.binance_api import get_binance_api from cryptoai.models.data_processor import DataProcessor from datetime import timedelta # 创建路由 router = APIRouter() logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) @router.get("/search/{key}") async def search_crypto(key: str): manager = get_db_manager() result = manager.search_token(key) return result class CryptoAnalysisRequest(BaseModel): symbol: str timeframe: Optional[str] = None @router.get("/kline/{symbol}") async def get_crypto_kline(symbol: str, timeframe: Optional[str] = None, limit: Optional[int] = 200): if symbol.endswith("USDT"): symbol = symbol else: symbol = symbol + "USDT" binance_api = get_binance_api() result = {} if timeframe is None: result["1h"] = binance_api.get_historical_klines(symbol=symbol, interval="1h", limit=limit, ts_transform=False).to_dict(orient="records") result["4h"] = binance_api.get_historical_klines(symbol=symbol, interval="4h", limit=limit, ts_transform=False).to_dict(orient="records") result["1d"] = binance_api.get_historical_klines(symbol=symbol, interval="1d", limit=limit, ts_transform=False).to_dict(orient="records") result["1w"] = binance_api.get_historical_klines(symbol=symbol, interval="1w", limit=limit, ts_transform=False).to_dict(orient="records") else: result[timeframe] = binance_api.get_historical_klines(symbol=symbol, interval=timeframe, limit=limit, ts_transform=False).to_dict(orient="records") return result @router.post("/analysis_v2") async def analysis_crypto_v2(request: CryptoAnalysisRequest, current_user: dict = Depends(get_current_user)): if request.symbol.endswith("USDT"): symbol = request.symbol else: symbol = request.symbol + "USDT" url = 'https://mate.aimateplus.com/v1/workflows/run' token = 'app-BbaqIAMPi0ktgaV9IizMlc2N' headers = { 'Authorization': f'Bearer {token}', 'Content-Type': 'application/json' } data = { "inputs" : { "symbol" : symbol, "timeframe" : request.timeframe }, "response_mode": "streaming", "user": current_user["mail"] } # 保存用户提问 get_db_manager().save_user_question(current_user["id"], symbol, "请分析以下加密货币:" + symbol + ",并给出分析报告。") response = requests.post(url, headers=headers, json=data, stream=True) # 如果响应不成功,返回错误 if response.status_code != 200: raise HTTPException( status_code=response.status_code, detail=f"Failed to get response from Dify API: {response.text}" ) # 获取response的stream def stream_response(): for chunk in response.iter_content(chunk_size=1024): if chunk: yield chunk return StreamingResponse(stream_response(), media_type="text/plain") @router.post("/analysis") async def analysis_crypto(request: CryptoAnalysisRequest, current_user: dict = Depends(get_current_user)): # 尝试从数据库获取Agent try: agent_id = 1 if request.timeframe: user_prompt = f"请分析以下加密货币:{request.symbol},并给出 {request.timeframe} 级别的分析报告。" else: user_prompt = f"请分析以下加密货币:{request.symbol},并给出分析报告。" agent = get_db_manager().get_agent_by_id(agent_id) if not agent: raise HTTPException(status_code=400, detail="Invalid agent ID") token = agent.get("dify_token") inputs = agent.get("inputs") or {} inputs["current_date"] = datetime.now().strftime("%Y-%m-%d") except ValueError: raise HTTPException(status_code=400, detail="Invalid agent ID format") url = "https://mate.aimateplus.com/v1/chat-messages" headers = { "Authorization": f"Bearer {token}", "Content-Type": "application/json" } data = { "inputs": inputs, "query": user_prompt, "response_mode": "streaming", "user": current_user["mail"] } logging.info(f"Chat request data: {data}") # 保存用户提问 get_db_manager().save_user_question(current_user["id"], agent_id, user_prompt) response = requests.post(url, headers=headers, json=data, stream=True) # 如果响应不成功,返回错误 if response.status_code != 200: raise HTTPException( status_code=response.status_code, detail=f"Failed to get response from Dify API: {response.text}" ) # 获取response的stream def stream_response(): for chunk in response.iter_content(chunk_size=1024): if chunk: yield chunk return StreamingResponse(stream_response(), media_type="text/plain")