#!/usr/bin/env python3 """ 简单的数据库集成测试 """ import sys from pathlib import Path # 添加src目录到路径 current_dir = Path(__file__).parent src_dir = current_dir / "src" sys.path.insert(0, str(src_dir)) from loguru import logger from src.database.database_manager import DatabaseManager from src.utils.config_loader import ConfigLoader from src.data.data_fetcher import ADataFetcher from src.utils.notification import NotificationManager from src.strategy.kline_pattern_strategy import KLinePatternStrategy def main(): """简单测试""" logger.remove() logger.add(sys.stdout, level="INFO", format="{time:HH:mm:ss} | {level} | {message}") print("🧪 简单数据库集成测试") print("=" * 50) try: # 初始化组件 logger.info("初始化组件...") config_loader = ConfigLoader() config = config_loader.load_config() data_fetcher = ADataFetcher() notification_manager = NotificationManager(config.get('notification', {})) db_manager = DatabaseManager() # 初始化策略 kline_config = config.get('strategy', {}).get('kline_pattern', {}) strategy = KLinePatternStrategy( data_fetcher=data_fetcher, notification_manager=notification_manager, config=kline_config, db_manager=db_manager ) logger.info(f"策略ID: {strategy.strategy_id}") # 测试小规模扫描 logger.info("开始小规模扫描...") test_stocks = ["000001.SZ", "000002.SZ"] results = strategy.scan_market( stock_list=test_stocks, max_stocks=2, use_hot_stocks=False ) # 检查数据库 logger.info("检查数据库记录...") latest_signals = db_manager.get_latest_signals(limit=10) logger.info(f"数据库中的信号数: {len(latest_signals)}") if not latest_signals.empty: logger.info("信号示例:") for _, signal in latest_signals.head(3).iterrows(): logger.info(f" {signal['stock_code']} - {signal['breakout_price']:.2f}元") logger.info("✅ 测试完成") except Exception as e: logger.error(f"❌ 测试失败: {e}") import traceback traceback.print_exc() if __name__ == "__main__": main()