diff --git a/db.py b/db.py deleted file mode 100644 index a1d29c5..0000000 --- a/db.py +++ /dev/null @@ -1,323 +0,0 @@ -#数据库工具 - -import datetime -import threading -import time -import traceback -import warnings -from contextlib import contextmanager -from functools import wraps -from urllib.parse import urlparse, parse_qsl - -import pymysql -from dbutils.pooled_db import PooledDB - -warnings.filterwarnings("ignore") - -from setting import dbUrl - - -def __parseresult_to_dict(parsed): - #解析连接字符串 - path_parts = parsed.path[1:].split('?') - query = parsed.query - connect_kwargs = {'db': path_parts[0]} - if parsed.username: - connect_kwargs['user'] = parsed.username - if parsed.password: - connect_kwargs['password'] = parsed.password - if parsed.hostname: - connect_kwargs['host'] = parsed.hostname - if parsed.port: - connect_kwargs['port'] = parsed.port - - # Adjust parameters for MySQL. - if 'password' in connect_kwargs: - connect_kwargs['passwd'] = connect_kwargs.pop('password') - - - - # Get additional connection args from the query string - qs_args = parse_qsl(query, keep_blank_values=True) - for key, value in qs_args: - if value.lower() == 'false': - value = False - elif value.lower() == 'true': - value = True - elif value.isdigit(): - value = int(value) - elif '.' in value and all(p.isdigit() for p in value.split('.', 1)): - try: - value = float(value) - except ValueError: - pass - elif value.lower() in ('null', 'none'): - value = None - connect_kwargs[key] = value - if 'maxsize' in connect_kwargs: - connect_kwargs['maxconnections'] = connect_kwargs.pop('maxsize') - return connect_kwargs - - -def __create_pool(url): - #创建连接池 - parsed = urlparse(url) - connect_kwargs = __parseresult_to_dict(parsed) - return PooledDB(pymysql, 1, **connect_kwargs) - - -# 数据库连接 -global setting, transaction_map, pool - -if "transaction_map" not in globals(): - global transaction_map - transaction_map = {} - -if "pool" not in globals(): - global pool - pool = __create_pool(dbUrl) - - -def __get_connection(): - #获取数据库链接 - tid = threading.get_ident() - if tid in transaction_map: - return transaction_map.get(tid) - else: - return pool.connection() - - -def __close_connection(conn): - #归还数据库链接 - tid = threading.get_ident() - if tid in transaction_map: - return - else: - conn.close() - - -@contextmanager -def dbp(): - #with 数据库方法块 - f = __get_connection() - yield f - __close_connection(f) - - -def execute_sql(sql,params=None): - #执行sql - with dbp() as db: - c = db.cursor() - c.execute(sql,params) - db.commit() - c.close() - - -def execute_sql_list(sqls): - #批量执行sql语句 - with dbp() as db: - c = db.cursor() - for sql in sqls: - c.execute(sql) - db.commit() - c.close() - - -def __get_obj_list_sql(obj_list, table, replace=True): - #获取对象插入sql以及对应参数 - if obj_list: - obj = obj_list[0] - keys=list(map(lambda x: f"`{x}`",obj.keys())) - values = list(map(lambda x:"%s",obj.keys())) - if replace: - sql = f"""replace INTO `{table}` ({",".join(keys)}) VALUES ({",".join(values)})""" - else: - sql = f"""insert INTO `{table}` ({",".join(keys)}) VALUES ({",".join(values)})""" - params = [] - for obj in obj_list: - params.append(tuple(obj.values())) - return sql, params - else: - return "", [] - - -def __get_obj_update_sql(obj, table, key): - #获取对象插入sql以及对应参数 - key_sql=f"where {key}='{obj[key]}'" - del obj[key] - keys=list(map(lambda x: f"`{x}`=%s",obj.keys())) - sql = f"""update `{table}` set {",".join(keys)} """ + key_sql - params =tuple(obj.values()) - return sql, params - -def sql_to_dict(sql,params=None): - #查询sql,输出dict 列表 - with dbp() as db: - c = db.cursor() - c.execute(sql,params) - ncols = len(c.description) - colnames = [c.description[i][0] for i in range(ncols)] - db_list = c.fetchall() - ret_list = [] - for row in db_list: - d = Map() - for i in range(ncols): - if isinstance(row[i],bytes) and len(row[i])==1: - d[colnames[i]] = True if row[i] == b'\x01' else False - else: - d[colnames[i]] = row[i] - ret_list.append(d) - c.close() - return ret_list - - -def start_transaction(): - #开始事务 - conn = __get_connection() - conn.autocommit = False - tid = threading.get_ident() - transaction_map[tid] = conn - return tid - - -def end_transaction(rockback=False): - #结束事务 - tid = threading.get_ident() - conn = transaction_map.pop(tid) - try: - if rockback: - conn.rollback() - else: - conn.commit() - finally: - conn.close() - - -@contextmanager -def transaction_code(): - #with 事务方法块 - f = start_transaction() - try: - yield f - end_transaction() - except Exception: - traceback.print_exc() - end_transaction(True) - - -# 事务 -def transaction(target_function): - #事务注解 - @wraps(target_function) - def wrapper(*args, **kwargs): - start_transaction() - ret = target_function(*args, **kwargs) - end_transaction() - return ret - - return wrapper - - -def insert(obj, table): - #插入对象 - (sql, params) = __get_obj_list_sql([obj], table) - with dbp() as db: - c = db.cursor() - c.execute(sql, params[0]) - db.commit() - lid=c.lastrowid - c.close() - return lid - -def update(obj, table,key="id"): - #插入对象 - (sql, params) = __get_obj_update_sql(obj, table,key) - with dbp() as db: - c = db.cursor() - c.execute(sql, params) - db.commit() - c.close() - -def inserts(obj_list, table): - #批量插入对象 - if obj_list: - (sql, params) = __get_obj_list_sql(obj_list, table) - with dbp() as db: - c = db.cursor() - c.executemany(sql, params) - db.commit() - c.close() - -def get(table,id,idstr="id"): - if isinstance(id,str):id=f"'{id}'" - db_data=sql_to_dict(f"select * from {table} where {idstr}={id}") - if db_data: - return db_data[0] - return None - -def get_list(table,where=None): - if not where:return sql_to_dict(f"select * from {table}") - return sql_to_dict(f"select * from {table} where {where}") - -class Map(dict): - """ - Example: - m = Map({'first_name': 'Eduardo'}, last_name='Pool', age=24, sports=['Soccer']) - """ - - def __init__(self, *args, **kwargs): - super(Map, self).__init__(*args, **kwargs) - for arg in args: - if isinstance(arg, dict): - for k, v in arg.items(): - self[k] = v - - if kwargs: - for k, v in kwargs.items(): - self[k] = v - - def __getattr__(self, attr): - return self.get(attr) - - def __setattr__(self, key, value): - self.__setitem__(key, value) - - def __setitem__(self, key, value): - super(Map, self).__setitem__(key, value) - self.__dict__.update({key: value}) - - def __delattr__(self, item): - self.__delitem__(item) - - def __delitem__(self, key): - super(Map, self).__delitem__(key) - del self.__dict__[key] - - -def __update_setting(): - global setting - s = sql_to_dict("select name,value from setting") - for i in s: - setting[i["name"]] = i["value"] - - -def __update_setting_thread(): - while True: - __update_setting() - time.sleep(5) - - -# 系统设置 -if "setting" not in vars(): - setting = Map() - __update_setting() - threading.Thread(target=__update_setting_thread, daemon=True).start() - - -def get_table_desc(table): - datas=sql_to_dict(f"show full fields from `{table}`") - ret_data=[] - for v in datas: - ret_data.append(Map({"name":v.Field,"type":v.Type,"commnet":v.Comment})) - return ret_data - diff --git a/main.py b/main.py index 9999986..2197900 100644 --- a/main.py +++ b/main.py @@ -2,31 +2,16 @@ import schedule import bn import setting import time -import signals.ema_arrangement as maa -import signals.macd as macd +import monitors.vegas as vegas import monitors.large_transfer as lt - -symbols = bn.symbols() - -for s in symbols: - #15m - schedule.every(15).minutes.do(maa.run, symbol=s, interval='15m') - schedule.every(15).minutes.do(macd.run, symbol=s, interval='15m') - - #1h - schedule.every(1).hours.do(maa.run, symbol=s, interval='1h') - schedule.every(1).hours.do(macd.run, symbol=s, interval='1h') - - - #4h - schedule.every(4).hours.do(maa.run, symbol=s, interval='4h') - schedule.every(4).hours.do(macd.run, symbol=s, interval='4h') - # 监控 -schedule.every(setting.whaleAlert_minutes).minutes.do(lt.run) +# schedule.every(setting.whaleAlert_minutes).minutes.do(lt.run) +# schedule.every().hour.at(":00").do(vegas.run) -print(f'Running... ChatID: {setting.chat_id}') -while True: - schedule.run_pending() - time.sleep(1) \ No newline at end of file +# print(f'Running... ChatID: {setting.chat_id}') +# while True: +# schedule.run_pending() +# time.sleep(1) + +vegas.run('1h') \ No newline at end of file diff --git a/monitors/large_transfer.py b/monitors/large_transfer.py index e190ee2..9036dfc 100644 --- a/monitors/large_transfer.py +++ b/monitors/large_transfer.py @@ -3,7 +3,7 @@ import requests import setting from datetime import datetime, timedelta import time -import tg +import telegram_sender import traceback import logging import json @@ -41,10 +41,10 @@ def run(): content = f"🚨大额转入提醒🚨\r\n\r\n {amount_format} #{ts['symbol']} ({amount_usd_format} USD) 从 {ts['from']['owner']} 转入 #{ts['to']['owner']}" print(content) - tg.send_message(setting.chat_id, content) + telegram_sender.send_message(setting.chat_id, content) else: print(data) - tg.send_message(setting.chat_id, data['message']) + telegram_sender.send_message(setting.chat_id, data['message']) except: logging.error(traceback.format_exc()) \ No newline at end of file diff --git a/monitors/vegas.py b/monitors/vegas.py new file mode 100644 index 0000000..831a2b2 --- /dev/null +++ b/monitors/vegas.py @@ -0,0 +1,107 @@ +from binance.spot import Spot +import talib +import numpy as np +import pandas as pd +import time,setting +import telegram_sender + + +api_key = "HCpeel8g6fsTK2630b7BvGBcS09Z3qfXkLVcAY2JkpaiMm1J6DWRvoQZBQlElDJg" +api_secret= "TySs6onlHOTrGzV8fMdDxLKTWWYnQ4rCHVAmjrcHby17acKflmo7xVTWVsbqtxe7" + +client = Spot(api_key, api_secret) + +## 获取所有现货交易对 +def get_symbols(): + data = client.exchange_info()["symbols"] + + # 创建DataFrame + columns = ['symbol', 'status', 'baseAsset', 'quoteAsset'] + df = pd.DataFrame(data, columns=columns) + + # 过滤出在架交易对 + df_in_trade = df[df['status'] == 'TRADING'] + df_in_USDT = df_in_trade[df_in_trade['quoteAsset'] == 'USDT'] + + return df_in_USDT['symbol'] + +## 根据交易对和周期获取数据集 +def get_dataFrame(symbol,interval): + # 获取 k 线数据 + data = client.klines(symbol, interval,limit=500) + + # 将数据转换为DataFrame + columns = ['timestamp', 'open', 'high', 'low', 'close', 'volume', + 'close_time', 'quote_asset_volume', 'number_of_trades', + 'taker_buy_base_asset_volume', 'taker_buy_quote_asset_volume', 'ignore'] + + df = pd.DataFrame(data, columns=columns) + + # 转化成 float + df['open'] = df['open'].astype('float64') + df['high'] = df['high'].astype('float64') + df['low'] = df['low'].astype('float64') + df['close'] = df['close'].astype('float64') + df['volume'] = df['volume'].astype('float64') + + # 将时间戳转换为日期时间格式 + df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms') + df['close_time'] = pd.to_datetime(df['close_time'], unit='ms') + + return df + +# crossover 函数:检测上穿信号 +def crossover(series1, series2): + return (series1 > series2) & (series1.shift(1) <= series2.shift(1)) + +# crossunder 函数:检测下穿信号 +def crossunder(series1, series2): + return (series1 < series2) & (series1.shift(1) >= series2.shift(1)) + +## 检查信号 +def check_signal(symbol, interval,df): + ## 计算 ema + df['ema13'] = talib.EMA(df['close'], timeperiod=13) + df['ema144'] = talib.EMA(df['close'], timeperiod=144) + df['ema169'] = talib.EMA(df['close'], timeperiod=169) + df['ema576'] = talib.EMA(df['close'], timeperiod=576) + df['ema676'] = talib.EMA(df['close'], timeperiod=676) + + maxline = np.maximum(df['ema144'], df['ema169']) + minline = np.minimum(df['ema144'], df['ema169']) + + shortArragement = df['ema13'] < minline + longArrangement = df['ema13'] > maxline + confuse = (df['ema13'] >= minline) & (df['ema13'] <= maxline) + crossRatio = 0.2 + + shortA = crossunder(df['ema13'], minline) + shortB = shortArragement & crossunder(df['close'], minline) & (((minline - df['close']) / df.apply(lambda x: abs(x['open'] - x['close']), axis=1)) > crossRatio) + df['shortResut'] = (shortA | shortB) & (confuse == False) + + longA = crossover(df['ema13'], maxline) + longB = longArrangement & crossover(df['close'], maxline) & (((df['close'] - maxline) / df.apply(lambda x: abs(x['open'] - x['close']), axis=1)) > crossRatio) + df['longResut'] = (longA | longB) & (confuse == False) + + latest = df.iloc[-1] + + direction = "" + if latest['shortResut']==True: + direction = '空' + if latest['longResut']==True: + direction = '多' + + message = f"信号提醒\r\n\r\n品种: {symbol}\r\n周期: {interval}\r\n信号: {direction}\r\n当前价格:{latest['open']}\r\n\r\n{latest['timestamp']}" + if direction != "": + telegram_sender.send_message(setting.chat_id, message) + + +def run(interval): + symbols= get_symbols() + for s in symbols: + df = get_dataFrame(s, interval) + check_signal(s,interval, df) + + time.sleep(1) + + diff --git a/setting.py b/setting.py index 6cdd0d4..8a5009e 100644 --- a/setting.py +++ b/setting.py @@ -1,8 +1,5 @@ import os -dbUrl = os.getenv("DB_URL", "mysql://tradinguser:BciA3^owC3UFlC7q@wpoolsjp.rwlb.singapore.rds.aliyuncs.com:3306/tradingdata?charset=utf8mb4&maxsize=50") - - # telegram bot key telegram_bot_key='5863718864:AAFijN65_SbbGQ0WDBggzKJw2SIcZVTVrPw' diff --git a/signals/ema_arrangement.py b/signals/ema_arrangement.py deleted file mode 100644 index dd0c54b..0000000 --- a/signals/ema_arrangement.py +++ /dev/null @@ -1,57 +0,0 @@ -import talib -import numpy as np -import bn -import tg -import datetime -import setting -import db -import signals.signal_builder as signal_builder -flags = {} - -def check_ema_arrange(data): - # 提取收盘价 - close_prices = np.array([float(entry[4]) for entry in data]) - - # 计算移动平均线 - ema7 = talib.EMA(close_prices, timeperiod=7) - ema30 = talib.EMA(close_prices, timeperiod=30) - ema100 = talib.EMA(close_prices, timeperiod=100) - ema200 = talib.EMA(close_prices, timeperiod=200) - - bullish = ema7[-1] > ema30[-1] > ema100[-1] > ema200[-1] - bearish = ema7[-1] < ema30[-1] < ema100[-1] < ema200[-1] - - return bullish, bearish - - -def run(symbol, interval): - # 获取kline数据 - data = bn.klines(symbol, interval) - - ticker_price = bn.ticker_price(symbol) - - bullish, bearish = check_ema_arrange(data) - - flag_name = symbol + '_' + interval - text = "" - data = {} - if bullish and (flag_name not in flags or flags[flag_name] == False): - flags[flag_name] = True - text = signal_builder.signal_text(symbol, interval, "EMA排列","【多】") - - data = {"type" : 1, "symbol": symbol, "interval" : interval, "signal": 1} - - if bearish and (flag_name not in flags or flags[flag_name] == True): - flags[flag_name] = False - text = signal_builder.signal_text(symbol, interval, "EMA排列","【空】") - - data = {"type" : 1, "symbol": symbol, "interval" : interval, "signal": 3} - if text != "": - signals = db.get_list('signals', f'symbol="{symbol}" and `interval`="{interval}"') - for s in signals: - print(s) - db.execute_sql(f'delete from signals where `id`={s["id"]}') - - db.insert(data, 'signals') - print(text) - tg.send_message(setting.chat_id, text) \ No newline at end of file diff --git a/signals/macd.py b/signals/macd.py deleted file mode 100644 index e8e7c8c..0000000 --- a/signals/macd.py +++ /dev/null @@ -1,76 +0,0 @@ -import talib -import numpy as np -import bn -import tg -import datetime -import setting -import db -import signals.signal_builder as signal_builder - - -def check_macd(data): - close_prices = np.array([float(entry[4]) for entry in data]) - - # 自定义参数 - fast_period = 12 - slow_period = 26 - signal_period = 9 - - # print(close_prices[-1], close_prices[-2], close_prices[-3], close_prices[-4]) - # 计算MACD指标 macd - DIF; macdsignal - DEA; macdhist- 柱状线 - macd, macd_signal, macd_hist = talib.MACD(close_prices, fastperiod=fast_period, slowperiod=slow_period, - signalperiod=signal_period) - - # print("MACD:", round(macd[-1], 2)) - # print("Signal:", round(macd_signal[-1], 2)) - # print("Histogram:", round(macd_hist[-1], 2)) - # print("----------") - # print("MACD:", round(macd[-2], 2)) - # print("Signal:", round(macd_signal[-2], 2)) - # print("Histogram:", round(macd_hist[-2], 2)) - - return macd, macd_signal, macd_hist - - -def generate_signals(macd, macd_signal, macd_hist): - # 1: 多 DIF > 0 and DEA>0 and MACD>0 - if macd[-1] > 0 and macd_signal[-1] > 0 and macd_hist[-1] > 0: - signal = 1 - # 2: 强多 DIF > 0 and DEA>0 and DIF>DEA AND MACD>0 - elif macd[-1] > 0 and macd_signal[-1] > 0 and macd[-1] > macd_signal[-1] and macd_hist[-1] > 0: - signal = 2 - # 3:空 - elif macd[-1] < 0 and macd_signal[-1] < 0 and macd[-1] < macd_signal[-1]: - signal = 3 - # 4: 强空 - elif macd[-1] < 0 and macd_signal[-1] < 0 and macd[-1] < macd_signal[-1] and macd_hist[-1] < 0: - signal = 2 - else: - signal = 0 - return signal - - -def run(symbol, interval): - # 获取kline数据 - data = bn.klines(symbol, interval) - macd, macd_signal, macd_hist = check_macd(data) - signal = generate_signals(macd, macd_signal, macd_hist) - print(signal) - text = "" - data = {} - if signal == 1 or signal == 2: - text = signal_builder.signal_text(symbol, interval, "MACD", "【多】" if signal == 1 else "【强多】") - data = {"type": 2, "symbol": symbol, "interval": interval, "signal": signal} - - if signal == 3 or signal == 4: - text = signal_builder.signal_text(symbol, interval, "MACD", "【空】" if signal == 3 else "【强空】") - data = {"type": 2, "symbol": symbol, "interval": interval, "signal": signal} - if text != "": - signals = db.get_list('signals', f'symbol="{symbol}" and `interval`="{interval}"') - for s in signals: - print(s) - if s["signal"] != data["signal"]: - db.execute_sql(f'delete from signals where `id`={s["id"]}') - db.insert(data, 'signals') - print(text) - tg.send_message(setting.chat_id, text) diff --git a/strategy_test.py b/strategy_test.py deleted file mode 100644 index 3c2e3c0..0000000 --- a/strategy_test.py +++ /dev/null @@ -1,10 +0,0 @@ -import monitors.large_transfer as lt -import bn -import signals.ema_arrangement as maa -import signals.macd as macd - - -symbols = bn.symbols() - -for s in symbols: - macd.run(s, '15m') \ No newline at end of file diff --git a/tg.py b/telegram_sender.py similarity index 100% rename from tg.py rename to telegram_sender.py diff --git a/test.py b/test.py index 93addc3..9cf1a88 100644 --- a/test.py +++ b/test.py @@ -1,14 +1,17 @@ -import bn -import pandas as pd -import mplfinance as mpf -import datetime as dt -import monitors.large_transfer as lt -from binance.spot import Spot -import signals.ema_arrangement as maa -from binance.cm_futures import CMFutures -maa.run('BTCUSDT', '1h') +import requests -# print(len(bn.symbols())) -# print(bn.ticker_price('BTCUSD_PERP')) +# telegram bot key +telegram_bot_key='5863718864:AAFijN65_SbbGQ0WDBggzKJw2SIcZVTVrPw' + +#chatid +chat_id = "@cyber4trading" + +url = f'https://api.telegram.org/bot{telegram_bot_key}/sendMessage' +formData = { + "chat_id" : chat_id, + "text" : '12312' +} + +requests.post(url, data= formData) \ No newline at end of file