trading-quant/monitors/macd_boll.py
2024-11-11 14:39:13 +08:00

76 lines
2.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import talib
import numpy as np
import time,setting
import telegram_sender
from datasource import crypto
import dingding
import discord_sender
# crossover 函数:检测上穿信号
def crossover(series1, series2):
return (series1 > series2) & (series1.shift(1) <= series2.shift(1))
# crossunder 函数:检测下穿信号
def crossunder(series1, series2):
return (series1 < series2) & (series1.shift(1) >= series2.shift(1))
## 检查信号
def stratergy_run(symbol, interval, df, debug):
macd_fast_length = 12
macd_slow_length = 26
macd_signal_length = 9
bollinger_length = 20
bollinger_mult = 2.0
# 计算EMA 144 200
fast_ema = talib.EMA(df['close'], timeperiod=144)
slow_ema = talib.EMA(df['close'], timeperiod=200)
# 计算 MACD
macd, macd_signal, macd_hist = talib.MACD(df['close'], fastperiod=macd_fast_length, slowperiod=macd_slow_length, signalperiod=macd_signal_length)
# 计算布林带
upperband, middleband, lowerband = talib.BBANDS(df['close'], timeperiod=bollinger_length, nbdevup=bollinger_mult, nbdevdn=bollinger_mult, matype=0)
# 生成交易信号
long_condition = (macd.shift(1) <= 0) & (macd > 0) & (df['high'] < upperband) & ((df['close'] > fast_ema) | (df['close'] > slow_ema))
short_condition = (macd.shift(1) >= 0) & (macd < 0) & (df['low'] > lowerband) & ((df['close'] < fast_ema) | (df['close'] < slow_ema))
df['Long_Signal'] = np.where(long_condition, True, False)
df['Short_Signal'] = np.where(short_condition,True, False)
latest = df.iloc[-2]
print(f"{symbol} - {interval}】 is checked!")
direction = ""
if latest['Short_Signal'] == True:
direction = ''
if latest['Long_Signal'] == True:
direction = ''
if direction != "":
message = f"策略:【MACD+BOLL策略V1.1】\r\n"
message += f"品种: {symbol}\r\n"
message += f"周期: {interval}\r\n"
message += f"信号: 【{direction}\r\n"
message += f"收盘价: {latest['close']}\r\n"
if debug == False:
url = 'https://discordapp.com/api/webhooks/1286585288986198048/iQ-yr26ViW45GM48ql8rPu70Iumqcmn_XXAmxAcKGeiQBmTQoDPeq-TmlChvIHkw_HJ-'
discord_sender.send_message(url,'🦄信号提醒🦄',message)
print(f"{symbol} - {interval}】 is singal fired!")
else:
print(message)
def run_crypto(interval, debug=False):
print('Vegas策略运行.')
time.sleep(5)
for s in setting.symbols:
df = crypto.get_klines(s, interval, True)
stratergy_run(s,interval, df, debug)
time.sleep(1)