import json import logging from fastapi import APIRouter, Depends, HTTPException, status, Body, Query, Path from cryptoai.api.adata_api import AStockAPI from cryptoai.utils.db_manager import get_db_manager from datetime import datetime from typing import Dict, Any, List, Optional from pydantic import BaseModel from cryptoai.api.deepseek_api import DeepSeekAPI from cryptoai.utils.config_loader import ConfigLoader from fastapi.responses import StreamingResponse from cryptoai.routes.user import get_current_user import requests # 创建路由 router = APIRouter() logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) @router.get("/stock/search") async def search_stock(key: str, limit: int = 10): manager = get_db_manager() result = manager.search_stock(key, limit) return result @router.get("/stock/base", summary="获取股票基础信息") async def get_stock_base(stock_code: str): api = AStockAPI() result = {} try: # 获取股本信息 stock_shares = api.get_stock_shares(stock_code) result["stock_shares"] = json.loads(stock_shares.to_json(orient="records")) # 获取概念板块 concept_east = api.get_concept_east(stock_code) result["concept_east"] = json.loads(concept_east.to_json(orient="records")) # 获取板块 plate_east = api.get_plate_east(stock_code) result["plate_east"] = json.loads(plate_east.to_json(orient="records")) except Exception as e: logger.error(f"获取股票基础信息失败: {e}") return {} return result @router.get("/stock/data", summary="获取股票数据") async def get_stock_data(stock_code: str): api = AStockAPI() result = {} try: start_date = "2025-01-01" end_date = datetime.now().strftime("%Y-%m-%d") # 获取市场数据 market_data = api.get_market_data(stock_code, start_date, end_date) result["market_data"] = json.loads(market_data.to_json(orient="records")) # 获取资金流向数据 flow_data = api.get_capital_flow(stock_code, start_date, end_date) result["flow_data"] = json.loads(flow_data.to_json(orient="records")) # 获取扫雷避险数据 mine_clearance = api.get_mine_clearance_tdx(stock_code) result["mine_clearance"] = json.loads(mine_clearance.to_json(orient="records")) except Exception as e: logger.error(f"获取股票数据失败: {e}") return {} return result @router.get("/stock/data/all", summary="获取所有股票数据") async def get_stock_data_all(stock_code: str): result = {} try: api = AStockAPI() #获取股本信息 stock_shares = api.get_stock_shares(stock_code) result["stock_shares"] = json.loads(stock_shares.to_json(orient="records")) # 获取概念板块 concept_east = api.get_concept_east(stock_code) result["concept_east"] = json.loads(concept_east.to_json(orient="records")) # 获取板块 plate_east = api.get_plate_east(stock_code) result["plate_east"] = json.loads(plate_east.to_json(orient="records")) # 获取市场数据 market_data = api.get_market_data(stock_code) result["market_data"] = json.loads(market_data.to_json(orient="records")) # 获取分钟线数据 min_data = api.get_market_min_data(stock_code) result["min_data"] = json.loads(min_data.to_json(orient="records")) # 获取资金流向数据 flow_data = api.get_capital_flow(stock_code) result["flow_data"] = json.loads(flow_data.to_json(orient="records")) except Exception as e: logger.error(f"获取股票数据失败: {e}") return {} return result @router.post('/{stock_code}/analysis', summary="获取股票分析数据") async def get_stock_analysis(stock_code: str, current_user: Dict[str, Any] = Depends(get_current_user)): url = 'https://mate.aimateplus.com/v1/workflows/run' token = 'app-nWuCOa0YfQVtAosTY3Jr5vFV' headers = { 'Authorization': f'Bearer {token}', 'Content-Type': 'application/json' } data = { "inputs" : { "stock_code" : stock_code }, "response_mode": "streaming", "user": current_user["mail"] } response = requests.post(url, headers=headers, json=data, stream=True) # 如果响应不成功,返回错误 if response.status_code != 200: raise HTTPException( status_code=response.status_code, detail=f"Failed to get response from Dify API: {response.text}" ) # 获取response的stream def stream_response(): for chunk in response.iter_content(chunk_size=1024): if chunk: yield chunk return StreamingResponse(stream_response(), media_type="text/plain")