#!/usr/bin/env python # -*- coding: utf-8 -*- """ 加密货币分析智能体定时任务 支持以下配置: - 定时执行:设置每天指定时间执行分析任务 - 周期执行:设置每隔一定时间执行一次分析任务 - 同时支持多个智能体:crypto和gold 使用方法: python schedule_task.py # 使用默认配置执行 python schedule_task.py --agent crypto # 指定执行加密货币智能体 python schedule_task.py --agent gold # 指定执行黄金智能体 python schedule_task.py --both # 同时执行两种智能体 python schedule_task.py --time "12:00" # 设置每天执行的时间 python schedule_task.py --interval 240 # 设置执行间隔(分钟) """ import os import sys import time import argparse import schedule import subprocess from datetime import datetime def parse_arguments(): """解析命令行参数""" parser = argparse.ArgumentParser(description='加密货币分析智能体定时任务') parser.add_argument('--agent', type=str, default='crypto', choices=['crypto', 'gold'], help='要执行的智能体类型,默认为加密货币智能体') parser.add_argument('--both', action='store_true', help='同时执行两种智能体') parser.add_argument('--time', type=str, default=None, help='每天执行任务的时间,格式为HH:MM,例如 "08:30"') parser.add_argument('--interval', type=int, default=360, help='自动执行间隔(分钟),默认为360分钟(6小时)') return parser.parse_args() def run_agent(agent_type='crypto'): """ 执行智能体 Args: agent_type: 智能体类型,可选 'crypto' 或 'gold' """ current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") print(f"[{current_time}] 执行{agent_type}智能体...") try: # 构建命令 cmd = ["python", "run.py", f"--agent={agent_type}", "--run-once"] # 执行命令 process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True ) # 实时输出日志 while True: output = process.stdout.readline() if output == '' and process.poll() is not None: break if output: print(output.strip()) # 获取执行结果 _, stderr = process.communicate() # 检查是否有错误 if process.returncode != 0: print(f"执行{agent_type}智能体时出错: {stderr}") else: print(f"[{current_time}] {agent_type}智能体执行完成") except Exception as e: print(f"执行{agent_type}智能体时出错: {e}") def run_both_agents(): """执行两种智能体""" run_agent('crypto') run_agent('gold') def main(): """主函数""" args = parse_arguments() # 定义任务函数 if args.both: job_func = run_both_agents agent_desc = "加密货币和黄金" else: job_func = lambda: run_agent(args.agent) agent_desc = f"{args.agent}分析" # 根据参数设置定时任务 if args.time: # 设置每天固定时间执行 schedule.every().day.at(args.time).do(job_func) print(f"已设置每天 {args.time} 执行{agent_desc}智能体") else: # 设置定时间隔执行 schedule.every(args.interval).minutes.do(job_func) print(f"已设置每 {args.interval} 分钟执行一次{agent_desc}智能体") # 立即执行一次 print("立即执行一次任务...") job_func() # 循环等待下次执行 while True: # 检查是否有定时任务需要执行 schedule.run_pending() # 打印下次执行时间 next_run = schedule.next_run().strftime("%Y-%m-%d %H:%M:%S") print(f"下次执行时间: {next_run}") # 等待一段时间再检查 time.sleep(60) if __name__ == "__main__": try: main() except KeyboardInterrupt: print("\n定时任务已停止") except Exception as e: print(f"定时任务出错: {e}") import traceback traceback.print_exc()