import json import logging from fastapi import APIRouter, Depends, HTTPException, status, Body, Query, Path from cryptoai.api.adata_api import AStockAPI from datetime import datetime from typing import Dict, Any, List, Optional from pydantic import BaseModel from cryptoai.api.deepseek_api import DeepSeekAPI from cryptoai.utils.config_loader import ConfigLoader from fastapi.responses import StreamingResponse from cryptoai.routes.user import get_current_user import requests from cryptoai.api.binance_api import get_binance_api from cryptoai.models.data_processor import DataProcessor from datetime import timedelta from sqlalchemy.orm import Session from cryptoai.utils.db_manager import get_db # 创建路由 router = APIRouter() logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) @router.get("/search/{key}") async def search_crypto(key: str, session: Session = Depends(get_db)): """ 搜索加密货币交易对 从币安API获取所有USDT交易对,然后根据关键字过滤 """ binance_api = get_binance_api() all_symbols = binance_api.get_all_symbols() # 将搜索关键字转换为大写 key_upper = key.upper() # 过滤包含关键字的交易对 filtered_symbols = [symbol for symbol in all_symbols if key_upper in symbol] # 构建返回结果,格式与原来保持一致 result = [] for symbol in filtered_symbols[:20]: # 限制返回前20个结果 # 从symbol中提取基础资产名称 (如BTCUSDT -> BTC) base_asset = symbol.replace('USDT', '') if symbol.endswith('USDT') else symbol result.append({ "symbol": symbol, "base_asset": base_asset, "quote_asset": "USDT" }) return result class CryptoAnalysisRequest(BaseModel): symbol: str timeframe: Optional[str] = None @router.get("/kline/{symbol}") async def get_crypto_kline(symbol: str, timeframe: Optional[str] = None, limit: Optional[int] = 100, session: Session = Depends(get_db)): # 检查symbol是否存在 - 从币安API获取所有交易对进行验证 binance_api = get_binance_api() all_symbols = binance_api.get_all_symbols() # 将输入的symbol转换为大写并确保以USDT结尾 symbol_upper = symbol.upper() if not symbol_upper.endswith('USDT'): symbol_upper += 'USDT' if symbol_upper not in all_symbols: raise HTTPException(status_code=400, detail="您输入的币种在币安不存在,请检查后重新输入。") symbol = symbol_upper result = {} if timeframe is None: result["15m"] = binance_api.get_historical_klines(symbol=symbol, interval="15m", limit=limit).to_dict(orient="records") result["1h"] = binance_api.get_historical_klines(symbol=symbol, interval="1h", limit=limit).to_dict(orient="records") result["4h"] = binance_api.get_historical_klines(symbol=symbol, interval="4h", limit=limit).to_dict(orient="records") # result["1d"] = binance_api.get_historical_klines(symbol=symbol, interval="1d", limit=limit).to_dict(orient="records") else: result[timeframe] = binance_api.get_historical_klines(symbol=symbol, interval=timeframe, limit=limit).to_dict(orient="records") return result @router.get("/news") async def get_crypto_news(page: Optional[int] = 1, limit: Optional[int] = 0, session: Session = Depends(get_db)): url = f'https://api.blockbeats.cn/h5v1/newsflash/list?page={page}&limit={limit}&ios=1&end_time=&detective=-2' response = requests.get(url) data = response.json() result = [] for item in data["data"]["list"]: timestamp = item["add_time"] add_time = datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d %H:%M:%S") result.append({ "title": item["title"], "content": item["content"], "timestamp": timestamp, "add_time": add_time, "url" : item["url"] }) return result