trading-quant/monitors/macd_boll.py
2024-09-22 11:31:04 +08:00

73 lines
2.3 KiB
Python

import talib
import numpy as np
import time,setting
import telegram_sender
from datasource import crypto
import dingding
import discord_sender
# crossover 函数:检测上穿信号
def crossover(series1, series2):
return (series1 > series2) & (series1.shift(1) <= series2.shift(1))
# crossunder 函数:检测下穿信号
def crossunder(series1, series2):
return (series1 < series2) & (series1.shift(1) >= series2.shift(1))
## 检查信号
def stratergy_run(symbol, interval, df, debug):
macd_fast_length = 12
macd_slow_length = 26
macd_signal_length = 9
bollinger_length = 20
bollinger_mult = 2.0
# 计算 MACD
macd, macd_signal, macd_hist = talib.MACD(df['close'], fastperiod=macd_fast_length, slowperiod=macd_slow_length, signalperiod=macd_signal_length)
# 计算布林带
upperband, middleband, lowerband = talib.BBANDS(df['close'], timeperiod=bollinger_length, nbdevup=bollinger_mult, nbdevdn=bollinger_mult, matype=0)
# 生成交易信号
long_condition = (macd.shift(1) <= 0) & (macd > 0) & (df['high'] < upperband)
short_condition = (macd.shift(1) >= 0) & (macd < 0) & (df['low'] > lowerband)
df['Long_Signal'] = np.where(long_condition, True, False)
df['Short_Signal'] = np.where(short_condition,True, False)
latest = df.iloc[-2]
direction = ""
if latest['Short_Signal'] == True:
direction = ''
if latest['Long_Signal'] == True:
direction = ''
if direction != "":
message = f"策略:【MACD+BOLL价格策略】\r\n"
message += f"品种: {symbol}\r\n"
message += f"周期: {interval}\r\n"
message += f"信号: 【{direction}\r\n"
message += f"收盘价: {latest['close']}\r\n"
if debug == False:
url = 'https://discordapp.com/api/webhooks/1286585288986198048/iQ-yr26ViW45GM48ql8rPu70Iumqcmn_XXAmxAcKGeiQBmTQoDPeq-TmlChvIHkw_HJ-'
discord_sender.send_message(url,'🦄信号提醒🦄',message)
print(f"{symbol} - {interval}】 is singal fired!")
else:
print(message)
def run_crypto(interval, debug=False):
print('Vegas策略运行.')
time.sleep(5)
for s in setting.future_sysmbols:
df = crypto.get_klines(s, interval, True)
stratergy_run(s,interval, df, debug)
time.sleep(1)