import talib import numpy as np import time,setting import telegram_sender from datasource import crypto import dingding import discord_sender # crossover 函数:检测上穿信号 def crossover(series1, series2): return (series1 > series2) & (series1.shift(1) <= series2.shift(1)) # crossunder 函数:检测下穿信号 def crossunder(series1, series2): return (series1 < series2) & (series1.shift(1) >= series2.shift(1)) ## 检查信号 def stratergy_run(symbol, interval, df, debug): ## 计算 ema df['ema13'] = talib.EMA(df['close'], timeperiod=13) df['ema144'] = talib.EMA(df['close'], timeperiod=144) df['ema169'] = talib.EMA(df['close'], timeperiod=169) df['ema576'] = talib.EMA(df['close'], timeperiod=576) df['ema676'] = talib.EMA(df['close'], timeperiod=676) maxline = np.maximum(df['ema144'], df['ema169']) minline = np.minimum(df['ema144'], df['ema169']) shortArragement = df['ema13'] < minline longArrangement = df['ema13'] > maxline confuse = (df['ema13'] >= minline) & (df['ema13'] <= maxline) crossRatio = 0.2 shortA = crossunder(df['ema13'], minline) shortB = shortArragement & crossunder(df['close'], minline) & (((minline - df['close']) / df.apply(lambda x: abs(x['open'] - x['close']), axis=1)) > crossRatio) df['shortResut'] = (shortA | shortB) & (confuse == False) longA = crossover(df['ema13'], maxline) longB = longArrangement & crossover(df['close'], maxline) & (((df['close'] - maxline) / df.apply(lambda x: abs(x['open'] - x['close']), axis=1)) > crossRatio) df['longResut'] = (longA | longB) & (confuse == False) latest = df.iloc[-1] direction = "" if latest['shortResut']==True: direction = '空' if latest['longResut']==True: direction = '多' message = f"策略:【Vegas反转策略】\r\n" message += f"品种: {symbol}\r\n" message += f"周期: {interval}\r\n" message += f"信号: 【{direction}】\r\n" message += f"收盘价: {latest['close']}\r\n" print(f"【{symbol} - {interval}】 is checked!") if direction != "": if debug == False: url = 'https://discordapp.com/api/webhooks/1285867836454998077/JNAFUyur_ygOJIh6C4beUAVYMpm1TZf4IEeMn8Q1p0TglO1Hjyiu2LQqiU5AxVovWyiO' discord_sender.send_message(url,'🦄信号提醒🦄',message) print(f"【{symbol} - {interval}】 is singal fired!") else: print(message) def run_crypto(interval, debug=False): for s in setting.symbols: df = crypto.get_klines(s, interval) stratergy_run(s,interval, df, debug) time.sleep(1)