import logging from datetime import datetime, timedelta import time import requests import json from cryptoai.utils.config_loader import ConfigLoader from cryptoai.utils.discord_bot import DiscordBot logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) class ImportantEventsMonitor: def __init__(self): self.config = ConfigLoader() self.webhook_url = self.config.get_discord_config()['important_events_webhook_url'] self.discord_bot = DiscordBot(self.webhook_url) def run(self): logger.info("获取重要事件") items = [] add_days = 0 while add_days < 14: date = datetime.now() + timedelta(days=add_days) events = self._get_important_events(date) if len(events) > 0: items.extend(events) add_days += 1 content = "" print("items length", len(items)) for item in items: # 计算事件临近程度 pub_time = datetime.strptime(item['pub_time'], '%Y-%m-%d %H:%M') time_diff = pub_time - datetime.now() hours_diff = time_diff.total_seconds() / 3600 # 根据临近程度添加标记 urgency_indicator = "" if hours_diff < 24: urgency_indicator = "🔴 今日" # 24小时内 elif hours_diff < 48: urgency_indicator = "🟠 明日" # 48小时内 else: urgency_indicator = "🟢 近期" # 其他 # 构建星级显示 star_display = "⭐" * item['star'] # 五星级数据重点标注 is_five_star = item['star'] == 5 title_prefix = "🔥 重要 🔥 " if is_five_star else "" title_suffix = " 🔥🔥🔥" if is_five_star else "" # 使用Markdown格式构建消息 content += f"### {title_prefix}{urgency_indicator} {item['country']} 经济数据{title_suffix}\n" # 五星级数据使用粗体标题 if is_five_star: content += f"### **{item['indicator_name']}** ({item['time_period']})\n\n" else: content += f"### {item['indicator_name']} ({item['time_period']})\n\n" # 使用列表格式展示数据 if item['previous'] is not None: content += f"- **前值**: {item['previous']}\n" if item['consensus'] is not None: content += f"- **预测值**: {item['consensus']}\n" # 五星级数据的重要程度使用特殊标记 if is_five_star: content += f"- **重要程度**: {star_display} (最高级别)\n" else: content += f"- **重要程度**: {star_display}\n" content += f"- **公布时间**: {item['pub_time']}\n\n" # 五星级数据添加额外强调 if is_five_star: content += "**⚠️ 此为市场高度关注的重要数据,可能引起较大波动 ⚠️**\n\n" # 添加分隔线 content += "---\n\n" print(content) self.discord_bot.send_message(content) # 获取重要事件 def _get_important_events(self, date: datetime): url = f"https://e0430d16720e4211b5e072c26205c890.z3c.jin10.com/get/data?date={date.strftime('%Y-%m-%d')}&category=cj" print(url) headers = { "x-app-id": "sKKYe29sFuJaeOCJ", "x-version": "2.0" } response = requests.get(url, headers=headers) data = response.json() if data['status'] != 200: logger.error(f"获取重要事件失败: {data['message']}") return [] items = [] for item in data['data']: if item['star'] >= 4: items.append(item) return items if __name__ == "__main__": monitor = ImportantEventsMonitor() monitor.run()