删除不必要的文件

This commit is contained in:
aaron 2026-03-02 23:40:36 +08:00
parent 6c9a7261d3
commit b7e07e5d0b
26 changed files with 0 additions and 4303 deletions

View File

@ -1,472 +0,0 @@
"""
测试 Bitget 实盘交易 API
运行前请确保已在 .env 中配置
- bitget_api_key
- bitget_api_secret
- bitget_passphrase
- bitget_use_testnet = true (建议先在测试网测试)
"""
import sys
import os
import asyncio
# 添加项目路径
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
from app.services.bitget_trading_api_sdk import BitgetTradingAPI
from app.config import get_settings
def print_section(title: str):
"""打印分节标题"""
print("\n" + "=" * 60)
print(f" {title}")
print("=" * 60)
def test_connection(api: BitgetTradingAPI):
"""测试 API 连接"""
print_section("1. 测试 API 连接")
if api.test_connection():
print("✅ API 连接成功")
return True
else:
print("❌ API 连接失败")
return False
def test_balance(api: BitgetTradingAPI):
"""测试查询余额"""
print_section("2. 查询账户余额")
balance = api.get_balance()
if balance:
print(f"✅ 查询余额成功")
for currency, info in balance.items():
available = info.get('available', '0')
frozen = info.get('frozen', '0')
print(f" {currency}: 可用={available}, 冻结={frozen}")
return True
else:
print("❌ 查询余额失败")
return False
def test_position(api: BitgetTradingAPI, symbol: str = "BTCUSDT"):
"""测试查询持仓"""
print_section(f"3. 查询持仓 ({symbol})")
positions = api.get_position(symbol)
if positions is not None:
if positions:
print(f"✅ 查询到 {len(positions)} 个持仓")
for pos in positions:
pos_side = pos.get('side', 'N/A')
contracts = pos.get('contracts', 0)
unrealized_pnl = pos.get('unrealizedPnl', 0)
print(f" {pos_side}: {contracts}张, 未实现盈亏={unrealized_pnl}")
else:
print("✅ 当前无持仓")
return True
else:
print("❌ 查询持仓失败")
return False
def test_leverage(api: BitgetTradingAPI, symbol: str = "BTCUSDT", leverage: int = 10):
"""测试设置杠杆"""
print_section(f"4. 设置杠杆 ({symbol} {leverage}x)")
if api.set_leverage(symbol, leverage):
print(f"✅ 设置杠杆成功: {leverage}x")
return True
else:
print(f"❌ 设置杠杆失败")
return False
def test_open_orders(api: BitgetTradingAPI):
"""测试查询挂单"""
print_section("5. 查询当前挂单")
orders = api.get_open_orders()
if orders is not None:
if orders:
print(f"✅ 查询到 {len(orders)} 个挂单")
for order in orders[:5]: # 只显示前5个
order_id = order.get('id', 'N/A')
symbol = order.get('symbol', 'N/A')
side = order.get('side', 'N/A')
amount = order.get('amount', 'N/A')
price = order.get('price', 'N/A')
print(f" {order_id}: {symbol} {side} {amount} @ {price}")
else:
print("✅ 当前无挂单")
return True
else:
print("❌ 查询挂单失败")
return False
def test_place_order(api: BitgetTradingAPI, symbol: str = "BTCUSDT",
side: str = "buy", usdt_value: float = 100,
order_type: str = "market", leverage: int = 20, price: float = None):
"""
测试下单 USDT 价值
Args:
api: Bitget API 实例
symbol: 交易对
side: 买卖方向
usdt_value: 下单的 USDT 价值
order_type: 订单类型
leverage: 杠杆倍数
price: 限价单价格可选
"""
print_section(f"6. 下单测试 ({symbol} {side} 价值${usdt_value}, {leverage}x杠杆)")
settings = get_settings()
# 先设置杠杆
print(f"设置杠杆: {leverage}x")
api.set_leverage(symbol, leverage)
# 获取当前价格
import ccxt
ccxt_symbol = api._standardize_symbol(symbol)
ticker = api.exchange.fetch_ticker(ccxt_symbol)
current_price = ticker['last'] if ticker else None
if not current_price:
print("❌ 无法获取当前价格")
return None
print(f"当前价格: ${current_price:.2f}")
# 选择订单类型
print("\n选择订单类型:")
print(" 1. 市价单 (立即成交)")
print(" 2. 限价单 (挂单,可测试撤单)")
order_type_choice = input("请选择 (1-2, 默认2): ").strip()
if order_type_choice == '1':
order_type = 'market'
limit_price = None
else:
order_type = 'limit'
# 限价单需要输入价格
price_input = input("挂单价格 (留空使用默认远离市价的价格): ").strip()
if price_input:
limit_price = float(price_input)
else:
# 默认设置一个远离当前价的价格,确保不会立即成交
if side == 'buy':
limit_price = current_price * 0.95 # 买单价格低于市价5%
print(f"默认买单价格: ${limit_price:.2f} (低于市价5%)")
else:
limit_price = current_price * 1.05 # 卖单价格高于市价5%
print(f"默认卖单价格: ${limit_price:.2f} (高于市价5%)")
# 计算合约数量
# Bitget 合约规格: BTC/USDT 每张 0.01 BTC, ETH/USDT 每张 0.1 ETH
if 'BTC' in symbol:
contract_size = 0.01 # 每张 0.01 BTC
min_size = 1 # Bitget 最小 1 张
coin_price = current_price
elif 'ETH' in symbol:
contract_size = 0.1 # 每张 0.1 ETH
min_size = 1 # Bitget 最小 1 张
coin_price = current_price
else:
contract_size = 1
min_size = 1
coin_price = current_price
# 计算需要的数量 (USDT价值 / (币价 * 合约规格))
size = usdt_value / (coin_price * contract_size)
# 确保满足最小下单量
size = max(size, min_size)
# 计算实际仓位价值和所需保证金
position_value = size * contract_size * coin_price
required_margin = position_value / leverage
print(f"合约规格: {contract_size} 币/张")
print(f"下单数量: {size:.3f}张 (最小{min_size}张)")
print(f"仓位价值: ${position_value:.2f}")
print(f"所需保证金: ${required_margin:.2f} ({leverage}x杠杆)")
print(f"模式: {'测试网' if settings.bitget_use_testnet else '生产网'}")
# 检查余额
balance = api.get_balance()
available = float(balance.get('USDT', {}).get('available', 0))
print(f"账户可用余额: ${available:.2f}")
if required_margin > available:
print(f"\n❌ 余额不足!")
print(f" 需要: ${required_margin:.2f}")
print(f" 可用: ${available:.2f}")
print(f" 缺口: ${required_margin - available:.2f}")
# 建议更高的杠杆
suggested_leverage = int(required_margin / available * leverage) + 1
if suggested_leverage <= 125: # Bitget 最大杠杆
print(f"\n建议: 使用 {suggested_leverage}x 杠杆或减少下单金额")
return None
order_type_text = "限价单" if order_type == "limit" else "市价单"
print(f"订单类型: {order_type_text}")
if price:
print(f"挂单价格: ${price:.2f}")
confirm = input(f"\n⚠️ 即将下单: {symbol} {side} {size:.3f}张 ({order_type_text}, 仓位价值${position_value:.2f}, 保证金${required_margin:.2f}),是否继续?(y/n): ")
if confirm.lower() != 'y':
print("已取消下单")
return None
order = api.place_order(
symbol=symbol,
side=side,
order_type=order_type,
size=size,
price=price
)
if order:
print(f"✅ 下单成功")
print(f" 订单ID: {order.get('id', 'N/A')}")
print(f" 状态: {order.get('status', 'N/A')}")
print(f" 价格: {order.get('price', 'N/A')}")
print(f" 数量: {order.get('amount', 'N/A')}")
fee = order.get('fee')
if fee and isinstance(fee, dict):
print(f" 手续费: {fee.get('cost', 'N/A')}")
else:
print(f" 手续费: N/A")
return order
else:
print("❌ 下单失败")
return None
def test_cancel_order(api: BitgetTradingAPI, symbol: str = "BTCUSDT", order_id: str = None):
"""测试撤单"""
print_section("7. 撤单测试")
if not order_id:
# 如果没有提供订单ID先查询挂单
orders = api.get_open_orders(symbol)
if not orders:
print("⚠️ 无可撤订单")
return None
order_id = orders[0].get('id')
print(f"订单ID: {order_id}")
if api.cancel_order(symbol, order_id=order_id):
print("✅ 撤单成功")
return True
else:
print("❌ 撤单失败")
return False
def test_close_position(api: BitgetTradingAPI, symbol: str = "BTCUSDT"):
"""测试平仓"""
print_section(f"8. 平仓测试 ({symbol})")
positions = api.get_position(symbol)
if not positions:
print("⚠️ 无持仓可平")
return None
print(f"持仓数量: {len(positions)}")
for pos in positions:
pos_side = pos.get('side', 'N/A')
contracts = pos.get('contracts', 0)
print(f" {pos_side}: {contracts}")
confirm = input("\n⚠️ 即将平仓,是否继续?(y/n): ")
if confirm.lower() != 'y':
print("已取消平仓")
return None
result = api.close_position(symbol)
if result:
print("✅ 平仓成功")
return True
else:
print("❌ 平仓失败")
return False
def test_modify_sl_tp(api: BitgetTradingAPI, symbol: str = "BTCUSDT"):
"""测试修改止损止盈"""
print_section(f"9. 修改止损止盈 ({symbol})")
positions = api.get_position(symbol)
if not positions:
print("⚠️ 无持仓,跳过止损止盈测试")
return None
pos = positions[0]
pos_side = pos.get('side')
mark_price = float(pos.get('markPrice', 0))
print(f"当前持仓: {pos_side}, 标记价: {mark_price}")
# 根据持仓方向计算测试用的止损止盈价格
if pos_side == 'long':
test_sl = mark_price * 0.98 # 止损设为标记价的 98%
test_tp = mark_price * 1.02 # 止盈设为标记价的 102%
else:
test_sl = mark_price * 1.02 # 做空止损设为标记价的 102%
test_tp = mark_price * 0.98 # 做空止盈设为标记价的 98%
print(f"测试参数: 止损={test_sl:.2f}, 止盈={test_tp:.2f}")
confirm = input("\n⚠️ 即将设置止损止盈,是否继续?(y/n): ")
if confirm.lower() != 'y':
print("已取消设置")
return None
if api.modify_sl_tp(symbol, stop_loss=test_sl, take_profit=test_tp):
print("✅ 设置止损止盈成功")
return True
else:
print("❌ 设置止损止盈失败")
return False
def test_cancel_all_orders(api: BitgetTradingAPI, symbol: str = "BTCUSDT"):
"""测试撤销所有挂单"""
print_section(f"10. 撤销所有挂单 ({symbol})")
# 先查询挂单
orders = api.get_open_orders(symbol)
if not orders:
print("⚠️ 无挂单可撤")
return None
print(f"当前挂单数: {len(orders)}")
confirm = input("\n⚠️ 即将撤销所有挂单,是否继续?(y/n): ")
if confirm.lower() != 'y':
print("已取消")
return None
if api.cancel_all_orders(symbol):
print("✅ 撤销所有挂单成功")
return True
else:
print("❌ 撤销所有挂单失败")
return False
def main():
"""主测试流程"""
print("\n" + "=" * 60)
print(" Bitget 实盘交易 API 测试")
print("=" * 60)
# 加载配置
settings = get_settings()
# 检查配置
if not settings.bitget_api_key or not settings.bitget_api_secret:
print("❌ 未配置 Bitget API 密钥")
print("请在 .env 文件中设置:")
print(" bitget_api_key=your_api_key")
print(" bitget_api_secret=your_api_secret")
print(" bitget_passphrase=your_passphrase (可选)")
print(" bitget_use_testnet=true/false (测试网/生产网)")
return
# 创建 API 实例
api = BitgetTradingAPI(
api_key=settings.bitget_api_key,
api_secret=settings.bitget_api_secret,
passphrase=settings.bitget_passphrase or "",
use_testnet=settings.bitget_use_testnet
)
mode = '测试网' if settings.bitget_use_testnet else '生产网'
print(f"\n模式: {mode}")
print(f"API Key: {settings.bitget_api_key[:8]}...{settings.bitget_api_key[-4:]}")
try:
# 基础测试(不涉及交易)
if not test_connection(api):
return
test_balance(api)
test_position(api)
test_leverage(api)
test_open_orders(api)
# 交易相关测试(需要确认)
print("\n" + "=" * 60)
print(" 交易功能测试(需要手动确认)")
print("=" * 60)
# 检查用户是否要测试交易功能
test_trading = input("\n是否测试交易功能(下单、平仓等)?(y/n): ")
if test_trading.lower() == 'y':
# 选择交易对
print("\n选择交易对:")
print(" 1. BTCUSDT")
print(" 2. ETHUSDT")
print(" 3. 自定义")
choice = input("请选择 (1-3): ").strip()
if choice == '1':
symbol = 'BTCUSDT'
elif choice == '2':
symbol = 'ETHUSDT'
else:
symbol = input("请输入交易对 (如 BTCUSDT): ").strip().upper()
# 选择买卖方向
direction = input("方向 (buy/sell, 默认buy): ").strip().lower()
side = direction if direction in ['buy', 'sell'] else 'buy'
# 输入 USDT 价值
usdt_input = input("下单价值 (USDT, 默认100): ").strip()
usdt_value = float(usdt_input) if usdt_input else 100
# 输入杠杆倍数
leverage_input = input("杠杆倍数 (默认20, 最大125): ").strip()
leverage = int(leverage_input) if leverage_input else 20
leverage = min(max(leverage, 1), 125) # 限制在 1-125 之间
# 下单测试(函数内部会询问订单类型)
order = test_place_order(api, symbol=symbol, side=side, usdt_value=usdt_value, leverage=leverage)
if order:
import time
print("\n等待 3 秒后撤单...")
time.sleep(3)
# 撤单测试
test_cancel_order(api, symbol, order_id=order.get('id'))
# 平仓测试
test_close_position(api, symbol)
# 止损止盈测试
test_modify_sl_tp(api, symbol)
# 撤销所有挂单
test_cancel_all_orders(api, symbol)
else:
print("跳过交易功能测试")
finally:
# 关闭连接
api.close()
print("\n" + "=" * 60)
print(" 测试完成")
print("=" * 60)
if __name__ == "__main__":
main()

View File

@ -1,166 +0,0 @@
#!/usr/bin/env python3
"""
测试加密货币智能体 - 单次分析
"""
import asyncio
import sys
import os
# 添加项目路径
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
async def test_binance_service():
"""测试 Binance 数据服务"""
print("\n" + "=" * 50)
print("测试 Binance 数据服务")
print("=" * 50)
from app.services.binance_service import binance_service
# 测试获取 K 线数据
print("\n1. 获取 BTCUSDT 1小时 K线数据...")
df = binance_service.get_klines('BTCUSDT', '1h', limit=10)
if not df.empty:
print(f" ✓ 获取成功,共 {len(df)} 条数据")
print(f" 最新收盘价: ${df.iloc[-1]['close']:,.2f}")
else:
print(" ✗ 获取失败")
return False
# 测试多周期数据
print("\n2. 获取 BTCUSDT 多周期数据...")
data = binance_service.get_multi_timeframe_data('BTCUSDT')
for interval, df in data.items():
if not df.empty:
print(f"{interval}: {len(df)} 条数据")
else:
print(f"{interval}: 无数据")
# 测试技术指标
print("\n3. 检查技术指标...")
df = data['1h']
indicators = ['ma5', 'ma20', 'rsi', 'macd', 'bb_upper', 'k', 'd', 'atr']
for ind in indicators:
if ind in df.columns:
value = df.iloc[-1][ind]
print(f"{ind}: {value:.4f}" if value else f" - {ind}: N/A")
return True
async def test_signal_analyzer():
"""测试信号分析器 - 使用新架构"""
print("\n" + "=" * 50)
print("测试市场信号分析器(新架构)")
print("=" * 50)
from app.services.binance_service import binance_service
from app.crypto_agent.market_signal_analyzer import MarketSignalAnalyzer
analyzer = MarketSignalAnalyzer()
# 获取数据
data = binance_service.get_multi_timeframe_data('BTCUSDT')
# 测试 LLM 分析
print("\n1. LLM 市场分析...")
signal = await analyzer.analyze('BTCUSDT', data, symbols=['BTCUSDT', 'ETHUSDT'])
print(f" 市场状态: {signal.get('market_state')}")
print(f" 趋势: {signal.get('trend')}")
print(f" 信号数量: {len(signal.get('signals', []))}")
return True
async def test_feishu_service():
"""测试飞书通知服务"""
print("\n" + "=" * 50)
print("测试飞书通知服务")
print("=" * 50)
from app.services.feishu_service import get_feishu_service
feishu = get_feishu_service()
if not feishu.enabled:
print(" ⚠ 飞书服务未配置,跳过测试")
return True
# 发送测试消息
print("\n1. 发送测试文本消息...")
result = await feishu.send_text("🧪 这是一条测试消息,来自加密货币智能体")
print(f" {'✓ 发送成功' if result else '✗ 发送失败'}")
return result
async def test_full_analysis():
"""测试完整分析流程"""
print("\n" + "=" * 50)
print("测试完整分析流程")
print("=" * 50)
from app.crypto_agent.crypto_agent import CryptoAgent
agent = CryptoAgent()
for symbol in ['BTCUSDT', 'ETHUSDT']:
print(f"\n分析 {symbol}...")
result = await agent.analyze_once(symbol)
if 'error' in result:
print(f" ✗ 错误: {result['error']}")
else:
print(f" 价格: ${result['price']:,.2f}")
print(f" 趋势: {result['trend']}")
print(f" 动作: {result['action']}")
print(f" 置信度: {result['confidence']}%")
if result.get('stop_loss'):
print(f" 止损: ${result['stop_loss']:,.2f}")
if result.get('take_profit'):
print(f" 止盈: ${result['take_profit']:,.2f}")
return True
async def main():
"""主测试函数"""
print("\n" + "=" * 60)
print("加密货币智能体测试")
print("=" * 60)
tests = [
("Binance 数据服务", test_binance_service),
("信号分析器", test_signal_analyzer),
("飞书通知服务", test_feishu_service),
("完整分析流程", test_full_analysis),
]
results = []
for name, test_func in tests:
try:
result = await test_func()
results.append((name, result))
except Exception as e:
print(f"\n{name} 测试出错: {e}")
results.append((name, False))
# 打印总结
print("\n" + "=" * 60)
print("测试总结")
print("=" * 60)
for name, result in results:
status = "✓ 通过" if result else "✗ 失败"
print(f" {name}: {status}")
all_passed = all(r for _, r in results)
print("\n" + ("所有测试通过!" if all_passed else "部分测试失败"))
return all_passed
if __name__ == "__main__":
success = asyncio.run(main())
sys.exit(0 if success else 1)

View File

@ -1,102 +0,0 @@
"""
测试钉钉通知服务
"""
import asyncio
import sys
import os
# 添加项目路径
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
from app.services.dingtalk_service import get_dingtalk_service
async def test_dingtalk():
"""测试钉钉消息发送"""
dingtalk = get_dingtalk_service()
print("=" * 50)
print("测试钉钉通知服务")
print("=" * 50)
print(f"服务状态: {'启用' if dingtalk.enabled else '禁用'}")
print(f"Webhook URL: {dingtalk.webhook_url[:50]}...")
print(f"Secret: {dingtalk.secret[:20]}...")
print()
# 测试 1: 发送文本消息
print("测试 1: 发送文本消息...")
success = await dingtalk.send_text("📊 钉钉通知服务测试\n\n这是一条测试消息,来自 Stock Agent 系统。")
print(f"结果: {'✅ 成功' if success else '❌ 失败'}")
print()
# 等待一下
await asyncio.sleep(2)
# 测试 2: 发送 Markdown 消息
print("测试 2: 发送 Markdown 消息...")
markdown_content = """### 📈 交易信号测试
> **交易对**: BTCUSDT
> **方向**: 🟢 做多
> **价格**: $95,000.00
> **信心度**: 85%
**分析理由**:
- 突破关键阻力位
- 量价配合良好
- 多周期共振向上
*来自 Stock Agent 系统*
"""
success = await dingtalk.send_markdown("交易信号", markdown_content)
print(f"结果: {'✅ 成功' if success else '❌ 失败'}")
print()
await asyncio.sleep(2)
# 测试 3: 发送 ActionCard 消息
print("测试 3: 发送 ActionCard 消息...")
card_content = """### 📊 模拟交易报告
#### 统计概览
- 总交易次数: 50
- 胜率: 65%
- 总盈亏: +15.2%
#### 最近交易
| 交易对 | 方向 | 盈亏 |
|--------|------|------|
| BTCUSDT | 做多 | +5.2% |
| ETHUSDT | 做空 | +2.1% |
"""
success = await dingtalk.send_action_card(
title="📊 4小时交易报告",
content=card_content,
btn_orientation="0",
btn_title="查看详情",
btn_url="https://example.com"
)
print(f"结果: {'✅ 成功' if success else '❌ 失败'}")
print()
# 测试 4: 发送交易信号
print("测试 4: 发送交易信号...")
signal = {
'action': 'buy',
'symbol': 'BTCUSDT',
'price': 95000,
'trend': 'uptrend',
'confidence': 85,
'agent_type': 'crypto'
}
success = await dingtalk.send_trading_signal(signal)
print(f"结果: {'✅ 成功' if success else '❌ 失败'}")
print()
print("=" * 50)
print("测试完成!")
print("=" * 50)
if __name__ == "__main__":
asyncio.run(test_dingtalk())

View File

@ -1,53 +0,0 @@
"""
测试全局异常处理器
"""
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import asyncio
from app.utils.error_handler import setup_global_exception_handler, init_error_notifier
from app.services.feishu_service import get_feishu_service
async def test_exception_handler():
"""测试异常处理器"""
print("=" * 60)
print("测试全局异常处理器")
print("=" * 60)
print()
# 初始化
setup_global_exception_handler()
print("✅ 全局异常处理器已安装")
print()
# 初始化飞书通知
try:
feishu = get_feishu_service()
init_error_notifier(feishu_service=feishu, enabled=True, cooldown=10)
print("✅ 飞书错误通知已启用冷却时间10秒")
except Exception as e:
print(f"❌ 飞书错误通知初始化失败: {e}")
return
print()
# 模拟一个未捕获的异常
print("-" * 60)
print("将触发一个测试异常...")
print("-" * 60)
print()
# 这个异常会被全局异常处理器捕获
# 注意:这会触发飞书通知
raise ValueError("这是一个测试异常,用于验证全局异常处理器是否正常工作")
if __name__ == "__main__":
try:
asyncio.run(test_exception_handler())
except SystemExit:
# 异常被处理后,程序可能会退出
pass
print()
print("测试结束")

View File

@ -1,100 +0,0 @@
"""
测试A股异动监控 - 使用涨停板数据作为替代方案
eastmoney API 不可用时使用涨停板数据分析
"""
import asyncio
import sys
import os
# 添加项目根目录到 Python 路径
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
import akshare as ak
import pandas as pd
from app.utils.logger import logger
def test_limit_up_approach():
"""使用涨停板数据发现异动板块"""
print("=" * 60)
print("涨停板数据分析测试")
print("=" * 60)
print()
try:
# 获取涨停板数据
print("获取涨停板数据...")
df = ak.stock_zt_pool_em(date='20260227')
if df.empty:
print("没有涨停数据")
return
print(f"获取到 {len(df)} 只涨停股")
print()
# 按行业分组统计
print("-" * 60)
print("按行业统计涨停股数:")
print("-" * 60)
# 获取行业列
industry_col = '所属行业'
if industry_col not in df.columns:
# 尝试其他可能的列名
for col in df.columns:
if '行业' in col or '板块' in col:
industry_col = col
break
if industry_col in df.columns:
# 按行业分组
industry_stats = df.groupby(industry_col).agg({
'代码': 'count',
'涨跌幅': 'mean',
'最新价': 'mean'
}).rename(columns={'代码': '涨停数', '涨跌幅': '平均涨幅', '最新价': '平均价格'})
# 排序
industry_stats = industry_stats.sort_values('涨停数', ascending=False)
# 显示Top 10
for idx, (industry, row) in enumerate(industry_stats.head(10).iterrows(), 1):
print(f"{idx}. {industry}: {int(row['涨停数'])}只涨停, 平均涨幅 {row['平均涨幅']:.2f}%")
print()
print("-" * 60)
print("涨停股详情 (Top 5):")
print("-" * 60)
# 显示涨幅最大的5只
top_5 = df.nlargest(5, '涨跌幅')
for idx, row in top_5.iterrows():
print(f"{int(row['序号'])}. {row['名称']} ({row['代码']})")
print(f" 涨幅: {row['涨跌幅']:.2f}% | 价格: {row['最新价']:.2f}")
print(f" 行业: {row.get(industry_col, 'N/A')}")
print(f" 封板时间: {row.get('最后封板时间', 'N/A')}")
print()
return True
except Exception as e:
print(f"测试失败: {e}")
import traceback
traceback.print_exc()
return False
if __name__ == "__main__":
success = test_limit_up_approach()
if success:
print("=" * 60)
print("测试成功涨停板API可用")
print("=" * 60)
print()
print("建议: 可以使用涨停板数据作为板块异动监控的替代方案")
print(" - 按行业统计涨停股数")
print(" - 发现涨停集中的板块即为异动板块")
print(" - 涨停股本身就是最好的龙头股候选")
else:
print("测试失败")

View File

@ -1,142 +0,0 @@
"""
测试市场信号分析 - 验证趋势判断功能
"""
import asyncio
import sys
import os
import json
# 添加项目路径
sys.path.insert(0, os.path.dirname(__file__))
from app.crypto_agent.market_signal_analyzer import MarketSignalAnalyzer
from app.utils.logger import logger
from app.services.bitget_service import BitgetService
async def test_trend_analysis(symbol: str = "BTCUSDT"):
"""测试趋势分析"""
logger.info("=" * 60)
logger.info(f"开始测试市场信号分析: {symbol}")
logger.info("=" * 60)
# 1. 获取 K 线数据
logger.info(f"\n[1/3] 获取 {symbol} 的 K 线数据...")
bitget_service = BitgetService()
data = bitget_service.get_multi_timeframe_data(symbol)
if not data:
logger.error(f"❌ 无法获取 {symbol} 的 K 线数据")
return
logger.info(f"✅ K 线数据获取成功")
for tf, df in data.items():
if df is not None and len(df) > 0:
latest_price = float(df.iloc[-1]['close'])
logger.info(f" {tf}: 最新价格 ${latest_price:,.2f}, 数据量 {len(df)}")
# 2. 分析市场信号
logger.info(f"\n[2/3] 分析市场信号...")
analyzer = MarketSignalAnalyzer()
result = await analyzer.analyze(symbol, data)
# 3. 打印分析结果
logger.info(f"\n[3/3] 分析结果:")
logger.info("-" * 60)
# 打印趋势判断(新功能)
logger.info("\n📊 趋势判断(新增):")
logger.info(f" 趋势方向: {result.get('trend_direction', 'unknown')}")
logger.info(f" 趋势强度: {result.get('trend_strength', 'unknown')}")
# 打印市场状态
logger.info(f"\n📈 市场状态:")
logger.info(f" {result.get('market_state', 'unknown')}")
logger.info(f" 分析摘要: {result.get('analysis_summary', 'unknown')}")
logger.info(f" 量价分析: {result.get('volume_analysis', 'unknown')}")
# 打印关键价位
if result.get('key_levels'):
logger.info(f"\n💰 关键价位:")
levels = result['key_levels']
if levels.get('support'):
logger.info(f" 支撑位: {[f'${s:,.0f}' for s in levels['support'] if s]}")
if levels.get('resistance'):
logger.info(f" 阻力位: {[f'${r:,.0f}' for r in levels['resistance'] if r]}")
# 打印信号
logger.info(f"\n🎯 交易信号:")
signals = result.get('signals', [])
if not signals:
logger.warning(" ⚠️ 无交易信号(观望)")
else:
for i, sig in enumerate(signals, 1):
action_emoji = "🟢" if sig.get('action') == 'buy' else "🔴" if sig.get('action') == 'sell' else ""
grade_icon = {'A': '⭐⭐⭐', 'B': '⭐⭐', 'C': '', 'D': ''}.get(sig.get('grade', ''), '')
entry_type_emoji = "⚡现价" if sig.get('entry_type') == 'market' else "⏳挂单"
logger.info(f"\n 信号 #{i} {action_emoji} {grade_icon}")
logger.info(f" ├─ 类型: {sig.get('timeframe', 'unknown')} | {entry_type_emoji}")
logger.info(f" ├─ 操作: {sig.get('action', 'unknown').upper()}")
logger.info(f" ├─ 信心度: {sig.get('confidence', 0)}% | 等级: {sig.get('grade', 'unknown')}")
logger.info(f" ├─ 入场价: ${sig.get('entry_price', 0):,.2f}")
logger.info(f" ├─ 止损: ${sig.get('stop_loss', 0):,.2f} | 止盈: ${sig.get('take_profit', 0):,.2f}")
logger.info(f" └─ 理由: {sig.get('reasoning', 'unknown')}")
# 检查是否顺势
trend_dir = result.get('trend_direction', 'neutral')
action = sig.get('action', '')
if trend_dir == 'uptrend' and action == 'sell':
logger.warning(f" ⚠️ 警告: 上升趋势中做空(逆势)")
elif trend_dir == 'downtrend' and action == 'buy':
logger.warning(f" ⚠️ 警告: 下降趋势中做多(逆势)")
elif trend_dir == 'uptrend' and action == 'buy':
logger.info(f" ✅ 顺势交易")
elif trend_dir == 'downtrend' and action == 'sell':
logger.info(f" ✅ 顺势交易")
logger.info("\n" + "=" * 60)
logger.info("测试完成")
logger.info("=" * 60)
return result
async def test_multiple_symbols():
"""测试多个交易对"""
symbols = ['BTCUSDT', 'ETHUSDT', 'SOLUSDT']
for symbol in symbols:
logger.info(f"\n\n{'#' * 60}")
logger.info(f"# 测试 {symbol}")
logger.info(f"{'#' * 60}")
try:
await test_trend_analysis(symbol)
except Exception as e:
logger.error(f"{symbol} 测试失败: {e}")
import traceback
traceback.print_exc()
# 等待一段时间,避免 API 限流
await asyncio.sleep(2)
def main():
import argparse
parser = argparse.ArgumentParser(description='测试市场信号分析')
parser.add_argument('--symbol', default='BTCUSDT', help='交易对(默认: BTCUSDT')
parser.add_argument('--multi', action='store_true', help='测试多个交易对')
args = parser.parse_args()
if args.multi:
asyncio.run(test_multiple_symbols())
else:
asyncio.run(test_trend_analysis(args.symbol))
if __name__ == '__main__':
main()

View File

@ -1,129 +0,0 @@
"""
测试A股板块异动监控
运行一次检查并打印结果不发送钉钉通知
"""
import asyncio
import sys
import os
# 添加项目根目录到 Python 路径
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from app.astock_agent import SectorMonitor
from app.utils.logger import logger
async def test_sector_monitor():
"""测试板块异动监控"""
print("=" * 60)
print("A股板块异动监控测试")
print("=" * 60)
print()
# 配置参数
change_threshold = 2.0 # 涨跌幅阈值 2%
top_n = 3 # 每个板块返回前3只龙头股
print(f"配置参数:")
print(f" - 涨跌幅阈值: {change_threshold}%")
print(f" - 龙头股数量: Top{top_n}")
print(f" - 钉钉通知: 已禁用(测试模式)")
print()
# 创建监控器(不启用通知)
monitor = SectorMonitor(
change_threshold=change_threshold,
top_n=top_n,
enable_notifier=False # 测试时不发送通知
)
print("开始检查板块异动...")
print("-" * 60)
print()
# 执行一次检查
result = await monitor.check_once()
# 打印结果
print()
print("-" * 60)
print("检查结果:")
print(f" - 异动板块数: {result['hot_sectors']}")
print(f" - 龙头股总数: {result['stocks']}")
print()
# 显示详细信息
if 'results' in result and result['results']:
print("=" * 60)
print("异动板块详情:")
print("=" * 60)
print()
for idx, item in enumerate(result['results'], 1):
sector = item['sector']
stocks = item['stocks']
reason = item['reason']
# 板块信息
change_icon = "📈" if sector['change_pct'] > 0 else "📉"
print(f"{idx}. 【{sector['name']}{change_icon} {sector['change_pct']:+.2f}%")
print(f" - 涨跌额: {sector['change_amount']:+.2f}")
print(f" - 上涨家数: {sector['ups']}")
print(f" - 下跌家数: {sector['downs']}")
print(f" - 异动原因: {reason}")
print()
# 龙头股信息
print(f" 🏆 龙头股 Top {len(stocks)}:")
for s_idx, stock in enumerate(stocks, 1):
change_pct = stock['change_pct']
if change_pct >= 5:
speed_icon = "⚡⚡⚡"
elif change_pct >= 3:
speed_icon = "⚡⚡"
elif change_pct >= 1:
speed_icon = ""
else:
speed_icon = "🐌"
# 成交额格式化
amount = stock['amount']
if amount >= 100000:
amount_str = f"{amount/100000:.1f}亿"
elif amount >= 10000:
amount_str = f"{amount/10000:.1f}"
else:
amount_str = f"{amount:.0f}"
print(f" {s_idx}. {stock['name']} ({stock['code']})")
print(f" 价格: ¥{stock['price']:.2f} | 涨跌幅: {change_pct:+.2f}% {speed_icon}")
print(f" 成交额: {amount_str} | 换手率: {stock['turnover']:.2f}%")
print(f" 评分: {stock['score']:.1f} | 涨速: {stock['speed_level']}")
print()
# 显示统计信息
print("=" * 60)
print("统计信息:")
print("=" * 60)
stats = monitor.get_stats()
print(f" - 总检查次数: {stats['total_checks']}")
print(f" - 总异动板块: {stats['total_hot_sectors']}")
print(f" - 总龙头股数: {stats['total_stocks']}")
if stats['total_checks'] > 0:
print(f" - 平均龙头股: {stats['avg_stocks_per_check']:.1f}")
print(f" - 最后检查时间: {stats['last_check_time']}")
print()
print("=" * 60)
print("测试完成!")
print("=" * 60)
if __name__ == "__main__":
try:
asyncio.run(test_sector_monitor())
except KeyboardInterrupt:
print("\n\n测试被用户中断")
except Exception as e:
print(f"\n\n测试失败: {e}")
import traceback
traceback.print_exc()

View File

@ -1,113 +0,0 @@
"""
测试 Tushare 接口返回数据
检查各个接口返回的实际字段
"""
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
# 直接使用 token
token = '0ed6419a00d8923dc19c0b58fc92d94c9a0696949ab91a13aa58a0cc'
def test_tushare_api():
"""测试 Tushare API 返回的数据"""
print("=" * 60)
print("Tushare API 数据测试")
print("=" * 60)
print()
import tushare as ts
ts.set_token(token)
pro = ts.pro_api()
from datetime import datetime, timedelta
yesterday = (datetime.now() - timedelta(days=5)).strftime('%Y%m%d')
today = datetime.now().strftime('%Y%m%d')
# 1. 测试概念板块列表
print("1. 测试 ths_index (概念板块列表)")
print("-" * 40)
sectors_df = pro.ths_index(type='N')
print(f"返回列: {sectors_df.columns.tolist()}")
print(f"前3行数据:")
print(sectors_df.head(3))
print()
# 获取第一个板块的代码
if not sectors_df.empty:
first_sector_code = sectors_df.iloc[0]['ts_code']
first_sector_name = sectors_df.iloc[0]['name']
print(f"使用第一个板块测试: {first_sector_name} ({first_sector_code})")
print()
# 2. 测试板块日线数据
print("2. 测试 ths_daily (板块日线数据)")
print("-" * 40)
daily_df = pro.ths_daily(
ts_code=first_sector_code,
start_date=yesterday,
end_date=today
)
print(f"返回列: {daily_df.columns.tolist()}")
print(f"最新一天数据:")
if not daily_df.empty:
latest = daily_df.sort_values('trade_date').iloc[-1]
print(latest)
print()
# 3. 测试成分股数据
print("3. 测试 ths_member (成分股数据)")
print("-" * 40)
members_df = pro.ths_member(ts_code=first_sector_code)
print(f"返回列: {members_df.columns.tolist()}")
print(f"前5个成分股:")
print(members_df.head(5))
print()
# 4. 测试个股日线数据
if not members_df.empty:
first_stock_code = members_df.iloc[0]['con_code']
print(f"使用第一个成分股测试: {first_stock_code}")
print()
print("4. 测试 daily (个股日线数据)")
print("-" * 40)
stock_daily_df = pro.daily(
ts_code=first_stock_code,
start_date=yesterday,
end_date=today
)
print(f"返回列: {stock_daily_df.columns.tolist()}")
if not stock_daily_df.empty:
print(f"最新一天数据:")
latest_stock = stock_daily_df.sort_values('trade_date').iloc[-1]
print(latest_stock)
print()
# 5. 测试个股每日指标
print("5. 测试 daily_basic (个股每日指标)")
print("-" * 40)
basic_df = pro.daily_basic(
ts_code=first_stock_code,
trade_date=today,
fields='ts_code,trade_date,turnover_rate,volume_ratio,pe,pb'
)
print(f"返回列: {basic_df.columns.tolist()}")
if not basic_df.empty:
print(f"数据:")
print(basic_df)
print()
print("=" * 60)
print("测试完成")
print("=" * 60)
if __name__ == "__main__":
try:
test_tushare_api()
except Exception as e:
print(f"\n测试失败: {e}")
import traceback
traceback.print_exc()

View File

@ -1,87 +0,0 @@
"""
测试A股板块异动监控 - Tushare 版本详细调试
"""
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from app.config import get_settings
def test_tushare_detailed():
"""详细测试 Tushare 板块数据"""
print("=" * 60)
print("Tushare 板块数据详细测试")
print("=" * 60)
print()
settings = get_settings()
token = settings.tushare_token
from app.astock_agent.tushare_client import TushareClient
ts_client = TushareClient(token=token)
# 1. 测试获取概念板块列表
print("1. 获取概念板块列表...")
sectors_df = ts_client.get_concept_sectors()
print(f" ✅ 获取到 {len(sectors_df)} 个概念板块")
print(f" 示例板块:")
for idx, row in sectors_df.head(5).iterrows():
print(f" - {row['name']}: {row['ts_code']}")
print()
# 2. 测试获取单个板块行情
print("2. 获取单个板块行情(以人工智能为例)...")
ai_sectors = sectors_df[sectors_df['name'].str.contains('人工智能', na=False)]
if not ai_sectors.empty:
ai_code = ai_sectors.iloc[0]['ts_code']
print(f" 找到人工智能板块: {ai_code}")
daily_df = ts_client.get_sector_daily(ai_code)
if not daily_df.empty:
print(f" ✅ 获取到 {len(daily_df)} 天行情数据")
latest = daily_df.sort_values('trade_date').iloc[-1]
print(f" 最新行情 ({latest['trade_date']}):")
print(f" - 收盘价: {latest['close']:.2f}")
print(f" - 涨跌幅: {latest.get('pct_chg', 'N/A')}%")
else:
print(f" ❌ 行情数据为空")
else:
print(f" ⚠️ 未找到人工智能板块")
print()
# 3. 测试获取板块成分股
print("3. 获取板块成分股...")
if not ai_sectors.empty:
members_df = ts_client.get_sector_members(ai_code)
if not members_df.empty:
print(f" ✅ 获取到 {len(members_df)} 个成分股")
print(f" 前5个成分股:")
for idx, row in members_df.head(5).iterrows():
print(f" - {row.get('name', row.get('member_name', 'N/A'))}: {row['ts_code']}")
else:
print(f" ❌ 成分股数据为空")
print()
# 4. 测试获取异动板块(降低阈值)
print("4. 检测异动板块(阈值 0%...")
hot_df = ts_client.get_hot_sectors(threshold=0.0)
if hot_df.empty:
print(" ⚠️ 未检测到上涨的板块")
else:
print(f" ✅ 检测到 {len(hot_df)} 个上涨板块")
print(f" 涨幅最高的 5 个板块:")
for idx, row in hot_df.head(5).iterrows():
print(f" {idx}. {row['name']}: {row['change_pct']:+.2f}%")
print()
print("=" * 60)
print("测试完成!")
print("=" * 60)
if __name__ == "__main__":
test_tushare_detailed()

View File

@ -1,166 +0,0 @@
"""
测试A股板块异动监控 - Tushare 版本
运行一次检查并打印结果并发送钉钉通知
"""
import asyncio
import sys
import os
# 添加项目根目录到 Python 路径
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from app.config import get_settings
from app.utils.logger import logger
def test_tushare_sector_monitor():
"""测试板块异动监控Tushare 版本)"""
print("=" * 60)
print("A股板块异动监控测试 - Tushare 版本")
print("=" * 60)
print()
# 获取配置
settings = get_settings()
token = settings.tushare_token
if not token:
print("❌ Tushare token 未配置!")
print("请在 .env 文件中设置 TUSHARE_TOKEN")
return
print(f"✅ Tushare token 已配置: {token[:10]}...")
print()
# 导入 Tushare 客户端和通知器
from app.astock_agent.tushare_client import TushareClient
from app.astock_agent.tushare_sector_analyzer import TushareSectorAnalyzer
from app.astock_agent.tushare_stock_selector import TushareStockSelector
from app.astock_agent.notifier import DingTalkNotifier
# 检查钉钉配置
webhook = settings.dingtalk_astock_webhook or settings.dingtalk_webhook_url
secret = settings.dingtalk_astock_secret or settings.dingtalk_secret
notifier = None
if webhook:
notifier = DingTalkNotifier(webhook, secret)
print(f"✅ 钉钉通知已配置")
else:
print("⚠️ 钉钉通知未配置,仅打印结果")
print()
# 配置参数
change_threshold = 2.0 # 涨跌幅阈值 2%
top_n = 3 # 每个板块返回前3只龙头股
print(f"配置参数:")
print(f" - 涨跌幅阈值: {change_threshold}%")
print(f" - 龙头股数量: Top{top_n}")
print()
# 创建 Tushare 客户端
print("初始化 Tushare 客户端...")
ts_client = TushareClient(token=token)
print("✅ Tushare 客户端初始化成功")
print()
# 创建分析器
print("-" * 60)
print("开始检查板块异动...")
print("-" * 60)
print()
analyzer = TushareSectorAnalyzer(ts_client, change_threshold=change_threshold)
# 执行检查
hot_sectors = analyzer.detect_sector_changes()
if not hot_sectors:
print("未检测到异动板块")
return
print(f"检测到 {len(hot_sectors)} 个异动板块:")
print()
# 显示异动板块详情
notified_count = 0
for idx, sector in enumerate(hot_sectors, 1):
print(f"{idx}. 【{sector['name']}")
print(f" 代码: {sector['ts_code']}")
print(f" 涨跌幅: {sector['change_pct']:+.2f}%")
print(f" 收盘价: {sector['close']:.2f}")
print(f" 成交额: {sector['amount_str']}")
print(f" 交易日期: {sector['trade_date']}")
print()
# 获取龙头股
print(f" 正在获取龙头股...")
selector = TushareStockSelector(ts_client, top_n=top_n)
top_stocks = selector.select_leading_stocks(sector['ts_code'], sector['name'])
if top_stocks:
print(f" 🏆 龙头股 Top {len(top_stocks)}:")
for stock in top_stocks:
# 成交额格式化
amount = stock['amount']
if amount >= 100000000:
amount_str = f"{amount/100000000:.2f}亿"
elif amount >= 10000:
amount_str = f"{amount/10000:.2f}"
else:
amount_str = f"{amount:.0f}"
print(f" {stock['name']} ({stock['code']})")
print(f" 价格: {stock['price']:.2f} | 涨跌幅: {stock['change_pct']:+.2f}% {stock['speed_level']}")
print(f" 成交额: {amount_str} | 评分: {stock['score']:.1f}")
else:
print(f" ⚠️ 未找到龙头股")
print()
# 发送钉钉通知
if notifier and top_stocks:
# 使用实际的板块数据
sector_for_notify = {
'name': sector['name'],
'change_pct': sector['change_pct'],
'change_amount': sector.get('change', 0), # 涨跌额
'amount': sector['amount'],
'leading_stock': top_stocks[0]['name'] if top_stocks else '',
'ts_code': sector['ts_code'],
}
# 分析异动原因
reason = analyzer.get_hot_reason(sector['name'], top_stocks)
# 发送通知
print(f" 📲 发送钉钉通知...")
success = notifier.send_sector_alert(
sector_data=sector_for_notify,
top_stocks=top_stocks,
reason=reason
)
if success:
print(f" ✅ 通知发送成功")
notified_count += 1
else:
print(f" ❌ 通知发送失败")
print()
print("=" * 60)
print("测试完成!")
print(f"检测到 {len(hot_sectors)} 个异动板块")
if notifier:
print(f"发送通知 {notified_count}/{len(hot_sectors)}")
print("=" * 60)
if __name__ == "__main__":
try:
test_tushare_sector_monitor()
except KeyboardInterrupt:
print("\n\n测试被用户中断")
except Exception as e:
print(f"\n\n测试失败: {e}")
import traceback
traceback.print_exc()

View File

@ -1,205 +0,0 @@
"""
测试 Bitget WebSocket 价格监控
"""
import asyncio
import sys
import os
# 添加项目路径
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
from app.services.bitget_websocket import BitgetWebSocketClient
from app.utils.logger import logger
async def test_websocket():
"""测试 WebSocket 连接和价格订阅"""
logger.info("=" * 60)
logger.info("Bitget WebSocket 测试开始")
logger.info("=" * 60)
# 创建 WebSocket 客户端
client = BitgetWebSocketClient()
# 注册价格回调
def on_price_update(symbol: str, price: float, data: dict):
"""价格更新回调"""
import datetime
timestamp = datetime.datetime.now().strftime("%H:%M:%S.%f")[:-3]
print(f"[{timestamp}] {symbol}: ${price:,.2f}")
# 显示更多详细信息
if 'open24h' in data:
change_24h = ((price - float(data['open24h'])) / float(data['open24h'])) * 100
print(f" └─ 24h涨跌: {change_24h:+.2f}%")
client.on_price_update('*', on_price_update)
# 连接
logger.info("正在连接 Bitget WebSocket...")
if not await client.connect():
logger.error("❌ 连接失败")
return False
logger.info("✅ 连接成功")
# 订阅交易对
symbols = ['BTCUSDT', 'ETHUSDT', 'SOLUSDT']
logger.info(f"正在订阅: {', '.join(symbols)}")
if not await client.subscribe(symbols):
logger.error("❌ 订阅失败")
return False
logger.info(f"✅ 订阅成功: {symbols}")
# 显示当前订阅状态
logger.info(f"已订阅交易对: {client.subscribed_symbols}")
logger.info(f"连接状态: {client.is_connected}")
# 接收价格更新30秒
logger.info("\n开始接收价格更新30秒...")
logger.info("-" * 60)
try:
# 等待30秒接收数据
await asyncio.sleep(30)
except KeyboardInterrupt:
logger.info("\n用户中断")
# 显示接收到的价格
logger.info("-" * 60)
logger.info("当前价格缓存:")
prices = client.get_all_prices()
for symbol, price in prices.items():
print(f" {symbol}: ${price:,.2f}")
# 断开连接
logger.info("\n正在断开连接...")
await client.disconnect()
logger.info("✅ 已断开连接")
logger.info("=" * 60)
logger.info("测试完成")
logger.info("=" * 60)
return True
async def test_reconnect():
"""测试自动重连功能"""
logger.info("=" * 60)
logger.info("测试自动重连功能")
logger.info("=" * 60)
client = BitgetWebSocketClient()
price_updates = []
def on_price_update(symbol: str, price: float, data: dict):
price_updates.append((symbol, price))
logger.info(f"[重连测试] {symbol}: ${price:,.2f}")
client.on_price_update('*', on_price_update)
# 第一次连接
logger.info("第一次连接...")
await client.connect()
await client.subscribe(['BTCUSDT'])
# 等待5秒接收数据
await asyncio.sleep(5)
logger.info(f"接收到的价格更新: {len(price_updates)}")
# 模拟断线(手动断开)
logger.info("\n模拟断线...")
await client.disconnect()
# 等待2秒
await asyncio.sleep(2)
# 重新连接
logger.info("重新连接...")
if await client.connect():
logger.info("✅ 重连成功")
await client.subscribe(['BTCUSDT'])
# 等待5秒接收数据
await asyncio.sleep(5)
initial_count = len(price_updates)
logger.info(f"重连后接收到的价格更新: {len(price_updates) - initial_count}")
else:
logger.error("❌ 重连失败")
await client.disconnect()
logger.info("重连测试完成")
return True
def test_integration():
"""测试与 price_monitor_service 的集成"""
logger.info("=" * 60)
logger.info("测试 PriceMonitorService 集成")
logger.info("=" * 60)
# 设置环境变量启用 WebSocket
os.environ['USE_BITGET_WEBSOCKET'] = 'true'
from app.services.price_monitor_service import get_price_monitor_service
monitor = get_price_monitor_service()
# 添加回调
update_count = {'count': 0}
def on_update(symbol: str, price: float):
update_count['count'] += 1
logger.info(f"[集成测试] {symbol}: ${price:,.2f}")
monitor.add_price_callback(on_update)
# 订阅交易对
logger.info("订阅 BTCUSDT...")
monitor.subscribe_symbol('BTCUSDT')
# 等待10秒
logger.info("等待10秒接收价格更新...")
import time
time.sleep(10)
logger.info(f"接收到 {update_count['count']} 次价格更新")
logger.info(f"当前价格: {monitor.get_latest_price('BTCUSDT')}")
# 停止监控
monitor.stop()
logger.info("集成测试完成")
return True
def main():
"""主测试函数"""
import argparse
parser = argparse.ArgumentParser(description='测试 Bitget WebSocket')
parser.add_argument('--test', choices=['basic', 'reconnect', 'integration'],
default='basic', help='测试类型')
args = parser.parse_args()
if args.test == 'basic':
asyncio.run(test_websocket())
elif args.test == 'reconnect':
asyncio.run(test_reconnect())
elif args.test == 'integration':
test_integration()
if __name__ == '__main__':
main()

View File

@ -1,87 +0,0 @@
"""
测试智能AI Agent
"""
import asyncio
import sys
import os
# 添加项目路径
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
from app.agent.smart_agent import SmartStockAgent
async def test_intelligent_mode():
"""测试智能模式"""
print("=" * 60)
print("测试智能AI Agent")
print("=" * 60)
# 创建Agent实例
agent = SmartStockAgent()
# 测试用例
test_cases = [
{
"name": "简单股票查询",
"message": "贵州茅台怎么样",
"session_id": "test_session_1"
},
{
"name": "技术分析关注",
"message": "比亚迪的MACD指标怎么样有没有金叉",
"session_id": "test_session_2"
},
{
"name": "基本面关注",
"message": "宁德时代的基本面如何,盈利能力强吗",
"session_id": "test_session_3"
}
]
for i, test_case in enumerate(test_cases, 1):
print(f"\n{'=' * 60}")
print(f"测试用例 {i}: {test_case['name']}")
print(f"{'=' * 60}")
print(f"用户问题: {test_case['message']}")
print()
try:
# 处理消息
response = await agent.process_message(
message=test_case['message'],
session_id=test_case['session_id']
)
# 打印结果
print("响应:")
print(response.get('message', ''))
print()
# 打印元数据
metadata = response.get('metadata', {})
if metadata:
print("元数据:")
print(f" 类型: {metadata.get('type')}")
if 'intent' in metadata:
intent = metadata['intent']
print(f" 意图类型: {intent.get('type')}")
print(f" 关注维度: {intent.get('dimensions')}")
print(f" 分析深度: {intent.get('analysis_depth')}")
if 'plan' in metadata:
plan = metadata['plan']
skills = [s['name'] for s in plan.get('skills', [])]
print(f" 调用技能: {skills}")
except Exception as e:
print(f"错误: {e}")
import traceback
traceback.print_exc()
print(f"\n{'=' * 60}")
print("测试完成")
print("=" * 60)
if __name__ == "__main__":
asyncio.run(test_intelligent_mode())

View File

@ -1,77 +0,0 @@
#!/usr/bin/env python3
"""
数据库迁移脚本添加 entry_zone 字段到 trading_signals
使用方法
python scripts/migrate_add_entry_zone.py
或者在服务器上直接执行 SQL
sqlite3 backend/stock_agent.db "ALTER TABLE trading_signals ADD COLUMN entry_zone FLOAT;"
"""
import sqlite3
import os
from pathlib import Path
def migrate_add_entry_zone():
"""添加 entry_zone 字段"""
# 数据库路径
db_path = Path(__file__).parent.parent / "backend" / "stock_agent.db"
if not db_path.exists():
print(f"❌ 数据库文件不存在: {db_path}")
return False
try:
# 连接数据库
conn = sqlite3.connect(str(db_path))
cursor = conn.cursor()
# 检查字段是否已存在
cursor.execute("PRAGMA table_info(trading_signals)")
columns = [col[1] for col in cursor.fetchall()]
if 'entry_zone' in columns:
print("✅ entry_zone 字段已存在,无需迁移")
conn.close()
return True
# 添加字段
print(f"📝 正在添加 entry_zone 字段到 {db_path}...")
cursor.execute("ALTER TABLE trading_signals ADD COLUMN entry_zone FLOAT")
conn.commit()
# 验证
cursor.execute("PRAGMA table_info(trading_signals)")
columns = [col[1] for col in cursor.fetchall()]
if 'entry_zone' in columns:
print("✅ entry_zone 字段添加成功")
conn.close()
return True
else:
print("❌ 字段添加失败")
conn.close()
return False
except Exception as e:
print(f"❌ 迁移失败: {e}")
return False
if __name__ == "__main__":
print("=" * 60)
print("数据库迁移:添加 entry_zone 字段")
print("=" * 60)
success = migrate_add_entry_zone()
if success:
print("\n✅ 迁移完成!")
print("\n请重启服务以使更改生效:")
print(" pm2 restart stock-agent")
else:
print("\n❌ 迁移失败!")
print("\n如果自动迁移失败,可以手动执行 SQL")
print(" sqlite3 backend/stock_agent.db \"ALTER TABLE trading_signals ADD COLUMN entry_zone FLOAT;\"")

View File

@ -1,71 +0,0 @@
#!/usr/bin/env python3
"""
测试加密货币新闻获取多源聚合
"""
import asyncio
import sys
import os
# 确保路径正确
script_dir = os.path.dirname(os.path.abspath(__file__))
project_root = os.path.dirname(script_dir)
backend_dir = os.path.join(project_root, 'backend')
sys.path.insert(0, backend_dir)
from app.services.news_service import get_news_service
async def main():
print("=" * 60)
print("📰 测试加密货币新闻获取(多源聚合)")
print("=" * 60)
news_service = get_news_service()
# 获取最新新闻
print("\n🔍 获取最新加密货币新闻...")
news_list = await news_service.get_latest_news(limit=30)
print(f"\n✅ 获取到 {len(news_list)} 条新闻\n")
# 按来源分组统计
sources = {}
for news in news_list:
source = news.get('source', 'Unknown')
sources[source] = sources.get(source, 0) + 1
print("📊 新闻来源统计:")
for source, count in sources.items():
print(f" {source}: {count}")
# 显示最新10条新闻
print("\n" + "=" * 60)
print("📰 最新 10 条新闻")
print("=" * 60)
for i, news in enumerate(news_list[:10], 1):
time_str = news.get('time_str', '')
title = news.get('title', '')
source = news.get('source', '')
desc = news.get('description', '')[:100]
print(f"\n{i}. [{time_str}] {source}")
print(f" {title}")
if desc:
print(f" {desc}...")
# 测试格式化给 LLM
print("\n" + "=" * 60)
print("🤖 格式化给 LLM 的新闻")
print("=" * 60)
formatted_news = news_service.format_news_for_llm(news_list[:5], max_items=5)
print(formatted_news)
print("\n" + "=" * 60)
print("✅ 测试完成")
print("=" * 60)
if __name__ == "__main__":
asyncio.run(main())

View File

@ -1,218 +0,0 @@
#!/usr/bin/env python3
"""
测试系统异常通知功能
测试场景
1. 测试普通异常通知
2. 测试带堆栈信息的异常通知
3. 测试冷却时间避免重复通知
"""
import sys
from pathlib import Path
# 添加项目路径
backend_dir = Path(__file__).parent.parent / "backend"
sys.path.insert(0, str(backend_dir))
def test_normal_exception():
"""测试普通异常通知"""
print("\n" + "=" * 60)
print("🧪 测试1: 普通异常通知")
print("=" * 60)
from app.utils.error_handler import get_exception_handler
handler = get_exception_handler()
# 模拟一个异常
try:
result = 1 / 0 # ZeroDivisionError
except Exception as e:
import traceback
exc_type, exc_value, exc_traceback = sys.exc_info()
handler.handle_exception(exc_type, exc_value, exc_traceback)
print("✅ 已触发异常处理(请检查飞书是否收到通知)")
def test_custom_exception():
"""测试自定义异常通知"""
print("\n" + "=" * 60)
print("🧪 测试2: 自定义异常通知")
print("=" * 60)
from app.utils.error_handler import get_exception_handler
handler = get_exception_handler()
# 模拟一个自定义异常
try:
raise ValueError("这是一个测试异常,用于验证飞书通知功能")
except Exception as e:
import traceback
exc_type, exc_value, exc_traceback = sys.exc_info()
handler.handle_exception(exc_type, exc_value, exc_traceback)
print("✅ 已触发自定义异常处理(请检查飞书是否收到通知)")
def test_nested_exception():
"""测试嵌套调用异常"""
print("\n" + "=" * 60)
print("🧪 测试3: 嵌套调用异常(带完整堆栈)")
print("=" * 60)
from app.utils.error_handler import get_exception_handler
handler = get_exception_handler()
def level_3():
"""第三层调用"""
raise RuntimeError("深层错误:数据库连接失败")
def level_2():
"""第二层调用"""
level_3()
def level_1():
"""第一层调用"""
level_2()
# 模拟嵌套调用异常
try:
level_1()
except Exception as e:
import traceback
exc_type, exc_value, exc_traceback = sys.exc_info()
handler.handle_exception(exc_type, exc_value, exc_traceback)
print("✅ 已触发嵌套异常处理(请检查飞书是否收到完整堆栈信息)")
def test_cooldown():
"""测试冷却时间"""
print("\n" + "=" * 60)
print("🧪 测试4: 冷却时间(连续触发异常)")
print("=" * 60)
from app.utils.error_handler import get_exception_handler
import time
handler = get_exception_handler()
handler.set_cooldown(10) # 设置10秒冷却时间
print(f"⏰ 冷却时间设置为 10 秒")
# 第一次触发
try:
raise RuntimeError("第一次异常")
except Exception:
exc_type, exc_value, exc_traceback = sys.exc_info()
handler.handle_exception(exc_type, exc_value, exc_traceback)
print("✅ 第一次异常已处理")
# 立即第二次触发(应该在冷却期内)
print("\n⏳ 立即触发第二次异常(应该在冷却期内)...")
try:
raise RuntimeError("第二次异常")
except Exception:
exc_type, exc_value, exc_traceback = sys.exc_info()
handler.handle_exception(exc_type, exc_value, exc_traceback)
print("✅ 第二次异常已处理(应该被冷却,不发送飞书通知)")
print("\n⏳ 等待 11 秒后再次触发...")
time.sleep(11)
# 第三次触发(冷却期已过)
try:
raise RuntimeError("第三次异常")
except Exception:
exc_type, exc_value, exc_traceback = sys.exc_info()
handler.handle_exception(exc_type, exc_value, exc_traceback)
print("✅ 第三次异常已处理(冷却期已过,应该发送飞书通知)")
def test_with_feishu_integration():
"""测试与飞书集成的完整流程"""
print("\n" + "=" * 60)
print("🧪 测试5: 完整集成测试(带飞书服务)")
print("=" * 60)
from app.utils.error_handler import setup_global_exception_handler, get_exception_handler
from app.services.feishu_service import get_feishu_service
# 设置全局异常处理器
setup_global_exception_handler()
print("✅ 全局异常处理器已安装")
# 初始化飞书通知
feishu_service = get_feishu_service()
handler = get_exception_handler()
handler.set_feishu_service(feishu_service)
handler.set_enabled(True)
print("✅ 飞书通知已启用")
# 触发一个测试异常
print("\n🔔 触发测试异常...")
try:
raise Exception("【测试】这是一个集成测试异常,用于验证飞书通知功能是否正常工作")
except Exception:
exc_type, exc_value, exc_traceback = sys.exc_info()
handler.handle_exception(exc_type, exc_value, exc_traceback)
print("✅ 测试异常已处理,请检查飞书是否收到通知")
def main():
"""主测试函数"""
print("\n" + "🚀" * 30)
print("系统异常通知功能测试")
print("🚀" * 30)
print("\n⚠️ 注意:以下测试会触发真实的异常,并尝试发送飞书通知")
print("⚠️ 请确保飞书 webhook 已正确配置\n")
try:
# 测试1: 普通异常
test_normal_exception()
# 等待用户确认
input("\n按 Enter 键继续下一个测试...")
# 测试2: 自定义异常
test_custom_exception()
# 等待用户确认
input("\n按 Enter 键继续下一个测试...")
# 测试3: 嵌套异常
test_nested_exception()
# 等待用户确认
input("\n按 Enter 键继续下一个测试...")
# 测试4: 冷却时间
test_cooldown()
# 等待用户确认
input("\n按 Enter 键继续最后一个测试...")
# 测试5: 完整集成
test_with_feishu_integration()
print("\n" + "=" * 60)
print("✅ 所有测试完成!")
print("=" * 60)
print("\n📋 请检查飞书聊天窗口,确认是否收到异常通知")
print("💡 提示:如果未收到通知,请检查:")
print(" 1. 飞书 webhook URL 是否正确")
print(" 2. 网络连接是否正常")
print(" 3. .env 文件中的 FEISHU_ENABLED 是否为 true\n")
except KeyboardInterrupt:
print("\n\n⚠️ 测试被用户中断")
except Exception as e:
print(f"\n\n❌ 测试失败: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
main()

View File

@ -1,80 +0,0 @@
#!/usr/bin/env python3
"""
测试合约市场数据获取
"""
import sys
import os
# 确保路径正确
script_dir = os.path.dirname(os.path.abspath(__file__))
project_root = os.path.dirname(script_dir)
backend_dir = os.path.join(project_root, 'backend')
sys.path.insert(0, backend_dir)
from app.services.binance_service import binance_service
def main():
print("=" * 60)
print("📊 测试 Binance 合约市场数据获取")
print("=" * 60)
symbols = ['BTCUSDT', 'ETHUSDT']
for symbol in symbols:
print(f"\n{'='*60}")
print(f"📈 {symbol} 合约市场数据")
print(f"{'='*60}")
# 1. 获取资金费率
print(f"\n🔍 获取资金费率...")
funding_rate = binance_service.get_funding_rate(symbol)
if funding_rate:
print(f"✅ 资金费率: {funding_rate['funding_rate_percent']:.4f}%")
print(f" 市场情绪: {funding_rate['sentiment']}")
print(f" 标记价格: ${funding_rate['mark_price']:,.2f}")
print(f" 指数价格: ${funding_rate['index_price']:,.2f}")
# 2. 获取持仓量
print(f"\n🔍 获取持仓量...")
open_interest = binance_service.get_open_interest(symbol)
if open_interest:
print(f"✅ 持仓量: {open_interest['open_interest']:,.0f}")
print(f" 持仓金额: ${open_interest['open_interest_value']:,.0f}")
# 3. 获取历史持仓量
print(f"\n🔍 获取历史持仓量计算24h变化...")
hist_oi = binance_service.get_open_interest_hist(symbol, period='1h', limit=24)
if hist_oi and len(hist_oi) >= 2:
oi_now = float(hist_oi[0].get('sumOpenInterest', 0))
oi_24h = float(hist_oi[-1].get('sumOpenInterest', 0))
oi_change = oi_now - oi_24h
oi_change_pct = (oi_change / oi_24h * 100) if oi_24h > 0 else 0
print(f"✅ 24h前: {oi_24h:,.0f}")
print(f" 当前: {oi_now:,.0f}")
print(f" 变化: {oi_change:+,.0f} 张 ({oi_change_pct:+.2f}%)")
# 4. 获取综合数据
print(f"\n🔍 获取综合合约数据...")
market_data = binance_service.get_futures_market_data(symbol)
if market_data:
print(f"✅ 综合数据获取成功")
print(f" 资金费率: {market_data['funding_rate']['funding_rate_percent']:.4f}%")
print(f" 持仓24h变化: {market_data['oi_change_percent_24h']:+.2f}%")
print(f" 溢价率: {market_data['premium_rate']:+.2f}%")
print(f" 市场情绪: {market_data['market_sentiment']}")
# 5. 格式化给 LLM 的数据
print(f"\n🤖 格式化给 LLM 的数据:")
print(f"{''*60}")
if market_data:
formatted = binance_service.format_futures_data_for_llm(symbol, market_data)
print(formatted)
print("\n" + "=" * 60)
print("✅ 测试完成")
print("=" * 60)
if __name__ == "__main__":
main()

View File

@ -1,105 +0,0 @@
#!/usr/bin/env python3
"""
测试使用 modify-order 修改止损止盈
"""
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'backend'))
import time
import uuid
from app.services.bitget_trading_api_sdk import get_bitget_trading_api
def print_section(title):
print("\n" + "=" * 60)
print(f" {title}")
print("=" * 60)
def main():
print("\n" + "🧪" * 30)
print(" 测试 modify-order 修改止损止盈")
print("🧪" * 30)
api = get_bitget_trading_api()
exchange = api.exchange
# 检查是否有 modify order 相关的方法
print("\n检查 modify order 方法:")
methods = [m for m in dir(exchange) if 'modify' in m.lower() and 'order' in m.lower()]
for m in methods:
print(f" {m}")
# 1. 开仓
print_section("1. 开仓")
order = api.place_order(
symbol="BTC/USDT:USDT",
side='buy',
order_type='market',
size=0.0001,
client_order_id=f"test_{uuid.uuid4().hex[:8]}"
)
if not order:
print("❌ 开仓失败")
return
order_id = order.get('id')
print(f"✅ 开仓成功! 订单ID: {order_id}")
time.sleep(2)
# 2. 查询持仓
print_section("2. 查询持仓")
positions = api.get_position("BTC/USDT:USDT")
position = None
for pos in positions:
if float(pos.get('contracts', 0)) > 0:
position = pos
break
if not position:
print("❌ 无持仓")
return
mark_price = float(position.get('markPrice'))
print(f"持仓: {position.get('contracts')} 张, 标记价: ${mark_price:,.2f}")
# 3. 尝试使用 edit_order 修改(添加止损止盈)
print_section("3. 使用 edit_order 添加止损止盈")
stop_loss = mark_price * 0.98
take_profit = mark_price * 1.03
print(f"目标止损: ${stop_loss:,.2f}")
print(f"目标止盈: ${take_profit:,.2f}")
try:
# 尝试使用 CCXT 的 edit_order 方法
# 注意可能需要传入订单ID和新参数
result = exchange.edit_order(
id=order_id,
symbol="BTC/USDT:USDT",
type='market', # 原订单类型
side='buy', # 原订单方向
amount=0.0001, # 原订单数量
params={
'stopLoss': str(stop_loss),
'takeProfit': str(take_profit),
'tdMode': 'cross',
'marginCoin': 'USDT',
}
)
print(f"✅ edit_order 成功: {result}")
except Exception as e:
print(f"❌ edit_order 失败: {e}")
# 4. 平仓
print_section("4. 平仓")
api.close_position("BTC/USDT:USDT")
print("✅ 已平仓")
if __name__ == "__main__":
try:
main()
except Exception as e:
print(f"\n\n❌ 测试出错: {e}")
import traceback
traceback.print_exc()

View File

@ -1,118 +0,0 @@
#!/usr/bin/env python3
"""
测试新闻通知格式
"""
import asyncio
import sys
from pathlib import Path
# 添加项目路径
script_dir = Path(__file__).parent
project_root = script_dir.parent
backend_dir = project_root / "backend"
sys.path.insert(0, str(backend_dir))
import os
os.chdir(backend_dir)
from app.news_agent.notifier import get_news_notifier
# 模拟高影响新闻数据
test_articles = [
{
'id': 1,
'title': 'Bitcoin ETFs See Record $500M Inflows as Institutions Pile In',
'source': 'CoinDesk',
'category': 'crypto',
'url': 'https://example.com/article1',
'market_impact': 'high',
'impact_type': 'bullish',
'sentiment': 'positive',
'summary': '比特币现货ETF昨日吸引5亿美元资金流入创历史新高显示机构投资者持续增持。',
'key_points': [
'贝莱德IBIT录得3亿美元流入',
'富达FBTC流入1.5亿美元',
'机构持仓占比超过60%'
],
'trading_advice': '建议持有BTC关注回调后的买入机会',
'relevant_symbols': ['BTC', 'IBIT', 'FBTC'],
'priority': 85,
'created_at': '2026-02-25T12:00:00'
},
{
'id': 2,
'title': 'SEC Delays Decision on Ethereum ETF Options Listings',
'source': 'Bloomberg',
'category': 'crypto',
'url': 'https://example.com/article2',
'market_impact': 'medium',
'impact_type': 'neutral',
'sentiment': 'neutral',
'summary': 'SEC再次推迟对以太坊ETF期权上市的决议新的截止日期为4月底。',
'key_points': [
'SEC引用需要额外审查时间',
'这是第三次推迟',
'市场反应温和'
],
'trading_advice': 'ETH持仓观望等待ETF期权批准',
'relevant_symbols': ['ETH', 'ETHA'],
'priority': 55,
'created_at': '2026-02-25T11:30:00'
},
{
'id': 3,
'title': 'NVIDIA Surpasses $4 Trillion Market Cap Amid AI Chip Demand',
'source': 'CNBC',
'category': 'stock',
'url': 'https://example.com/article3',
'market_impact': 'high',
'impact_type': 'bullish',
'sentiment': 'positive',
'summary': '英伟达市值突破4万亿美元大关成为全球市值最高的公司AI芯片需求持续爆发。',
'key_points': [
'股价上涨5%至每股1600美元',
'H100芯片供不应求',
'数据中心收入同比增长300%'
],
'trading_advice': '建议继续持有NVDAAI趋势未完',
'relevant_symbols': ['NVDA', 'AMD'],
'priority': 80,
'created_at': '2026-02-25T10:15:00'
}
]
async def main():
print("=" * 70)
print("🧪 测试新闻通知格式")
print("=" * 70)
print()
notifier = get_news_notifier()
print("📊 测试数据:")
for i, article in enumerate(test_articles, 1):
print(f" {i}. [{article['market_impact']}] {article['title'][:50]}...")
print(f" 摘要: {article['summary'][:50]}...")
print(f" 建议: {article['trading_advice']}")
print()
print("📢 发送测试通知...")
result = await notifier.notify_news_batch(test_articles)
if result:
print("✅ 通知发送成功!")
else:
print("❌ 通知发送失败")
print()
print("=" * 70)
return 0
if __name__ == "__main__":
exit_code = asyncio.run(main())
sys.exit(exit_code)

View File

@ -1,285 +0,0 @@
#!/usr/bin/env python3
"""
实盘交易功能测试脚本
测试 Bitget 实盘交易 API 的各项功能
注意: 此脚本会进行真实交易请确保在测试网环境运行
"""
import sys
import os
from pathlib import Path
# 添加项目路径
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root / "backend"))
from app.config import get_settings
from app.services.bitget_trading_api import BitgetTradingAPI, get_bitget_trading_api
from app.services.real_trading_service import get_real_trading_service
def print_section(title: str):
"""打印分节标题"""
print("\n" + "=" * 80)
print(f" {title}")
print("=" * 80)
def test_api_connection():
"""测试 API 连接"""
print_section("1. 测试 API 连接")
settings = get_settings()
print(f"\n配置信息:")
print(f" API Key: {settings.bitget_api_key[:10]}..." if settings.bitget_api_key else " API Key: 未配置")
print(f" 测试网: {'是 ✅' if settings.bitget_use_testnet else '否 ⚠️'}")
if not settings.bitget_api_key or not settings.bitget_api_secret:
print("\n❌ API Key 未配置,请在 .env 文件中设置:")
print(" BITGET_API_KEY=your_api_key")
print(" BITGET_API_SECRET=your_api_secret")
print(" BITGET_PASSPHRASE=your_passphrase")
print(" BITGET_USE_TESTNET=true")
return False
# 创建 API 实例
api = BitgetTradingAPI(
api_key=settings.bitget_api_key,
api_secret=settings.bitget_api_secret,
passphrase=settings.bitget_passphrase,
use_testnet=settings.bitget_use_testnet
)
# 测试连接
if api.test_connection():
print("\n✅ API 连接成功!")
return True
else:
print("\n❌ API 连接失败!")
print(" 请检查:")
print(" 1. API Key 和 Secret 是否正确")
print(" 2. 是否在测试网环境")
print(" 3. API 权限是否正确")
return False
def test_query_account():
"""测试查询账户"""
print_section("2. 测试查询账户")
api = get_bitget_trading_api()
if not api:
print("\n❌ 交易 API 未初始化")
return False
# 查询余额
print("\n查询账户余额...")
balance = api.get_balance()
if balance:
usdt_info = balance.get('USDT', {})
print(f"\n✅ USDT 余额:")
print(f" 可用: ${float(usdt_info.get('available', 0)):,.2f}")
print(f" 冻结: ${float(usdt_info.get('frozen', 0)):,.2f}")
print(f" 锁定: ${float(usdt_info.get('locked', 0)):,.2f}")
return True
else:
print("\n❌ 查询余额失败")
return False
def test_query_position():
"""测试查询持仓"""
print_section("3. 测试查询持仓")
api = get_bitget_trading_api()
if not api:
print("\n❌ 交易 API 未初始化")
return False
# 查询持仓
print("\n查询当前持仓...")
positions = api.get_position()
print(f"\n✅ 当前持仓数量: {len(positions)}")
if positions:
for pos in positions:
symbol = pos.get('symbol', 'N/A')
total = pos.get('total', 0)
available = pos.get('available', 0)
hold_side = pos.get('holdSide', 'N/A')
avg_price = pos.get('averageOpenPrice', 0)
print(f"\n {symbol} ({hold_side})")
print(f" 持仓: {total}")
print(f" 可用: {available}")
print(f" 均价: ${avg_price}")
else:
print(" 无持仓")
return True
def test_place_small_order():
"""
测试下单小仓位测试
警告: 此函数会进行真实交易
建议在测试网运行并使用最小仓位
"""
print_section("4. 测试下单 (⚠️ 实盘交易)")
# 确认是否要测试
print("\n⚠️ 警告: 此测试会进行真实下单!")
print(" 建议: 1. 确保在测试网环境")
print(" 2. 使用最小仓位测试")
print(" 3. 先手动撤销测试订单")
api = get_bitget_trading_api()
if not api:
print("\n❌ 交易 API 未初始化")
return False
# 使用 BTCUSDT 进行测试
symbol = "BTCUSDT"
# 查询当前价格
from app.services.bitget_service import bitget_service
current_price = bitget_service.get_current_price(symbol)
if not current_price:
print(f"\n❌ 无法获取 {symbol} 当前价格")
return False
print(f"\n{symbol} 当前价格: ${current_price:,.2f}")
# 使用最小数量1张通常约等于1 USDT
test_size = 1
# 使用市价单测试(快速成交)
print(f"\n准备下测试单:")
print(f" 交易对: {symbol}")
print(f" 方向: 开多 (open_long)")
print(f" 类型: 市价单")
print(f" 数量: {test_size} 张 (约 ${test_size})")
# 取消注释以实际下单
print("\n❗ 下单功能已禁用,避免意外交易")
print(" 如需测试,请手动取消下面的注释并确认:")
print("")
"""
# 实际下单代码(已注释,需手动取消注释)
result = api.place_order(
symbol=symbol,
side='open_long',
order_type='market',
size=test_size
)
if result:
order_id = result.get('orderId')
print(f"\n✅ 下单成功!")
print(f" 订单ID: {order_id}")
print(f" 请在交易所检查订单状态")
# 询问是否立即撤单
print("\n是否立即撤单?(测试用)")
# input("按回车键撤单...")
# 撤单
if api.cancel_order(symbol, order_id=order_id):
print(f"\n✅ 撤单成功")
else:
print(f"\n❌ 撤单失败,请手动在交易所撤单")
return True
else:
print(f"\n❌ 下单失败")
return False
"""
return False
def test_real_trading_service():
"""测试实盘交易服务"""
print_section("5. 测试实盘交易服务")
service = get_real_trading_service()
if not service:
print("\n❌ 实盘交易服务未启用")
print(" 请在 .env 中设置:")
print(" REAL_TRADING_ENABLED=true")
return False
# 查询账户状态
print("\n查询账户状态...")
account = service.get_account_status()
print(f"\n✅ 账户状态:")
print(f" 总余额: ${account['current_balance']:,.2f}")
print(f" 可用: ${account['available']:,.2f}")
print(f" 已用: ${account['used_margin']:,.2f}")
print(f" 持仓: ${account['total_position_value']:,.2f}")
# 同步持仓
print("\n同步交易所持仓...")
positions = service.sync_positions_from_exchange()
print(f"\n✅ 交易所持仓: {len(positions)}")
return True
def main():
"""主函数"""
print("\n" + "🚀" * 40)
print("\nBitget 实盘交易功能测试")
print(f"测试时间: {datetime.now()}")
settings = get_settings()
print("\n⚠️ 重要提醒:")
print(f" 当前环境: {'测试网 ✅' if settings.bitget_use_testnet else '生产网 ⚠️'}")
try:
# 1. 测试 API 连接
if not test_api_connection():
print("\n❌ API 连接测试失败,请检查配置")
return
# 2. 测试查询账户
if not test_query_account():
print("\n❌ 查询账户测试失败")
return
# 3. 测试查询持仓
if not test_query_position():
print("\n❌ 查询持仓测试失败")
return
# 4. 测试下单(需要手动取消注释)
# test_place_small_order()
# 5. 测试实盘交易服务
if not test_real_trading_service():
print("\n⚠️ 实盘交易服务未启用(可忽略)")
print("\n" + "=" * 80)
print(" ✅ 所有测试通过!")
print("=" * 80)
print("\n实盘交易功能已就绪!")
except Exception as e:
print(f"\n❌ 测试出错: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
from datetime import datetime
main()

View File

@ -1,57 +0,0 @@
"""
测试实盘交易 API 端点
"""
import requests
import json
BASE_URL = "http://localhost:8000"
def test_api():
print("=" * 60)
print("测试实盘交易 API")
print("=" * 60)
# 测试 /api/real-trading/status
print("\n1. 测试 /api/real-trading/status")
try:
response = requests.get(f"{BASE_URL}/api/real-trading/status")
print(f" 状态码: {response.status_code}")
print(f" 响应: {json.dumps(response.json(), indent=2, ensure_ascii=False)}")
except Exception as e:
print(f" 错误: {e}")
# 测试 /api/real-trading/account
print("\n2. 测试 /api/real-trading/account")
try:
response = requests.get(f"{BASE_URL}/api/real-trading/account")
print(f" 状态码: {response.status_code}")
print(f" 响应: {json.dumps(response.json(), indent=2, ensure_ascii=False)}")
except Exception as e:
print(f" 错误: {e}")
# 测试 /api/real-trading/positions
print("\n3. 测试 /api/real-trading/positions")
try:
response = requests.get(f"{BASE_URL}/api/real-trading/positions")
print(f" 状态码: {response.status_code}")
print(f" 响应: {json.dumps(response.json(), indent=2, ensure_ascii=False)}")
except Exception as e:
print(f" 错误: {e}")
# 测试 /api/real-trading/orders
print("\n4. 测试 /api/real-trading/orders?status=orders&limit=10")
try:
response = requests.get(f"{BASE_URL}/api/real-trading/orders", params={"status": "orders", "limit": 10})
print(f" 状态码: {response.status_code}")
data = response.json()
print(f" Success: {data.get('success')}")
print(f" Count: {data.get('count')}")
if data.get('orders'):
print(f" Orders: {len(data.get('orders', []))}")
except Exception as e:
print(f" 错误: {e}")
print("\n" + "=" * 60)
if __name__ == "__main__":
test_api()

View File

@ -1,324 +0,0 @@
"""
实盘交易完整流程测试脚本
测试内容
1. 获取账户余额
2. 获取当前持仓
3. 下测试单极小金额
4. 查询订单状态
5. 修改止损止盈
6. 平仓
风险控制
- 使用极小金额测试5-10 USDT
- 测试完成后自动平仓
"""
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'backend'))
import asyncio
import time
from app.services.bitget_trading_api_sdk import get_bitget_trading_api
from app.services.real_trading_service import get_real_trading_service
from app.utils.logger import logger
def print_section(title):
"""打印分隔线"""
print("\n" + "=" * 60)
print(f" {title}")
print("=" * 60)
def test_account_info(trading_api):
"""测试1: 获取账户信息"""
print_section("1. 获取账户信息")
try:
balance = trading_api.get_balance()
print(f"✅ 账户余额:")
for currency, info in balance.items():
if isinstance(info, dict):
available = info.get('available', 0)
frozen = info.get('frozen', 0)
locked = info.get('locked', 0)
if float(available) > 0 or float(frozen) > 0:
print(f" {currency}: 可用={float(available):.2f}, 冻结={float(frozen):.2f}, 锁定={float(locked):.2f}")
return True
except Exception as e:
print(f"❌ 获取账户信息失败: {e}")
return False
def test_get_positions(trading_api):
"""测试2: 获取当前持仓"""
print_section("2. 获取当前持仓")
try:
positions = trading_api.get_position()
print(f"✅ 当前持仓数量: {len(positions)}")
open_positions = [p for p in positions if float(p.get('contracts', 0)) != 0]
if open_positions:
print(f" 活跃持仓:")
for pos in open_positions:
symbol = pos.get('symbol')
side = pos.get('side')
contracts = pos.get('contracts')
entry_price = pos.get('entryPrice')
mark_price = pos.get('markPrice')
unrealized_pnl = pos.get('unrealizedPnl', 0)
print(f" {symbol} {side} | 数量: {contracts} | 入场: {entry_price} | 标记: {mark_price} | 浮盈: {unrealized_pnl}")
else:
print(f" 当前无持仓")
return True
except Exception as e:
print(f"❌ 获取持仓失败: {e}")
return False
def test_place_order(trading_api, symbol="BTCUSDT", side='buy', size=5):
"""测试3: 下测试单"""
print_section("3. 下测试单")
# 生成订单ID
import uuid
client_order_id = f"test_{uuid.uuid4().hex[:8]}"
print(f" 交易对: {symbol}")
print(f" 方向: {side}")
print(f" 数量: {size} USDT")
print(f" 客户端ID: {client_order_id}")
try:
# 先获取当前价格
ticker = trading_api.exchange.fetch_ticker(symbol)
current_price = ticker['last']
print(f" 当前价格: ${current_price:,.2f}")
# 下市价单
print(f" 📝 正在下单...")
result = trading_api.place_order(
symbol=symbol,
side=side,
order_type='market',
size=size,
client_order_id=client_order_id
)
if result:
order_id = result.get('id')
filled_price = result.get('average') or result.get('price')
print(f"✅ 下单成功!")
print(f" 订单ID: {order_id}")
print(f" 成交价格: ${filled_price:,.2f}")
print(f" 成交数量: {size} USDT")
# 等待一下让订单完全成交
time.sleep(2)
return order_id, symbol, side
else:
print(f"❌ 下单失败: 无返回结果")
return None
except Exception as e:
print(f"❌ 下单失败: {e}")
import traceback
print(traceback.format_exc())
return None
def test_get_order(trading_api, order_id, symbol):
"""测试4: 查询订单状态"""
print_section("4. 查询订单状态")
try:
order = trading_api.exchange.fetch_order(order_id, symbol)
print(f"✅ 订单状态: {order.get('status')}")
print(f" 订单ID: {order.get('id')}")
print(f" 价格: {order.get('price')}")
print(f" 已成交: {order.get('filled')}")
print(f" 剩余: {order.get('remaining')}")
return order
except Exception as e:
print(f"❌ 查询订单失败: {e}")
return None
def test_modify_sl_tp(trading_api, symbol, side='buy'):
"""测试5: 修改止损止盈"""
print_section("5. 修改止损止盈")
# 获取当前持仓
try:
positions = trading_api.get_position()
position = None
for pos in positions:
if pos.get('symbol') == symbol and float(pos.get('contracts', 0)) != 0:
position = pos
break
if not position:
print(f"❌ 未找到 {symbol} 的持仓")
return False
# 获取当前价格来设置合理的止损止盈
ticker = trading_api.exchange.fetch_ticker(symbol)
current_price = float(ticker['last'])
if side == 'buy':
stop_loss = current_price * 0.97 # 3% 止损
take_profit = current_price * 1.05 # 5% 止盈
else:
stop_loss = current_price * 1.03 # 3% 止损
take_profit = current_price * 0.95 # 5% 止盈
print(f" 当前价格: ${current_price:,.2f}")
print(f" 设置止损: ${stop_loss:,.2f}")
print(f" 设置止盈: ${take_profit:,.2f}")
# 使用交易所API修改止损止盈这需要根据具体交易所API实现
# 注意: Bitget 可能需要使用不同的API来修改止损止盈
print(f"⚠️ 注意: 修改止损止盈功能需要根据交易所具体API实现")
print(f"✅ 止损止盈参数已计算实际修改需要查看交易所API文档")
return True
except Exception as e:
print(f"❌ 修改止损止盈失败: {e}")
import traceback
print(traceback.format_exc())
return False
def test_close_position(trading_api, symbol, side):
"""测试6: 平仓"""
print_section("6. 平仓测试")
# 获取当前持仓
try:
positions = trading_api.get_position()
position = None
for pos in positions:
if pos.get('symbol') == symbol and float(pos.get('contracts', 0)) != 0:
position = pos
break
if not position:
print(f"❌ 未找到 {symbol} 的持仓,无法平仓")
return False
contracts = float(position.get('contracts', 0))
print(f" 持仓数量: {contracts}")
# 平仓方向与开仓相反
close_side = 'sell' if side == 'buy' else 'buy'
print(f" 平仓方向: {close_side}")
# 生成平仓订单ID
import uuid
client_order_id = f"close_{uuid.uuid4().hex[:8]}"
print(f" 📝 正在平仓...")
result = trading_api.place_order(
symbol=symbol,
side=close_side,
order_type='market',
size=contracts, # 平掉所有持仓
client_order_id=client_order_id,
reduce_only=True # 只平仓,不开新仓
)
if result:
print(f"✅ 平仓成功!")
print(f" 订单ID: {result.get('id')}")
return True
else:
print(f"❌ 平仓失败: 无返回结果")
return False
except Exception as e:
print(f"❌ 平仓失败: {e}")
import traceback
print(traceback.format_exc())
return False
def main():
"""主测试流程"""
print("\n" + "🚀" * 30)
print(" 实盘交易完整流程测试")
print("🚀" * 30)
print("\n⚠️ 警告: 此测试将使用真实资金但金额很小5-10 USDT")
print("⚠️ 请确保:")
print(" 1. Bitget API 配置正确")
print(" 2. 账户有足够的测试资金")
print(" 3. 使用测试网或小额资金")
# 确认开始测试
confirm = input("\n是否开始测试? (输入 'yes' 继续): ")
if confirm.lower() != 'yes':
print("测试已取消")
return
# 获取交易API
trading_api = get_bitget_trading_api()
if not trading_api:
print("❌ Bitget API 未初始化,请检查配置")
return
print(f"\n📡 交易所: {'Bitget 测试网' if trading_api.use_testnet else 'Bitget 正式网'}")
# 测试1: 获取账户信息
if not test_account_info(trading_api):
return
# 测试2: 获取当前持仓
if not test_get_positions(trading_api):
return
# 测试3: 下测试单
symbol = "BTCUSDT"
side = 'buy' # 做多
size = 5 # 5 USDT
order_result = test_place_order(trading_api, symbol, side, size)
if not order_result:
print("\n❌ 下单失败,终止测试")
return
order_id, _, _ = order_result
# 测试4: 查询订单状态
test_get_order(trading_api, order_id, symbol)
# 测试5: 修改止损止盈
test_modify_sl_tp(trading_api, symbol, side)
# 测试6: 平仓
time.sleep(2) # 等待一下
test_close_position(trading_api, symbol, side)
# 最终状态
print_section("测试完成")
print("✅ 所有测试已完成,请检查:")
print(" 1. Bitget 账户中的订单记录")
print(" 2. 余额变化")
print(" 3. 持仓情况")
# 再次获取持仓确认已平仓
test_get_positions(trading_api)
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\n\n⚠️ 测试被用户中断")
except Exception as e:
print(f"\n\n❌ 测试出错: {e}")
import traceback
print(traceback.format_exc())

View File

@ -1,200 +0,0 @@
"""
实盘交易简单测试 - 使用 API 直接测试
测试流程
1. 获取账户余额
2. 下一个小单
3. 查询订单
4. 平仓
"""
import requests
import json
import time
BASE_URL = "http://localhost:8000"
def print_section(title):
print("\n" + "=" * 50)
print(f" {title}")
print("=" * 50)
def test_get_account():
"""获取账户信息"""
print_section("1. 获取账户信息")
response = requests.get(f"{BASE_URL}/api/real-trading/account")
data = response.json()
if data.get('success'):
account = data['account']
print(f"✅ 账户余额: ${account['current_balance']:.2f}")
print(f" 可用: ${account['available']:.2f}")
print(f" 已用: ${account['used_margin']:.2f}")
print(f" 持仓价值: ${account['total_position_value']:.2f}")
return account
else:
print(f"❌ 获取账户信息失败: {data.get('message')}")
return None
def test_get_positions():
"""获取持仓"""
print_section("2. 获取当前持仓")
response = requests.get(f"{BASE_URL}/api/real-trading/positions")
data = response.json()
if data.get('success'):
positions = data['positions']
print(f"✅ 持仓数量: {len(positions)}")
for pos in positions:
if pos.get('holding', 0) > 0:
print(f" {pos['symbol']}: {pos['holding']} USDT")
return positions
else:
print(f"❌ 获取持仓失败: {data.get('message')}")
return []
def test_place_small_order():
"""下一个小测试单"""
print_section("3. 下测试单")
# 使用模拟交易API测试更安全
symbol = "BTCUSDT"
# 先获取当前价格
ticker_response = requests.get(f"{BASE_URL}/api/bitget/ticker?symbol={symbol}")
ticker_data = ticker_response.json()
if not ticker_data.get('success'):
print(f"❌ 获取价格失败")
return None
current_price = ticker_data['data'].get('last_price', 0)
print(f" 当前价格: ${current_price:,.2f}")
# 计算测试参数
stop_loss = current_price * 0.95
take_profit = current_price * 1.05
order_data = {
"symbol": symbol,
"action": "buy",
"entry_type": "market",
"entry_price": current_price,
"stop_loss": stop_loss,
"take_profit": take_profit,
"confidence": 60,
"signal_grade": "C",
"position_size": "light"
}
print(f" 测试参数:")
print(f" 止损: ${stop_loss:,.2f}")
print(f" 止盈: ${take_profit:,.2f}")
# 先用模拟交易测试
print(f"\n 📝 正在创建模拟交易订单...")
response = requests.post(f"{BASE_URL}/api/paper-trading/order", json=order_data)
data = response.json()
if data.get('success'):
order = data.get('order')
print(f"✅ 模拟订单创建成功!")
print(f" 订单ID: {order['order_id']}")
print(f" 数量: ${order['quantity']:.2f}")
return order
else:
print(f"❌ 下单失败: {data.get('message')}")
return None
def test_get_orders():
"""获取订单列表"""
print_section("4. 获取订单列表")
response = requests.get(f"{BASE_URL}/api/paper-trading/orders?status=active&limit=10")
data = response.json()
if data.get('success'):
orders = data['orders']
print(f"✅ 活跃订单数量: {len(orders)}")
for order in orders:
print(f" {order['symbol']} {order['side']} | "
f"状态: {order['status']} | "
f"数量: ${order['quantity']:.2f}")
return orders
else:
print(f"❌ 获取订单失败: {data.get('message')}")
return []
def test_close_order(order_id):
"""平仓"""
print_section("5. 平仓测试")
print(f" 正在平仓订单: {order_id}")
response = requests.post(f"{BASE_URL}/api/paper-trading/close", json={
"order_id": order_id,
"reason": "测试平仓"
})
data = response.json()
if data.get('success'):
print(f"✅ 平仓成功!")
print(f" 平仓价格: ${data.get('close_price', 0):,.2f}")
print(f" 盈亏: ${data.get('pnl', 0):.2f}")
return True
else:
print(f"❌ 平仓失败: {data.get('message')}")
return False
def main():
print("\n" + "🧪" * 25)
print(" 实盘交易流程测试 (模拟模式)")
print("🧪" * 25)
print("\n此测试使用模拟交易,不会动用真实资金")
input("\n按 Enter 开始测试...")
# 测试1: 获取账户
test_get_account()
# 测试2: 获取持仓
test_get_positions()
# 测试3: 下单
order = test_place_small_order()
if order:
time.sleep(1)
# 测试4: 查询订单
test_get_orders()
time.sleep(1)
# 测试5: 平仓
test_close_order(order['order_id'])
print_section("测试完成")
print("✅ 所有测试已完成")
print(" 请检查前端页面确认订单状态")
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\n\n⚠️ 测试被用户中断")
except Exception as e:
print(f"\n\n❌ 测试出错: {e}")
import traceback
traceback.print_exc()

View File

@ -1,648 +0,0 @@
#!/usr/bin/env python3
"""
美股分析脚本新架构版
用法:
python3 scripts/test_stock.py AAPL
python3 scripts/test_stock.py AAPL TSLA NVDA
"""
import sys
import os
# 确保路径正确
script_dir = os.path.dirname(os.path.abspath(__file__))
project_root = os.path.dirname(script_dir)
backend_dir = os.path.join(project_root, 'backend')
sys.path.insert(0, backend_dir)
import asyncio
from app.services.yfinance_service import get_yfinance_service
from app.services.feishu_service import get_feishu_service
from app.services.telegram_service import get_telegram_service
from app.services.fundamental_service import get_fundamental_service
from app.services.news_service import get_news_service
from app.stock_agent.market_signal_analyzer import StockMarketSignalAnalyzer
from app.config import get_settings
from app.utils.logger import logger
async def analyze(symbol: str, send_notification: bool = True):
"""分析单只股票
Args:
symbol: 股票代码
send_notification: 是否发送通知默认True
Returns:
分析结果字典
"""
# 获取服务
yf_service = get_yfinance_service()
market_analyzer = StockMarketSignalAnalyzer() # 使用新的市场信号分析器
fundamental = get_fundamental_service() # 基本面服务
news = get_news_service() # 新闻服务
feishu = get_feishu_service()
telegram = get_telegram_service()
result = {
'symbol': symbol,
'stock_name': '', # 从基本面数据获取
'price': 0,
'signals': [],
'notified': False
}
# 获取配置
settings = get_settings()
threshold = settings.stock_llm_threshold * 100 # 转换为百分比
print(f"\n{'='*60}")
print(f"📊 分析 {symbol}")
print(f"{'='*60}")
try:
# 获取行情
print(f"获取行情...")
ticker = yf_service.get_ticker(symbol)
if not ticker:
print(f"❌ 无法获取 {symbol} 行情")
return result
price = ticker['lastPrice']
change = ticker['priceChangePercent']
result['price'] = price
print(f"价格: ${price:,.2f} ({change:+.2f}%)")
print(f"成交量: {ticker['volume']:,}")
# 获取K线
print(f"获取K线数据...")
data = yf_service.get_multi_timeframe_data(symbol)
if not data:
print(f"❌ 无法获取K线数据")
return result
print(f"时间周期: {', '.join(data.keys())}")
# 获取基本面数据(包含公司名称)
print(f"\n📈 基本面分析中...")
fundamental_data = None
fundamental_summary = ""
stock_name = ""
try:
fundamental_data = fundamental.get_fundamental_data(symbol)
if fundamental_data:
# 传递已获取的数据,避免重复调用
fundamental_summary = fundamental.get_fundamental_summary(symbol, fundamental_data)
# 获取公司名称
stock_name = fundamental_data.get('company_name', '')
result['stock_name'] = stock_name
# 输出基本面详细信息
score = fundamental_data.get('score', {})
print(f" ✓ 基本面数据获取成功")
# 公司信息
company = fundamental_data.get('company_name', 'N/A')
sector = fundamental_data.get('sector', 'N/A')
print(f" 【公司】{company} | {sector}")
# 更新显示
symbol_display = f"{stock_name}({symbol})" if stock_name else symbol
print(f"\n{'='*60}")
print(f"📊 分析 {symbol_display} @ ${price:,.2f}")
print(f"{'='*60}")
# 评分
print(f" 【评分】总分: {score.get('total', 0):.0f}/100 ({score.get('rating', 'N/A')}级) | "
f"估值:{score.get('valuation', 0)} 盈利:{score.get('profitability', 0)} "
f"成长:{score.get('growth', 0)} 财务:{score.get('financial_health', 0)}")
# 估值指标
val = fundamental_data.get('valuation', {})
if val.get('pe_ratio'):
pe = val['pe_ratio']
pb = val.get('pb_ratio')
ps = val.get('ps_ratio')
peg = val.get('peg_ratio')
pb_str = f"{pb:.2f}" if pb is not None else "N/A"
ps_str = f"{ps:.2f}" if ps is not None else "N/A"
peg_str = f"{peg:.2f}" if peg is not None else "N/A"
print(f" 【估值】PE:{pe:.2f} | PB:{pb_str} | PS:{ps_str} | PEG:{peg_str}")
# 盈利能力
prof = fundamental_data.get('profitability', {})
if prof.get('return_on_equity'):
roe = prof['return_on_equity']
pm = prof.get('profit_margin')
gm = prof.get('gross_margin')
pm_str = f"{pm:.1f}" if pm is not None else "N/A"
gm_str = f"{gm:.1f}" if gm is not None else "N/A"
print(f" 【盈利】ROE:{roe:.2f}% | 净利率:{pm_str}% | 毛利率:{gm_str}%")
# 成长性
growth = fundamental_data.get('growth', {})
rg = growth.get('revenue_growth')
eg = growth.get('earnings_growth')
if rg is not None or eg is not None:
rg_str = f"{rg:.1f}" if rg is not None else "N/A"
eg_str = f"{eg:.1f}" if eg is not None else "N/A"
print(f" 【成长】营收增长:{rg_str}% | 盈利增长:{eg_str}%")
# 财务健康
fin = fundamental_data.get('financial_health', {})
if fin.get('debt_to_equity'):
de = fin['debt_to_equity']
cr = fin.get('current_ratio')
cr_str = f"{cr:.2f}" if cr is not None else "N/A"
print(f" 【财务】债务股本比:{de:.2f} | 流动比率:{cr_str}")
# 分析师建议
analyst = fundamental_data.get('analyst', {})
tp = analyst.get('target_price')
if tp:
rec = analyst.get('recommendation', 'N/A')
print(f" 【分析师】目标价:${tp:.2f} | 评级:{rec}")
else:
print(f" ⚠️ 无法获取基本面数据")
except Exception as e:
print(f" ⚠️ 获取基本面数据失败: {e}")
# 获取新闻数据
print(f"\n📰 新闻分析...")
news_data = None
try:
news_data = await news.search_stock_news(symbol, stock_name, max_results=5)
if news_data:
print(f" 获取到 {len(news_data)} 条相关新闻")
else:
print(f" 暂无相关新闻")
except Exception as e:
print(f" ⚠️ 获取新闻数据失败: {e}")
# 市场信号分析(使用新架构 - 技术面 + 基本面 + 新闻)
print(f"\n🤖 市场信号分析中...\n")
market_signal = await market_analyzer.analyze(
symbol, data,
symbols=[symbol],
fundamental_data=fundamental_data,
news_data=news_data
)
# 输出结果
summary = market_signal.get('analysis_summary', '')
signals = market_signal.get('signals', [])
result['signals'] = signals
print(f"市场状态: {summary}")
if signals:
print(f"\n🎯 发现 {len(signals)} 个信号:")
for sig in signals:
action = sig.get('action', 'wait')
grade = sig.get('grade', 'D')
conf = sig.get('confidence', 0)
action_text = {'buy': '🟢 做多', 'sell': '🔴 做空'}.get(action, action)
grade_icon = {'A': '⭐⭐⭐', 'B': '⭐⭐', 'C': ''}.get(grade, '')
print(f"\n{action_text} [{grade}{grade_icon}] {conf}%")
entry = sig.get('entry_price')
sl = sig.get('stop_loss')
tp = sig.get('take_profit')
if entry and sl and tp:
print(f" 入场: ${entry:,.2f}")
print(f" 止损: ${sl:,.2f}")
print(f" 止盈: ${tp:,.2f}")
reason = sig.get('reason', '')
if reason:
# 限制理由长度
short_reason = reason[:80] + "..." if len(reason) > 80 else reason
print(f" 理由: {short_reason}")
# 发送通知(仅发送置信度 >= 阈值的信号)
if send_notification:
best_signal = None
for sig in signals:
if sig.get('confidence', 0) >= threshold and sig.get('grade', 'D') != 'D':
best_signal = sig
break
if best_signal:
# 使用格式化工具格式化通知
from app.utils.signal_formatter import get_signal_formatter
formatter = get_signal_formatter()
card = formatter.format_feishu_card(best_signal, symbol, agent_type='stock')
title = card['title']
content = card['content']
# 发送通知 - 使用 send_card 方法
# 根据信号方向选择颜色
color = "green" if best_signal.get('action') == 'buy' else "red"
await feishu.send_card(title, content, color)
await telegram.send_message(formatter.format_signal_message(best_signal, symbol, agent_type='stock'))
print(f"\n📬 通知已发送:{title}")
result['notified'] = True
else:
print(f"\n⏸️ 置信度不足,不发送通知(阈值: {threshold}%")
else:
print(f"\n⏸️ 无交易信号")
return result
except Exception as e:
print(f"❌ 错误: {e}")
import traceback
traceback.print_exc()
return result
def print_summary_report(results: list, send_notification: bool = True):
"""打印汇总报告并发送通知
Args:
results: 分析结果列表
send_notification: 是否发送通知默认True
"""
from app.config import get_settings
settings = get_settings()
threshold = settings.stock_llm_threshold * 100 # 获取阈值
total = len(results)
with_signals = [r for r in results if r.get('signals')]
notified = [r for r in results if r.get('notified')]
# 区分美股和港股
us_results = [r for r in results if not r['symbol'].endswith('.HK')]
hk_results = [r for r in results if r['symbol'].endswith('.HK')]
# 统计信号
buy_count = 0
sell_count = 0
high_quality_signals = [] # A/B级信号且达到阈值
all_signals = [] # 所有信号
for r in with_signals:
for sig in r.get('signals', []):
sig['symbol'] = r['symbol']
sig['current_price'] = r.get('price', 0)
sig['is_hk'] = r['symbol'].endswith('.HK')
sig['stock_name'] = r.get('stock_name', '')
all_signals.append(sig)
if sig.get('action') == 'buy':
buy_count += 1
elif sig.get('action') == 'sell':
sell_count += 1
# 只统计达到阈值的A/B级信号
if sig.get('grade') in ['A', 'B'] and sig.get('confidence', 0) >= threshold:
high_quality_signals.append(sig)
# 按置信度排序
high_quality_signals.sort(key=lambda x: x.get('confidence', 0), reverse=True)
all_signals.sort(key=lambda x: x.get('confidence', 0), reverse=True)
# 打印汇总
print("\n" + "="*80)
print("📊 股票分析汇总报告")
print("="*80)
print(f"分析总数: {total} 只 (美股: {len(us_results)}, 港股: {len(hk_results)})")
print(f"有信号: {len(with_signals)}")
print(f"已通知: {len(notified)}")
print(f"通知阈值: {threshold}%")
print("")
# 显示高等级信号(达到阈值的)
if high_quality_signals:
print(f"⭐ 高等级信号达到阈值 (A/B级 >= {threshold}%): {len(high_quality_signals)}")
for sig in high_quality_signals[:10]:
symbol = sig['symbol']
stock_name = sig.get('stock_name', '')
market_tag = '[港股]' if sig.get('is_hk') else '[美股]'
action = '🟢 做多' if sig.get('action') == 'buy' else '🔴 做空'
grade = sig.get('grade', 'D')
confidence = sig.get('confidence', 0)
price = sig.get('current_price', 0)
entry = sig.get('entry_price', 0)
# 构建带名称的股票显示
symbol_display = f"{stock_name}({symbol})" if stock_name else symbol
print(f" {market_tag} {symbol_display} {action} [{grade}级] {confidence}% @ ${price:,.2f}")
if entry > 0:
print(f" 入场: ${entry:,.2f}")
print("")
# 显示未达到阈值但质量不错的信号
below_threshold = [s for s in all_signals
if s.get('grade') in ['A', 'B'] and s.get('confidence', 0) < threshold]
if below_threshold:
print(f"⚠️ 以下信号未达到通知阈值 ({threshold}%):")
for sig in below_threshold[:10]:
symbol = sig['symbol']
stock_name = sig.get('stock_name', '')
market_tag = '[港股]' if sig.get('is_hk') else '[美股]'
action = '🟢 做多' if sig.get('action') == 'buy' else '🔴 做空'
grade = sig.get('grade', 'D')
confidence = sig.get('confidence', 0)
symbol_display = f"{stock_name}({symbol})" if stock_name else symbol
print(f" {market_tag} {symbol_display} {action} {grade}{confidence}%")
print("")
# 统计汇总
print(f"📈 做多信号: {buy_count}")
print(f"📉 做空信号: {sell_count}")
print("="*80)
# 发送汇总通知
if send_notification:
asyncio.run(send_summary_notification(
results, total, with_signals, notified,
buy_count, sell_count, high_quality_signals, all_signals, threshold,
len(us_results), len(hk_results)
))
async def send_summary_notification(
results: list,
total: int,
with_signals: list,
notified: list,
buy_count: int,
sell_count: int,
high_quality_signals: list,
all_signals: list,
threshold: float,
us_count: int = 0,
hk_count: int = 0
):
"""发送汇总报告到飞书和Telegram
Args:
results: 分析结果列表
total: 总数
with_signals: 有信号的股票列表
notified: 已通知的股票列表
buy_count: 做多信号数量
sell_count: 做空信号数量
high_quality_signals: 达到阈值的高等级信号列表
all_signals: 所有信号列表
threshold: 通知阈值
us_count: 美股数量
hk_count: 港股数量
"""
try:
from datetime import datetime
feishu = get_feishu_service()
telegram = get_telegram_service()
now = datetime.now()
# 构建飞书汇总内容
content_parts = [
f"**📊 股票分析汇总报告**",
f"",
f"⏰ 时间: {now.strftime('%Y-%m-%d %H:%M')}",
f"",
f"📊 **分析概况**",
f"• 美股: {us_count} 只 | 港股: {hk_count}",
f"• 发现信号: {len(with_signals)}",
f"• 已发通知: {len(notified)}",
f"• 通知阈值: {threshold:.0f}%",
f"",
]
# 高等级信号(达到阈值的)
if high_quality_signals:
content_parts.append(f"⭐ **高等级信号 (A/B级 ≥ {threshold:.0f}%)**")
for sig in high_quality_signals[:5]:
symbol = sig['symbol']
stock_name = sig.get('stock_name', '')
market_tag = '[港股]' if sig.get('is_hk') else '[美股]'
action = '🟢 做多' if sig.get('action') == 'buy' else '🔴 做空'
grade = sig.get('grade', 'D')
confidence = sig.get('confidence', 0)
# 构建带名称的股票显示
symbol_display = f"{stock_name}({symbol})" if stock_name else symbol
content_parts.append(f"{market_tag} {symbol_display} {action} {grade}{confidence}%")
content_parts.append(f"")
# 信号统计
content_parts.extend([
f"📈 做多信号: {buy_count}",
f"📉 做空信号: {sell_count}",
f"",
f"*⚠️ 仅供参考,不构成投资建议*"
])
content = "\n".join(content_parts)
# 发送飞书
title = f"📊 股票分析汇总 ({now.strftime('%H:%M')})"
color = "blue"
await feishu.send_card(title, content, color)
# 发送 Telegram
telegram_msg = f"📊 *股票分析汇总*\n\n"
telegram_msg += f"时间: {now.strftime('%H:%M')}\n"
telegram_msg += f"美股: {us_count}只 | 港股: {hk_count}\n"
telegram_msg += f"信号: {len(with_signals)}只 | 通知: {len(notified)}\n"
telegram_msg += f"阈值: {threshold:.0f}%\n\n"
if high_quality_signals:
telegram_msg += f"⭐ *高等级信号 (≥{threshold:.0f}%)*\n"
for sig in high_quality_signals[:5]:
symbol = sig['symbol']
stock_name = sig.get('stock_name', '')
market_tag = '[港股]' if sig.get('is_hk') else '[美股]'
action = '🟢 做多' if sig.get('action') == 'buy' else '🔴 做空'
grade = sig.get('grade', 'D')
confidence = sig.get('confidence', 0)
symbol_display = f"{stock_name}({symbol})" if stock_name else symbol
telegram_msg += f"{market_tag} {symbol_display} {action} {grade}{confidence}%\n"
telegram_msg += "\n"
telegram_msg += f"做多: {buy_count} | 做空: {sell_count}"
await telegram.send_message(telegram_msg)
print(f"\n📬 汇总报告已发送到飞书和Telegram")
except Exception as e:
print(f"❌ 发送汇总通知失败: {e}")
import traceback
traceback.print_exc()
async def main():
# 从命令行参数获取股票代码
if len(sys.argv) < 2:
print("用法: python3 scripts/test_stock.py AAPL [TSLA] [NVDA] ...")
print("\n示例:")
print(" python3 scripts/test_stock.py AAPL")
print(" python3 scripts/test_stock.py AAPL TSLA NVDA")
sys.exit(1)
# 过滤掉无效的参数(如环境变量路径、配置项等)
# 只接受有效的股票代码字母开头1-5个字符或包含数字的港股代码如0700.HK
raw_symbols = sys.argv[1:]
symbols = []
for arg in raw_symbols:
# 跳过包含路径分隔符、等号、或明显不是股票代码的参数
if '/' in arg or '=' in arg or ':' in arg or len(arg) > 10:
logger.debug(f"跳过无效参数: {arg}")
continue
# 接受纯字母的参数(美股股票代码)
if arg.isalpha() and 1 <= len(arg) <= 5:
symbols.append(arg.upper())
# 接受包含数字和点号的参数港股代码如0700.HK
elif '.HK' in arg.upper() and len(arg) <= 10:
symbols.append(arg.upper())
else:
logger.debug(f"跳过非股票代码参数: {arg}")
if not symbols:
print("❌ 未找到有效的股票代码")
print(f"接收到的参数: {raw_symbols}")
print("\n用法: python3 scripts/test_stock.py AAPL [TSLA] [NVDA] ...")
sys.exit(1)
print("="*60)
print("🤖 美股分析脚本")
print("="*60)
print(f"股票: {', '.join(symbols)}")
print(f"时间: {__import__('datetime').datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
# 收集所有分析结果
results = []
for symbol in symbols:
result = await analyze(symbol.upper(), send_notification=True)
results.append(result)
# 生成汇总报告并发送通知
await send_summary_notification_async(results)
print("\n" + "="*60)
print("✅ 分析完成")
print("="*60)
async def send_summary_notification_async(results: list):
"""异步发送汇总通知"""
from app.config import get_settings
settings = get_settings()
threshold = settings.stock_llm_threshold * 100 # 获取阈值
total = len(results)
with_signals = [r for r in results if r.get('signals')]
notified = [r for r in results if r.get('notified')]
# 区分美股和港股
us_results = [r for r in results if not r['symbol'].endswith('.HK')]
hk_results = [r for r in results if r['symbol'].endswith('.HK')]
# 统计信号
buy_count = 0
sell_count = 0
high_quality_signals = [] # A/B级信号且达到阈值
all_signals = [] # 所有信号
for r in with_signals:
for sig in r.get('signals', []):
sig['symbol'] = r['symbol']
sig['current_price'] = r.get('price', 0)
sig['is_hk'] = r['symbol'].endswith('.HK')
sig['stock_name'] = r.get('stock_name', '')
all_signals.append(sig)
if sig.get('action') == 'buy':
buy_count += 1
elif sig.get('action') == 'sell':
sell_count += 1
# 只统计达到阈值的A/B级信号
if sig.get('grade') in ['A', 'B'] and sig.get('confidence', 0) >= threshold:
high_quality_signals.append(sig)
# 按置信度排序
high_quality_signals.sort(key=lambda x: x.get('confidence', 0), reverse=True)
all_signals.sort(key=lambda x: x.get('confidence', 0), reverse=True)
# 打印汇总
print("\n" + "="*80)
print("📊 股票分析汇总报告")
print("="*80)
print(f"分析总数: {total} 只 (美股: {len(us_results)}, 港股: {len(hk_results)})")
print(f"有信号: {len(with_signals)}")
print(f"已通知: {len(notified)}")
print(f"通知阈值: {threshold}%")
print("")
# 显示高等级信号(达到阈值的)
if high_quality_signals:
print(f"⭐ 高等级信号达到阈值 (A/B级 >= {threshold}%): {len(high_quality_signals)}")
for sig in high_quality_signals[:10]:
symbol = sig['symbol']
stock_name = sig.get('stock_name', '')
market_tag = '[港股]' if sig.get('is_hk') else '[美股]'
action = '🟢 做多' if sig.get('action') == 'buy' else '🔴 做空'
grade = sig.get('grade', 'D')
confidence = sig.get('confidence', 0)
price = sig.get('current_price', 0)
entry = sig.get('entry_price', 0)
# 构建带名称的股票显示
symbol_display = f"{stock_name}({symbol})" if stock_name else symbol
print(f" {market_tag} {symbol_display} {action} [{grade}级] {confidence}% @ ${price:,.2f}")
if entry > 0:
print(f" 入场: ${entry:,.2f}")
print("")
# 显示未达到阈值但质量不错的信号
below_threshold = [s for s in all_signals
if s.get('grade') in ['A', 'B'] and s.get('confidence', 0) < threshold]
if below_threshold:
print(f"⚠️ 以下信号未达到通知阈值 ({threshold}%):")
for sig in below_threshold[:10]:
symbol = sig['symbol']
stock_name = sig.get('stock_name', '')
market_tag = '[港股]' if sig.get('is_hk') else '[美股]'
action = '🟢 做多' if sig.get('action') == 'buy' else '🔴 做空'
grade = sig.get('grade', 'D')
confidence = sig.get('confidence', 0)
# 构建带名称的股票显示
symbol_display = f"{stock_name}({symbol})" if stock_name else symbol
print(f" {market_tag} {symbol_display} {action} {grade}{confidence}%")
print("")
# 统计汇总
print(f"📈 做多信号: {buy_count}")
print(f"📉 做空信号: {sell_count}")
print("="*80)
# 发送汇总通知
await send_summary_notification(
results, total, with_signals, notified,
buy_count, sell_count, high_quality_signals, all_signals, threshold,
len(us_results), len(hk_results)
)
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\n\n⚠️ 用户中断")

View File

@ -1,215 +0,0 @@
#!/usr/bin/env python3
"""
测试组合波动率过滤功能
测试场景
1. 低波动率1小时和5分钟都低- 应该跳过分析
2. 高波动率1小时高- 应该允许分析
3. 突发波动1小时低但5分钟高- 应该允许分析
"""
import sys
import pandas as pd
from pathlib import Path
# 添加项目路径
backend_dir = Path(__file__).parent.parent / "backend"
sys.path.insert(0, str(backend_dir))
from app.crypto_agent.crypto_agent import CryptoAgent
def create_test_data_1h_low_volatility():
"""
场景1: 低波动率
1小时K线价格在 50000 - 50040 之间波动波动率约0.08%
5分钟K线价格稳定无突发波动
"""
import numpy as np
# 创建1小时数据20根K线- 价格波动范围很小
base_price = 50000
timestamps_1h = pd.date_range(end='2024-01-01 12:00:00', periods=20, freq='1h')
df_1h = pd.DataFrame({
'timestamp': timestamps_1h,
'open': [base_price + i * 2 for i in range(20)], # 缓慢上涨
'high': [base_price + 20 + i * 2 for i in range(20)], # 最高价只高20
'low': [base_price - 20 + i * 2 for i in range(20)], # 最低价只低20
'close': [base_price + 10 + i * 2 for i in range(20)],
'volume': [1000] * 20
})
# 创建5分钟数据3根K线- 价格稳定
timestamps_5m = pd.date_range(end='2024-01-01 12:00:00', periods=3, freq='5min')
df_5m = pd.DataFrame({
'timestamp': timestamps_5m,
'open': [base_price + 38, base_price + 39, base_price + 40],
'high': [base_price + 42, base_price + 43, base_price + 44],
'low': [base_price + 36, base_price + 37, base_price + 38],
'close': [base_price + 39, base_price + 40, base_price + 41],
'volume': [100] * 3
})
return {
'1h': df_1h,
'5m': df_5m,
'15m': df_1h,
'4h': df_1h
}
def create_test_data_1h_high_volatility():
"""
场景2: 高波动率1小时趋势活跃
1小时K线价格在 50000 - 51000 之间波动波动率约2%
"""
import numpy as np
# 创建1小时数据20根K线
timestamps_1h = pd.date_range(end='2024-01-01 12:00:00', periods=20, freq='1h')
df_1h = pd.DataFrame({
'timestamp': timestamps_1h,
'open': [50000 + i * 50 for i in range(20)],
'high': [50200 + i * 50 for i in range(20)],
'low': [49800 + i * 50 for i in range(20)],
'close': [50100 + i * 50 for i in range(20)],
'volume': [1000] * 20
})
# 创建5分钟数据
timestamps_5m = pd.date_range(end='2024-01-01 12:00:00', periods=3, freq='5min')
df_5m = pd.DataFrame({
'timestamp': timestamps_5m,
'open': [50980, 50985, 50990],
'high': [51000, 51005, 51010],
'low': [50975, 50980, 50985],
'close': [50985, 50990, 50995],
'volume': [100] * 3
})
return {
'1h': df_1h,
'5m': df_5m,
'15m': df_1h,
'4h': df_1h
}
def create_test_data_5m_surge():
"""
场景3: 突发波动1小时低但5分钟高
1小时K线价格在 50000 - 50040 之间波动波动率约0.08%
5分钟K线价格从 50000 突然涨到 50600涨幅约1.2%
"""
import numpy as np
# 创建1小时数据20根K线- 低波动
base_price = 50000
timestamps_1h = pd.date_range(end='2024-01-01 12:00:00', periods=20, freq='1h')
df_1h = pd.DataFrame({
'timestamp': timestamps_1h,
'open': [base_price + i * 2 for i in range(20)],
'high': [base_price + 20 + i * 2 for i in range(20)],
'low': [base_price - 20 + i * 2 for i in range(20)],
'close': [base_price + 10 + i * 2 for i in range(20)],
'volume': [1000] * 20
})
# 创建5分钟数据3根K线- 突发波动
# 第一根K线从正常价格开始然后突然暴涨
timestamps_5m = pd.date_range(end='2024-01-01 12:00:00', periods=3, freq='5min')
df_5m = pd.DataFrame({
'timestamp': timestamps_5m,
'open': [base_price + 38, base_price + 300, base_price + 600], # 突然暴涨
'high': [base_price + 42, base_price + 350, base_price + 650],
'low': [base_price + 36, base_price + 250, base_price + 550],
'close': [base_price + 40, base_price + 600, base_price + 620], # 第一根收盘价接近开盘价,然后暴涨
'volume': [100, 200, 300] # 成交量也放大
})
return {
'1h': df_1h,
'5m': df_5m,
'15m': df_1h,
'4h': df_1h
}
def test_volatility_filter():
"""测试波动率过滤功能"""
print("\n" + "=" * 60)
print("🧪 测试组合波动率过滤功能")
print("=" * 60)
# 创建 CryptoAgent 实例
agent = CryptoAgent()
# 测试场景1: 低波动率
print("\n📋 场景1: 低波动率1小时和5分钟都低")
print("-" * 60)
data1 = create_test_data_1h_low_volatility()
should_analyze1, reason1, vol1 = agent._check_volatility('BTCUSDT', data1)
print(f"结果: {'✅ 允许分析' if should_analyze1 else '⏸️ 跳过分析'}")
print(f"原因: {reason1}")
print(f"1小时波动率: {vol1:.2f}%")
print(f"预期: ⏸️ 跳过分析(波动率过低)")
print(f"测试: {'✅ 通过' if not should_analyze1 else '❌ 失败'}")
# 测试场景2: 高波动率
print("\n📋 场景2: 高波动率1小时趋势活跃")
print("-" * 60)
data2 = create_test_data_1h_high_volatility()
should_analyze2, reason2, vol2 = agent._check_volatility('BTCUSDT', data2)
print(f"结果: {'✅ 允许分析' if should_analyze2 else '⏸️ 跳过分析'}")
print(f"原因: {reason2}")
print(f"1小时波动率: {vol2:.2f}%")
print(f"预期: ✅ 允许分析1小时趋势活跃")
print(f"测试: {'✅ 通过' if should_analyze2 else '❌ 失败'}")
# 测试场景3: 突发波动
print("\n📋 场景3: 突发波动1小时低但5分钟高")
print("-" * 60)
data3 = create_test_data_5m_surge()
# 打印5分钟数据用于调试
df_5m_3 = data3['5m']
print(f"5分钟K线数据:")
for idx, row in df_5m_3.iterrows():
print(f" 开: {row['open']:.2f}, 高: {row['high']:.2f}, 低: {row['low']:.2f}, 收: {row['close']:.2f}")
# 手动计算价格变化
price_start = float(df_5m_3.iloc[0]['close'])
price_end = float(df_5m_3.iloc[-1]['close'])
price_change_5m = abs(price_end - price_start) / price_start * 100
print(f"5分钟价格变化: {price_start:.2f} -> {price_end:.2f} = {price_change_5m:.2f}%")
should_analyze3, reason3, vol3 = agent._check_volatility('BTCUSDT', data3)
print(f"结果: {'✅ 允许分析' if should_analyze3 else '⏸️ 跳过分析'}")
print(f"原因: {reason3}")
print(f"1小时波动率: {vol3:.2f}%")
print(f"预期: ✅ 允许分析5分钟突发波动")
print(f"测试: {'✅ 通过' if should_analyze3 else '❌ 失败'}")
# 总结
print("\n" + "=" * 60)
print("📊 测试总结")
print("=" * 60)
all_passed = (
not should_analyze1 and # 场景1应该跳过
should_analyze2 and # 场景2应该允许
should_analyze3 # 场景3应该允许
)
if all_passed:
print("✅ 所有测试通过!")
print("\n组合波动率过滤功能正常工作:")
print(" • 低波动率时正确跳过分析节省API调用")
print(" • 高波动率时正确触发分析(捕捉趋势机会)")
print(" • 突发波动时正确触发分析(捕捉突发机会)")
else:
print("❌ 部分测试失败,请检查逻辑")
return all_passed
if __name__ == "__main__":
success = test_volatility_filter()
sys.exit(0 if success else 1)

View File

@ -1,83 +0,0 @@
"""
龙回头选股器测试脚本
运行: cd backend && python ../tests/test_pullback_selector.py
"""
import asyncio
import sys
import os
from pathlib import Path
# 添加 backend 目录到 Python 路径
backend_dir = Path(__file__).parent.parent / "backend"
sys.path.insert(0, str(backend_dir))
# 切换到 backend 目录(确保 .env 能被找到)
os.chdir(backend_dir)
from app.config import get_settings
from app.astock_agent.pullback_selector import get_pullback_selector
import logging
# 启用详细日志
logging.getLogger('stock_agent').setLevel(logging.DEBUG)
async def test_pullback_selector():
"""测试龙回头选股器"""
print("=" * 60)
print("龙回头选股器测试")
print("=" * 60)
settings = get_settings()
print(f"\n配置:")
print(f" Tushare Token: {'已配置' if settings.tushare_token else '未配置'}")
print(f" 选股启用: {settings.pullback_selector_enabled}")
print(f" 检查板块数: {settings.pullback_sectors_to_check}")
if not settings.tushare_token:
print("\n❌ 错误: 请先在 .env 中配置 TUSHARE_TOKEN")
return
# 创建选股器
selector = get_pullback_selector()
print(f"\n开始选股...")
print(f"检查板块数: {settings.pullback_sectors_to_check}")
print(f"每个板块最多选: 2 只\n")
# 执行选股
results = selector.select_from_hot_sectors(top_n=settings.pullback_sectors_to_check)
# 显示结果
if results:
print("\n" + "=" * 60)
print("选股结果")
print("=" * 60)
for sector_name, stocks in results.items():
print(f"\n{sector_name}")
for stock in stocks:
print(f" 📈 {stock['name']}({stock['ts_code']})")
print(f" 现价: ¥{stock['close']:.2f} | 回踩: {stock['pullback_pct']:.2f}% | 涨幅: {stock['rise_pct']:.2f}%")
print(f" MA5: ¥{stock['ma5']:.2f} | MA10: ¥{stock['ma10']:.2f} | MA30: ¥{stock['ma30']:.2f} | MA60: ¥{stock['ma60']:.2f}")
print(f" 缩量比: {stock['volume_shrink_ratio']:.0%} | 再放量: {stock['recent_volume_ratio']:.2f}x")
total = sum(len(stocks) for stocks in results.values())
print(f"\n共选出 {total} 只股票")
# 格式化完整消息
print("\n" + "=" * 60)
print("通知消息预览")
print("=" * 60)
print(selector.format_result(results))
else:
print("\n未选出符合条件的股票")
print("\n" + "=" * 60)
print("测试完成")
print("=" * 60)
if __name__ == "__main__":
asyncio.run(test_pullback_selector())