#!/usr/bin/env python3 """ 钉钉通知快速设置和测试脚本 """ import os import sys from coin_selection_engine import CoinSelectionEngine def setup_dingtalk(): """设置钉钉webhook""" print("🤖 加密货币选币系统 - 钉钉通知设置") print("=" * 50) # 获取当前配置 current_webhook = os.getenv('DINGTALK_WEBHOOK_URL') current_secret = os.getenv('DINGTALK_WEBHOOK_SECRET') if current_webhook: print(f"当前配置的webhook: {current_webhook[:50]}...") if current_secret: print("已配置加签密钥(安全模式)") use_current = input("是否使用当前配置? (y/n): ").lower().strip() if use_current == 'y': return current_webhook, current_secret # 输入新的webhook print("\n📱 请按以下步骤获取钉钉机器人webhook URL:") print("1. 在钉钉群中添加机器人") print("2. 选择'自定义机器人'") print("3. 复制webhook URL") print("4. 如果启用了加签安全,也复制密钥") print() webhook_url = input("请输入钉钉机器人webhook URL: ").strip() if not webhook_url: print("❌ 未输入webhook URL") return None, None if not webhook_url.startswith('https://oapi.dingtalk.com/robot/send'): print("❌ webhook URL格式不正确") return None, None # 询问是否使用加签 use_sign = input("是否启用了加签安全设置? (y/n): ").lower().strip() webhook_secret = None if use_sign == 'y': webhook_secret = input("请输入加签密钥: ").strip() if webhook_secret and not webhook_secret.startswith('SEC'): print("⚠️ 加签密钥通常以'SEC'开头,请确认输入正确") # 保存到环境变量 (临时) os.environ['DINGTALK_WEBHOOK_URL'] = webhook_url if webhook_secret: os.environ['DINGTALK_WEBHOOK_SECRET'] = webhook_secret security_mode = "加签模式" if webhook_secret else "普通模式" print(f"✅ 钉钉通知配置完成 ({security_mode})") return webhook_url, webhook_secret def test_notification(): """测试钉钉通知""" print("\n🧪 测试钉钉通知...") try: # 创建选币引擎实例 engine = CoinSelectionEngine() # 发送测试消息 success = engine.test_dingtalk_notification() if success: print("✅ 钉钉通知测试成功!") return True else: print("❌ 钉钉通知测试失败") return False except Exception as e: print(f"❌ 测试过程中出错: {e}") return False def run_selection_with_notification(): """运行选币并发送通知""" print("\n🎯 开始执行选币...") try: engine = CoinSelectionEngine() selected_coins = engine.run_coin_selection() print(f"\n📊 选币完成,共发现 {len(selected_coins)} 个投资机会") # 打印简要结果 if selected_coins: long_signals = [s for s in selected_coins if s.signal_type == "LONG"] short_signals = [s for s in selected_coins if s.signal_type == "SHORT"] print(f"多头信号: {len(long_signals)}个") print(f"空头信号: {len(short_signals)}个") print("\n前5个信号:") for i, signal in enumerate(selected_coins[:5], 1): direction = "📈多头" if signal.signal_type == "LONG" else "📉空头" print(f"{i}. {signal.symbol} {direction} - {signal.score:.1f}分 ({signal.confidence}信心)") return selected_coins except Exception as e: print(f"❌ 选币过程中出错: {e}") return [] def create_env_file(webhook_url, webhook_secret=None): """创建 .env 文件保存配置""" env_content = f"""# 钉钉机器人配置 DINGTALK_WEBHOOK_URL={webhook_url} """ if webhook_secret: env_content += f"DINGTALK_WEBHOOK_SECRET={webhook_secret}\n" env_content += """ # 选币配置 USE_MARKET_CAP_RANKING=True """ try: with open('.env', 'w', encoding='utf-8') as f: f.write(env_content) print("✅ 配置已保存到 .env 文件") except Exception as e: print(f"⚠️ 保存配置文件失败: {e}") def main(): """主函数""" if len(sys.argv) > 1: command = sys.argv[1].lower() if command == 'test': test_notification() return elif command == 'run': run_selection_with_notification() return elif command == 'setup': webhook, secret = setup_dingtalk() if webhook: create_env_file(webhook, secret) test_notification() return # 交互式菜单 while True: print("\n🤖 加密货币选币系统 - 钉钉通知") print("=" * 40) print("1. 设置钉钉webhook") print("2. 测试钉钉通知") print("3. 执行选币并发送通知") print("4. 退出") print() choice = input("请选择操作 (1-4): ").strip() if choice == '1': webhook, secret = setup_dingtalk() if webhook: create_env_file(webhook, secret) elif choice == '2': test_notification() elif choice == '3': run_selection_with_notification() elif choice == '4': print("👋 再见!") break else: print("❌ 无效选择,请重试") if __name__ == "__main__": main()