import sqlite3 from datetime import datetime, timezone, timedelta import logging # 设置东八区时区 BEIJING_TZ = timezone(timedelta(hours=8)) def utc_to_beijing(utc_time_str): """将UTC时间字符串转换为东八区时间字符串""" try: # 解析UTC时间字符串 utc_dt = datetime.fromisoformat(utc_time_str.replace('Z', '+00:00')) if utc_dt.tzinfo is None: utc_dt = utc_dt.replace(tzinfo=timezone.utc) # 转换为东八区时间 beijing_dt = utc_dt.astimezone(BEIJING_TZ) return beijing_dt.strftime('%Y-%m-%d %H:%M:%S') except: return utc_time_str def get_utc_time(): """获取UTC时间""" return datetime.now(timezone.utc) class DatabaseManager: def __init__(self, db_path="trading.db"): self.db_path = db_path self.init_database() def init_database(self): """初始化数据库表结构""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # 币种基础信息表 cursor.execute(''' CREATE TABLE IF NOT EXISTS coins ( id INTEGER PRIMARY KEY AUTOINCREMENT, symbol TEXT UNIQUE NOT NULL, base_asset TEXT NOT NULL, quote_asset TEXT NOT NULL, is_active BOOLEAN DEFAULT TRUE, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') # K线数据表 cursor.execute(''' CREATE TABLE IF NOT EXISTS klines ( id INTEGER PRIMARY KEY AUTOINCREMENT, symbol TEXT NOT NULL, timeframe TEXT NOT NULL, open_time TIMESTAMP NOT NULL, close_time TIMESTAMP NOT NULL, open_price REAL NOT NULL, high_price REAL NOT NULL, low_price REAL NOT NULL, close_price REAL NOT NULL, volume REAL NOT NULL, quote_volume REAL NOT NULL, trades_count INTEGER, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, UNIQUE(symbol, timeframe, open_time) ) ''') # 选币结果表 cursor.execute(''' CREATE TABLE IF NOT EXISTS coin_selections ( id INTEGER PRIMARY KEY AUTOINCREMENT, symbol TEXT NOT NULL, score REAL NOT NULL, reason TEXT NOT NULL, entry_price REAL NOT NULL, stop_loss REAL NOT NULL, take_profit REAL NOT NULL, timeframe TEXT NOT NULL, selection_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, status TEXT DEFAULT 'active', actual_entry_price REAL, exit_price REAL, exit_time TIMESTAMP, pnl_percentage REAL, notes TEXT, strategy_type TEXT NOT NULL DEFAULT '中线', holding_period INTEGER NOT NULL DEFAULT 7, risk_reward_ratio REAL NOT NULL DEFAULT 2.0, expiry_time TIMESTAMP, is_expired BOOLEAN DEFAULT FALSE, action_suggestion TEXT DEFAULT '等待回调买入' ) ''') # 修改score字段为可空(如果表已存在) try: # SQLite不能直接修改列约束,所以我们需要检查并处理 cursor.execute("PRAGMA table_info(coin_selections)") columns = cursor.fetchall() # 检查是否需要重建表结构 score_column = next((col for col in columns if col[1] == 'score'), None) if score_column and score_column[3] == 1: # NOT NULL constraint exists # 备份现有数据 cursor.execute("ALTER TABLE coin_selections RENAME TO coin_selections_backup") # 重新创建表(score字段改为可空) cursor.execute(''' CREATE TABLE coin_selections ( id INTEGER PRIMARY KEY AUTOINCREMENT, symbol TEXT NOT NULL, score REAL, reason TEXT NOT NULL, entry_price REAL NOT NULL, stop_loss REAL NOT NULL, take_profit REAL NOT NULL, timeframe TEXT NOT NULL, selection_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, status TEXT DEFAULT 'active', actual_entry_price REAL, exit_price REAL, exit_time TIMESTAMP, pnl_percentage REAL, notes TEXT, strategy_type TEXT NOT NULL DEFAULT '中线', holding_period INTEGER NOT NULL DEFAULT 7, risk_reward_ratio REAL NOT NULL DEFAULT 2.0, expiry_time TIMESTAMP, is_expired BOOLEAN DEFAULT FALSE, action_suggestion TEXT DEFAULT '等待回调买入', signal_type TEXT DEFAULT 'LONG', direction TEXT DEFAULT 'BUY', qualified_factors INTEGER NOT NULL DEFAULT 3 ) ''') # 迁移数据 cursor.execute(''' INSERT INTO coin_selections SELECT * FROM coin_selections_backup ''') # 删除备份表 cursor.execute("DROP TABLE coin_selections_backup") except Exception as e: # 如果出错,可能是表不存在或已经正确,继续执行 pass try: cursor.execute("ALTER TABLE coin_selections ADD COLUMN strategy_type TEXT NOT NULL DEFAULT '中线'") except: pass try: cursor.execute("ALTER TABLE coin_selections ADD COLUMN holding_period INTEGER NOT NULL DEFAULT 7") except: pass try: cursor.execute("ALTER TABLE coin_selections ADD COLUMN risk_reward_ratio REAL NOT NULL DEFAULT 2.0") except: pass try: cursor.execute("ALTER TABLE coin_selections ADD COLUMN expiry_time TIMESTAMP") except: pass try: cursor.execute("ALTER TABLE coin_selections ADD COLUMN is_expired BOOLEAN DEFAULT FALSE") except: pass try: cursor.execute("ALTER TABLE coin_selections ADD COLUMN action_suggestion TEXT DEFAULT '等待回调买入'") except: pass # 添加多空支持字段 try: cursor.execute("ALTER TABLE coin_selections ADD COLUMN signal_type TEXT DEFAULT 'LONG'") except: pass try: cursor.execute("ALTER TABLE coin_selections ADD COLUMN direction TEXT DEFAULT 'BUY'") except: pass # 添加新的符合因子字段 try: cursor.execute("ALTER TABLE coin_selections ADD COLUMN qualified_factors INTEGER NOT NULL DEFAULT 3") except: pass # 技术指标表 cursor.execute(''' CREATE TABLE IF NOT EXISTS technical_indicators ( id INTEGER PRIMARY KEY AUTOINCREMENT, symbol TEXT NOT NULL, timeframe TEXT NOT NULL, indicator_time TIMESTAMP NOT NULL, ma20 REAL, ma50 REAL, ma200 REAL, rsi REAL, macd REAL, macd_signal REAL, macd_hist REAL, bb_upper REAL, bb_middle REAL, bb_lower REAL, volume_ma REAL, fib_618 REAL, fib_382 REAL, support_level REAL, resistance_level REAL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, UNIQUE(symbol, timeframe, indicator_time) ) ''') # 创建索引 cursor.execute('CREATE INDEX IF NOT EXISTS idx_klines_symbol_time ON klines(symbol, timeframe, open_time)') cursor.execute('CREATE INDEX IF NOT EXISTS idx_selections_symbol_time ON coin_selections(symbol, selection_time)') cursor.execute('CREATE INDEX IF NOT EXISTS idx_indicators_symbol_time ON technical_indicators(symbol, timeframe, indicator_time)') conn.commit() conn.close() logging.info("数据库初始化完成") def get_connection(self): """获取数据库连接""" return sqlite3.connect(self.db_path) def insert_coin_selection(self, symbol, qualified_factors, reason, entry_price, stop_loss, take_profit, timeframe, strategy_type, holding_period, risk_reward_ratio, expiry_hours, action_suggestion, signal_type="LONG", direction="BUY"): """插入选币结果 - 支持多空方向""" conn = self.get_connection() cursor = conn.cursor() # 计算过期时间 expiry_time = datetime.now(timezone.utc) + timedelta(hours=expiry_hours) # 将qualified_factors转换为分数(兼容旧系统) score = qualified_factors * 25.0 if qualified_factors else 75.0 # 3/4 = 75分,4/4 = 100分 cursor.execute(''' INSERT INTO coin_selections (symbol, score, qualified_factors, reason, entry_price, stop_loss, take_profit, timeframe, strategy_type, holding_period, risk_reward_ratio, expiry_time, action_suggestion, signal_type, direction) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', (symbol, score, qualified_factors, reason, entry_price, stop_loss, take_profit, timeframe, strategy_type, holding_period, risk_reward_ratio, expiry_time, action_suggestion, signal_type, direction)) selection_id = cursor.lastrowid conn.commit() conn.close() return selection_id def get_active_selections(self, limit=20, offset=0): """获取活跃的选币结果,自动标记过期的 - 支持分页""" conn = self.get_connection() cursor = conn.cursor() # 首先标记过期的选币 cursor.execute(''' UPDATE coin_selections SET is_expired = TRUE, status = 'expired' WHERE expiry_time < datetime('now') AND status = 'active' ''') cursor.execute(''' SELECT * FROM coin_selections WHERE status = 'active' AND is_expired = FALSE ORDER BY selection_time DESC, qualified_factors DESC LIMIT ? OFFSET ? ''', (limit, offset)) results = cursor.fetchall() conn.commit() # 提交过期状态更新 conn.close() return results def check_and_expire_selections(self): """检查并标记过期的选币""" conn = self.get_connection() cursor = conn.cursor() cursor.execute(''' UPDATE coin_selections SET is_expired = TRUE, status = 'expired' WHERE expiry_time < datetime('now') AND status = 'active' ''') expired_count = cursor.rowcount conn.commit() conn.close() if expired_count > 0: logging.info(f"标记了{expired_count}个过期的选币结果") return expired_count def update_selection_status(self, selection_id, status, exit_price=None, pnl_percentage=None): """更新选币结果状态""" conn = self.get_connection() cursor = conn.cursor() if exit_price and pnl_percentage: cursor.execute(''' UPDATE coin_selections SET status = ?, exit_price = ?, exit_time = CURRENT_TIMESTAMP, pnl_percentage = ? WHERE id = ? ''', (status, exit_price, pnl_percentage, selection_id)) else: cursor.execute(''' UPDATE coin_selections SET status = ? WHERE id = ? ''', (status, selection_id)) conn.commit() conn.close()