diff --git a/cryptoai/api/__pycache__/binance_api.cpython-313.pyc b/cryptoai/api/__pycache__/binance_api.cpython-313.pyc index 68d6366..0831f15 100644 Binary files a/cryptoai/api/__pycache__/binance_api.cpython-313.pyc and b/cryptoai/api/__pycache__/binance_api.cpython-313.pyc differ diff --git a/cryptoai/api/binance_api.py b/cryptoai/api/binance_api.py index ba40749..5d33849 100644 --- a/cryptoai/api/binance_api.py +++ b/cryptoai/api/binance_api.py @@ -7,8 +7,9 @@ import datetime import time class BinanceAPI: - """Binance API交互类,用于获取市场数据和执行交易""" + """Binance API交互类,用于获取市场数据和执行交易""" + def __init__(self, api_key: str, api_secret: str, test_mode: bool = True): """ 初始化Binance API客户端 @@ -224,4 +225,21 @@ class BinanceAPI: ) except BinanceAPIException as e: print(f"下单时出错: {e}") - return {} \ No newline at end of file + return {} + + +from cryptoai.utils.config_loader import ConfigLoader + +def get_binance_api() -> BinanceAPI: + """ + 获取Binance API实例 + + Returns: + BinanceAPI实例 + """ + config = ConfigLoader() + api_key = config.get_binance_config()['api_key'] + api_secret = config.get_binance_config()['api_secret'] + test_mode = config.get_binance_config()['test_mode'] + + return BinanceAPI(api_key, api_secret, test_mode) diff --git a/cryptoai/routes/adata.py b/cryptoai/routes/adata.py index 3e0415a..95fed54 100644 --- a/cryptoai/routes/adata.py +++ b/cryptoai/routes/adata.py @@ -4,6 +4,14 @@ from fastapi import APIRouter, Depends, HTTPException, status, Body, Query, Path from cryptoai.api.adata_api import AStockAPI from cryptoai.utils.db_manager import get_db_manager from datetime import datetime +from typing import Dict, Any, List, Optional +from pydantic import BaseModel + +from cryptoai.api.deepseek_api import DeepSeekAPI +from cryptoai.utils.config_loader import ConfigLoader +from fastapi.responses import StreamingResponse +from cryptoai.routes.user import get_current_user +import requests # 创建路由 router = APIRouter() @@ -107,3 +115,46 @@ async def get_stock_data_all(stock_code: str): return {} return result + + +@router.post('/{stock_code}/analysis', summary="获取股票分析数据") +async def get_stock_analysis(stock_code: str, current_user: Dict[str, Any] = Depends(get_current_user)): + + + url = 'https://mate.aimateplus.com/v1/workflows/run' + token = 'app-nWuCOa0YfQVtAosTY3Jr5vFV' + headers = { + 'Authorization': f'Bearer {token}', + 'Content-Type': 'application/json' + } + + data = { + "inputs" : { + "stock_code" : stock_code + }, + "response_mode": "streaming", + "user": current_user["mail"] + } + + response = requests.post(url, headers=headers, json=data, stream=True) + + # 如果响应不成功,返回错误 + if response.status_code != 200: + raise HTTPException( + status_code=response.status_code, + detail=f"Failed to get response from Dify API: {response.text}" + ) + + # 获取response的stream + def stream_response(): + for chunk in response.iter_content(chunk_size=1024): + if chunk: + yield chunk + + return StreamingResponse(stream_response(), media_type="text/plain") + + + + + + diff --git a/cryptoai/routes/crypto.py b/cryptoai/routes/crypto.py new file mode 100644 index 0000000..aced56c --- /dev/null +++ b/cryptoai/routes/crypto.py @@ -0,0 +1,90 @@ +import json +import logging +from fastapi import APIRouter, Depends, HTTPException, status, Body, Query, Path +from cryptoai.api.adata_api import AStockAPI +from cryptoai.utils.db_manager import get_db_manager +from datetime import datetime +from typing import Dict, Any, List, Optional +from pydantic import BaseModel + +from cryptoai.api.deepseek_api import DeepSeekAPI +from cryptoai.utils.config_loader import ConfigLoader +from fastapi.responses import StreamingResponse +from cryptoai.routes.user import get_current_user +import requests + +# 创建路由 +router = APIRouter() + +logger = logging.getLogger(__name__) +logger.setLevel(logging.DEBUG) + +@router.get("/crypto/search/{key}") +async def search_crypto(key: str): + manager = get_db_manager() + result = manager.search_token(key) + return result + +class CryptoAnalysisRequest(BaseModel): + symbol: str + timeframe: Optional[str] = None + +@router.post("/crypto/analysis") +async def analysis_crypto(request: CryptoAnalysisRequest, + current_user: dict = Depends(get_current_user)): + + # 尝试从数据库获取Agent + try: + agent_id = 1 + if request.timeframe: + user_prompt = f"请分析以下加密货币:{request.symbol},并给出 {request.timeframe} 级别的分析报告。" + else: + user_prompt = f"请分析以下加密货币:{request.symbol},并给出分析报告。" + + agent = get_db_manager().get_agent_by_id(agent_id) + + if not agent: + raise HTTPException(status_code=400, detail="Invalid agent ID") + + token = agent.get("dify_token") + inputs = agent.get("inputs") or {} + inputs["current_date"] = datetime.now().strftime("%Y-%m-%d") + + except ValueError: + raise HTTPException(status_code=400, detail="Invalid agent ID format") + + url = "https://mate.aimateplus.com/v1/chat-messages" + headers = { + "Authorization": f"Bearer {token}", + "Content-Type": "application/json" + } + data = { + "inputs": inputs, + "query": user_prompt, + "response_mode": "streaming", + "user": current_user["mail"] + } + + logging.info(f"Chat request data: {data}") + + # 保存用户提问 + get_db_manager().save_user_question(current_user["id"], agent_id, user_prompt) + + response = requests.post(url, headers=headers, json=data, stream=True) + + # 如果响应不成功,返回错误 + if response.status_code != 200: + raise HTTPException( + status_code=response.status_code, + detail=f"Failed to get response from Dify API: {response.text}" + ) + + # 获取response的stream + def stream_response(): + for chunk in response.iter_content(chunk_size=1024): + if chunk: + yield chunk + + return StreamingResponse(stream_response(), media_type="text/plain") + + diff --git a/cryptoai/routes/fastapi_app.py b/cryptoai/routes/fastapi_app.py index 6f7ffc0..6b79552 100644 --- a/cryptoai/routes/fastapi_app.py +++ b/cryptoai/routes/fastapi_app.py @@ -20,6 +20,7 @@ from cryptoai.routes.feed import router as feed_router from cryptoai.routes.user import router as user_router from cryptoai.routes.adata import router as adata_router from cryptoai.routes.question import router as question_router +from cryptoai.routes.crypto import router as crypto_router # 配置日志 logging.basicConfig( @@ -54,7 +55,7 @@ app.include_router(feed_router, prefix="/feed", tags=["AI Agent信息流"]) app.include_router(user_router, prefix="/user", tags=["用户管理"]) app.include_router(question_router, prefix="/question", tags=["用户提问"]) app.include_router(adata_router, prefix="/adata", tags=["A股数据"]) - +app.include_router(crypto_router, prefix="/crypto", tags=["加密货币数据"]) # 请求计时中间件 @app.middleware("http") async def add_process_time_header(request: Request, call_next): diff --git a/cryptoai/routes/usstock.py b/cryptoai/routes/usstock.py new file mode 100644 index 0000000..e69de29 diff --git a/cryptoai/utils/db_manager.py b/cryptoai/utils/db_manager.py index f3db435..bb1352c 100644 --- a/cryptoai/utils/db_manager.py +++ b/cryptoai/utils/db_manager.py @@ -12,6 +12,7 @@ from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker, relationship from sqlalchemy.dialects.mysql import JSON from sqlalchemy.pool import QueuePool +from sqlalchemy import or_ from cryptoai.utils.config_loader import ConfigLoader @@ -25,6 +26,25 @@ logger = logging.getLogger('db_manager') # 创建模型基类 Base = declarative_base() +# 定义Token模型 +class Token(Base): + """Token信息表模型""" + __tablename__ = 'tokens' + + id = Column(Integer, primary_key=True, autoincrement=True) + symbol = Column(String(50), nullable=False, unique=True, comment='交易对符号') + base_asset = Column(String(20), nullable=False, comment='基础资产') + quote_asset = Column(String(20), nullable=False, comment='计价资产') + created_at = Column(DateTime, nullable=False, default=datetime.now, comment='创建时间') + + # 索引 + __table_args__ = ( + Index('idx_symbol', 'symbol'), + Index('idx_base_asset', 'base_asset'), + Index('idx_quote_asset', 'quote_asset'), + {'mysql_charset': 'utf8mb4', 'mysql_collate': 'utf8mb4_unicode_ci'} + ) + # 定义分析结果模型 class AnalysisResult(Base): """分析结果表模型""" @@ -1648,7 +1668,136 @@ class DBManager: except Exception as e: logger.error(f"获取股票列表失败: {e}") return [] + + def create_token(self, symbol: str, base_asset: str, quote_asset: str) -> bool: + """ + 创建新的Token信息 + + Args: + symbol: 交易对符号 + base_asset: 基础资产 + quote_asset: 计价资产 + + Returns: + 创建是否成功 + """ + if not self.engine: + try: + self._init_db() + except Exception as e: + logger.error(f"重新连接数据库失败: {e}") + return False + + try: + session = self.Session() + + try: + # 检查交易对是否已存在 + existing_token = session.query(Token).filter(Token.symbol == symbol).first() + if existing_token: + logger.warning(f"交易对 {symbol} 已存在") + return False + + # 创建新Token记录 + new_token = Token( + symbol=symbol, + base_asset=base_asset, + quote_asset=quote_asset + ) + + session.add(new_token) + session.commit() + + logger.info(f"成功创建Token信息: {symbol}") + return True + + except Exception as e: + session.rollback() + logger.error(f"创建Token信息失败: {e}") + return False + + finally: + session.close() + + except Exception as e: + logger.error(f"创建数据库会话失败: {e}") + return False + + def delete_token(self, symbol: str) -> bool: + """ + 删除Token信息 + + Args: + symbol: 交易对符号 + + Returns: + 删除是否成功 + """ + if not self.engine: + try: + self._init_db() + except Exception as e: + logger.error(f"重新连接数据库失败: {e}") + return False + + try: + session = self.Session() + + try: + # 查询Token + token = session.query(Token).filter(Token.symbol == symbol).first() + if not token: + logger.warning(f"交易对 {symbol} 不存在") + return False + + # 删除Token + session.delete(token) + session.commit() + + logger.info(f"成功删除Token信息: {symbol}") + return True + + except Exception as e: + session.rollback() + logger.error(f"删除Token信息失败: {e}") + return False + + finally: + session.close() + + except Exception as e: + logger.error(f"创建数据库会话失败: {e}") + return False + def search_token(self, key: str, limit: int = 10) -> List[Dict[str, Any]]: + """ + 搜索Token + """ + if not self.engine: + try: + self._init_db() + except Exception as e: + logger.error(f"重新连接数据库失败: {e}") + return [] + + try: + session = self.Session() + + # 使用 SQLAlchemy 的 ORM 查询 + tokens = session.query(Token).filter(Token.symbol.like(f"{key}%")).limit(limit).all() + + return [{ + 'symbol': token.symbol, + 'base_asset': token.base_asset, + 'quote_asset': token.quote_asset + } for token in tokens] + + except Exception as e: + logger.error(f"获取Token信息失败: {e}") + return [] + + finally: + session.close() # 单例模式 _db_instance = None diff --git a/test.py b/test.py index 0f907e0..94819a7 100644 --- a/test.py +++ b/test.py @@ -1,17 +1,20 @@ from cryptoai.utils.db_manager import get_db_manager from cryptoai.api.adata_api import AStockAPI +from cryptoai.api.binance_api import get_binance_api import json from time import sleep if __name__ == "__main__": - print("开始获取A股数据") - api = AStockAPI() - stock_codes = api.get_all_stock_codes() - list = json.loads(stock_codes.to_json(orient="records")) - # print(list[0]) + + symbols = get_binance_api().get_all_symbols() + # for symbol in symbols: + # # 移除 symbol 中的 USDT + # symbol = symbol.replace('USDT', '') + # print(symbol) - # 保存到数据库 - for stock in list: - print(f"创建股票: {stock['stock_code']} - {stock['short_name']}") - get_db_manager().create_stock(stock["stock_code"], stock["short_name"], stock["exchange"], stock["list_date"]) - print(f"创建股票: {stock['stock_code']} - {stock['short_name']} 完成") - sleep(1) \ No newline at end of file + manager = get_db_manager() + + for symbol in symbols: + base_asset = symbol.split('USDT')[0] + quote_asset = 'USDT' + + manager.create_token(symbol,base_asset, quote_asset)