diff --git a/backend/test_bitget_trading.py b/backend/test_bitget_trading.py deleted file mode 100644 index c21d217..0000000 --- a/backend/test_bitget_trading.py +++ /dev/null @@ -1,472 +0,0 @@ -""" -测试 Bitget 实盘交易 API - -运行前请确保已在 .env 中配置: -- bitget_api_key -- bitget_api_secret -- bitget_passphrase -- bitget_use_testnet = true (建议先在测试网测试) -""" -import sys -import os -import asyncio - -# 添加项目路径 -sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) - -from app.services.bitget_trading_api_sdk import BitgetTradingAPI -from app.config import get_settings - - -def print_section(title: str): - """打印分节标题""" - print("\n" + "=" * 60) - print(f" {title}") - print("=" * 60) - - -def test_connection(api: BitgetTradingAPI): - """测试 API 连接""" - print_section("1. 测试 API 连接") - - if api.test_connection(): - print("✅ API 连接成功") - return True - else: - print("❌ API 连接失败") - return False - - -def test_balance(api: BitgetTradingAPI): - """测试查询余额""" - print_section("2. 查询账户余额") - - balance = api.get_balance() - if balance: - print(f"✅ 查询余额成功") - for currency, info in balance.items(): - available = info.get('available', '0') - frozen = info.get('frozen', '0') - print(f" {currency}: 可用={available}, 冻结={frozen}") - return True - else: - print("❌ 查询余额失败") - return False - - -def test_position(api: BitgetTradingAPI, symbol: str = "BTCUSDT"): - """测试查询持仓""" - print_section(f"3. 查询持仓 ({symbol})") - - positions = api.get_position(symbol) - if positions is not None: - if positions: - print(f"✅ 查询到 {len(positions)} 个持仓") - for pos in positions: - pos_side = pos.get('side', 'N/A') - contracts = pos.get('contracts', 0) - unrealized_pnl = pos.get('unrealizedPnl', 0) - print(f" {pos_side}: {contracts}张, 未实现盈亏={unrealized_pnl}") - else: - print("✅ 当前无持仓") - return True - else: - print("❌ 查询持仓失败") - return False - - -def test_leverage(api: BitgetTradingAPI, symbol: str = "BTCUSDT", leverage: int = 10): - """测试设置杠杆""" - print_section(f"4. 设置杠杆 ({symbol} {leverage}x)") - - if api.set_leverage(symbol, leverage): - print(f"✅ 设置杠杆成功: {leverage}x") - return True - else: - print(f"❌ 设置杠杆失败") - return False - - -def test_open_orders(api: BitgetTradingAPI): - """测试查询挂单""" - print_section("5. 查询当前挂单") - - orders = api.get_open_orders() - if orders is not None: - if orders: - print(f"✅ 查询到 {len(orders)} 个挂单") - for order in orders[:5]: # 只显示前5个 - order_id = order.get('id', 'N/A') - symbol = order.get('symbol', 'N/A') - side = order.get('side', 'N/A') - amount = order.get('amount', 'N/A') - price = order.get('price', 'N/A') - print(f" {order_id}: {symbol} {side} {amount} @ {price}") - else: - print("✅ 当前无挂单") - return True - else: - print("❌ 查询挂单失败") - return False - - -def test_place_order(api: BitgetTradingAPI, symbol: str = "BTCUSDT", - side: str = "buy", usdt_value: float = 100, - order_type: str = "market", leverage: int = 20, price: float = None): - """ - 测试下单(按 USDT 价值) - - Args: - api: Bitget API 实例 - symbol: 交易对 - side: 买卖方向 - usdt_value: 下单的 USDT 价值 - order_type: 订单类型 - leverage: 杠杆倍数 - price: 限价单价格(可选) - """ - print_section(f"6. 下单测试 ({symbol} {side} 价值${usdt_value}, {leverage}x杠杆)") - - settings = get_settings() - - # 先设置杠杆 - print(f"设置杠杆: {leverage}x") - api.set_leverage(symbol, leverage) - - # 获取当前价格 - import ccxt - ccxt_symbol = api._standardize_symbol(symbol) - ticker = api.exchange.fetch_ticker(ccxt_symbol) - current_price = ticker['last'] if ticker else None - - if not current_price: - print("❌ 无法获取当前价格") - return None - - print(f"当前价格: ${current_price:.2f}") - - # 选择订单类型 - print("\n选择订单类型:") - print(" 1. 市价单 (立即成交)") - print(" 2. 限价单 (挂单,可测试撤单)") - order_type_choice = input("请选择 (1-2, 默认2): ").strip() - if order_type_choice == '1': - order_type = 'market' - limit_price = None - else: - order_type = 'limit' - # 限价单需要输入价格 - price_input = input("挂单价格 (留空使用默认远离市价的价格): ").strip() - if price_input: - limit_price = float(price_input) - else: - # 默认设置一个远离当前价的价格,确保不会立即成交 - if side == 'buy': - limit_price = current_price * 0.95 # 买单价格低于市价5% - print(f"默认买单价格: ${limit_price:.2f} (低于市价5%)") - else: - limit_price = current_price * 1.05 # 卖单价格高于市价5% - print(f"默认卖单价格: ${limit_price:.2f} (高于市价5%)") - - # 计算合约数量 - # Bitget 合约规格: BTC/USDT 每张 0.01 BTC, ETH/USDT 每张 0.1 ETH - if 'BTC' in symbol: - contract_size = 0.01 # 每张 0.01 BTC - min_size = 1 # Bitget 最小 1 张 - coin_price = current_price - elif 'ETH' in symbol: - contract_size = 0.1 # 每张 0.1 ETH - min_size = 1 # Bitget 最小 1 张 - coin_price = current_price - else: - contract_size = 1 - min_size = 1 - coin_price = current_price - - # 计算需要的数量 (USDT价值 / (币价 * 合约规格)) - size = usdt_value / (coin_price * contract_size) - - # 确保满足最小下单量 - size = max(size, min_size) - - # 计算实际仓位价值和所需保证金 - position_value = size * contract_size * coin_price - required_margin = position_value / leverage - - print(f"合约规格: {contract_size} 币/张") - print(f"下单数量: {size:.3f}张 (最小{min_size}张)") - print(f"仓位价值: ${position_value:.2f}") - print(f"所需保证金: ${required_margin:.2f} ({leverage}x杠杆)") - print(f"模式: {'测试网' if settings.bitget_use_testnet else '生产网'}") - - # 检查余额 - balance = api.get_balance() - available = float(balance.get('USDT', {}).get('available', 0)) - print(f"账户可用余额: ${available:.2f}") - - if required_margin > available: - print(f"\n❌ 余额不足!") - print(f" 需要: ${required_margin:.2f}") - print(f" 可用: ${available:.2f}") - print(f" 缺口: ${required_margin - available:.2f}") - - # 建议更高的杠杆 - suggested_leverage = int(required_margin / available * leverage) + 1 - if suggested_leverage <= 125: # Bitget 最大杠杆 - print(f"\n建议: 使用 {suggested_leverage}x 杠杆或减少下单金额") - return None - - order_type_text = "限价单" if order_type == "limit" else "市价单" - print(f"订单类型: {order_type_text}") - if price: - print(f"挂单价格: ${price:.2f}") - - confirm = input(f"\n⚠️ 即将下单: {symbol} {side} {size:.3f}张 ({order_type_text}, 仓位价值${position_value:.2f}, 保证金${required_margin:.2f}),是否继续?(y/n): ") - if confirm.lower() != 'y': - print("已取消下单") - return None - - order = api.place_order( - symbol=symbol, - side=side, - order_type=order_type, - size=size, - price=price - ) - - if order: - print(f"✅ 下单成功") - print(f" 订单ID: {order.get('id', 'N/A')}") - print(f" 状态: {order.get('status', 'N/A')}") - print(f" 价格: {order.get('price', 'N/A')}") - print(f" 数量: {order.get('amount', 'N/A')}") - fee = order.get('fee') - if fee and isinstance(fee, dict): - print(f" 手续费: {fee.get('cost', 'N/A')}") - else: - print(f" 手续费: N/A") - return order - else: - print("❌ 下单失败") - return None - - -def test_cancel_order(api: BitgetTradingAPI, symbol: str = "BTCUSDT", order_id: str = None): - """测试撤单""" - print_section("7. 撤单测试") - - if not order_id: - # 如果没有提供订单ID,先查询挂单 - orders = api.get_open_orders(symbol) - if not orders: - print("⚠️ 无可撤订单") - return None - order_id = orders[0].get('id') - - print(f"订单ID: {order_id}") - - if api.cancel_order(symbol, order_id=order_id): - print("✅ 撤单成功") - return True - else: - print("❌ 撤单失败") - return False - - -def test_close_position(api: BitgetTradingAPI, symbol: str = "BTCUSDT"): - """测试平仓""" - print_section(f"8. 平仓测试 ({symbol})") - - positions = api.get_position(symbol) - if not positions: - print("⚠️ 无持仓可平") - return None - - print(f"持仓数量: {len(positions)}") - for pos in positions: - pos_side = pos.get('side', 'N/A') - contracts = pos.get('contracts', 0) - print(f" {pos_side}: {contracts}张") - - confirm = input("\n⚠️ 即将平仓,是否继续?(y/n): ") - if confirm.lower() != 'y': - print("已取消平仓") - return None - - result = api.close_position(symbol) - if result: - print("✅ 平仓成功") - return True - else: - print("❌ 平仓失败") - return False - - -def test_modify_sl_tp(api: BitgetTradingAPI, symbol: str = "BTCUSDT"): - """测试修改止损止盈""" - print_section(f"9. 修改止损止盈 ({symbol})") - - positions = api.get_position(symbol) - if not positions: - print("⚠️ 无持仓,跳过止损止盈测试") - return None - - pos = positions[0] - pos_side = pos.get('side') - mark_price = float(pos.get('markPrice', 0)) - - print(f"当前持仓: {pos_side}, 标记价: {mark_price}") - - # 根据持仓方向计算测试用的止损止盈价格 - if pos_side == 'long': - test_sl = mark_price * 0.98 # 止损设为标记价的 98% - test_tp = mark_price * 1.02 # 止盈设为标记价的 102% - else: - test_sl = mark_price * 1.02 # 做空止损设为标记价的 102% - test_tp = mark_price * 0.98 # 做空止盈设为标记价的 98% - - print(f"测试参数: 止损={test_sl:.2f}, 止盈={test_tp:.2f}") - - confirm = input("\n⚠️ 即将设置止损止盈,是否继续?(y/n): ") - if confirm.lower() != 'y': - print("已取消设置") - return None - - if api.modify_sl_tp(symbol, stop_loss=test_sl, take_profit=test_tp): - print("✅ 设置止损止盈成功") - return True - else: - print("❌ 设置止损止盈失败") - return False - - -def test_cancel_all_orders(api: BitgetTradingAPI, symbol: str = "BTCUSDT"): - """测试撤销所有挂单""" - print_section(f"10. 撤销所有挂单 ({symbol})") - - # 先查询挂单 - orders = api.get_open_orders(symbol) - if not orders: - print("⚠️ 无挂单可撤") - return None - - print(f"当前挂单数: {len(orders)}") - - confirm = input("\n⚠️ 即将撤销所有挂单,是否继续?(y/n): ") - if confirm.lower() != 'y': - print("已取消") - return None - - if api.cancel_all_orders(symbol): - print("✅ 撤销所有挂单成功") - return True - else: - print("❌ 撤销所有挂单失败") - return False - - -def main(): - """主测试流程""" - print("\n" + "=" * 60) - print(" Bitget 实盘交易 API 测试") - print("=" * 60) - - # 加载配置 - settings = get_settings() - - # 检查配置 - if not settings.bitget_api_key or not settings.bitget_api_secret: - print("❌ 未配置 Bitget API 密钥") - print("请在 .env 文件中设置:") - print(" bitget_api_key=your_api_key") - print(" bitget_api_secret=your_api_secret") - print(" bitget_passphrase=your_passphrase (可选)") - print(" bitget_use_testnet=true/false (测试网/生产网)") - return - - # 创建 API 实例 - api = BitgetTradingAPI( - api_key=settings.bitget_api_key, - api_secret=settings.bitget_api_secret, - passphrase=settings.bitget_passphrase or "", - use_testnet=settings.bitget_use_testnet - ) - - mode = '测试网' if settings.bitget_use_testnet else '生产网' - print(f"\n模式: {mode}") - print(f"API Key: {settings.bitget_api_key[:8]}...{settings.bitget_api_key[-4:]}") - - try: - # 基础测试(不涉及交易) - if not test_connection(api): - return - test_balance(api) - test_position(api) - test_leverage(api) - test_open_orders(api) - - # 交易相关测试(需要确认) - print("\n" + "=" * 60) - print(" 交易功能测试(需要手动确认)") - print("=" * 60) - - # 检查用户是否要测试交易功能 - test_trading = input("\n是否测试交易功能(下单、平仓等)?(y/n): ") - if test_trading.lower() == 'y': - # 选择交易对 - print("\n选择交易对:") - print(" 1. BTCUSDT") - print(" 2. ETHUSDT") - print(" 3. 自定义") - choice = input("请选择 (1-3): ").strip() - - if choice == '1': - symbol = 'BTCUSDT' - elif choice == '2': - symbol = 'ETHUSDT' - else: - symbol = input("请输入交易对 (如 BTCUSDT): ").strip().upper() - - # 选择买卖方向 - direction = input("方向 (buy/sell, 默认buy): ").strip().lower() - side = direction if direction in ['buy', 'sell'] else 'buy' - - # 输入 USDT 价值 - usdt_input = input("下单价值 (USDT, 默认100): ").strip() - usdt_value = float(usdt_input) if usdt_input else 100 - - # 输入杠杆倍数 - leverage_input = input("杠杆倍数 (默认20, 最大125): ").strip() - leverage = int(leverage_input) if leverage_input else 20 - leverage = min(max(leverage, 1), 125) # 限制在 1-125 之间 - - # 下单测试(函数内部会询问订单类型) - order = test_place_order(api, symbol=symbol, side=side, usdt_value=usdt_value, leverage=leverage) - if order: - import time - print("\n等待 3 秒后撤单...") - time.sleep(3) - # 撤单测试 - test_cancel_order(api, symbol, order_id=order.get('id')) - - # 平仓测试 - test_close_position(api, symbol) - - # 止损止盈测试 - test_modify_sl_tp(api, symbol) - - # 撤销所有挂单 - test_cancel_all_orders(api, symbol) - else: - print("跳过交易功能测试") - - finally: - # 关闭连接 - api.close() - print("\n" + "=" * 60) - print(" 测试完成") - print("=" * 60) - - -if __name__ == "__main__": - main() diff --git a/backend/test_crypto_agent.py b/backend/test_crypto_agent.py deleted file mode 100644 index f076c30..0000000 --- a/backend/test_crypto_agent.py +++ /dev/null @@ -1,166 +0,0 @@ -#!/usr/bin/env python3 -""" -测试加密货币智能体 - 单次分析 -""" -import asyncio -import sys -import os - -# 添加项目路径 -sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) - - -async def test_binance_service(): - """测试 Binance 数据服务""" - print("\n" + "=" * 50) - print("测试 Binance 数据服务") - print("=" * 50) - - from app.services.binance_service import binance_service - - # 测试获取 K 线数据 - print("\n1. 获取 BTCUSDT 1小时 K线数据...") - df = binance_service.get_klines('BTCUSDT', '1h', limit=10) - if not df.empty: - print(f" ✓ 获取成功,共 {len(df)} 条数据") - print(f" 最新收盘价: ${df.iloc[-1]['close']:,.2f}") - else: - print(" ✗ 获取失败") - return False - - # 测试多周期数据 - print("\n2. 获取 BTCUSDT 多周期数据...") - data = binance_service.get_multi_timeframe_data('BTCUSDT') - for interval, df in data.items(): - if not df.empty: - print(f" ✓ {interval}: {len(df)} 条数据") - else: - print(f" ✗ {interval}: 无数据") - - # 测试技术指标 - print("\n3. 检查技术指标...") - df = data['1h'] - indicators = ['ma5', 'ma20', 'rsi', 'macd', 'bb_upper', 'k', 'd', 'atr'] - for ind in indicators: - if ind in df.columns: - value = df.iloc[-1][ind] - print(f" ✓ {ind}: {value:.4f}" if value else f" - {ind}: N/A") - - return True - - -async def test_signal_analyzer(): - """测试信号分析器 - 使用新架构""" - print("\n" + "=" * 50) - print("测试市场信号分析器(新架构)") - print("=" * 50) - - from app.services.binance_service import binance_service - from app.crypto_agent.market_signal_analyzer import MarketSignalAnalyzer - - analyzer = MarketSignalAnalyzer() - - # 获取数据 - data = binance_service.get_multi_timeframe_data('BTCUSDT') - - # 测试 LLM 分析 - print("\n1. LLM 市场分析...") - signal = await analyzer.analyze('BTCUSDT', data, symbols=['BTCUSDT', 'ETHUSDT']) - - print(f" 市场状态: {signal.get('market_state')}") - print(f" 趋势: {signal.get('trend')}") - print(f" 信号数量: {len(signal.get('signals', []))}") - - return True - - -async def test_feishu_service(): - """测试飞书通知服务""" - print("\n" + "=" * 50) - print("测试飞书通知服务") - print("=" * 50) - - from app.services.feishu_service import get_feishu_service - - feishu = get_feishu_service() - - if not feishu.enabled: - print(" ⚠ 飞书服务未配置,跳过测试") - return True - - # 发送测试消息 - print("\n1. 发送测试文本消息...") - result = await feishu.send_text("🧪 这是一条测试消息,来自加密货币智能体") - print(f" {'✓ 发送成功' if result else '✗ 发送失败'}") - - return result - - -async def test_full_analysis(): - """测试完整分析流程""" - print("\n" + "=" * 50) - print("测试完整分析流程") - print("=" * 50) - - from app.crypto_agent.crypto_agent import CryptoAgent - - agent = CryptoAgent() - - for symbol in ['BTCUSDT', 'ETHUSDT']: - print(f"\n分析 {symbol}...") - result = await agent.analyze_once(symbol) - - if 'error' in result: - print(f" ✗ 错误: {result['error']}") - else: - print(f" 价格: ${result['price']:,.2f}") - print(f" 趋势: {result['trend']}") - print(f" 动作: {result['action']}") - print(f" 置信度: {result['confidence']}%") - if result.get('stop_loss'): - print(f" 止损: ${result['stop_loss']:,.2f}") - if result.get('take_profit'): - print(f" 止盈: ${result['take_profit']:,.2f}") - - return True - - -async def main(): - """主测试函数""" - print("\n" + "=" * 60) - print("加密货币智能体测试") - print("=" * 60) - - tests = [ - ("Binance 数据服务", test_binance_service), - ("信号分析器", test_signal_analyzer), - ("飞书通知服务", test_feishu_service), - ("完整分析流程", test_full_analysis), - ] - - results = [] - for name, test_func in tests: - try: - result = await test_func() - results.append((name, result)) - except Exception as e: - print(f"\n✗ {name} 测试出错: {e}") - results.append((name, False)) - - # 打印总结 - print("\n" + "=" * 60) - print("测试总结") - print("=" * 60) - for name, result in results: - status = "✓ 通过" if result else "✗ 失败" - print(f" {name}: {status}") - - all_passed = all(r for _, r in results) - print("\n" + ("所有测试通过!" if all_passed else "部分测试失败")) - - return all_passed - - -if __name__ == "__main__": - success = asyncio.run(main()) - sys.exit(0 if success else 1) diff --git a/backend/test_dingtalk.py b/backend/test_dingtalk.py deleted file mode 100644 index 8765ff9..0000000 --- a/backend/test_dingtalk.py +++ /dev/null @@ -1,102 +0,0 @@ -""" -测试钉钉通知服务 -""" -import asyncio -import sys -import os - -# 添加项目路径 -sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) - -from app.services.dingtalk_service import get_dingtalk_service - - -async def test_dingtalk(): - """测试钉钉消息发送""" - dingtalk = get_dingtalk_service() - - print("=" * 50) - print("测试钉钉通知服务") - print("=" * 50) - print(f"服务状态: {'启用' if dingtalk.enabled else '禁用'}") - print(f"Webhook URL: {dingtalk.webhook_url[:50]}...") - print(f"Secret: {dingtalk.secret[:20]}...") - print() - - # 测试 1: 发送文本消息 - print("测试 1: 发送文本消息...") - success = await dingtalk.send_text("📊 钉钉通知服务测试\n\n这是一条测试消息,来自 Stock Agent 系统。") - print(f"结果: {'✅ 成功' if success else '❌ 失败'}") - print() - - # 等待一下 - await asyncio.sleep(2) - - # 测试 2: 发送 Markdown 消息 - print("测试 2: 发送 Markdown 消息...") - markdown_content = """### 📈 交易信号测试 - -> **交易对**: BTCUSDT -> **方向**: 🟢 做多 -> **价格**: $95,000.00 -> **信心度**: 85% - -**分析理由**: -- 突破关键阻力位 -- 量价配合良好 -- 多周期共振向上 - -*来自 Stock Agent 系统* -""" - success = await dingtalk.send_markdown("交易信号", markdown_content) - print(f"结果: {'✅ 成功' if success else '❌ 失败'}") - print() - - await asyncio.sleep(2) - - # 测试 3: 发送 ActionCard 消息 - print("测试 3: 发送 ActionCard 消息...") - card_content = """### 📊 模拟交易报告 - -#### 统计概览 -- 总交易次数: 50 -- 胜率: 65% -- 总盈亏: +15.2% - -#### 最近交易 -| 交易对 | 方向 | 盈亏 | -|--------|------|------| -| BTCUSDT | 做多 | +5.2% | -| ETHUSDT | 做空 | +2.1% | -""" - success = await dingtalk.send_action_card( - title="📊 4小时交易报告", - content=card_content, - btn_orientation="0", - btn_title="查看详情", - btn_url="https://example.com" - ) - print(f"结果: {'✅ 成功' if success else '❌ 失败'}") - print() - - # 测试 4: 发送交易信号 - print("测试 4: 发送交易信号...") - signal = { - 'action': 'buy', - 'symbol': 'BTCUSDT', - 'price': 95000, - 'trend': 'uptrend', - 'confidence': 85, - 'agent_type': 'crypto' - } - success = await dingtalk.send_trading_signal(signal) - print(f"结果: {'✅ 成功' if success else '❌ 失败'}") - print() - - print("=" * 50) - print("测试完成!") - print("=" * 50) - - -if __name__ == "__main__": - asyncio.run(test_dingtalk()) diff --git a/backend/test_exception_handler.py b/backend/test_exception_handler.py deleted file mode 100644 index 5d8b37e..0000000 --- a/backend/test_exception_handler.py +++ /dev/null @@ -1,53 +0,0 @@ -""" -测试全局异常处理器 -""" -import sys -import os -sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) - -import asyncio -from app.utils.error_handler import setup_global_exception_handler, init_error_notifier -from app.services.feishu_service import get_feishu_service - - -async def test_exception_handler(): - """测试异常处理器""" - print("=" * 60) - print("测试全局异常处理器") - print("=" * 60) - print() - - # 初始化 - setup_global_exception_handler() - print("✅ 全局异常处理器已安装") - print() - - # 初始化飞书通知 - try: - feishu = get_feishu_service() - init_error_notifier(feishu_service=feishu, enabled=True, cooldown=10) - print("✅ 飞书错误通知已启用(冷却时间10秒)") - except Exception as e: - print(f"❌ 飞书错误通知初始化失败: {e}") - return - print() - - # 模拟一个未捕获的异常 - print("-" * 60) - print("将触发一个测试异常...") - print("-" * 60) - print() - - # 这个异常会被全局异常处理器捕获 - # 注意:这会触发飞书通知 - raise ValueError("这是一个测试异常,用于验证全局异常处理器是否正常工作") - - -if __name__ == "__main__": - try: - asyncio.run(test_exception_handler()) - except SystemExit: - # 异常被处理后,程序可能会退出 - pass - print() - print("测试结束") diff --git a/backend/test_limit_up_approach.py b/backend/test_limit_up_approach.py deleted file mode 100644 index d665a1e..0000000 --- a/backend/test_limit_up_approach.py +++ /dev/null @@ -1,100 +0,0 @@ -""" -测试A股异动监控 - 使用涨停板数据作为替代方案 -当 eastmoney API 不可用时,使用涨停板数据分析 -""" -import asyncio -import sys -import os - -# 添加项目根目录到 Python 路径 -sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) - -import akshare as ak -import pandas as pd -from app.utils.logger import logger - - -def test_limit_up_approach(): - """使用涨停板数据发现异动板块""" - print("=" * 60) - print("涨停板数据分析测试") - print("=" * 60) - print() - - try: - # 获取涨停板数据 - print("获取涨停板数据...") - df = ak.stock_zt_pool_em(date='20260227') - - if df.empty: - print("没有涨停数据") - return - - print(f"获取到 {len(df)} 只涨停股") - print() - - # 按行业分组统计 - print("-" * 60) - print("按行业统计涨停股数:") - print("-" * 60) - - # 获取行业列 - industry_col = '所属行业' - if industry_col not in df.columns: - # 尝试其他可能的列名 - for col in df.columns: - if '行业' in col or '板块' in col: - industry_col = col - break - - if industry_col in df.columns: - # 按行业分组 - industry_stats = df.groupby(industry_col).agg({ - '代码': 'count', - '涨跌幅': 'mean', - '最新价': 'mean' - }).rename(columns={'代码': '涨停数', '涨跌幅': '平均涨幅', '最新价': '平均价格'}) - - # 排序 - industry_stats = industry_stats.sort_values('涨停数', ascending=False) - - # 显示Top 10 - for idx, (industry, row) in enumerate(industry_stats.head(10).iterrows(), 1): - print(f"{idx}. {industry}: {int(row['涨停数'])}只涨停, 平均涨幅 {row['平均涨幅']:.2f}%") - - print() - print("-" * 60) - print("涨停股详情 (Top 5):") - print("-" * 60) - - # 显示涨幅最大的5只 - top_5 = df.nlargest(5, '涨跌幅') - for idx, row in top_5.iterrows(): - print(f"{int(row['序号'])}. {row['名称']} ({row['代码']})") - print(f" 涨幅: {row['涨跌幅']:.2f}% | 价格: {row['最新价']:.2f}") - print(f" 行业: {row.get(industry_col, 'N/A')}") - print(f" 封板时间: {row.get('最后封板时间', 'N/A')}") - print() - - return True - - except Exception as e: - print(f"测试失败: {e}") - import traceback - traceback.print_exc() - return False - - -if __name__ == "__main__": - success = test_limit_up_approach() - if success: - print("=" * 60) - print("测试成功!涨停板API可用") - print("=" * 60) - print() - print("建议: 可以使用涨停板数据作为板块异动监控的替代方案") - print(" - 按行业统计涨停股数") - print(" - 发现涨停集中的板块即为异动板块") - print(" - 涨停股本身就是最好的龙头股候选") - else: - print("测试失败") diff --git a/backend/test_market_signal.py b/backend/test_market_signal.py deleted file mode 100644 index 8ffcbf4..0000000 --- a/backend/test_market_signal.py +++ /dev/null @@ -1,142 +0,0 @@ -""" -测试市场信号分析 - 验证趋势判断功能 -""" -import asyncio -import sys -import os -import json - -# 添加项目路径 -sys.path.insert(0, os.path.dirname(__file__)) - -from app.crypto_agent.market_signal_analyzer import MarketSignalAnalyzer -from app.utils.logger import logger -from app.services.bitget_service import BitgetService - - -async def test_trend_analysis(symbol: str = "BTCUSDT"): - """测试趋势分析""" - - logger.info("=" * 60) - logger.info(f"开始测试市场信号分析: {symbol}") - logger.info("=" * 60) - - # 1. 获取 K 线数据 - logger.info(f"\n[1/3] 获取 {symbol} 的 K 线数据...") - bitget_service = BitgetService() - data = bitget_service.get_multi_timeframe_data(symbol) - - if not data: - logger.error(f"❌ 无法获取 {symbol} 的 K 线数据") - return - - logger.info(f"✅ K 线数据获取成功") - for tf, df in data.items(): - if df is not None and len(df) > 0: - latest_price = float(df.iloc[-1]['close']) - logger.info(f" {tf}: 最新价格 ${latest_price:,.2f}, 数据量 {len(df)} 条") - - # 2. 分析市场信号 - logger.info(f"\n[2/3] 分析市场信号...") - analyzer = MarketSignalAnalyzer() - result = await analyzer.analyze(symbol, data) - - # 3. 打印分析结果 - logger.info(f"\n[3/3] 分析结果:") - logger.info("-" * 60) - - # 打印趋势判断(新功能) - logger.info("\n📊 趋势判断(新增):") - logger.info(f" 趋势方向: {result.get('trend_direction', 'unknown')}") - logger.info(f" 趋势强度: {result.get('trend_strength', 'unknown')}") - - # 打印市场状态 - logger.info(f"\n📈 市场状态:") - logger.info(f" {result.get('market_state', 'unknown')}") - logger.info(f" 分析摘要: {result.get('analysis_summary', 'unknown')}") - logger.info(f" 量价分析: {result.get('volume_analysis', 'unknown')}") - - # 打印关键价位 - if result.get('key_levels'): - logger.info(f"\n💰 关键价位:") - levels = result['key_levels'] - if levels.get('support'): - logger.info(f" 支撑位: {[f'${s:,.0f}' for s in levels['support'] if s]}") - if levels.get('resistance'): - logger.info(f" 阻力位: {[f'${r:,.0f}' for r in levels['resistance'] if r]}") - - # 打印信号 - logger.info(f"\n🎯 交易信号:") - signals = result.get('signals', []) - if not signals: - logger.warning(" ⚠️ 无交易信号(观望)") - else: - for i, sig in enumerate(signals, 1): - action_emoji = "🟢" if sig.get('action') == 'buy' else "🔴" if sig.get('action') == 'sell' else "⏪" - grade_icon = {'A': '⭐⭐⭐', 'B': '⭐⭐', 'C': '⭐', 'D': ''}.get(sig.get('grade', ''), '') - entry_type_emoji = "⚡现价" if sig.get('entry_type') == 'market' else "⏳挂单" - - logger.info(f"\n 信号 #{i} {action_emoji} {grade_icon}") - logger.info(f" ├─ 类型: {sig.get('timeframe', 'unknown')} | {entry_type_emoji}") - logger.info(f" ├─ 操作: {sig.get('action', 'unknown').upper()}") - logger.info(f" ├─ 信心度: {sig.get('confidence', 0)}% | 等级: {sig.get('grade', 'unknown')}") - logger.info(f" ├─ 入场价: ${sig.get('entry_price', 0):,.2f}") - logger.info(f" ├─ 止损: ${sig.get('stop_loss', 0):,.2f} | 止盈: ${sig.get('take_profit', 0):,.2f}") - logger.info(f" └─ 理由: {sig.get('reasoning', 'unknown')}") - - # 检查是否顺势 - trend_dir = result.get('trend_direction', 'neutral') - action = sig.get('action', '') - if trend_dir == 'uptrend' and action == 'sell': - logger.warning(f" ⚠️ 警告: 上升趋势中做空(逆势)") - elif trend_dir == 'downtrend' and action == 'buy': - logger.warning(f" ⚠️ 警告: 下降趋势中做多(逆势)") - elif trend_dir == 'uptrend' and action == 'buy': - logger.info(f" ✅ 顺势交易") - elif trend_dir == 'downtrend' and action == 'sell': - logger.info(f" ✅ 顺势交易") - - logger.info("\n" + "=" * 60) - logger.info("测试完成") - logger.info("=" * 60) - - return result - - -async def test_multiple_symbols(): - """测试多个交易对""" - symbols = ['BTCUSDT', 'ETHUSDT', 'SOLUSDT'] - - for symbol in symbols: - logger.info(f"\n\n{'#' * 60}") - logger.info(f"# 测试 {symbol}") - logger.info(f"{'#' * 60}") - - try: - await test_trend_analysis(symbol) - except Exception as e: - logger.error(f"❌ {symbol} 测试失败: {e}") - import traceback - traceback.print_exc() - - # 等待一段时间,避免 API 限流 - await asyncio.sleep(2) - - -def main(): - import argparse - - parser = argparse.ArgumentParser(description='测试市场信号分析') - parser.add_argument('--symbol', default='BTCUSDT', help='交易对(默认: BTCUSDT)') - parser.add_argument('--multi', action='store_true', help='测试多个交易对') - - args = parser.parse_args() - - if args.multi: - asyncio.run(test_multiple_symbols()) - else: - asyncio.run(test_trend_analysis(args.symbol)) - - -if __name__ == '__main__': - main() diff --git a/backend/test_sector_monitor.py b/backend/test_sector_monitor.py deleted file mode 100644 index 886af54..0000000 --- a/backend/test_sector_monitor.py +++ /dev/null @@ -1,129 +0,0 @@ -""" -测试A股板块异动监控 -运行一次检查并打印结果,不发送钉钉通知 -""" -import asyncio -import sys -import os - -# 添加项目根目录到 Python 路径 -sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) - -from app.astock_agent import SectorMonitor -from app.utils.logger import logger - - -async def test_sector_monitor(): - """测试板块异动监控""" - print("=" * 60) - print("A股板块异动监控测试") - print("=" * 60) - print() - - # 配置参数 - change_threshold = 2.0 # 涨跌幅阈值 2% - top_n = 3 # 每个板块返回前3只龙头股 - - print(f"配置参数:") - print(f" - 涨跌幅阈值: {change_threshold}%") - print(f" - 龙头股数量: Top{top_n}") - print(f" - 钉钉通知: 已禁用(测试模式)") - print() - - # 创建监控器(不启用通知) - monitor = SectorMonitor( - change_threshold=change_threshold, - top_n=top_n, - enable_notifier=False # 测试时不发送通知 - ) - - print("开始检查板块异动...") - print("-" * 60) - print() - - # 执行一次检查 - result = await monitor.check_once() - - # 打印结果 - print() - print("-" * 60) - print("检查结果:") - print(f" - 异动板块数: {result['hot_sectors']}") - print(f" - 龙头股总数: {result['stocks']}") - print() - - # 显示详细信息 - if 'results' in result and result['results']: - print("=" * 60) - print("异动板块详情:") - print("=" * 60) - print() - - for idx, item in enumerate(result['results'], 1): - sector = item['sector'] - stocks = item['stocks'] - reason = item['reason'] - - # 板块信息 - change_icon = "📈" if sector['change_pct'] > 0 else "📉" - print(f"{idx}. 【{sector['name']}】{change_icon} {sector['change_pct']:+.2f}%") - print(f" - 涨跌额: {sector['change_amount']:+.2f}") - print(f" - 上涨家数: {sector['ups']}") - print(f" - 下跌家数: {sector['downs']}") - print(f" - 异动原因: {reason}") - print() - - # 龙头股信息 - print(f" 🏆 龙头股 Top {len(stocks)}:") - for s_idx, stock in enumerate(stocks, 1): - change_pct = stock['change_pct'] - if change_pct >= 5: - speed_icon = "⚡⚡⚡" - elif change_pct >= 3: - speed_icon = "⚡⚡" - elif change_pct >= 1: - speed_icon = "⚡" - else: - speed_icon = "🐌" - - # 成交额格式化 - amount = stock['amount'] - if amount >= 100000: - amount_str = f"{amount/100000:.1f}亿" - elif amount >= 10000: - amount_str = f"{amount/10000:.1f}万" - else: - amount_str = f"{amount:.0f}元" - - print(f" {s_idx}. {stock['name']} ({stock['code']})") - print(f" 价格: ¥{stock['price']:.2f} | 涨跌幅: {change_pct:+.2f}% {speed_icon}") - print(f" 成交额: {amount_str} | 换手率: {stock['turnover']:.2f}%") - print(f" 评分: {stock['score']:.1f} | 涨速: {stock['speed_level']}") - print() - - # 显示统计信息 - print("=" * 60) - print("统计信息:") - print("=" * 60) - stats = monitor.get_stats() - print(f" - 总检查次数: {stats['total_checks']}") - print(f" - 总异动板块: {stats['total_hot_sectors']}") - print(f" - 总龙头股数: {stats['total_stocks']}") - if stats['total_checks'] > 0: - print(f" - 平均龙头股: {stats['avg_stocks_per_check']:.1f}") - print(f" - 最后检查时间: {stats['last_check_time']}") - print() - print("=" * 60) - print("测试完成!") - print("=" * 60) - - -if __name__ == "__main__": - try: - asyncio.run(test_sector_monitor()) - except KeyboardInterrupt: - print("\n\n测试被用户中断") - except Exception as e: - print(f"\n\n测试失败: {e}") - import traceback - traceback.print_exc() diff --git a/backend/test_tushare_api_fields.py b/backend/test_tushare_api_fields.py deleted file mode 100644 index 1fc43af..0000000 --- a/backend/test_tushare_api_fields.py +++ /dev/null @@ -1,113 +0,0 @@ -""" -测试 Tushare 接口返回数据 -检查各个接口返回的实际字段 -""" -import sys -import os -sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) - -# 直接使用 token -token = '0ed6419a00d8923dc19c0b58fc92d94c9a0696949ab91a13aa58a0cc' - -def test_tushare_api(): - """测试 Tushare API 返回的数据""" - print("=" * 60) - print("Tushare API 数据测试") - print("=" * 60) - print() - - import tushare as ts - ts.set_token(token) - pro = ts.pro_api() - - from datetime import datetime, timedelta - yesterday = (datetime.now() - timedelta(days=5)).strftime('%Y%m%d') - today = datetime.now().strftime('%Y%m%d') - - # 1. 测试概念板块列表 - print("1. 测试 ths_index (概念板块列表)") - print("-" * 40) - sectors_df = pro.ths_index(type='N') - print(f"返回列: {sectors_df.columns.tolist()}") - print(f"前3行数据:") - print(sectors_df.head(3)) - print() - - # 获取第一个板块的代码 - if not sectors_df.empty: - first_sector_code = sectors_df.iloc[0]['ts_code'] - first_sector_name = sectors_df.iloc[0]['name'] - print(f"使用第一个板块测试: {first_sector_name} ({first_sector_code})") - print() - - # 2. 测试板块日线数据 - print("2. 测试 ths_daily (板块日线数据)") - print("-" * 40) - - daily_df = pro.ths_daily( - ts_code=first_sector_code, - start_date=yesterday, - end_date=today - ) - print(f"返回列: {daily_df.columns.tolist()}") - print(f"最新一天数据:") - if not daily_df.empty: - latest = daily_df.sort_values('trade_date').iloc[-1] - print(latest) - print() - - # 3. 测试成分股数据 - print("3. 测试 ths_member (成分股数据)") - print("-" * 40) - members_df = pro.ths_member(ts_code=first_sector_code) - print(f"返回列: {members_df.columns.tolist()}") - print(f"前5个成分股:") - print(members_df.head(5)) - print() - - # 4. 测试个股日线数据 - if not members_df.empty: - first_stock_code = members_df.iloc[0]['con_code'] - print(f"使用第一个成分股测试: {first_stock_code}") - print() - - print("4. 测试 daily (个股日线数据)") - print("-" * 40) - stock_daily_df = pro.daily( - ts_code=first_stock_code, - start_date=yesterday, - end_date=today - ) - print(f"返回列: {stock_daily_df.columns.tolist()}") - if not stock_daily_df.empty: - print(f"最新一天数据:") - latest_stock = stock_daily_df.sort_values('trade_date').iloc[-1] - print(latest_stock) - print() - - # 5. 测试个股每日指标 - print("5. 测试 daily_basic (个股每日指标)") - print("-" * 40) - basic_df = pro.daily_basic( - ts_code=first_stock_code, - trade_date=today, - fields='ts_code,trade_date,turnover_rate,volume_ratio,pe,pb' - ) - print(f"返回列: {basic_df.columns.tolist()}") - if not basic_df.empty: - print(f"数据:") - print(basic_df) - print() - - print("=" * 60) - print("测试完成") - print("=" * 60) - - -if __name__ == "__main__": - try: - test_tushare_api() - except Exception as e: - print(f"\n测试失败: {e}") - import traceback - traceback.print_exc() diff --git a/backend/test_tushare_detailed.py b/backend/test_tushare_detailed.py deleted file mode 100644 index 2cacd90..0000000 --- a/backend/test_tushare_detailed.py +++ /dev/null @@ -1,87 +0,0 @@ -""" -测试A股板块异动监控 - Tushare 版本(详细调试) -""" -import sys -import os -sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) - -from app.config import get_settings - - -def test_tushare_detailed(): - """详细测试 Tushare 板块数据""" - print("=" * 60) - print("Tushare 板块数据详细测试") - print("=" * 60) - print() - - settings = get_settings() - token = settings.tushare_token - - from app.astock_agent.tushare_client import TushareClient - - ts_client = TushareClient(token=token) - - # 1. 测试获取概念板块列表 - print("1. 获取概念板块列表...") - sectors_df = ts_client.get_concept_sectors() - print(f" ✅ 获取到 {len(sectors_df)} 个概念板块") - print(f" 示例板块:") - for idx, row in sectors_df.head(5).iterrows(): - print(f" - {row['name']}: {row['ts_code']}") - print() - - # 2. 测试获取单个板块行情 - print("2. 获取单个板块行情(以人工智能为例)...") - ai_sectors = sectors_df[sectors_df['name'].str.contains('人工智能', na=False)] - - if not ai_sectors.empty: - ai_code = ai_sectors.iloc[0]['ts_code'] - print(f" 找到人工智能板块: {ai_code}") - - daily_df = ts_client.get_sector_daily(ai_code) - if not daily_df.empty: - print(f" ✅ 获取到 {len(daily_df)} 天行情数据") - latest = daily_df.sort_values('trade_date').iloc[-1] - print(f" 最新行情 ({latest['trade_date']}):") - print(f" - 收盘价: {latest['close']:.2f}") - print(f" - 涨跌幅: {latest.get('pct_chg', 'N/A')}%") - else: - print(f" ❌ 行情数据为空") - else: - print(f" ⚠️ 未找到人工智能板块") - print() - - # 3. 测试获取板块成分股 - print("3. 获取板块成分股...") - if not ai_sectors.empty: - members_df = ts_client.get_sector_members(ai_code) - if not members_df.empty: - print(f" ✅ 获取到 {len(members_df)} 个成分股") - print(f" 前5个成分股:") - for idx, row in members_df.head(5).iterrows(): - print(f" - {row.get('name', row.get('member_name', 'N/A'))}: {row['ts_code']}") - else: - print(f" ❌ 成分股数据为空") - print() - - # 4. 测试获取异动板块(降低阈值) - print("4. 检测异动板块(阈值 0%)...") - hot_df = ts_client.get_hot_sectors(threshold=0.0) - - if hot_df.empty: - print(" ⚠️ 未检测到上涨的板块") - else: - print(f" ✅ 检测到 {len(hot_df)} 个上涨板块") - print(f" 涨幅最高的 5 个板块:") - for idx, row in hot_df.head(5).iterrows(): - print(f" {idx}. {row['name']}: {row['change_pct']:+.2f}%") - print() - - print("=" * 60) - print("测试完成!") - print("=" * 60) - - -if __name__ == "__main__": - test_tushare_detailed() diff --git a/backend/test_tushare_sector_monitor.py b/backend/test_tushare_sector_monitor.py deleted file mode 100644 index 2d6e671..0000000 --- a/backend/test_tushare_sector_monitor.py +++ /dev/null @@ -1,166 +0,0 @@ -""" -测试A股板块异动监控 - Tushare 版本 -运行一次检查并打印结果并发送钉钉通知 -""" -import asyncio -import sys -import os - -# 添加项目根目录到 Python 路径 -sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) - -from app.config import get_settings -from app.utils.logger import logger - - -def test_tushare_sector_monitor(): - """测试板块异动监控(Tushare 版本)""" - print("=" * 60) - print("A股板块异动监控测试 - Tushare 版本") - print("=" * 60) - print() - - # 获取配置 - settings = get_settings() - token = settings.tushare_token - - if not token: - print("❌ Tushare token 未配置!") - print("请在 .env 文件中设置 TUSHARE_TOKEN") - return - - print(f"✅ Tushare token 已配置: {token[:10]}...") - print() - - # 导入 Tushare 客户端和通知器 - from app.astock_agent.tushare_client import TushareClient - from app.astock_agent.tushare_sector_analyzer import TushareSectorAnalyzer - from app.astock_agent.tushare_stock_selector import TushareStockSelector - from app.astock_agent.notifier import DingTalkNotifier - - # 检查钉钉配置 - webhook = settings.dingtalk_astock_webhook or settings.dingtalk_webhook_url - secret = settings.dingtalk_astock_secret or settings.dingtalk_secret - notifier = None - - if webhook: - notifier = DingTalkNotifier(webhook, secret) - print(f"✅ 钉钉通知已配置") - else: - print("⚠️ 钉钉通知未配置,仅打印结果") - print() - - # 配置参数 - change_threshold = 2.0 # 涨跌幅阈值 2% - top_n = 3 # 每个板块返回前3只龙头股 - - print(f"配置参数:") - print(f" - 涨跌幅阈值: {change_threshold}%") - print(f" - 龙头股数量: Top{top_n}") - print() - - # 创建 Tushare 客户端 - print("初始化 Tushare 客户端...") - ts_client = TushareClient(token=token) - print("✅ Tushare 客户端初始化成功") - print() - - # 创建分析器 - print("-" * 60) - print("开始检查板块异动...") - print("-" * 60) - print() - - analyzer = TushareSectorAnalyzer(ts_client, change_threshold=change_threshold) - - # 执行检查 - hot_sectors = analyzer.detect_sector_changes() - - if not hot_sectors: - print("未检测到异动板块") - return - - print(f"检测到 {len(hot_sectors)} 个异动板块:") - print() - - # 显示异动板块详情 - notified_count = 0 - for idx, sector in enumerate(hot_sectors, 1): - print(f"{idx}. 【{sector['name']}】") - print(f" 代码: {sector['ts_code']}") - print(f" 涨跌幅: {sector['change_pct']:+.2f}%") - print(f" 收盘价: {sector['close']:.2f}") - print(f" 成交额: {sector['amount_str']}") - print(f" 交易日期: {sector['trade_date']}") - print() - - # 获取龙头股 - print(f" 正在获取龙头股...") - selector = TushareStockSelector(ts_client, top_n=top_n) - top_stocks = selector.select_leading_stocks(sector['ts_code'], sector['name']) - - if top_stocks: - print(f" 🏆 龙头股 Top {len(top_stocks)}:") - for stock in top_stocks: - # 成交额格式化 - amount = stock['amount'] - if amount >= 100000000: - amount_str = f"{amount/100000000:.2f}亿" - elif amount >= 10000: - amount_str = f"{amount/10000:.2f}万" - else: - amount_str = f"{amount:.0f}元" - - print(f" {stock['name']} ({stock['code']})") - print(f" 价格: {stock['price']:.2f} | 涨跌幅: {stock['change_pct']:+.2f}% {stock['speed_level']}") - print(f" 成交额: {amount_str} | 评分: {stock['score']:.1f}") - else: - print(f" ⚠️ 未找到龙头股") - print() - - # 发送钉钉通知 - if notifier and top_stocks: - # 使用实际的板块数据 - sector_for_notify = { - 'name': sector['name'], - 'change_pct': sector['change_pct'], - 'change_amount': sector.get('change', 0), # 涨跌额 - 'amount': sector['amount'], - 'leading_stock': top_stocks[0]['name'] if top_stocks else '', - 'ts_code': sector['ts_code'], - } - - # 分析异动原因 - reason = analyzer.get_hot_reason(sector['name'], top_stocks) - - # 发送通知 - print(f" 📲 发送钉钉通知...") - success = notifier.send_sector_alert( - sector_data=sector_for_notify, - top_stocks=top_stocks, - reason=reason - ) - if success: - print(f" ✅ 通知发送成功") - notified_count += 1 - else: - print(f" ❌ 通知发送失败") - print() - - print("=" * 60) - print("测试完成!") - print(f"检测到 {len(hot_sectors)} 个异动板块") - if notifier: - print(f"发送通知 {notified_count}/{len(hot_sectors)}") - print("=" * 60) - - -if __name__ == "__main__": - try: - test_tushare_sector_monitor() - except KeyboardInterrupt: - print("\n\n测试被用户中断") - except Exception as e: - print(f"\n\n测试失败: {e}") - import traceback - traceback.print_exc() diff --git a/backend/test_websocket.py b/backend/test_websocket.py deleted file mode 100644 index 3b21ddf..0000000 --- a/backend/test_websocket.py +++ /dev/null @@ -1,205 +0,0 @@ -""" -测试 Bitget WebSocket 价格监控 -""" -import asyncio -import sys -import os - -# 添加项目路径 -sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) - -from app.services.bitget_websocket import BitgetWebSocketClient -from app.utils.logger import logger - - -async def test_websocket(): - """测试 WebSocket 连接和价格订阅""" - - logger.info("=" * 60) - logger.info("Bitget WebSocket 测试开始") - logger.info("=" * 60) - - # 创建 WebSocket 客户端 - client = BitgetWebSocketClient() - - # 注册价格回调 - def on_price_update(symbol: str, price: float, data: dict): - """价格更新回调""" - import datetime - timestamp = datetime.datetime.now().strftime("%H:%M:%S.%f")[:-3] - print(f"[{timestamp}] {symbol}: ${price:,.2f}") - - # 显示更多详细信息 - if 'open24h' in data: - change_24h = ((price - float(data['open24h'])) / float(data['open24h'])) * 100 - print(f" └─ 24h涨跌: {change_24h:+.2f}%") - - client.on_price_update('*', on_price_update) - - # 连接 - logger.info("正在连接 Bitget WebSocket...") - if not await client.connect(): - logger.error("❌ 连接失败") - return False - - logger.info("✅ 连接成功") - - # 订阅交易对 - symbols = ['BTCUSDT', 'ETHUSDT', 'SOLUSDT'] - logger.info(f"正在订阅: {', '.join(symbols)}") - - if not await client.subscribe(symbols): - logger.error("❌ 订阅失败") - return False - - logger.info(f"✅ 订阅成功: {symbols}") - - # 显示当前订阅状态 - logger.info(f"已订阅交易对: {client.subscribed_symbols}") - logger.info(f"连接状态: {client.is_connected}") - - # 接收价格更新(30秒) - logger.info("\n开始接收价格更新(30秒)...") - logger.info("-" * 60) - - try: - # 等待30秒接收数据 - await asyncio.sleep(30) - - except KeyboardInterrupt: - logger.info("\n用户中断") - - # 显示接收到的价格 - logger.info("-" * 60) - logger.info("当前价格缓存:") - prices = client.get_all_prices() - for symbol, price in prices.items(): - print(f" {symbol}: ${price:,.2f}") - - # 断开连接 - logger.info("\n正在断开连接...") - await client.disconnect() - logger.info("✅ 已断开连接") - - logger.info("=" * 60) - logger.info("测试完成") - logger.info("=" * 60) - - return True - - -async def test_reconnect(): - """测试自动重连功能""" - - logger.info("=" * 60) - logger.info("测试自动重连功能") - logger.info("=" * 60) - - client = BitgetWebSocketClient() - - price_updates = [] - - def on_price_update(symbol: str, price: float, data: dict): - price_updates.append((symbol, price)) - logger.info(f"[重连测试] {symbol}: ${price:,.2f}") - - client.on_price_update('*', on_price_update) - - # 第一次连接 - logger.info("第一次连接...") - await client.connect() - await client.subscribe(['BTCUSDT']) - - # 等待5秒接收数据 - await asyncio.sleep(5) - logger.info(f"接收到的价格更新: {len(price_updates)} 次") - - # 模拟断线(手动断开) - logger.info("\n模拟断线...") - await client.disconnect() - - # 等待2秒 - await asyncio.sleep(2) - - # 重新连接 - logger.info("重新连接...") - if await client.connect(): - logger.info("✅ 重连成功") - await client.subscribe(['BTCUSDT']) - - # 等待5秒接收数据 - await asyncio.sleep(5) - - initial_count = len(price_updates) - logger.info(f"重连后接收到的价格更新: {len(price_updates) - initial_count} 次") - else: - logger.error("❌ 重连失败") - - await client.disconnect() - logger.info("重连测试完成") - - return True - - -def test_integration(): - """测试与 price_monitor_service 的集成""" - - logger.info("=" * 60) - logger.info("测试 PriceMonitorService 集成") - logger.info("=" * 60) - - # 设置环境变量启用 WebSocket - os.environ['USE_BITGET_WEBSOCKET'] = 'true' - - from app.services.price_monitor_service import get_price_monitor_service - - monitor = get_price_monitor_service() - - # 添加回调 - update_count = {'count': 0} - - def on_update(symbol: str, price: float): - update_count['count'] += 1 - logger.info(f"[集成测试] {symbol}: ${price:,.2f}") - - monitor.add_price_callback(on_update) - - # 订阅交易对 - logger.info("订阅 BTCUSDT...") - monitor.subscribe_symbol('BTCUSDT') - - # 等待10秒 - logger.info("等待10秒接收价格更新...") - import time - time.sleep(10) - - logger.info(f"接收到 {update_count['count']} 次价格更新") - logger.info(f"当前价格: {monitor.get_latest_price('BTCUSDT')}") - - # 停止监控 - monitor.stop() - - logger.info("集成测试完成") - - return True - - -def main(): - """主测试函数""" - import argparse - - parser = argparse.ArgumentParser(description='测试 Bitget WebSocket') - parser.add_argument('--test', choices=['basic', 'reconnect', 'integration'], - default='basic', help='测试类型') - args = parser.parse_args() - - if args.test == 'basic': - asyncio.run(test_websocket()) - elif args.test == 'reconnect': - asyncio.run(test_reconnect()) - elif args.test == 'integration': - test_integration() - - -if __name__ == '__main__': - main() diff --git a/backend/tests/__init__.py b/backend/tests/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/backend/tests/test_intelligent_agent.py b/backend/tests/test_intelligent_agent.py deleted file mode 100644 index 2ceb2a7..0000000 --- a/backend/tests/test_intelligent_agent.py +++ /dev/null @@ -1,87 +0,0 @@ -""" -测试智能AI Agent -""" -import asyncio -import sys -import os - -# 添加项目路径 -sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) - -from app.agent.smart_agent import SmartStockAgent - - -async def test_intelligent_mode(): - """测试智能模式""" - print("=" * 60) - print("测试智能AI Agent") - print("=" * 60) - - # 创建Agent实例 - agent = SmartStockAgent() - - # 测试用例 - test_cases = [ - { - "name": "简单股票查询", - "message": "贵州茅台怎么样", - "session_id": "test_session_1" - }, - { - "name": "技术分析关注", - "message": "比亚迪的MACD指标怎么样,有没有金叉", - "session_id": "test_session_2" - }, - { - "name": "基本面关注", - "message": "宁德时代的基本面如何,盈利能力强吗", - "session_id": "test_session_3" - } - ] - - for i, test_case in enumerate(test_cases, 1): - print(f"\n{'=' * 60}") - print(f"测试用例 {i}: {test_case['name']}") - print(f"{'=' * 60}") - print(f"用户问题: {test_case['message']}") - print() - - try: - # 处理消息 - response = await agent.process_message( - message=test_case['message'], - session_id=test_case['session_id'] - ) - - # 打印结果 - print("响应:") - print(response.get('message', '')) - print() - - # 打印元数据 - metadata = response.get('metadata', {}) - if metadata: - print("元数据:") - print(f" 类型: {metadata.get('type')}") - if 'intent' in metadata: - intent = metadata['intent'] - print(f" 意图类型: {intent.get('type')}") - print(f" 关注维度: {intent.get('dimensions')}") - print(f" 分析深度: {intent.get('analysis_depth')}") - if 'plan' in metadata: - plan = metadata['plan'] - skills = [s['name'] for s in plan.get('skills', [])] - print(f" 调用技能: {skills}") - - except Exception as e: - print(f"错误: {e}") - import traceback - traceback.print_exc() - - print(f"\n{'=' * 60}") - print("测试完成") - print("=" * 60) - - -if __name__ == "__main__": - asyncio.run(test_intelligent_mode()) diff --git a/scripts/migrate_add_entry_zone.py b/scripts/migrate_add_entry_zone.py deleted file mode 100644 index ae46ec6..0000000 --- a/scripts/migrate_add_entry_zone.py +++ /dev/null @@ -1,77 +0,0 @@ -#!/usr/bin/env python3 -""" -数据库迁移脚本:添加 entry_zone 字段到 trading_signals 表 - -使用方法: - python scripts/migrate_add_entry_zone.py - -或者在服务器上直接执行 SQL: - sqlite3 backend/stock_agent.db "ALTER TABLE trading_signals ADD COLUMN entry_zone FLOAT;" -""" -import sqlite3 -import os -from pathlib import Path - - -def migrate_add_entry_zone(): - """添加 entry_zone 字段""" - - # 数据库路径 - db_path = Path(__file__).parent.parent / "backend" / "stock_agent.db" - - if not db_path.exists(): - print(f"❌ 数据库文件不存在: {db_path}") - return False - - try: - # 连接数据库 - conn = sqlite3.connect(str(db_path)) - cursor = conn.cursor() - - # 检查字段是否已存在 - cursor.execute("PRAGMA table_info(trading_signals)") - columns = [col[1] for col in cursor.fetchall()] - - if 'entry_zone' in columns: - print("✅ entry_zone 字段已存在,无需迁移") - conn.close() - return True - - # 添加字段 - print(f"📝 正在添加 entry_zone 字段到 {db_path}...") - cursor.execute("ALTER TABLE trading_signals ADD COLUMN entry_zone FLOAT") - conn.commit() - - # 验证 - cursor.execute("PRAGMA table_info(trading_signals)") - columns = [col[1] for col in cursor.fetchall()] - - if 'entry_zone' in columns: - print("✅ entry_zone 字段添加成功") - conn.close() - return True - else: - print("❌ 字段添加失败") - conn.close() - return False - - except Exception as e: - print(f"❌ 迁移失败: {e}") - return False - - -if __name__ == "__main__": - print("=" * 60) - print("数据库迁移:添加 entry_zone 字段") - print("=" * 60) - - success = migrate_add_entry_zone() - - if success: - print("\n✅ 迁移完成!") - print("\n请重启服务以使更改生效:") - print(" pm2 restart stock-agent") - else: - print("\n❌ 迁移失败!") - print("\n如果自动迁移失败,可以手动执行 SQL:") - print(" sqlite3 backend/stock_agent.db \"ALTER TABLE trading_signals ADD COLUMN entry_zone FLOAT;\"") diff --git a/scripts/test_crypto_news.py b/scripts/test_crypto_news.py deleted file mode 100755 index 903804b..0000000 --- a/scripts/test_crypto_news.py +++ /dev/null @@ -1,71 +0,0 @@ -#!/usr/bin/env python3 -""" -测试加密货币新闻获取(多源聚合) -""" -import asyncio -import sys -import os - -# 确保路径正确 -script_dir = os.path.dirname(os.path.abspath(__file__)) -project_root = os.path.dirname(script_dir) -backend_dir = os.path.join(project_root, 'backend') -sys.path.insert(0, backend_dir) - -from app.services.news_service import get_news_service - - -async def main(): - print("=" * 60) - print("📰 测试加密货币新闻获取(多源聚合)") - print("=" * 60) - - news_service = get_news_service() - - # 获取最新新闻 - print("\n🔍 获取最新加密货币新闻...") - news_list = await news_service.get_latest_news(limit=30) - - print(f"\n✅ 获取到 {len(news_list)} 条新闻\n") - - # 按来源分组统计 - sources = {} - for news in news_list: - source = news.get('source', 'Unknown') - sources[source] = sources.get(source, 0) + 1 - - print("📊 新闻来源统计:") - for source, count in sources.items(): - print(f" {source}: {count} 条") - - # 显示最新10条新闻 - print("\n" + "=" * 60) - print("📰 最新 10 条新闻") - print("=" * 60) - - for i, news in enumerate(news_list[:10], 1): - time_str = news.get('time_str', '') - title = news.get('title', '') - source = news.get('source', '') - desc = news.get('description', '')[:100] - - print(f"\n{i}. [{time_str}] {source}") - print(f" {title}") - if desc: - print(f" {desc}...") - - # 测试格式化给 LLM - print("\n" + "=" * 60) - print("🤖 格式化给 LLM 的新闻") - print("=" * 60) - - formatted_news = news_service.format_news_for_llm(news_list[:5], max_items=5) - print(formatted_news) - - print("\n" + "=" * 60) - print("✅ 测试完成") - print("=" * 60) - - -if __name__ == "__main__": - asyncio.run(main()) diff --git a/scripts/test_error_notification.py b/scripts/test_error_notification.py deleted file mode 100755 index 20a64af..0000000 --- a/scripts/test_error_notification.py +++ /dev/null @@ -1,218 +0,0 @@ -#!/usr/bin/env python3 -""" -测试系统异常通知功能 - -测试场景: -1. 测试普通异常通知 -2. 测试带堆栈信息的异常通知 -3. 测试冷却时间(避免重复通知) -""" -import sys -from pathlib import Path - -# 添加项目路径 -backend_dir = Path(__file__).parent.parent / "backend" -sys.path.insert(0, str(backend_dir)) - - -def test_normal_exception(): - """测试普通异常通知""" - print("\n" + "=" * 60) - print("🧪 测试1: 普通异常通知") - print("=" * 60) - - from app.utils.error_handler import get_exception_handler - - handler = get_exception_handler() - - # 模拟一个异常 - try: - result = 1 / 0 # ZeroDivisionError - except Exception as e: - import traceback - exc_type, exc_value, exc_traceback = sys.exc_info() - handler.handle_exception(exc_type, exc_value, exc_traceback) - print("✅ 已触发异常处理(请检查飞书是否收到通知)") - - -def test_custom_exception(): - """测试自定义异常通知""" - print("\n" + "=" * 60) - print("🧪 测试2: 自定义异常通知") - print("=" * 60) - - from app.utils.error_handler import get_exception_handler - - handler = get_exception_handler() - - # 模拟一个自定义异常 - try: - raise ValueError("这是一个测试异常,用于验证飞书通知功能") - except Exception as e: - import traceback - exc_type, exc_value, exc_traceback = sys.exc_info() - handler.handle_exception(exc_type, exc_value, exc_traceback) - print("✅ 已触发自定义异常处理(请检查飞书是否收到通知)") - - -def test_nested_exception(): - """测试嵌套调用异常""" - print("\n" + "=" * 60) - print("🧪 测试3: 嵌套调用异常(带完整堆栈)") - print("=" * 60) - - from app.utils.error_handler import get_exception_handler - - handler = get_exception_handler() - - def level_3(): - """第三层调用""" - raise RuntimeError("深层错误:数据库连接失败") - - def level_2(): - """第二层调用""" - level_3() - - def level_1(): - """第一层调用""" - level_2() - - # 模拟嵌套调用异常 - try: - level_1() - except Exception as e: - import traceback - exc_type, exc_value, exc_traceback = sys.exc_info() - handler.handle_exception(exc_type, exc_value, exc_traceback) - print("✅ 已触发嵌套异常处理(请检查飞书是否收到完整堆栈信息)") - - -def test_cooldown(): - """测试冷却时间""" - print("\n" + "=" * 60) - print("🧪 测试4: 冷却时间(连续触发异常)") - print("=" * 60) - - from app.utils.error_handler import get_exception_handler - import time - - handler = get_exception_handler() - handler.set_cooldown(10) # 设置10秒冷却时间 - - print(f"⏰ 冷却时间设置为 10 秒") - - # 第一次触发 - try: - raise RuntimeError("第一次异常") - except Exception: - exc_type, exc_value, exc_traceback = sys.exc_info() - handler.handle_exception(exc_type, exc_value, exc_traceback) - print("✅ 第一次异常已处理") - - # 立即第二次触发(应该在冷却期内) - print("\n⏳ 立即触发第二次异常(应该在冷却期内)...") - try: - raise RuntimeError("第二次异常") - except Exception: - exc_type, exc_value, exc_traceback = sys.exc_info() - handler.handle_exception(exc_type, exc_value, exc_traceback) - print("✅ 第二次异常已处理(应该被冷却,不发送飞书通知)") - - print("\n⏳ 等待 11 秒后再次触发...") - time.sleep(11) - - # 第三次触发(冷却期已过) - try: - raise RuntimeError("第三次异常") - except Exception: - exc_type, exc_value, exc_traceback = sys.exc_info() - handler.handle_exception(exc_type, exc_value, exc_traceback) - print("✅ 第三次异常已处理(冷却期已过,应该发送飞书通知)") - - -def test_with_feishu_integration(): - """测试与飞书集成的完整流程""" - print("\n" + "=" * 60) - print("🧪 测试5: 完整集成测试(带飞书服务)") - print("=" * 60) - - from app.utils.error_handler import setup_global_exception_handler, get_exception_handler - from app.services.feishu_service import get_feishu_service - - # 设置全局异常处理器 - setup_global_exception_handler() - print("✅ 全局异常处理器已安装") - - # 初始化飞书通知 - feishu_service = get_feishu_service() - handler = get_exception_handler() - handler.set_feishu_service(feishu_service) - handler.set_enabled(True) - print("✅ 飞书通知已启用") - - # 触发一个测试异常 - print("\n🔔 触发测试异常...") - try: - raise Exception("【测试】这是一个集成测试异常,用于验证飞书通知功能是否正常工作") - except Exception: - exc_type, exc_value, exc_traceback = sys.exc_info() - handler.handle_exception(exc_type, exc_value, exc_traceback) - print("✅ 测试异常已处理,请检查飞书是否收到通知") - - -def main(): - """主测试函数""" - print("\n" + "🚀" * 30) - print("系统异常通知功能测试") - print("🚀" * 30) - - print("\n⚠️ 注意:以下测试会触发真实的异常,并尝试发送飞书通知") - print("⚠️ 请确保飞书 webhook 已正确配置\n") - - try: - # 测试1: 普通异常 - test_normal_exception() - - # 等待用户确认 - input("\n按 Enter 键继续下一个测试...") - - # 测试2: 自定义异常 - test_custom_exception() - - # 等待用户确认 - input("\n按 Enter 键继续下一个测试...") - - # 测试3: 嵌套异常 - test_nested_exception() - - # 等待用户确认 - input("\n按 Enter 键继续下一个测试...") - - # 测试4: 冷却时间 - test_cooldown() - - # 等待用户确认 - input("\n按 Enter 键继续最后一个测试...") - - # 测试5: 完整集成 - test_with_feishu_integration() - - print("\n" + "=" * 60) - print("✅ 所有测试完成!") - print("=" * 60) - print("\n📋 请检查飞书聊天窗口,确认是否收到异常通知") - print("💡 提示:如果未收到通知,请检查:") - print(" 1. 飞书 webhook URL 是否正确") - print(" 2. 网络连接是否正常") - print(" 3. .env 文件中的 FEISHU_ENABLED 是否为 true\n") - - except KeyboardInterrupt: - print("\n\n⚠️ 测试被用户中断") - except Exception as e: - print(f"\n\n❌ 测试失败: {e}") - import traceback - traceback.print_exc() - - -if __name__ == "__main__": - main() diff --git a/scripts/test_futures_data.py b/scripts/test_futures_data.py deleted file mode 100755 index c4ee093..0000000 --- a/scripts/test_futures_data.py +++ /dev/null @@ -1,80 +0,0 @@ -#!/usr/bin/env python3 -""" -测试合约市场数据获取 -""" -import sys -import os - -# 确保路径正确 -script_dir = os.path.dirname(os.path.abspath(__file__)) -project_root = os.path.dirname(script_dir) -backend_dir = os.path.join(project_root, 'backend') -sys.path.insert(0, backend_dir) - -from app.services.binance_service import binance_service - - -def main(): - print("=" * 60) - print("📊 测试 Binance 合约市场数据获取") - print("=" * 60) - - symbols = ['BTCUSDT', 'ETHUSDT'] - - for symbol in symbols: - print(f"\n{'='*60}") - print(f"📈 {symbol} 合约市场数据") - print(f"{'='*60}") - - # 1. 获取资金费率 - print(f"\n🔍 获取资金费率...") - funding_rate = binance_service.get_funding_rate(symbol) - if funding_rate: - print(f"✅ 资金费率: {funding_rate['funding_rate_percent']:.4f}%") - print(f" 市场情绪: {funding_rate['sentiment']}") - print(f" 标记价格: ${funding_rate['mark_price']:,.2f}") - print(f" 指数价格: ${funding_rate['index_price']:,.2f}") - - # 2. 获取持仓量 - print(f"\n🔍 获取持仓量...") - open_interest = binance_service.get_open_interest(symbol) - if open_interest: - print(f"✅ 持仓量: {open_interest['open_interest']:,.0f} 张") - print(f" 持仓金额: ${open_interest['open_interest_value']:,.0f}") - - # 3. 获取历史持仓量 - print(f"\n🔍 获取历史持仓量(计算24h变化)...") - hist_oi = binance_service.get_open_interest_hist(symbol, period='1h', limit=24) - if hist_oi and len(hist_oi) >= 2: - oi_now = float(hist_oi[0].get('sumOpenInterest', 0)) - oi_24h = float(hist_oi[-1].get('sumOpenInterest', 0)) - oi_change = oi_now - oi_24h - oi_change_pct = (oi_change / oi_24h * 100) if oi_24h > 0 else 0 - print(f"✅ 24h前: {oi_24h:,.0f} 张") - print(f" 当前: {oi_now:,.0f} 张") - print(f" 变化: {oi_change:+,.0f} 张 ({oi_change_pct:+.2f}%)") - - # 4. 获取综合数据 - print(f"\n🔍 获取综合合约数据...") - market_data = binance_service.get_futures_market_data(symbol) - if market_data: - print(f"✅ 综合数据获取成功") - print(f" 资金费率: {market_data['funding_rate']['funding_rate_percent']:.4f}%") - print(f" 持仓24h变化: {market_data['oi_change_percent_24h']:+.2f}%") - print(f" 溢价率: {market_data['premium_rate']:+.2f}%") - print(f" 市场情绪: {market_data['market_sentiment']}") - - # 5. 格式化给 LLM 的数据 - print(f"\n🤖 格式化给 LLM 的数据:") - print(f"{'─'*60}") - if market_data: - formatted = binance_service.format_futures_data_for_llm(symbol, market_data) - print(formatted) - - print("\n" + "=" * 60) - print("✅ 测试完成") - print("=" * 60) - - -if __name__ == "__main__": - main() diff --git a/scripts/test_modify_order.py b/scripts/test_modify_order.py deleted file mode 100644 index 441c177..0000000 --- a/scripts/test_modify_order.py +++ /dev/null @@ -1,105 +0,0 @@ -#!/usr/bin/env python3 -""" -测试使用 modify-order 修改止损止盈 -""" -import sys -import os -sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'backend')) - -import time -import uuid -from app.services.bitget_trading_api_sdk import get_bitget_trading_api - -def print_section(title): - print("\n" + "=" * 60) - print(f" {title}") - print("=" * 60) - -def main(): - print("\n" + "🧪" * 30) - print(" 测试 modify-order 修改止损止盈") - print("🧪" * 30) - - api = get_bitget_trading_api() - exchange = api.exchange - - # 检查是否有 modify order 相关的方法 - print("\n检查 modify order 方法:") - methods = [m for m in dir(exchange) if 'modify' in m.lower() and 'order' in m.lower()] - for m in methods: - print(f" {m}") - - # 1. 开仓 - print_section("1. 开仓") - order = api.place_order( - symbol="BTC/USDT:USDT", - side='buy', - order_type='market', - size=0.0001, - client_order_id=f"test_{uuid.uuid4().hex[:8]}" - ) - if not order: - print("❌ 开仓失败") - return - - order_id = order.get('id') - print(f"✅ 开仓成功! 订单ID: {order_id}") - time.sleep(2) - - # 2. 查询持仓 - print_section("2. 查询持仓") - positions = api.get_position("BTC/USDT:USDT") - position = None - for pos in positions: - if float(pos.get('contracts', 0)) > 0: - position = pos - break - - if not position: - print("❌ 无持仓") - return - - mark_price = float(position.get('markPrice')) - print(f"持仓: {position.get('contracts')} 张, 标记价: ${mark_price:,.2f}") - - # 3. 尝试使用 edit_order 修改(添加止损止盈) - print_section("3. 使用 edit_order 添加止损止盈") - - stop_loss = mark_price * 0.98 - take_profit = mark_price * 1.03 - - print(f"目标止损: ${stop_loss:,.2f}") - print(f"目标止盈: ${take_profit:,.2f}") - - try: - # 尝试使用 CCXT 的 edit_order 方法 - # 注意:可能需要传入订单ID和新参数 - result = exchange.edit_order( - id=order_id, - symbol="BTC/USDT:USDT", - type='market', # 原订单类型 - side='buy', # 原订单方向 - amount=0.0001, # 原订单数量 - params={ - 'stopLoss': str(stop_loss), - 'takeProfit': str(take_profit), - 'tdMode': 'cross', - 'marginCoin': 'USDT', - } - ) - print(f"✅ edit_order 成功: {result}") - except Exception as e: - print(f"❌ edit_order 失败: {e}") - - # 4. 平仓 - print_section("4. 平仓") - api.close_position("BTC/USDT:USDT") - print("✅ 已平仓") - -if __name__ == "__main__": - try: - main() - except Exception as e: - print(f"\n\n❌ 测试出错: {e}") - import traceback - traceback.print_exc() diff --git a/scripts/test_news_notification.py b/scripts/test_news_notification.py deleted file mode 100644 index a4f7a75..0000000 --- a/scripts/test_news_notification.py +++ /dev/null @@ -1,118 +0,0 @@ -#!/usr/bin/env python3 -""" -测试新闻通知格式 -""" -import asyncio -import sys -from pathlib import Path - -# 添加项目路径 -script_dir = Path(__file__).parent -project_root = script_dir.parent -backend_dir = project_root / "backend" - -sys.path.insert(0, str(backend_dir)) - -import os -os.chdir(backend_dir) - -from app.news_agent.notifier import get_news_notifier - - -# 模拟高影响新闻数据 -test_articles = [ - { - 'id': 1, - 'title': 'Bitcoin ETFs See Record $500M Inflows as Institutions Pile In', - 'source': 'CoinDesk', - 'category': 'crypto', - 'url': 'https://example.com/article1', - 'market_impact': 'high', - 'impact_type': 'bullish', - 'sentiment': 'positive', - 'summary': '比特币现货ETF昨日吸引5亿美元资金流入,创历史新高,显示机构投资者持续增持。', - 'key_points': [ - '贝莱德IBIT录得3亿美元流入', - '富达FBTC流入1.5亿美元', - '机构持仓占比超过60%' - ], - 'trading_advice': '建议持有BTC,关注回调后的买入机会', - 'relevant_symbols': ['BTC', 'IBIT', 'FBTC'], - 'priority': 85, - 'created_at': '2026-02-25T12:00:00' - }, - { - 'id': 2, - 'title': 'SEC Delays Decision on Ethereum ETF Options Listings', - 'source': 'Bloomberg', - 'category': 'crypto', - 'url': 'https://example.com/article2', - 'market_impact': 'medium', - 'impact_type': 'neutral', - 'sentiment': 'neutral', - 'summary': 'SEC再次推迟对以太坊ETF期权上市的决议,新的截止日期为4月底。', - 'key_points': [ - 'SEC引用需要额外审查时间', - '这是第三次推迟', - '市场反应温和' - ], - 'trading_advice': 'ETH持仓观望,等待ETF期权批准', - 'relevant_symbols': ['ETH', 'ETHA'], - 'priority': 55, - 'created_at': '2026-02-25T11:30:00' - }, - { - 'id': 3, - 'title': 'NVIDIA Surpasses $4 Trillion Market Cap Amid AI Chip Demand', - 'source': 'CNBC', - 'category': 'stock', - 'url': 'https://example.com/article3', - 'market_impact': 'high', - 'impact_type': 'bullish', - 'sentiment': 'positive', - 'summary': '英伟达市值突破4万亿美元大关,成为全球市值最高的公司,AI芯片需求持续爆发。', - 'key_points': [ - '股价上涨5%至每股1600美元', - 'H100芯片供不应求', - '数据中心收入同比增长300%' - ], - 'trading_advice': '建议继续持有NVDA,AI趋势未完', - 'relevant_symbols': ['NVDA', 'AMD'], - 'priority': 80, - 'created_at': '2026-02-25T10:15:00' - } -] - - -async def main(): - print("=" * 70) - print("🧪 测试新闻通知格式") - print("=" * 70) - print() - - notifier = get_news_notifier() - - print("📊 测试数据:") - for i, article in enumerate(test_articles, 1): - print(f" {i}. [{article['market_impact']}] {article['title'][:50]}...") - print(f" 摘要: {article['summary'][:50]}...") - print(f" 建议: {article['trading_advice']}") - print() - - print("📢 发送测试通知...") - result = await notifier.notify_news_batch(test_articles) - - if result: - print("✅ 通知发送成功!") - else: - print("❌ 通知发送失败") - - print() - print("=" * 70) - - return 0 - - -if __name__ == "__main__": - exit_code = asyncio.run(main()) - sys.exit(exit_code) diff --git a/scripts/test_real_trading.py b/scripts/test_real_trading.py deleted file mode 100644 index 839ca47..0000000 --- a/scripts/test_real_trading.py +++ /dev/null @@ -1,285 +0,0 @@ -#!/usr/bin/env python3 -""" -实盘交易功能测试脚本 - -测试 Bitget 实盘交易 API 的各项功能 -⚠️ 注意: 此脚本会进行真实交易,请确保在测试网环境运行! -""" -import sys -import os -from pathlib import Path - -# 添加项目路径 -project_root = Path(__file__).parent.parent -sys.path.insert(0, str(project_root / "backend")) - -from app.config import get_settings -from app.services.bitget_trading_api import BitgetTradingAPI, get_bitget_trading_api -from app.services.real_trading_service import get_real_trading_service - - -def print_section(title: str): - """打印分节标题""" - print("\n" + "=" * 80) - print(f" {title}") - print("=" * 80) - - -def test_api_connection(): - """测试 API 连接""" - print_section("1. 测试 API 连接") - - settings = get_settings() - - print(f"\n配置信息:") - print(f" API Key: {settings.bitget_api_key[:10]}..." if settings.bitget_api_key else " API Key: 未配置") - print(f" 测试网: {'是 ✅' if settings.bitget_use_testnet else '否 ⚠️'}") - - if not settings.bitget_api_key or not settings.bitget_api_secret: - print("\n❌ API Key 未配置,请在 .env 文件中设置:") - print(" BITGET_API_KEY=your_api_key") - print(" BITGET_API_SECRET=your_api_secret") - print(" BITGET_PASSPHRASE=your_passphrase") - print(" BITGET_USE_TESTNET=true") - return False - - # 创建 API 实例 - api = BitgetTradingAPI( - api_key=settings.bitget_api_key, - api_secret=settings.bitget_api_secret, - passphrase=settings.bitget_passphrase, - use_testnet=settings.bitget_use_testnet - ) - - # 测试连接 - if api.test_connection(): - print("\n✅ API 连接成功!") - return True - else: - print("\n❌ API 连接失败!") - print(" 请检查:") - print(" 1. API Key 和 Secret 是否正确") - print(" 2. 是否在测试网环境") - print(" 3. API 权限是否正确") - return False - - -def test_query_account(): - """测试查询账户""" - print_section("2. 测试查询账户") - - api = get_bitget_trading_api() - if not api: - print("\n❌ 交易 API 未初始化") - return False - - # 查询余额 - print("\n查询账户余额...") - balance = api.get_balance() - - if balance: - usdt_info = balance.get('USDT', {}) - print(f"\n✅ USDT 余额:") - print(f" 可用: ${float(usdt_info.get('available', 0)):,.2f}") - print(f" 冻结: ${float(usdt_info.get('frozen', 0)):,.2f}") - print(f" 锁定: ${float(usdt_info.get('locked', 0)):,.2f}") - return True - else: - print("\n❌ 查询余额失败") - return False - - -def test_query_position(): - """测试查询持仓""" - print_section("3. 测试查询持仓") - - api = get_bitget_trading_api() - if not api: - print("\n❌ 交易 API 未初始化") - return False - - # 查询持仓 - print("\n查询当前持仓...") - positions = api.get_position() - - print(f"\n✅ 当前持仓数量: {len(positions)}") - - if positions: - for pos in positions: - symbol = pos.get('symbol', 'N/A') - total = pos.get('total', 0) - available = pos.get('available', 0) - hold_side = pos.get('holdSide', 'N/A') - avg_price = pos.get('averageOpenPrice', 0) - - print(f"\n {symbol} ({hold_side})") - print(f" 持仓: {total}") - print(f" 可用: {available}") - print(f" 均价: ${avg_price}") - else: - print(" 无持仓") - - return True - - -def test_place_small_order(): - """ - 测试下单(小仓位测试) - - ⚠️ 警告: 此函数会进行真实交易! - 建议在测试网运行,并使用最小仓位 - """ - print_section("4. 测试下单 (⚠️ 实盘交易)") - - # 确认是否要测试 - print("\n⚠️ 警告: 此测试会进行真实下单!") - print(" 建议: 1. 确保在测试网环境") - print(" 2. 使用最小仓位测试") - print(" 3. 先手动撤销测试订单") - - api = get_bitget_trading_api() - if not api: - print("\n❌ 交易 API 未初始化") - return False - - # 使用 BTCUSDT 进行测试 - symbol = "BTCUSDT" - - # 查询当前价格 - from app.services.bitget_service import bitget_service - current_price = bitget_service.get_current_price(symbol) - - if not current_price: - print(f"\n❌ 无法获取 {symbol} 当前价格") - return False - - print(f"\n{symbol} 当前价格: ${current_price:,.2f}") - - # 使用最小数量(1张,通常约等于1 USDT) - test_size = 1 - - # 使用市价单测试(快速成交) - print(f"\n准备下测试单:") - print(f" 交易对: {symbol}") - print(f" 方向: 开多 (open_long)") - print(f" 类型: 市价单") - print(f" 数量: {test_size} 张 (约 ${test_size})") - - # 取消注释以实际下单 - print("\n❗ 下单功能已禁用,避免意外交易") - print(" 如需测试,请手动取消下面的注释并确认:") - print("") - - """ - # 实际下单代码(已注释,需手动取消注释) - result = api.place_order( - symbol=symbol, - side='open_long', - order_type='market', - size=test_size - ) - - if result: - order_id = result.get('orderId') - print(f"\n✅ 下单成功!") - print(f" 订单ID: {order_id}") - print(f" 请在交易所检查订单状态") - - # 询问是否立即撤单 - print("\n是否立即撤单?(测试用)") - # input("按回车键撤单...") - - # 撤单 - if api.cancel_order(symbol, order_id=order_id): - print(f"\n✅ 撤单成功") - else: - print(f"\n❌ 撤单失败,请手动在交易所撤单") - - return True - else: - print(f"\n❌ 下单失败") - return False - """ - - return False - - -def test_real_trading_service(): - """测试实盘交易服务""" - print_section("5. 测试实盘交易服务") - - service = get_real_trading_service() - - if not service: - print("\n❌ 实盘交易服务未启用") - print(" 请在 .env 中设置:") - print(" REAL_TRADING_ENABLED=true") - return False - - # 查询账户状态 - print("\n查询账户状态...") - account = service.get_account_status() - - print(f"\n✅ 账户状态:") - print(f" 总余额: ${account['current_balance']:,.2f}") - print(f" 可用: ${account['available']:,.2f}") - print(f" 已用: ${account['used_margin']:,.2f}") - print(f" 持仓: ${account['total_position_value']:,.2f}") - - # 同步持仓 - print("\n同步交易所持仓...") - positions = service.sync_positions_from_exchange() - - print(f"\n✅ 交易所持仓: {len(positions)} 个") - - return True - - -def main(): - """主函数""" - print("\n" + "🚀" * 40) - print("\nBitget 实盘交易功能测试") - print(f"测试时间: {datetime.now()}") - - settings = get_settings() - - print("\n⚠️ 重要提醒:") - print(f" 当前环境: {'测试网 ✅' if settings.bitget_use_testnet else '生产网 ⚠️'}") - - try: - # 1. 测试 API 连接 - if not test_api_connection(): - print("\n❌ API 连接测试失败,请检查配置") - return - - # 2. 测试查询账户 - if not test_query_account(): - print("\n❌ 查询账户测试失败") - return - - # 3. 测试查询持仓 - if not test_query_position(): - print("\n❌ 查询持仓测试失败") - return - - # 4. 测试下单(需要手动取消注释) - # test_place_small_order() - - # 5. 测试实盘交易服务 - if not test_real_trading_service(): - print("\n⚠️ 实盘交易服务未启用(可忽略)") - - print("\n" + "=" * 80) - print(" ✅ 所有测试通过!") - print("=" * 80) - print("\n实盘交易功能已就绪!") - - except Exception as e: - print(f"\n❌ 测试出错: {e}") - import traceback - traceback.print_exc() - - -if __name__ == "__main__": - from datetime import datetime - main() diff --git a/scripts/test_real_trading_api.py b/scripts/test_real_trading_api.py deleted file mode 100644 index abd1356..0000000 --- a/scripts/test_real_trading_api.py +++ /dev/null @@ -1,57 +0,0 @@ -""" -测试实盘交易 API 端点 -""" -import requests -import json - -BASE_URL = "http://localhost:8000" - -def test_api(): - print("=" * 60) - print("测试实盘交易 API") - print("=" * 60) - - # 测试 /api/real-trading/status - print("\n1. 测试 /api/real-trading/status") - try: - response = requests.get(f"{BASE_URL}/api/real-trading/status") - print(f" 状态码: {response.status_code}") - print(f" 响应: {json.dumps(response.json(), indent=2, ensure_ascii=False)}") - except Exception as e: - print(f" 错误: {e}") - - # 测试 /api/real-trading/account - print("\n2. 测试 /api/real-trading/account") - try: - response = requests.get(f"{BASE_URL}/api/real-trading/account") - print(f" 状态码: {response.status_code}") - print(f" 响应: {json.dumps(response.json(), indent=2, ensure_ascii=False)}") - except Exception as e: - print(f" 错误: {e}") - - # 测试 /api/real-trading/positions - print("\n3. 测试 /api/real-trading/positions") - try: - response = requests.get(f"{BASE_URL}/api/real-trading/positions") - print(f" 状态码: {response.status_code}") - print(f" 响应: {json.dumps(response.json(), indent=2, ensure_ascii=False)}") - except Exception as e: - print(f" 错误: {e}") - - # 测试 /api/real-trading/orders - print("\n4. 测试 /api/real-trading/orders?status=orders&limit=10") - try: - response = requests.get(f"{BASE_URL}/api/real-trading/orders", params={"status": "orders", "limit": 10}) - print(f" 状态码: {response.status_code}") - data = response.json() - print(f" Success: {data.get('success')}") - print(f" Count: {data.get('count')}") - if data.get('orders'): - print(f" Orders: {len(data.get('orders', []))} 条") - except Exception as e: - print(f" 错误: {e}") - - print("\n" + "=" * 60) - -if __name__ == "__main__": - test_api() diff --git a/scripts/test_real_trading_full.py b/scripts/test_real_trading_full.py deleted file mode 100644 index 3112ed8..0000000 --- a/scripts/test_real_trading_full.py +++ /dev/null @@ -1,324 +0,0 @@ -""" -实盘交易完整流程测试脚本 - -测试内容: -1. 获取账户余额 -2. 获取当前持仓 -3. 下测试单(极小金额) -4. 查询订单状态 -5. 修改止损止盈 -6. 平仓 - -风险控制: -- 使用极小金额测试(5-10 USDT) -- 测试完成后自动平仓 -""" -import sys -import os -sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'backend')) - -import asyncio -import time -from app.services.bitget_trading_api_sdk import get_bitget_trading_api -from app.services.real_trading_service import get_real_trading_service -from app.utils.logger import logger - - -def print_section(title): - """打印分隔线""" - print("\n" + "=" * 60) - print(f" {title}") - print("=" * 60) - - -def test_account_info(trading_api): - """测试1: 获取账户信息""" - print_section("1. 获取账户信息") - - try: - balance = trading_api.get_balance() - print(f"✅ 账户余额:") - for currency, info in balance.items(): - if isinstance(info, dict): - available = info.get('available', 0) - frozen = info.get('frozen', 0) - locked = info.get('locked', 0) - if float(available) > 0 or float(frozen) > 0: - print(f" {currency}: 可用={float(available):.2f}, 冻结={float(frozen):.2f}, 锁定={float(locked):.2f}") - return True - except Exception as e: - print(f"❌ 获取账户信息失败: {e}") - return False - - -def test_get_positions(trading_api): - """测试2: 获取当前持仓""" - print_section("2. 获取当前持仓") - - try: - positions = trading_api.get_position() - print(f"✅ 当前持仓数量: {len(positions)}") - - open_positions = [p for p in positions if float(p.get('contracts', 0)) != 0] - if open_positions: - print(f" 活跃持仓:") - for pos in open_positions: - symbol = pos.get('symbol') - side = pos.get('side') - contracts = pos.get('contracts') - entry_price = pos.get('entryPrice') - mark_price = pos.get('markPrice') - unrealized_pnl = pos.get('unrealizedPnl', 0) - print(f" {symbol} {side} | 数量: {contracts} | 入场: {entry_price} | 标记: {mark_price} | 浮盈: {unrealized_pnl}") - else: - print(f" 当前无持仓") - - return True - except Exception as e: - print(f"❌ 获取持仓失败: {e}") - return False - - -def test_place_order(trading_api, symbol="BTCUSDT", side='buy', size=5): - """测试3: 下测试单""" - print_section("3. 下测试单") - - # 生成订单ID - import uuid - client_order_id = f"test_{uuid.uuid4().hex[:8]}" - - print(f" 交易对: {symbol}") - print(f" 方向: {side}") - print(f" 数量: {size} USDT") - print(f" 客户端ID: {client_order_id}") - - try: - # 先获取当前价格 - ticker = trading_api.exchange.fetch_ticker(symbol) - current_price = ticker['last'] - print(f" 当前价格: ${current_price:,.2f}") - - # 下市价单 - print(f" 📝 正在下单...") - result = trading_api.place_order( - symbol=symbol, - side=side, - order_type='market', - size=size, - client_order_id=client_order_id - ) - - if result: - order_id = result.get('id') - filled_price = result.get('average') or result.get('price') - print(f"✅ 下单成功!") - print(f" 订单ID: {order_id}") - print(f" 成交价格: ${filled_price:,.2f}") - print(f" 成交数量: {size} USDT") - - # 等待一下让订单完全成交 - time.sleep(2) - return order_id, symbol, side - else: - print(f"❌ 下单失败: 无返回结果") - return None - except Exception as e: - print(f"❌ 下单失败: {e}") - import traceback - print(traceback.format_exc()) - return None - - -def test_get_order(trading_api, order_id, symbol): - """测试4: 查询订单状态""" - print_section("4. 查询订单状态") - - try: - order = trading_api.exchange.fetch_order(order_id, symbol) - print(f"✅ 订单状态: {order.get('status')}") - print(f" 订单ID: {order.get('id')}") - print(f" 价格: {order.get('price')}") - print(f" 已成交: {order.get('filled')}") - print(f" 剩余: {order.get('remaining')}") - return order - except Exception as e: - print(f"❌ 查询订单失败: {e}") - return None - - -def test_modify_sl_tp(trading_api, symbol, side='buy'): - """测试5: 修改止损止盈""" - print_section("5. 修改止损止盈") - - # 获取当前持仓 - try: - positions = trading_api.get_position() - position = None - - for pos in positions: - if pos.get('symbol') == symbol and float(pos.get('contracts', 0)) != 0: - position = pos - break - - if not position: - print(f"❌ 未找到 {symbol} 的持仓") - return False - - # 获取当前价格来设置合理的止损止盈 - ticker = trading_api.exchange.fetch_ticker(symbol) - current_price = float(ticker['last']) - - if side == 'buy': - stop_loss = current_price * 0.97 # 3% 止损 - take_profit = current_price * 1.05 # 5% 止盈 - else: - stop_loss = current_price * 1.03 # 3% 止损 - take_profit = current_price * 0.95 # 5% 止盈 - - print(f" 当前价格: ${current_price:,.2f}") - print(f" 设置止损: ${stop_loss:,.2f}") - print(f" 设置止盈: ${take_profit:,.2f}") - - # 使用交易所API修改止损止盈(这需要根据具体交易所API实现) - # 注意: Bitget 可能需要使用不同的API来修改止损止盈 - - print(f"⚠️ 注意: 修改止损止盈功能需要根据交易所具体API实现") - print(f"✅ 止损止盈参数已计算(实际修改需要查看交易所API文档)") - - return True - except Exception as e: - print(f"❌ 修改止损止盈失败: {e}") - import traceback - print(traceback.format_exc()) - return False - - -def test_close_position(trading_api, symbol, side): - """测试6: 平仓""" - print_section("6. 平仓测试") - - # 获取当前持仓 - try: - positions = trading_api.get_position() - position = None - - for pos in positions: - if pos.get('symbol') == symbol and float(pos.get('contracts', 0)) != 0: - position = pos - break - - if not position: - print(f"❌ 未找到 {symbol} 的持仓,无法平仓") - return False - - contracts = float(position.get('contracts', 0)) - print(f" 持仓数量: {contracts}") - - # 平仓方向与开仓相反 - close_side = 'sell' if side == 'buy' else 'buy' - print(f" 平仓方向: {close_side}") - - # 生成平仓订单ID - import uuid - client_order_id = f"close_{uuid.uuid4().hex[:8]}" - - print(f" 📝 正在平仓...") - result = trading_api.place_order( - symbol=symbol, - side=close_side, - order_type='market', - size=contracts, # 平掉所有持仓 - client_order_id=client_order_id, - reduce_only=True # 只平仓,不开新仓 - ) - - if result: - print(f"✅ 平仓成功!") - print(f" 订单ID: {result.get('id')}") - return True - else: - print(f"❌ 平仓失败: 无返回结果") - return False - - except Exception as e: - print(f"❌ 平仓失败: {e}") - import traceback - print(traceback.format_exc()) - return False - - -def main(): - """主测试流程""" - print("\n" + "🚀" * 30) - print(" 实盘交易完整流程测试") - print("🚀" * 30) - print("\n⚠️ 警告: 此测试将使用真实资金,但金额很小(5-10 USDT)") - print("⚠️ 请确保:") - print(" 1. Bitget API 配置正确") - print(" 2. 账户有足够的测试资金") - print(" 3. 使用测试网或小额资金") - - # 确认开始测试 - confirm = input("\n是否开始测试? (输入 'yes' 继续): ") - if confirm.lower() != 'yes': - print("测试已取消") - return - - # 获取交易API - trading_api = get_bitget_trading_api() - if not trading_api: - print("❌ Bitget API 未初始化,请检查配置") - return - - print(f"\n📡 交易所: {'Bitget 测试网' if trading_api.use_testnet else 'Bitget 正式网'}") - - # 测试1: 获取账户信息 - if not test_account_info(trading_api): - return - - # 测试2: 获取当前持仓 - if not test_get_positions(trading_api): - return - - # 测试3: 下测试单 - symbol = "BTCUSDT" - side = 'buy' # 做多 - size = 5 # 5 USDT - - order_result = test_place_order(trading_api, symbol, side, size) - if not order_result: - print("\n❌ 下单失败,终止测试") - return - - order_id, _, _ = order_result - - # 测试4: 查询订单状态 - test_get_order(trading_api, order_id, symbol) - - # 测试5: 修改止损止盈 - test_modify_sl_tp(trading_api, symbol, side) - - # 测试6: 平仓 - time.sleep(2) # 等待一下 - test_close_position(trading_api, symbol, side) - - # 最终状态 - print_section("测试完成") - print("✅ 所有测试已完成,请检查:") - print(" 1. Bitget 账户中的订单记录") - print(" 2. 余额变化") - print(" 3. 持仓情况") - - # 再次获取持仓确认已平仓 - test_get_positions(trading_api) - - -if __name__ == "__main__": - try: - main() - except KeyboardInterrupt: - print("\n\n⚠️ 测试被用户中断") - except Exception as e: - print(f"\n\n❌ 测试出错: {e}") - import traceback - print(traceback.format_exc()) diff --git a/scripts/test_real_trading_simple.py b/scripts/test_real_trading_simple.py deleted file mode 100644 index c3c3ef2..0000000 --- a/scripts/test_real_trading_simple.py +++ /dev/null @@ -1,200 +0,0 @@ -""" -实盘交易简单测试 - 使用 API 直接测试 - -测试流程: -1. 获取账户余额 -2. 下一个小单 -3. 查询订单 -4. 平仓 -""" -import requests -import json -import time - -BASE_URL = "http://localhost:8000" - -def print_section(title): - print("\n" + "=" * 50) - print(f" {title}") - print("=" * 50) - - -def test_get_account(): - """获取账户信息""" - print_section("1. 获取账户信息") - - response = requests.get(f"{BASE_URL}/api/real-trading/account") - data = response.json() - - if data.get('success'): - account = data['account'] - print(f"✅ 账户余额: ${account['current_balance']:.2f}") - print(f" 可用: ${account['available']:.2f}") - print(f" 已用: ${account['used_margin']:.2f}") - print(f" 持仓价值: ${account['total_position_value']:.2f}") - return account - else: - print(f"❌ 获取账户信息失败: {data.get('message')}") - return None - - -def test_get_positions(): - """获取持仓""" - print_section("2. 获取当前持仓") - - response = requests.get(f"{BASE_URL}/api/real-trading/positions") - data = response.json() - - if data.get('success'): - positions = data['positions'] - print(f"✅ 持仓数量: {len(positions)}") - - for pos in positions: - if pos.get('holding', 0) > 0: - print(f" {pos['symbol']}: {pos['holding']} USDT") - return positions - else: - print(f"❌ 获取持仓失败: {data.get('message')}") - return [] - - -def test_place_small_order(): - """下一个小测试单""" - print_section("3. 下测试单") - - # 使用模拟交易API测试(更安全) - symbol = "BTCUSDT" - - # 先获取当前价格 - ticker_response = requests.get(f"{BASE_URL}/api/bitget/ticker?symbol={symbol}") - ticker_data = ticker_response.json() - - if not ticker_data.get('success'): - print(f"❌ 获取价格失败") - return None - - current_price = ticker_data['data'].get('last_price', 0) - print(f" 当前价格: ${current_price:,.2f}") - - # 计算测试参数 - stop_loss = current_price * 0.95 - take_profit = current_price * 1.05 - - order_data = { - "symbol": symbol, - "action": "buy", - "entry_type": "market", - "entry_price": current_price, - "stop_loss": stop_loss, - "take_profit": take_profit, - "confidence": 60, - "signal_grade": "C", - "position_size": "light" - } - - print(f" 测试参数:") - print(f" 止损: ${stop_loss:,.2f}") - print(f" 止盈: ${take_profit:,.2f}") - - # 先用模拟交易测试 - print(f"\n 📝 正在创建模拟交易订单...") - response = requests.post(f"{BASE_URL}/api/paper-trading/order", json=order_data) - data = response.json() - - if data.get('success'): - order = data.get('order') - print(f"✅ 模拟订单创建成功!") - print(f" 订单ID: {order['order_id']}") - print(f" 数量: ${order['quantity']:.2f}") - return order - else: - print(f"❌ 下单失败: {data.get('message')}") - return None - - -def test_get_orders(): - """获取订单列表""" - print_section("4. 获取订单列表") - - response = requests.get(f"{BASE_URL}/api/paper-trading/orders?status=active&limit=10") - data = response.json() - - if data.get('success'): - orders = data['orders'] - print(f"✅ 活跃订单数量: {len(orders)}") - - for order in orders: - print(f" {order['symbol']} {order['side']} | " - f"状态: {order['status']} | " - f"数量: ${order['quantity']:.2f}") - return orders - else: - print(f"❌ 获取订单失败: {data.get('message')}") - return [] - - -def test_close_order(order_id): - """平仓""" - print_section("5. 平仓测试") - - print(f" 正在平仓订单: {order_id}") - - response = requests.post(f"{BASE_URL}/api/paper-trading/close", json={ - "order_id": order_id, - "reason": "测试平仓" - }) - - data = response.json() - - if data.get('success'): - print(f"✅ 平仓成功!") - print(f" 平仓价格: ${data.get('close_price', 0):,.2f}") - print(f" 盈亏: ${data.get('pnl', 0):.2f}") - return True - else: - print(f"❌ 平仓失败: {data.get('message')}") - return False - - -def main(): - print("\n" + "🧪" * 25) - print(" 实盘交易流程测试 (模拟模式)") - print("🧪" * 25) - print("\n此测试使用模拟交易,不会动用真实资金") - - input("\n按 Enter 开始测试...") - - # 测试1: 获取账户 - test_get_account() - - # 测试2: 获取持仓 - test_get_positions() - - # 测试3: 下单 - order = test_place_small_order() - - if order: - time.sleep(1) - - # 测试4: 查询订单 - test_get_orders() - - time.sleep(1) - - # 测试5: 平仓 - test_close_order(order['order_id']) - - print_section("测试完成") - print("✅ 所有测试已完成") - print(" 请检查前端页面确认订单状态") - - -if __name__ == "__main__": - try: - main() - except KeyboardInterrupt: - print("\n\n⚠️ 测试被用户中断") - except Exception as e: - print(f"\n\n❌ 测试出错: {e}") - import traceback - traceback.print_exc() diff --git a/scripts/test_stock.py b/scripts/test_stock.py deleted file mode 100755 index efb6b3d..0000000 --- a/scripts/test_stock.py +++ /dev/null @@ -1,648 +0,0 @@ -#!/usr/bin/env python3 -""" -美股分析脚本(新架构版) - -用法: - python3 scripts/test_stock.py AAPL - python3 scripts/test_stock.py AAPL TSLA NVDA -""" -import sys -import os - -# 确保路径正确 -script_dir = os.path.dirname(os.path.abspath(__file__)) -project_root = os.path.dirname(script_dir) -backend_dir = os.path.join(project_root, 'backend') -sys.path.insert(0, backend_dir) - -import asyncio -from app.services.yfinance_service import get_yfinance_service -from app.services.feishu_service import get_feishu_service -from app.services.telegram_service import get_telegram_service -from app.services.fundamental_service import get_fundamental_service -from app.services.news_service import get_news_service -from app.stock_agent.market_signal_analyzer import StockMarketSignalAnalyzer -from app.config import get_settings -from app.utils.logger import logger - - -async def analyze(symbol: str, send_notification: bool = True): - """分析单只股票 - - Args: - symbol: 股票代码 - send_notification: 是否发送通知(默认True) - - Returns: - 分析结果字典 - """ - # 获取服务 - yf_service = get_yfinance_service() - market_analyzer = StockMarketSignalAnalyzer() # 使用新的市场信号分析器 - fundamental = get_fundamental_service() # 基本面服务 - news = get_news_service() # 新闻服务 - feishu = get_feishu_service() - telegram = get_telegram_service() - - result = { - 'symbol': symbol, - 'stock_name': '', # 从基本面数据获取 - 'price': 0, - 'signals': [], - 'notified': False - } - - # 获取配置 - settings = get_settings() - threshold = settings.stock_llm_threshold * 100 # 转换为百分比 - - print(f"\n{'='*60}") - print(f"📊 分析 {symbol}") - print(f"{'='*60}") - - try: - # 获取行情 - print(f"获取行情...") - ticker = yf_service.get_ticker(symbol) - if not ticker: - print(f"❌ 无法获取 {symbol} 行情") - return result - - price = ticker['lastPrice'] - change = ticker['priceChangePercent'] - result['price'] = price - print(f"价格: ${price:,.2f} ({change:+.2f}%)") - print(f"成交量: {ticker['volume']:,}") - - # 获取K线 - print(f"获取K线数据...") - data = yf_service.get_multi_timeframe_data(symbol) - - if not data: - print(f"❌ 无法获取K线数据") - return result - - print(f"时间周期: {', '.join(data.keys())}") - - # 获取基本面数据(包含公司名称) - print(f"\n📈 基本面分析中...") - fundamental_data = None - fundamental_summary = "" - stock_name = "" - try: - fundamental_data = fundamental.get_fundamental_data(symbol) - if fundamental_data: - # 传递已获取的数据,避免重复调用 - fundamental_summary = fundamental.get_fundamental_summary(symbol, fundamental_data) - # 获取公司名称 - stock_name = fundamental_data.get('company_name', '') - result['stock_name'] = stock_name - # 输出基本面详细信息 - score = fundamental_data.get('score', {}) - print(f" ✓ 基本面数据获取成功") - - # 公司信息 - company = fundamental_data.get('company_name', 'N/A') - sector = fundamental_data.get('sector', 'N/A') - print(f" 【公司】{company} | {sector}") - - # 更新显示 - symbol_display = f"{stock_name}({symbol})" if stock_name else symbol - print(f"\n{'='*60}") - print(f"📊 分析 {symbol_display} @ ${price:,.2f}") - print(f"{'='*60}") - - # 评分 - print(f" 【评分】总分: {score.get('total', 0):.0f}/100 ({score.get('rating', 'N/A')}级) | " - f"估值:{score.get('valuation', 0)} 盈利:{score.get('profitability', 0)} " - f"成长:{score.get('growth', 0)} 财务:{score.get('financial_health', 0)}") - - # 估值指标 - val = fundamental_data.get('valuation', {}) - if val.get('pe_ratio'): - pe = val['pe_ratio'] - pb = val.get('pb_ratio') - ps = val.get('ps_ratio') - peg = val.get('peg_ratio') - pb_str = f"{pb:.2f}" if pb is not None else "N/A" - ps_str = f"{ps:.2f}" if ps is not None else "N/A" - peg_str = f"{peg:.2f}" if peg is not None else "N/A" - print(f" 【估值】PE:{pe:.2f} | PB:{pb_str} | PS:{ps_str} | PEG:{peg_str}") - - # 盈利能力 - prof = fundamental_data.get('profitability', {}) - if prof.get('return_on_equity'): - roe = prof['return_on_equity'] - pm = prof.get('profit_margin') - gm = prof.get('gross_margin') - pm_str = f"{pm:.1f}" if pm is not None else "N/A" - gm_str = f"{gm:.1f}" if gm is not None else "N/A" - print(f" 【盈利】ROE:{roe:.2f}% | 净利率:{pm_str}% | 毛利率:{gm_str}%") - - # 成长性 - growth = fundamental_data.get('growth', {}) - rg = growth.get('revenue_growth') - eg = growth.get('earnings_growth') - if rg is not None or eg is not None: - rg_str = f"{rg:.1f}" if rg is not None else "N/A" - eg_str = f"{eg:.1f}" if eg is not None else "N/A" - print(f" 【成长】营收增长:{rg_str}% | 盈利增长:{eg_str}%") - - # 财务健康 - fin = fundamental_data.get('financial_health', {}) - if fin.get('debt_to_equity'): - de = fin['debt_to_equity'] - cr = fin.get('current_ratio') - cr_str = f"{cr:.2f}" if cr is not None else "N/A" - print(f" 【财务】债务股本比:{de:.2f} | 流动比率:{cr_str}") - - # 分析师建议 - analyst = fundamental_data.get('analyst', {}) - tp = analyst.get('target_price') - if tp: - rec = analyst.get('recommendation', 'N/A') - print(f" 【分析师】目标价:${tp:.2f} | 评级:{rec}") - - else: - print(f" ⚠️ 无法获取基本面数据") - except Exception as e: - print(f" ⚠️ 获取基本面数据失败: {e}") - - # 获取新闻数据 - print(f"\n📰 新闻分析...") - news_data = None - try: - news_data = await news.search_stock_news(symbol, stock_name, max_results=5) - if news_data: - print(f" 获取到 {len(news_data)} 条相关新闻") - else: - print(f" 暂无相关新闻") - except Exception as e: - print(f" ⚠️ 获取新闻数据失败: {e}") - - # 市场信号分析(使用新架构 - 技术面 + 基本面 + 新闻) - print(f"\n🤖 市场信号分析中...\n") - market_signal = await market_analyzer.analyze( - symbol, data, - symbols=[symbol], - fundamental_data=fundamental_data, - news_data=news_data - ) - - # 输出结果 - summary = market_signal.get('analysis_summary', '') - signals = market_signal.get('signals', []) - result['signals'] = signals - - print(f"市场状态: {summary}") - - if signals: - print(f"\n🎯 发现 {len(signals)} 个信号:") - for sig in signals: - action = sig.get('action', 'wait') - grade = sig.get('grade', 'D') - conf = sig.get('confidence', 0) - - action_text = {'buy': '🟢 做多', 'sell': '🔴 做空'}.get(action, action) - grade_icon = {'A': '⭐⭐⭐', 'B': '⭐⭐', 'C': '⭐'}.get(grade, '') - - print(f"\n{action_text} [{grade}{grade_icon}] {conf}%") - - entry = sig.get('entry_price') - sl = sig.get('stop_loss') - tp = sig.get('take_profit') - - if entry and sl and tp: - print(f" 入场: ${entry:,.2f}") - print(f" 止损: ${sl:,.2f}") - print(f" 止盈: ${tp:,.2f}") - - reason = sig.get('reason', '') - if reason: - # 限制理由长度 - short_reason = reason[:80] + "..." if len(reason) > 80 else reason - print(f" 理由: {short_reason}") - - # 发送通知(仅发送置信度 >= 阈值的信号) - if send_notification: - best_signal = None - for sig in signals: - if sig.get('confidence', 0) >= threshold and sig.get('grade', 'D') != 'D': - best_signal = sig - break - - if best_signal: - # 使用格式化工具格式化通知 - from app.utils.signal_formatter import get_signal_formatter - formatter = get_signal_formatter() - - card = formatter.format_feishu_card(best_signal, symbol, agent_type='stock') - title = card['title'] - content = card['content'] - - # 发送通知 - 使用 send_card 方法 - # 根据信号方向选择颜色 - color = "green" if best_signal.get('action') == 'buy' else "red" - await feishu.send_card(title, content, color) - await telegram.send_message(formatter.format_signal_message(best_signal, symbol, agent_type='stock')) - print(f"\n📬 通知已发送:{title}") - result['notified'] = True - else: - print(f"\n⏸️ 置信度不足,不发送通知(阈值: {threshold}%)") - else: - print(f"\n⏸️ 无交易信号") - - return result - - except Exception as e: - print(f"❌ 错误: {e}") - import traceback - traceback.print_exc() - return result - - -def print_summary_report(results: list, send_notification: bool = True): - """打印汇总报告并发送通知 - - Args: - results: 分析结果列表 - send_notification: 是否发送通知(默认True) - """ - from app.config import get_settings - settings = get_settings() - threshold = settings.stock_llm_threshold * 100 # 获取阈值 - - total = len(results) - with_signals = [r for r in results if r.get('signals')] - notified = [r for r in results if r.get('notified')] - - # 区分美股和港股 - us_results = [r for r in results if not r['symbol'].endswith('.HK')] - hk_results = [r for r in results if r['symbol'].endswith('.HK')] - - # 统计信号 - buy_count = 0 - sell_count = 0 - high_quality_signals = [] # A/B级信号且达到阈值 - all_signals = [] # 所有信号 - - for r in with_signals: - for sig in r.get('signals', []): - sig['symbol'] = r['symbol'] - sig['current_price'] = r.get('price', 0) - sig['is_hk'] = r['symbol'].endswith('.HK') - sig['stock_name'] = r.get('stock_name', '') - all_signals.append(sig) - - if sig.get('action') == 'buy': - buy_count += 1 - elif sig.get('action') == 'sell': - sell_count += 1 - - # 只统计达到阈值的A/B级信号 - if sig.get('grade') in ['A', 'B'] and sig.get('confidence', 0) >= threshold: - high_quality_signals.append(sig) - - # 按置信度排序 - high_quality_signals.sort(key=lambda x: x.get('confidence', 0), reverse=True) - all_signals.sort(key=lambda x: x.get('confidence', 0), reverse=True) - - # 打印汇总 - print("\n" + "="*80) - print("📊 股票分析汇总报告") - print("="*80) - print(f"分析总数: {total} 只 (美股: {len(us_results)}, 港股: {len(hk_results)})") - print(f"有信号: {len(with_signals)} 只") - print(f"已通知: {len(notified)} 只") - print(f"通知阈值: {threshold}%") - print("") - - # 显示高等级信号(达到阈值的) - if high_quality_signals: - print(f"⭐ 高等级信号达到阈值 (A/B级 >= {threshold}%): {len(high_quality_signals)} 个") - for sig in high_quality_signals[:10]: - symbol = sig['symbol'] - stock_name = sig.get('stock_name', '') - market_tag = '[港股]' if sig.get('is_hk') else '[美股]' - action = '🟢 做多' if sig.get('action') == 'buy' else '🔴 做空' - grade = sig.get('grade', 'D') - confidence = sig.get('confidence', 0) - price = sig.get('current_price', 0) - entry = sig.get('entry_price', 0) - - # 构建带名称的股票显示 - symbol_display = f"{stock_name}({symbol})" if stock_name else symbol - - print(f" {market_tag} {symbol_display} {action} [{grade}级] {confidence}% @ ${price:,.2f}") - if entry > 0: - print(f" 入场: ${entry:,.2f}") - print("") - - # 显示未达到阈值但质量不错的信号 - below_threshold = [s for s in all_signals - if s.get('grade') in ['A', 'B'] and s.get('confidence', 0) < threshold] - if below_threshold: - print(f"⚠️ 以下信号未达到通知阈值 ({threshold}%):") - for sig in below_threshold[:10]: - symbol = sig['symbol'] - stock_name = sig.get('stock_name', '') - market_tag = '[港股]' if sig.get('is_hk') else '[美股]' - action = '🟢 做多' if sig.get('action') == 'buy' else '🔴 做空' - grade = sig.get('grade', 'D') - confidence = sig.get('confidence', 0) - - symbol_display = f"{stock_name}({symbol})" if stock_name else symbol - print(f" {market_tag} {symbol_display} {action} {grade}级 {confidence}%") - print("") - - # 统计汇总 - print(f"📈 做多信号: {buy_count} 个") - print(f"📉 做空信号: {sell_count} 个") - print("="*80) - - # 发送汇总通知 - if send_notification: - asyncio.run(send_summary_notification( - results, total, with_signals, notified, - buy_count, sell_count, high_quality_signals, all_signals, threshold, - len(us_results), len(hk_results) - )) - - -async def send_summary_notification( - results: list, - total: int, - with_signals: list, - notified: list, - buy_count: int, - sell_count: int, - high_quality_signals: list, - all_signals: list, - threshold: float, - us_count: int = 0, - hk_count: int = 0 -): - """发送汇总报告到飞书和Telegram - - Args: - results: 分析结果列表 - total: 总数 - with_signals: 有信号的股票列表 - notified: 已通知的股票列表 - buy_count: 做多信号数量 - sell_count: 做空信号数量 - high_quality_signals: 达到阈值的高等级信号列表 - all_signals: 所有信号列表 - threshold: 通知阈值 - us_count: 美股数量 - hk_count: 港股数量 - """ - try: - from datetime import datetime - feishu = get_feishu_service() - telegram = get_telegram_service() - - now = datetime.now() - - # 构建飞书汇总内容 - content_parts = [ - f"**📊 股票分析汇总报告**", - f"", - f"⏰ 时间: {now.strftime('%Y-%m-%d %H:%M')}", - f"", - f"📊 **分析概况**", - f"• 美股: {us_count} 只 | 港股: {hk_count} 只", - f"• 发现信号: {len(with_signals)} 只", - f"• 已发通知: {len(notified)} 只", - f"• 通知阈值: {threshold:.0f}%", - f"", - ] - - # 高等级信号(达到阈值的) - if high_quality_signals: - content_parts.append(f"⭐ **高等级信号 (A/B级 ≥ {threshold:.0f}%)**") - for sig in high_quality_signals[:5]: - symbol = sig['symbol'] - stock_name = sig.get('stock_name', '') - market_tag = '[港股]' if sig.get('is_hk') else '[美股]' - action = '🟢 做多' if sig.get('action') == 'buy' else '🔴 做空' - grade = sig.get('grade', 'D') - confidence = sig.get('confidence', 0) - - # 构建带名称的股票显示 - symbol_display = f"{stock_name}({symbol})" if stock_name else symbol - - content_parts.append(f"• {market_tag} {symbol_display} {action} {grade}级 {confidence}%") - content_parts.append(f"") - - # 信号统计 - content_parts.extend([ - f"📈 做多信号: {buy_count} 个", - f"📉 做空信号: {sell_count} 个", - f"", - f"*⚠️ 仅供参考,不构成投资建议*" - ]) - - content = "\n".join(content_parts) - - # 发送飞书 - title = f"📊 股票分析汇总 ({now.strftime('%H:%M')})" - color = "blue" - - await feishu.send_card(title, content, color) - - # 发送 Telegram - telegram_msg = f"📊 *股票分析汇总*\n\n" - telegram_msg += f"时间: {now.strftime('%H:%M')}\n" - telegram_msg += f"美股: {us_count}只 | 港股: {hk_count}只\n" - telegram_msg += f"信号: {len(with_signals)}只 | 通知: {len(notified)}只\n" - telegram_msg += f"阈值: {threshold:.0f}%\n\n" - - if high_quality_signals: - telegram_msg += f"⭐ *高等级信号 (≥{threshold:.0f}%)*\n" - for sig in high_quality_signals[:5]: - symbol = sig['symbol'] - stock_name = sig.get('stock_name', '') - market_tag = '[港股]' if sig.get('is_hk') else '[美股]' - action = '🟢 做多' if sig.get('action') == 'buy' else '🔴 做空' - grade = sig.get('grade', 'D') - confidence = sig.get('confidence', 0) - - symbol_display = f"{stock_name}({symbol})" if stock_name else symbol - telegram_msg += f"{market_tag} {symbol_display} {action} {grade}级 {confidence}%\n" - telegram_msg += "\n" - - telegram_msg += f"做多: {buy_count} | 做空: {sell_count}" - - await telegram.send_message(telegram_msg) - - print(f"\n📬 汇总报告已发送到飞书和Telegram") - - except Exception as e: - print(f"❌ 发送汇总通知失败: {e}") - import traceback - traceback.print_exc() - - -async def main(): - # 从命令行参数获取股票代码 - if len(sys.argv) < 2: - print("用法: python3 scripts/test_stock.py AAPL [TSLA] [NVDA] ...") - print("\n示例:") - print(" python3 scripts/test_stock.py AAPL") - print(" python3 scripts/test_stock.py AAPL TSLA NVDA") - sys.exit(1) - - # 过滤掉无效的参数(如环境变量路径、配置项等) - # 只接受有效的股票代码(字母开头,1-5个字符,或包含数字的港股代码如0700.HK) - raw_symbols = sys.argv[1:] - symbols = [] - for arg in raw_symbols: - # 跳过包含路径分隔符、等号、或明显不是股票代码的参数 - if '/' in arg or '=' in arg or ':' in arg or len(arg) > 10: - logger.debug(f"跳过无效参数: {arg}") - continue - # 接受纯字母的参数(美股股票代码) - if arg.isalpha() and 1 <= len(arg) <= 5: - symbols.append(arg.upper()) - # 接受包含数字和点号的参数(港股代码,如0700.HK) - elif '.HK' in arg.upper() and len(arg) <= 10: - symbols.append(arg.upper()) - else: - logger.debug(f"跳过非股票代码参数: {arg}") - - if not symbols: - print("❌ 未找到有效的股票代码") - print(f"接收到的参数: {raw_symbols}") - print("\n用法: python3 scripts/test_stock.py AAPL [TSLA] [NVDA] ...") - sys.exit(1) - - print("="*60) - print("🤖 美股分析脚本") - print("="*60) - print(f"股票: {', '.join(symbols)}") - print(f"时间: {__import__('datetime').datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") - - # 收集所有分析结果 - results = [] - for symbol in symbols: - result = await analyze(symbol.upper(), send_notification=True) - results.append(result) - - # 生成汇总报告并发送通知 - await send_summary_notification_async(results) - - print("\n" + "="*60) - print("✅ 分析完成") - print("="*60) - - -async def send_summary_notification_async(results: list): - """异步发送汇总通知""" - from app.config import get_settings - settings = get_settings() - threshold = settings.stock_llm_threshold * 100 # 获取阈值 - - total = len(results) - with_signals = [r for r in results if r.get('signals')] - notified = [r for r in results if r.get('notified')] - - # 区分美股和港股 - us_results = [r for r in results if not r['symbol'].endswith('.HK')] - hk_results = [r for r in results if r['symbol'].endswith('.HK')] - - # 统计信号 - buy_count = 0 - sell_count = 0 - high_quality_signals = [] # A/B级信号且达到阈值 - all_signals = [] # 所有信号 - - for r in with_signals: - for sig in r.get('signals', []): - sig['symbol'] = r['symbol'] - sig['current_price'] = r.get('price', 0) - sig['is_hk'] = r['symbol'].endswith('.HK') - sig['stock_name'] = r.get('stock_name', '') - all_signals.append(sig) - - if sig.get('action') == 'buy': - buy_count += 1 - elif sig.get('action') == 'sell': - sell_count += 1 - - # 只统计达到阈值的A/B级信号 - if sig.get('grade') in ['A', 'B'] and sig.get('confidence', 0) >= threshold: - high_quality_signals.append(sig) - - # 按置信度排序 - high_quality_signals.sort(key=lambda x: x.get('confidence', 0), reverse=True) - all_signals.sort(key=lambda x: x.get('confidence', 0), reverse=True) - - # 打印汇总 - print("\n" + "="*80) - print("📊 股票分析汇总报告") - print("="*80) - print(f"分析总数: {total} 只 (美股: {len(us_results)}, 港股: {len(hk_results)})") - print(f"有信号: {len(with_signals)} 只") - print(f"已通知: {len(notified)} 只") - print(f"通知阈值: {threshold}%") - print("") - - # 显示高等级信号(达到阈值的) - if high_quality_signals: - print(f"⭐ 高等级信号达到阈值 (A/B级 >= {threshold}%): {len(high_quality_signals)} 个") - for sig in high_quality_signals[:10]: - symbol = sig['symbol'] - stock_name = sig.get('stock_name', '') - market_tag = '[港股]' if sig.get('is_hk') else '[美股]' - action = '🟢 做多' if sig.get('action') == 'buy' else '🔴 做空' - grade = sig.get('grade', 'D') - confidence = sig.get('confidence', 0) - price = sig.get('current_price', 0) - entry = sig.get('entry_price', 0) - - # 构建带名称的股票显示 - symbol_display = f"{stock_name}({symbol})" if stock_name else symbol - - print(f" {market_tag} {symbol_display} {action} [{grade}级] {confidence}% @ ${price:,.2f}") - if entry > 0: - print(f" 入场: ${entry:,.2f}") - print("") - - # 显示未达到阈值但质量不错的信号 - below_threshold = [s for s in all_signals - if s.get('grade') in ['A', 'B'] and s.get('confidence', 0) < threshold] - if below_threshold: - print(f"⚠️ 以下信号未达到通知阈值 ({threshold}%):") - for sig in below_threshold[:10]: - symbol = sig['symbol'] - stock_name = sig.get('stock_name', '') - market_tag = '[港股]' if sig.get('is_hk') else '[美股]' - action = '🟢 做多' if sig.get('action') == 'buy' else '🔴 做空' - grade = sig.get('grade', 'D') - confidence = sig.get('confidence', 0) - - # 构建带名称的股票显示 - symbol_display = f"{stock_name}({symbol})" if stock_name else symbol - - print(f" {market_tag} {symbol_display} {action} {grade}级 {confidence}%") - print("") - - # 统计汇总 - print(f"📈 做多信号: {buy_count} 个") - print(f"📉 做空信号: {sell_count} 个") - print("="*80) - - # 发送汇总通知 - await send_summary_notification( - results, total, with_signals, notified, - buy_count, sell_count, high_quality_signals, all_signals, threshold, - len(us_results), len(hk_results) - ) - - -if __name__ == "__main__": - try: - asyncio.run(main()) - except KeyboardInterrupt: - print("\n\n⚠️ 用户中断") diff --git a/scripts/test_volatility_filter.py b/scripts/test_volatility_filter.py deleted file mode 100644 index c92fa7f..0000000 --- a/scripts/test_volatility_filter.py +++ /dev/null @@ -1,215 +0,0 @@ -#!/usr/bin/env python3 -""" -测试组合波动率过滤功能 - -测试场景: -1. 低波动率(1小时和5分钟都低)- 应该跳过分析 -2. 高波动率(1小时高)- 应该允许分析 -3. 突发波动(1小时低,但5分钟高)- 应该允许分析 -""" -import sys -import pandas as pd -from pathlib import Path - -# 添加项目路径 -backend_dir = Path(__file__).parent.parent / "backend" -sys.path.insert(0, str(backend_dir)) - -from app.crypto_agent.crypto_agent import CryptoAgent - - -def create_test_data_1h_low_volatility(): - """ - 场景1: 低波动率 - 1小时K线:价格在 50000 - 50040 之间波动(波动率约0.08%) - 5分钟K线:价格稳定,无突发波动 - """ - import numpy as np - - # 创建1小时数据(20根K线)- 价格波动范围很小 - base_price = 50000 - timestamps_1h = pd.date_range(end='2024-01-01 12:00:00', periods=20, freq='1h') - df_1h = pd.DataFrame({ - 'timestamp': timestamps_1h, - 'open': [base_price + i * 2 for i in range(20)], # 缓慢上涨 - 'high': [base_price + 20 + i * 2 for i in range(20)], # 最高价只高20 - 'low': [base_price - 20 + i * 2 for i in range(20)], # 最低价只低20 - 'close': [base_price + 10 + i * 2 for i in range(20)], - 'volume': [1000] * 20 - }) - - # 创建5分钟数据(3根K线)- 价格稳定 - timestamps_5m = pd.date_range(end='2024-01-01 12:00:00', periods=3, freq='5min') - df_5m = pd.DataFrame({ - 'timestamp': timestamps_5m, - 'open': [base_price + 38, base_price + 39, base_price + 40], - 'high': [base_price + 42, base_price + 43, base_price + 44], - 'low': [base_price + 36, base_price + 37, base_price + 38], - 'close': [base_price + 39, base_price + 40, base_price + 41], - 'volume': [100] * 3 - }) - - return { - '1h': df_1h, - '5m': df_5m, - '15m': df_1h, - '4h': df_1h - } - - -def create_test_data_1h_high_volatility(): - """ - 场景2: 高波动率(1小时趋势活跃) - 1小时K线:价格在 50000 - 51000 之间波动(波动率约2%) - """ - import numpy as np - - # 创建1小时数据(20根K线) - timestamps_1h = pd.date_range(end='2024-01-01 12:00:00', periods=20, freq='1h') - df_1h = pd.DataFrame({ - 'timestamp': timestamps_1h, - 'open': [50000 + i * 50 for i in range(20)], - 'high': [50200 + i * 50 for i in range(20)], - 'low': [49800 + i * 50 for i in range(20)], - 'close': [50100 + i * 50 for i in range(20)], - 'volume': [1000] * 20 - }) - - # 创建5分钟数据 - timestamps_5m = pd.date_range(end='2024-01-01 12:00:00', periods=3, freq='5min') - df_5m = pd.DataFrame({ - 'timestamp': timestamps_5m, - 'open': [50980, 50985, 50990], - 'high': [51000, 51005, 51010], - 'low': [50975, 50980, 50985], - 'close': [50985, 50990, 50995], - 'volume': [100] * 3 - }) - - return { - '1h': df_1h, - '5m': df_5m, - '15m': df_1h, - '4h': df_1h - } - - -def create_test_data_5m_surge(): - """ - 场景3: 突发波动(1小时低,但5分钟高) - 1小时K线:价格在 50000 - 50040 之间波动(波动率约0.08%) - 5分钟K线:价格从 50000 突然涨到 50600(涨幅约1.2%) - """ - import numpy as np - - # 创建1小时数据(20根K线)- 低波动 - base_price = 50000 - timestamps_1h = pd.date_range(end='2024-01-01 12:00:00', periods=20, freq='1h') - df_1h = pd.DataFrame({ - 'timestamp': timestamps_1h, - 'open': [base_price + i * 2 for i in range(20)], - 'high': [base_price + 20 + i * 2 for i in range(20)], - 'low': [base_price - 20 + i * 2 for i in range(20)], - 'close': [base_price + 10 + i * 2 for i in range(20)], - 'volume': [1000] * 20 - }) - - # 创建5分钟数据(3根K线)- 突发波动 - # 第一根K线从正常价格开始,然后突然暴涨 - timestamps_5m = pd.date_range(end='2024-01-01 12:00:00', periods=3, freq='5min') - df_5m = pd.DataFrame({ - 'timestamp': timestamps_5m, - 'open': [base_price + 38, base_price + 300, base_price + 600], # 突然暴涨 - 'high': [base_price + 42, base_price + 350, base_price + 650], - 'low': [base_price + 36, base_price + 250, base_price + 550], - 'close': [base_price + 40, base_price + 600, base_price + 620], # 第一根收盘价接近开盘价,然后暴涨 - 'volume': [100, 200, 300] # 成交量也放大 - }) - - return { - '1h': df_1h, - '5m': df_5m, - '15m': df_1h, - '4h': df_1h - } - - -def test_volatility_filter(): - """测试波动率过滤功能""" - print("\n" + "=" * 60) - print("🧪 测试组合波动率过滤功能") - print("=" * 60) - - # 创建 CryptoAgent 实例 - agent = CryptoAgent() - - # 测试场景1: 低波动率 - print("\n📋 场景1: 低波动率(1小时和5分钟都低)") - print("-" * 60) - data1 = create_test_data_1h_low_volatility() - should_analyze1, reason1, vol1 = agent._check_volatility('BTCUSDT', data1) - print(f"结果: {'✅ 允许分析' if should_analyze1 else '⏸️ 跳过分析'}") - print(f"原因: {reason1}") - print(f"1小时波动率: {vol1:.2f}%") - print(f"预期: ⏸️ 跳过分析(波动率过低)") - print(f"测试: {'✅ 通过' if not should_analyze1 else '❌ 失败'}") - - # 测试场景2: 高波动率 - print("\n📋 场景2: 高波动率(1小时趋势活跃)") - print("-" * 60) - data2 = create_test_data_1h_high_volatility() - should_analyze2, reason2, vol2 = agent._check_volatility('BTCUSDT', data2) - print(f"结果: {'✅ 允许分析' if should_analyze2 else '⏸️ 跳过分析'}") - print(f"原因: {reason2}") - print(f"1小时波动率: {vol2:.2f}%") - print(f"预期: ✅ 允许分析(1小时趋势活跃)") - print(f"测试: {'✅ 通过' if should_analyze2 else '❌ 失败'}") - - # 测试场景3: 突发波动 - print("\n📋 场景3: 突发波动(1小时低,但5分钟高)") - print("-" * 60) - data3 = create_test_data_5m_surge() - - # 打印5分钟数据用于调试 - df_5m_3 = data3['5m'] - print(f"5分钟K线数据:") - for idx, row in df_5m_3.iterrows(): - print(f" 开: {row['open']:.2f}, 高: {row['high']:.2f}, 低: {row['low']:.2f}, 收: {row['close']:.2f}") - - # 手动计算价格变化 - price_start = float(df_5m_3.iloc[0]['close']) - price_end = float(df_5m_3.iloc[-1]['close']) - price_change_5m = abs(price_end - price_start) / price_start * 100 - print(f"5分钟价格变化: {price_start:.2f} -> {price_end:.2f} = {price_change_5m:.2f}%") - - should_analyze3, reason3, vol3 = agent._check_volatility('BTCUSDT', data3) - print(f"结果: {'✅ 允许分析' if should_analyze3 else '⏸️ 跳过分析'}") - print(f"原因: {reason3}") - print(f"1小时波动率: {vol3:.2f}%") - print(f"预期: ✅ 允许分析(5分钟突发波动)") - print(f"测试: {'✅ 通过' if should_analyze3 else '❌ 失败'}") - - # 总结 - print("\n" + "=" * 60) - print("📊 测试总结") - print("=" * 60) - all_passed = ( - not should_analyze1 and # 场景1应该跳过 - should_analyze2 and # 场景2应该允许 - should_analyze3 # 场景3应该允许 - ) - if all_passed: - print("✅ 所有测试通过!") - print("\n组合波动率过滤功能正常工作:") - print(" • 低波动率时正确跳过分析(节省API调用)") - print(" • 高波动率时正确触发分析(捕捉趋势机会)") - print(" • 突发波动时正确触发分析(捕捉突发机会)") - else: - print("❌ 部分测试失败,请检查逻辑") - - return all_passed - - -if __name__ == "__main__": - success = test_volatility_filter() - sys.exit(0 if success else 1) diff --git a/tests/test_pullback_selector.py b/tests/test_pullback_selector.py deleted file mode 100644 index 65d0853..0000000 --- a/tests/test_pullback_selector.py +++ /dev/null @@ -1,83 +0,0 @@ -""" -龙回头选股器测试脚本 -运行: cd backend && python ../tests/test_pullback_selector.py -""" -import asyncio -import sys -import os -from pathlib import Path - -# 添加 backend 目录到 Python 路径 -backend_dir = Path(__file__).parent.parent / "backend" -sys.path.insert(0, str(backend_dir)) - -# 切换到 backend 目录(确保 .env 能被找到) -os.chdir(backend_dir) - -from app.config import get_settings -from app.astock_agent.pullback_selector import get_pullback_selector -import logging - -# 启用详细日志 -logging.getLogger('stock_agent').setLevel(logging.DEBUG) - - -async def test_pullback_selector(): - """测试龙回头选股器""" - print("=" * 60) - print("龙回头选股器测试") - print("=" * 60) - - settings = get_settings() - print(f"\n配置:") - print(f" Tushare Token: {'已配置' if settings.tushare_token else '未配置'}") - print(f" 选股启用: {settings.pullback_selector_enabled}") - print(f" 检查板块数: {settings.pullback_sectors_to_check}") - - if not settings.tushare_token: - print("\n❌ 错误: 请先在 .env 中配置 TUSHARE_TOKEN") - return - - # 创建选股器 - selector = get_pullback_selector() - - print(f"\n开始选股...") - print(f"检查板块数: {settings.pullback_sectors_to_check}") - print(f"每个板块最多选: 2 只\n") - - # 执行选股 - results = selector.select_from_hot_sectors(top_n=settings.pullback_sectors_to_check) - - # 显示结果 - if results: - print("\n" + "=" * 60) - print("选股结果") - print("=" * 60) - - for sector_name, stocks in results.items(): - print(f"\n【{sector_name}】") - for stock in stocks: - print(f" 📈 {stock['name']}({stock['ts_code']})") - print(f" 现价: ¥{stock['close']:.2f} | 回踩: {stock['pullback_pct']:.2f}% | 涨幅: {stock['rise_pct']:.2f}%") - print(f" MA5: ¥{stock['ma5']:.2f} | MA10: ¥{stock['ma10']:.2f} | MA30: ¥{stock['ma30']:.2f} | MA60: ¥{stock['ma60']:.2f}") - print(f" 缩量比: {stock['volume_shrink_ratio']:.0%} | 再放量: {stock['recent_volume_ratio']:.2f}x") - - total = sum(len(stocks) for stocks in results.values()) - print(f"\n共选出 {total} 只股票") - - # 格式化完整消息 - print("\n" + "=" * 60) - print("通知消息预览") - print("=" * 60) - print(selector.format_result(results)) - - else: - print("\n未选出符合条件的股票") - - print("\n" + "=" * 60) - print("测试完成") - print("=" * 60) - - -if __name__ == "__main__": - asyncio.run(test_pullback_selector())