import talib import numpy as np import time,setting import telegram_sender from datasource import crypto import dingding import redis_helper import discord_sender # crossover 函数:检测上穿信号 def crossover(series1, series2): return (series1 > series2) & (series1.shift(1) <= series2.shift(1)) # crossunder 函数:检测下穿信号 def crossunder(series1, series2): return (series1 < series2) & (series1.shift(1) >= series2.shift(1)) ## 检查信号 def stratergy_run(symbol, interval, df, debug): macd_fast_length = 12 macd_slow_length = 26 macd_signal_length = 9 bollinger_length = 20 bollinger_mult = 2.0 # 计算EMA 144, 200 fast_ema = talib.EMA(df['close'], timeperiod=144) slow_ema = talib.EMA(df['close'], timeperiod=200) # 计算 MACD macd, macd_signal, macd_hist = talib.MACD(df['close'], fastperiod=macd_fast_length, slowperiod=macd_slow_length, signalperiod=macd_signal_length) # 计算布林带 upperband, middleband, lowerband = talib.BBANDS(df['close'], timeperiod=bollinger_length, nbdevup=bollinger_mult, nbdevdn=bollinger_mult, matype=0) # 生成交易信号 long_condition = (macd.shift(1) <= 0) & (macd > 0) & (df['high'] < upperband) & (df['close'] > fast_ema | df['close'] > slow_ema) short_condition = (macd.shift(1) >= 0) & (macd < 0) & (df['low'] > lowerband) & (df['close'] < fast_ema | df['close'] < slow_ema) df['Long_Signal'] = np.where(long_condition, True, False) df['Short_Signal'] = np.where(short_condition,True, False) latest = df.iloc[-2] print(f"【{symbol} - {interval}】 is checked!") direction = "" if latest['Short_Signal'] == True: direction = '空' if latest['Long_Signal'] == True: direction = '多' if direction != "": message = f"策略:【MACD+BOLL策略V1.1】\r\n" message += f"品种: {symbol}\r\n" message += f"周期: {interval}\r\n" message += f"信号: 【{direction}】\r\n" message += f"收盘价: {latest['close']}\r\n" if debug == False: url = 'https://discordapp.com/api/webhooks/1286585288986198048/iQ-yr26ViW45GM48ql8rPu70Iumqcmn_XXAmxAcKGeiQBmTQoDPeq-TmlChvIHkw_HJ-' discord_sender.send_message(url,'🦄信号提醒🦄',message) print(f"【{symbol} - {interval}】 is singal fired!") else: print(message) def run_crypto(interval, debug=False): print('Vegas策略运行.') time.sleep(5) for s in redis_helper.get_symbols(): df = crypto.get_klines(s, interval, True) stratergy_run(s,interval, df, debug) time.sleep(1)