import schedule import time import logging from datetime import datetime, timezone, timedelta from coin_selection_engine import CoinSelectionEngine import threading # 东八区时区 BEIJING_TZ = timezone(timedelta(hours=8)) def get_beijing_time(): """获取当前东八区时间用于显示""" return datetime.now(BEIJING_TZ) class ScheduledCoinSelector: def __init__(self, api_key=None, secret=None, db_path="trading.db"): """初始化定时选币器""" self.engine = CoinSelectionEngine(api_key, secret, db_path) self.is_running = False self.scheduler_thread = None # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('coin_selection.log'), logging.StreamHandler() ] ) self.logger = logging.getLogger(__name__) def scheduled_selection_job(self): """定时选币任务""" self.logger.info("开始执行定时选币任务...") try: # 执行选币 selected_coins = self.engine.run_coin_selection() if selected_coins: self.logger.info(f"定时选币完成,共选出{len(selected_coins)}个币种") # 打印选币结果 print("\n" + "="*60) print(f"定时选币结果 - {get_beijing_time().strftime('%Y-%m-%d %H:%M:%S CST')}") print("="*60) for i, signal in enumerate(selected_coins, 1): print(f"{i}. {signal.symbol} - 评分: {signal.score:.1f}") print(f" 入场: ${signal.entry_price:.4f}") print(f" 止损: ${signal.stop_loss:.4f}") print(f" 止盈: ${signal.take_profit:.4f}") print(f" 理由: {signal.reason}") print("-" * 40) else: self.logger.warning("本次定时选币未找到符合条件的币种") except Exception as e: self.logger.error(f"定时选币任务执行失败: {e}") def start_scheduler(self): """启动定时器""" if self.is_running: self.logger.warning("定时器已经在运行中") return self.logger.info("启动定时选币器...") # 设置定时任务 # 每天8点和20点执行选币 schedule.every().day.at("08:00").do(self.scheduled_selection_job) schedule.every().day.at("20:00").do(self.scheduled_selection_job) # 也可以设置每隔4小时执行一次 # schedule.every(4).hours.do(self.scheduled_selection_job) self.is_running = True # 在单独的线程中运行调度器 def run_scheduler(): while self.is_running: schedule.run_pending() time.sleep(60) # 每分钟检查一次 self.scheduler_thread = threading.Thread(target=run_scheduler, daemon=True) self.scheduler_thread.start() self.logger.info("定时选币器已启动,计划每日8:00和20:00执行") def stop_scheduler(self): """停止定时器""" if not self.is_running: self.logger.warning("定时器未在运行") return self.logger.info("停止定时选币器...") self.is_running = False schedule.clear() if self.scheduler_thread and self.scheduler_thread.is_alive(): self.scheduler_thread.join(timeout=5) self.logger.info("定时选币器已停止") def run_once(self): """立即执行一次选币""" self.logger.info("立即执行选币...") self.scheduled_selection_job() def get_next_run_time(self): """获取下次执行时间""" jobs = schedule.get_jobs() if jobs: next_run = min(job.next_run for job in jobs) # 转换为东八区时间显示 beijing_time = next_run.replace(tzinfo=timezone.utc).astimezone(BEIJING_TZ) return beijing_time.strftime('%Y-%m-%d %H:%M:%S CST') return "无计划任务" def main(): """主函数""" import argparse parser = argparse.ArgumentParser(description='加密货币定时选币器') parser.add_argument('--api-key', help='Binance API Key') parser.add_argument('--secret', help='Binance Secret Key') parser.add_argument('--db-path', default='trading.db', help='数据库路径') parser.add_argument('--run-once', action='store_true', help='立即执行一次选币') parser.add_argument('--daemon', action='store_true', help='以守护进程方式运行') args = parser.parse_args() # 创建定时选币器 selector = ScheduledCoinSelector(args.api_key, args.secret, args.db_path) if args.run_once: # 立即执行一次 selector.run_once() elif args.daemon: # 启动定时器并保持运行 selector.start_scheduler() try: print("定时选币器正在运行...") print(f"下次执行时间: {selector.get_next_run_time()}") print("按 Ctrl+C 停止") while True: time.sleep(1) except KeyboardInterrupt: print("\n收到停止信号...") selector.stop_scheduler() print("定时选币器已停止") else: print("请使用 --run-once 立即执行或 --daemon 启动定时器") print("示例:") print(" python scheduler.py --run-once") print(" python scheduler.py --daemon") if __name__ == "__main__": main()