trading.ai/scripts/data_collector.py
2025-09-19 21:15:09 +08:00

104 lines
3.2 KiB
Python
Executable File

#!/usr/bin/env python3
"""
数据采集服务
定时运行策略扫描
"""
import time
import schedule
import sys
from pathlib import Path
# 添加项目根目录到路径
current_dir = Path(__file__).parent
project_root = current_dir.parent
sys.path.insert(0, str(project_root))
from src.strategy.kline_pattern_strategy import KLinePatternStrategy
from src.utils.config_loader import ConfigLoader
from src.data.data_fetcher import ADataFetcher
from src.utils.notification import NotificationManager
from src.database.database_manager import DatabaseManager
from loguru import logger
def run_strategy():
"""运行策略扫描"""
try:
logger.info('开始运行策略扫描...')
# 初始化各个组件
config_loader = ConfigLoader()
config = config_loader.config
# 初始化数据获取器
data_fetcher = ADataFetcher()
# 初始化通知管理器
notification_config = config.get('notification', {})
notification_manager = NotificationManager(notification_config)
# 初始化数据库管理器
db_manager = DatabaseManager()
# 初始化策略
strategy_config = config.get('strategy', {}).get('kline_pattern', {})
strategy = KLinePatternStrategy(
data_fetcher=data_fetcher,
notification_manager=notification_manager,
config=strategy_config,
db_manager=db_manager
)
# 执行市场K线形态扫描
logger.info("开始扫描市场K线形态...")
logger.info("⚠️ 注意: 这可能需要较长时间,请耐心等待")
# 获取扫描股票数量配置
scan_count = strategy.config.get('scan_stocks_count', 20)
logger.info(f"扫描股票数量: {scan_count}")
results = strategy.scan_market(max_stocks=scan_count)
if results:
logger.info(f"📈 市场扫描结果 (发现 {len(results)} 只股票有信号):")
for stock_code, stock_results in results.items():
total_signals = sum(len(signals) for signals in stock_results.values())
logger.info(f"股票: {stock_code} (共{total_signals}个信号)")
for timeframe, signals in stock_results.items():
if signals:
logger.info(f" {timeframe}: {len(signals)}个信号")
# 只显示最新的信号
latest_signal = signals[-1]
logger.info(f" 最新: {latest_signal['date']} 突破价格 {latest_signal['breakout_price']:.2f}")
else:
logger.info("未发现任何K线形态信号")
logger.info('策略扫描完成')
except Exception as e:
logger.error(f'策略运行失败: {e}')
def main():
"""主函数"""
logger.info('数据采集服务已启动')
# 每天9点和15点运行策略
schedule.every().day.at('09:00').do(run_strategy)
schedule.every().day.at('15:00').do(run_strategy)
# # 立即运行一次(用于测试)
# logger.info('立即执行一次策略扫描...')
# run_strategy()
# 主循环
while True:
schedule.run_pending()
time.sleep(60)
if __name__ == '__main__':
main()