trading.ai/quick_test.py
aaron 283901df18 Initial commit: A股量化交易系统
主要功能:
- K线形态策略: 两阳+阴+阳突破形态识别
- 信号时间修复: 使用K线时间而非发送时间
- 换手率约束: 最后阳线换手率不超过40%
- 汇总通知: 钉钉webhook单次发送所有信号
- 数据获取: 支持AKShare数据源
- 舆情分析: 北向资金、热门股票等

技术特性:
- 支持日线/周线/月线多时间周期
- EMA20趋势确认
- 实体比例验证
- 突破价格确认
- 流动性约束检查

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-09-16 15:59:48 +08:00

99 lines
3.1 KiB
Python

#!/usr/bin/env python3
"""
K线形态策略快速测试
"""
import sys
from pathlib import Path
# 将src目录添加到Python路径
current_dir = Path(__file__).parent
src_dir = current_dir / "src"
sys.path.insert(0, str(src_dir))
from src.data.data_fetcher import ADataFetcher
from src.utils.notification import NotificationManager
from src.strategy.kline_pattern_strategy import KLinePatternStrategy
def quick_test():
"""快速测试策略功能"""
print("🚀 K线形态策略快速测试")
print("=" * 50)
# 配置
strategy_config = {
'min_entity_ratio': 0.55,
'timeframes': ['daily'],
'scan_stocks_count': 3, # 只测试3只股票
'analysis_days': 30
}
notification_config = {
'dingtalk': {
'enabled': False, # 测试时关闭钉钉通知
'webhook_url': ''
}
}
try:
# 初始化组件
print("📊 初始化组件...")
data_fetcher = ADataFetcher()
notification_manager = NotificationManager(notification_config)
strategy = KLinePatternStrategy(data_fetcher, notification_manager, strategy_config)
# 测试1: 单股分析
print("\n🔍 测试1: 单股K线形态分析")
test_stock = "000001.SZ"
results = strategy.analyze_stock(test_stock)
total_signals = sum(len(signals) for signals in results.values())
print(f"{test_stock} 分析完成: {total_signals} 个信号")
# 测试2: 市场扫描
print("\n🌍 测试2: 市场形态扫描")
market_results = strategy.scan_market(max_stocks=3)
total_stocks_with_signals = len(market_results)
total_market_signals = sum(
sum(len(signals) for signals in stock_results.values())
for stock_results in market_results.values()
)
print(f"✅ 市场扫描完成: {total_stocks_with_signals} 只股票有信号,共 {total_market_signals} 个信号")
# 测试3: 通知功能
print("\n📱 测试3: 通知系统")
notification_success = notification_manager.send_strategy_signal(
stock_code="TEST001",
stock_name="测试股票",
timeframe="daily",
signal_type="快速测试信号",
price=12.34,
additional_info={
"测试类型": "快速验证",
"状态": "正常"
}
)
print(f"✅ 通知系统测试完成: {'成功' if notification_success else '失败(正常,未配置钉钉)'}")
print("\n🎉 所有测试通过!")
print("\n📝 使用方法:")
print(" python main.py # 启动完整系统")
print(" python test_strategy.py # 详细功能测试")
print("\n⚙️ 配置钉钉通知:")
print(" 1. 在钉钉群中添加自定义机器人")
print(" 2. 复制webhook地址到 config/config.yaml")
print(" 3. 设置 notification.dingtalk.enabled: true")
except Exception as e:
print(f"❌ 测试失败: {e}")
return False
return True
if __name__ == "__main__":
success = quick_test()
sys.exit(0 if success else 1)