trading.ai/setup_dingtalk.py
2025-08-14 10:06:19 +08:00

184 lines
5.7 KiB
Python

#!/usr/bin/env python3
"""
钉钉通知快速设置和测试脚本
"""
import os
import sys
from coin_selection_engine import CoinSelectionEngine
def setup_dingtalk():
"""设置钉钉webhook"""
print("🤖 加密货币选币系统 - 钉钉通知设置")
print("=" * 50)
# 获取当前配置
current_webhook = os.getenv('DINGTALK_WEBHOOK_URL')
current_secret = os.getenv('DINGTALK_WEBHOOK_SECRET')
if current_webhook:
print(f"当前配置的webhook: {current_webhook[:50]}...")
if current_secret:
print("已配置加签密钥(安全模式)")
use_current = input("是否使用当前配置? (y/n): ").lower().strip()
if use_current == 'y':
return current_webhook, current_secret
# 输入新的webhook
print("\n📱 请按以下步骤获取钉钉机器人webhook URL:")
print("1. 在钉钉群中添加机器人")
print("2. 选择'自定义机器人'")
print("3. 复制webhook URL")
print("4. 如果启用了加签安全,也复制密钥")
print()
webhook_url = input("请输入钉钉机器人webhook URL: ").strip()
if not webhook_url:
print("❌ 未输入webhook URL")
return None, None
if not webhook_url.startswith('https://oapi.dingtalk.com/robot/send'):
print("❌ webhook URL格式不正确")
return None, None
# 询问是否使用加签
use_sign = input("是否启用了加签安全设置? (y/n): ").lower().strip()
webhook_secret = None
if use_sign == 'y':
webhook_secret = input("请输入加签密钥: ").strip()
if webhook_secret and not webhook_secret.startswith('SEC'):
print("⚠️ 加签密钥通常以'SEC'开头,请确认输入正确")
# 保存到环境变量 (临时)
os.environ['DINGTALK_WEBHOOK_URL'] = webhook_url
if webhook_secret:
os.environ['DINGTALK_WEBHOOK_SECRET'] = webhook_secret
security_mode = "加签模式" if webhook_secret else "普通模式"
print(f"✅ 钉钉通知配置完成 ({security_mode})")
return webhook_url, webhook_secret
def test_notification():
"""测试钉钉通知"""
print("\n🧪 测试钉钉通知...")
try:
# 创建选币引擎实例
engine = CoinSelectionEngine()
# 发送测试消息
success = engine.test_dingtalk_notification()
if success:
print("✅ 钉钉通知测试成功!")
return True
else:
print("❌ 钉钉通知测试失败")
return False
except Exception as e:
print(f"❌ 测试过程中出错: {e}")
return False
def run_selection_with_notification():
"""运行选币并发送通知"""
print("\n🎯 开始执行选币...")
try:
engine = CoinSelectionEngine()
selected_coins = engine.run_coin_selection()
print(f"\n📊 选币完成,共发现 {len(selected_coins)} 个投资机会")
# 打印简要结果
if selected_coins:
long_signals = [s for s in selected_coins if s.signal_type == "LONG"]
short_signals = [s for s in selected_coins if s.signal_type == "SHORT"]
print(f"多头信号: {len(long_signals)}")
print(f"空头信号: {len(short_signals)}")
print("\n前5个信号:")
for i, signal in enumerate(selected_coins[:5], 1):
direction = "📈多头" if signal.signal_type == "LONG" else "📉空头"
print(f"{i}. {signal.symbol} {direction} - {signal.score:.1f}分 ({signal.confidence}信心)")
return selected_coins
except Exception as e:
print(f"❌ 选币过程中出错: {e}")
return []
def create_env_file(webhook_url, webhook_secret=None):
"""创建 .env 文件保存配置"""
env_content = f"""# 钉钉机器人配置
DINGTALK_WEBHOOK_URL={webhook_url}
"""
if webhook_secret:
env_content += f"DINGTALK_WEBHOOK_SECRET={webhook_secret}\n"
env_content += """
# 选币配置
USE_MARKET_CAP_RANKING=True
"""
try:
with open('.env', 'w', encoding='utf-8') as f:
f.write(env_content)
print("✅ 配置已保存到 .env 文件")
except Exception as e:
print(f"⚠️ 保存配置文件失败: {e}")
def main():
"""主函数"""
if len(sys.argv) > 1:
command = sys.argv[1].lower()
if command == 'test':
test_notification()
return
elif command == 'run':
run_selection_with_notification()
return
elif command == 'setup':
webhook, secret = setup_dingtalk()
if webhook:
create_env_file(webhook, secret)
test_notification()
return
# 交互式菜单
while True:
print("\n🤖 加密货币选币系统 - 钉钉通知")
print("=" * 40)
print("1. 设置钉钉webhook")
print("2. 测试钉钉通知")
print("3. 执行选币并发送通知")
print("4. 退出")
print()
choice = input("请选择操作 (1-4): ").strip()
if choice == '1':
webhook, secret = setup_dingtalk()
if webhook:
create_env_file(webhook, secret)
elif choice == '2':
test_notification()
elif choice == '3':
run_selection_with_notification()
elif choice == '4':
print("👋 再见!")
break
else:
print("❌ 无效选择,请重试")
if __name__ == "__main__":
main()