crypto.ai/cryptoai/routes/adata.py
2025-05-18 16:43:24 +08:00

177 lines
5.4 KiB
Python

import json
import logging
from fastapi import APIRouter, Depends, HTTPException, status, Body, Query, Path
from cryptoai.api.adata_api import AStockAPI
from cryptoai.utils.db_manager import get_db_manager
from datetime import datetime
from typing import Dict, Any, List, Optional
from pydantic import BaseModel
from cryptoai.api.deepseek_api import DeepSeekAPI
from cryptoai.utils.config_loader import ConfigLoader
from fastapi.responses import StreamingResponse
from cryptoai.routes.user import get_current_user
import requests
# 创建路由
router = APIRouter()
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
@router.get("/stock/search")
async def search_stock(key: str, limit: int = 10):
manager = get_db_manager()
result = manager.search_stock(key, limit)
return result
@router.get("/stock/base", summary="获取股票基础信息")
async def get_stock_base(stock_code: str):
api = AStockAPI()
result = {}
try:
# 获取核心财务指标
core_index = api.get_stock_finance_core_index(stock_code)
result["finance_core_index"] = json.loads(core_index.to_json(orient="records"))
# 获取股本信息
stock_shares = api.get_stock_shares(stock_code)
result["stock_shares"] = json.loads(stock_shares.to_json(orient="records"))
# 获取概念板块
concept_east = api.get_concept_east(stock_code)
result["concept_east"] = json.loads(concept_east.to_json(orient="records"))
# 获取板块
plate_east = api.get_plate_east(stock_code)
result["plate_east"] = json.loads(plate_east.to_json(orient="records"))
except Exception as e:
logger.error(f"获取股票基础信息失败: {e}")
return {}
return result
@router.get("/stock/data", summary="获取股票数据")
async def get_stock_data(stock_code: str, start_date: Optional[str] = None, end_date: Optional[str] = None):
api = AStockAPI()
result = {}
try:
if start_date is None:
start_date = "2025-01-01"
if end_date is None:
end_date = datetime.now().strftime("%Y-%m-%d")
# 获取市场数据
market_data = api.get_market_data(stock_code, start_date, end_date)
result["market_data"] = json.loads(market_data.to_json(orient="records"))
# 获取资金流向数据
flow_data = api.get_capital_flow(stock_code, start_date, end_date)
result["flow_data"] = json.loads(flow_data.to_json(orient="records"))
except Exception as e:
logger.error(f"获取股票数据失败: {e}")
return {}
return result
@router.get("/stock/data/all", summary="获取所有股票数据")
async def get_stock_data_all(stock_code: str):
result = {}
try:
api = AStockAPI()
#获取股本信息
stock_shares = api.get_stock_shares(stock_code)
result["stock_shares"] = json.loads(stock_shares.to_json(orient="records"))
# 获取概念板块
concept_east = api.get_concept_east(stock_code)
result["concept_east"] = json.loads(concept_east.to_json(orient="records"))
# 获取板块
plate_east = api.get_plate_east(stock_code)
result["plate_east"] = json.loads(plate_east.to_json(orient="records"))
# 获取市场数据
market_data = api.get_market_data(stock_code)
result["market_data"] = json.loads(market_data.to_json(orient="records"))
# 获取分钟线数据
min_data = api.get_market_min_data(stock_code)
result["min_data"] = json.loads(min_data.to_json(orient="records"))
# 获取资金流向数据
flow_data = api.get_capital_flow(stock_code)
result["flow_data"] = json.loads(flow_data.to_json(orient="records"))
except Exception as e:
logger.error(f"获取股票数据失败: {e}")
return {}
return result
@router.post('/{stock_code}/analysis', summary="获取股票分析数据")
async def get_stock_analysis(stock_code: str, current_user: Dict[str, Any] = Depends(get_current_user)):
# 检查stock_code是否存在
# codes = get_db_manager().search_stock(stock_code)
# if not codes or len(codes) == 0:
# raise HTTPException(status_code=400, detail="您输入的股票代码不存在,请检查后重新输入。")
# stock_code = codes[0]["stock_code"]
url = 'https://mate.aimateplus.com/v1/workflows/run'
token = 'app-nWuCOa0YfQVtAosTY3Jr5vFV'
headers = {
'Authorization': f'Bearer {token}',
'Content-Type': 'application/json'
}
data = {
"inputs" : {
"stock_code" : stock_code
},
"response_mode": "streaming",
"user": current_user["mail"]
}
# 保存用户提问
get_db_manager().save_user_question(current_user["id"], stock_code, "请分析以下股票:" + stock_code + ",并给出分析报告。")
response = requests.post(url, headers=headers, json=data, stream=True)
# 如果响应不成功,返回错误
if response.status_code != 200:
raise HTTPException(
status_code=response.status_code,
detail=f"Failed to get response from Dify API: {response.text}"
)
# 获取response的stream
def stream_response():
for chunk in response.iter_content(chunk_size=1024):
if chunk:
yield chunk
return StreamingResponse(stream_response(), media_type="text/plain")