77 lines
2.6 KiB
Python
77 lines
2.6 KiB
Python
import talib
|
||
import numpy as np
|
||
import time,setting
|
||
import telegram_sender
|
||
from datasource import crypto
|
||
import dingding
|
||
import redis_helper
|
||
import discord_sender
|
||
|
||
|
||
# crossover 函数:检测上穿信号
|
||
def crossover(series1, series2):
|
||
return (series1 > series2) & (series1.shift(1) <= series2.shift(1))
|
||
|
||
# crossunder 函数:检测下穿信号
|
||
def crossunder(series1, series2):
|
||
return (series1 < series2) & (series1.shift(1) >= series2.shift(1))
|
||
|
||
## 检查信号
|
||
def stratergy_run(symbol, interval, df, debug):
|
||
|
||
macd_fast_length = 12
|
||
macd_slow_length = 26
|
||
macd_signal_length = 9
|
||
bollinger_length = 20
|
||
bollinger_mult = 2.0
|
||
|
||
# 计算EMA 144, 200
|
||
fast_ema = talib.EMA(df['close'], timeperiod=144)
|
||
slow_ema = talib.EMA(df['close'], timeperiod=200)
|
||
|
||
# 计算 MACD
|
||
macd, macd_signal, macd_hist = talib.MACD(df['close'], fastperiod=macd_fast_length, slowperiod=macd_slow_length, signalperiod=macd_signal_length)
|
||
|
||
# 计算布林带
|
||
upperband, middleband, lowerband = talib.BBANDS(df['close'], timeperiod=bollinger_length, nbdevup=bollinger_mult, nbdevdn=bollinger_mult, matype=0)
|
||
|
||
# 生成交易信号
|
||
long_condition = (macd.shift(1) <= 0) & (macd > 0) & (df['high'] < upperband) & ((df['close'] > fast_ema) | (df['close'] > slow_ema))
|
||
short_condition = (macd.shift(1) >= 0) & (macd < 0) & (df['low'] > lowerband) & ((df['close'] < fast_ema) | (df['close'] < slow_ema))
|
||
|
||
|
||
df['Long_Signal'] = np.where(long_condition, True, False)
|
||
df['Short_Signal'] = np.where(short_condition,True, False)
|
||
|
||
latest = df.iloc[-2]
|
||
print(f"【{symbol} - {interval}】 is checked!")
|
||
|
||
direction = ""
|
||
if latest['Short_Signal'] == True:
|
||
direction = '空'
|
||
if latest['Long_Signal'] == True:
|
||
direction = '多'
|
||
if direction != "":
|
||
message = f"策略:【MACD+BOLL策略V1.1】\r\n"
|
||
message += f"品种: {symbol}\r\n"
|
||
message += f"周期: {interval}\r\n"
|
||
message += f"信号: 【{direction}】\r\n"
|
||
message += f"收盘价: {latest['close']}\r\n"
|
||
|
||
if debug == False:
|
||
url = 'https://discordapp.com/api/webhooks/1286585288986198048/iQ-yr26ViW45GM48ql8rPu70Iumqcmn_XXAmxAcKGeiQBmTQoDPeq-TmlChvIHkw_HJ-'
|
||
discord_sender.send_message(url,'🦄信号提醒🦄',message)
|
||
|
||
print(f"【{symbol} - {interval}】 is singal fired!")
|
||
else:
|
||
print(message)
|
||
|
||
|
||
def run_crypto(interval, debug=False):
|
||
print('Vegas策略运行.')
|
||
time.sleep(5)
|
||
for s in redis_helper.get_symbols():
|
||
df = crypto.get_klines(s, interval, True)
|
||
stratergy_run(s,interval, df, debug)
|
||
|
||
time.sleep(1) |