crypto.ai/schedule_task.py
2025-04-28 14:53:05 +08:00

140 lines
4.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
加密货币分析智能体定时任务
支持以下配置:
- 定时执行:设置每天指定时间执行分析任务
- 周期执行:设置每隔一定时间执行一次分析任务
- 同时支持多个智能体crypto和gold
使用方法:
python schedule_task.py # 使用默认配置执行
python schedule_task.py --agent crypto # 指定执行加密货币智能体
python schedule_task.py --agent gold # 指定执行黄金智能体
python schedule_task.py --both # 同时执行两种智能体
python schedule_task.py --time "12:00" # 设置每天执行的时间
python schedule_task.py --interval 240 # 设置执行间隔(分钟)
"""
import os
import sys
import time
import argparse
import schedule
import subprocess
from datetime import datetime
def parse_arguments():
"""解析命令行参数"""
parser = argparse.ArgumentParser(description='加密货币分析智能体定时任务')
parser.add_argument('--agent', type=str, default='crypto',
choices=['crypto', 'gold'],
help='要执行的智能体类型,默认为加密货币智能体')
parser.add_argument('--both', action='store_true',
help='同时执行两种智能体')
parser.add_argument('--time', type=str, default=None,
help='每天执行任务的时间格式为HH:MM例如 "08:30"')
parser.add_argument('--interval', type=int, default=360,
help='自动执行间隔分钟默认为360分钟6小时')
return parser.parse_args()
def run_agent(agent_type='crypto'):
"""
执行智能体
Args:
agent_type: 智能体类型,可选 'crypto''gold'
"""
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"[{current_time}] 执行{agent_type}智能体...")
try:
# 构建命令
cmd = ["python", "run.py", f"--agent={agent_type}", "--run-once"]
# 执行命令
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True
)
# 实时输出日志
while True:
output = process.stdout.readline()
if output == '' and process.poll() is not None:
break
if output:
print(output.strip())
# 获取执行结果
_, stderr = process.communicate()
# 检查是否有错误
if process.returncode != 0:
print(f"执行{agent_type}智能体时出错: {stderr}")
else:
print(f"[{current_time}] {agent_type}智能体执行完成")
except Exception as e:
print(f"执行{agent_type}智能体时出错: {e}")
def run_both_agents():
"""执行两种智能体"""
run_agent('crypto')
run_agent('gold')
def main():
"""主函数"""
args = parse_arguments()
# 定义任务函数
if args.both:
job_func = run_both_agents
agent_desc = "加密货币和黄金"
else:
job_func = lambda: run_agent(args.agent)
agent_desc = f"{args.agent}分析"
# 根据参数设置定时任务
if args.time:
# 设置每天固定时间执行
schedule.every().day.at(args.time).do(job_func)
print(f"已设置每天 {args.time} 执行{agent_desc}智能体")
else:
# 设置定时间隔执行
schedule.every(args.interval).minutes.do(job_func)
print(f"已设置每 {args.interval} 分钟执行一次{agent_desc}智能体")
# 立即执行一次
print("立即执行一次任务...")
job_func()
# 循环等待下次执行
while True:
# 检查是否有定时任务需要执行
schedule.run_pending()
# 打印下次执行时间
next_run = schedule.next_run().strftime("%Y-%m-%d %H:%M:%S")
print(f"下次执行时间: {next_run}")
# 等待一段时间再检查
time.sleep(60)
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\n定时任务已停止")
except Exception as e:
print(f"定时任务出错: {e}")
import traceback
traceback.print_exc()