crypto.ai/cryptoai/routes/crypto.py
2025-06-15 09:46:24 +08:00

79 lines
3.0 KiB
Python

import json
import logging
from fastapi import APIRouter, Depends, HTTPException, status, Body, Query, Path
from cryptoai.api.adata_api import AStockAPI
from datetime import datetime
from typing import Dict, Any, List, Optional
from pydantic import BaseModel
from cryptoai.api.deepseek_api import DeepSeekAPI
from cryptoai.utils.config_loader import ConfigLoader
from fastapi.responses import StreamingResponse
from cryptoai.routes.user import get_current_user
import requests
from cryptoai.api.binance_api import get_binance_api
from cryptoai.models.data_processor import DataProcessor
from datetime import timedelta
from cryptoai.models.token import TokenManager
from sqlalchemy.orm import Session
from cryptoai.utils.db_manager import get_db
# 创建路由
router = APIRouter()
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
@router.get("/search/{key}")
async def search_crypto(key: str, session: Session = Depends(get_db)):
manager = TokenManager(session)
result = manager.search_token(key)
return result
class CryptoAnalysisRequest(BaseModel):
symbol: str
timeframe: Optional[str] = None
@router.get("/kline/{symbol}")
async def get_crypto_kline(symbol: str, timeframe: Optional[str] = None, limit: Optional[int] = 100, session: Session = Depends(get_db)):
# 检查symbol是否存在
token_manager = TokenManager(session)
tokens = token_manager.search_token(symbol)
if not tokens or len(tokens) == 0:
raise HTTPException(status_code=400, detail="您输入的币种在币安不存在,请检查后重新输入。")
symbol = tokens[0]["symbol"]
binance_api = get_binance_api()
result = {}
data_processor = DataProcessor()
if timeframe is None:
result["15m"] = data_processor.add_technical_indicators(binance_api.get_historical_klines(symbol=symbol, interval="15m", limit=limit)).to_dict(orient="records")
result["1h"] = data_processor.add_technical_indicators(binance_api.get_historical_klines(symbol=symbol, interval="1h", limit=limit)).to_dict(orient="records")
result["4h"] = data_processor.add_technical_indicators(binance_api.get_historical_klines(symbol=symbol, interval="4h", limit=limit)).to_dict(orient="records")
else:
result[timeframe] = data_processor.add_technical_indicators(binance_api.get_historical_klines(symbol=symbol, interval=timeframe, limit=limit)).to_dict(orient="records")
return result
@router.get("/news")
async def get_crypto_news(session: Session = Depends(get_db)):
url = 'https://api.blockbeats.cn/h5v1/newsflash/list?page=1&limit=15&ios=1&end_time=&detective=-2'
response = requests.get(url)
data = response.json()
result = []
for item in data["data"]["list"]:
timestamp = item["add_time"]
add_time = datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d %H:%M:%S")
result.append({
"title": item["title"],
"content": item["content"],
"add_time": add_time
})
return result