trading-quant/monitors/vegas.py
2024-07-25 00:12:02 +08:00

80 lines
2.8 KiB
Python

from binance.spot import Spot
import talib
import numpy as np
import pandas as pd
import datasource.crypto
import time,setting
import telegram_sender
from datasource import crypto
from datetime import datetime
import dingding
# crossover 函数:检测上穿信号
def crossover(series1, series2):
return (series1 > series2) & (series1.shift(1) <= series2.shift(1))
# crossunder 函数:检测下穿信号
def crossunder(series1, series2):
return (series1 < series2) & (series1.shift(1) >= series2.shift(1))
## 检查信号
def stratergy_run(symbol, interval, df, debug):
## 计算 ema
df['ema13'] = talib.EMA(df['close'], timeperiod=13)
df['ema144'] = talib.EMA(df['close'], timeperiod=144)
df['ema169'] = talib.EMA(df['close'], timeperiod=169)
df['ema576'] = talib.EMA(df['close'], timeperiod=576)
df['ema676'] = talib.EMA(df['close'], timeperiod=676)
maxline = np.maximum(df['ema144'], df['ema169'])
minline = np.minimum(df['ema144'], df['ema169'])
shortArragement = df['ema13'] < minline
longArrangement = df['ema13'] > maxline
confuse = (df['ema13'] >= minline) & (df['ema13'] <= maxline)
crossRatio = 0.2
shortA = crossunder(df['ema13'], minline)
shortB = shortArragement & crossunder(df['close'], minline) & (((minline - df['close']) / df.apply(lambda x: abs(x['open'] - x['close']), axis=1)) > crossRatio)
df['shortResut'] = (shortA | shortB) & (confuse == False)
longA = crossover(df['ema13'], maxline)
longB = longArrangement & crossover(df['close'], maxline) & (((df['close'] - maxline) / df.apply(lambda x: abs(x['open'] - x['close']), axis=1)) > crossRatio)
df['longResut'] = (longA | longB) & (confuse == False)
latest = df.iloc[-1]
direction = ""
if latest['shortResut']==True:
direction = '做空'
if latest['longResut']==True:
direction = '做多'
message = f"🌟 信 号 提 醒 🌟\r\n\r\n"
message += f"策略:【Vegas趋势跟踪策略】\r\n"
message += f"品种: {symbol}\r\n"
message += f"周期: {interval}\r\n"
message += f"信号: 【{direction}\r\n"
print(f"{symbol} - {interval}】 is checked!")
if direction != "":
if debug == False:
telegram_sender.send_message(setting.chat_id, message)
dingding.send_message(message)
print(f"{symbol} - {interval}】 is singal fired!")
else:
print(message)
def run_crypto(interval, debug=False):
print('Vegas策略运行.')
# symbols = crypto.get_symbols()
# symbols = crypto.get_top_binance_usdt_pairs(10)
symbols = ['BTCUSDT',"ETHUSDT",'LTCUSDT','DOGEUSDT','FTMUSDT','FILUSDT','OPUSDT','SOLUSDT','BNBUSDT','BCHUSDT','ETCUSDT','ARUSDT']
for s in symbols:
df = crypto.get_klines(s, interval)
stratergy_run(s,interval, df, debug)
time.sleep(1)