diff --git a/docker-compose.yml b/docker-compose.yml index 437d9f0..761a7e4 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -13,6 +13,7 @@ services: environment: - FLASK_ENV=production - PYTHONPATH=/app + - TZ=Asia/Shanghai # MySQL连接配置 (可通过环境变量覆盖) - MYSQL_HOST=${MYSQL_HOST:-cd-cynosdbmysql-grp-7kdd8qe4.sql.tencentcdb.com} - MYSQL_PORT=${MYSQL_PORT:-26558} @@ -37,6 +38,7 @@ services: - ./crontab:/app/crontab environment: - PYTHONPATH=/app + - TZ=Asia/Shanghai - LOG_LEVEL=${LOG_LEVEL:-INFO} - MARKET_SCAN_STOCKS=${MARKET_SCAN_STOCKS:-200} # MySQL连接配置 diff --git a/src/database/mysql_database_manager.py b/src/database/mysql_database_manager.py index 4ab4efd..691cf82 100644 --- a/src/database/mysql_database_manager.py +++ b/src/database/mysql_database_manager.py @@ -336,7 +336,9 @@ class MySQLDatabaseManager: def get_latest_signals(self, strategy_name: str = None, limit: int = 100) -> pd.DataFrame: """获取最新信号""" try: - sql = "SELECT * FROM latest_signals_view" + sql = """ + SELECT * FROM latest_signals_view + """ params = [] if strategy_name: @@ -479,4 +481,64 @@ class MySQLDatabaseManager: except Exception as e: logger.error(f"清理旧数据失败: {e}") + raise + + def clear_signals(self, days: int = 7, strategy_name: str = None) -> int: + """ + 清空指定范围的信号数据 + + Args: + days: 清空最近多少天的数据,默认7天 + strategy_name: 指定策略名称,如果为None则清空所有策略 + + Returns: + 删除的信号数量 + """ + try: + with pymysql.connect(**self.connection_params) as conn: + cursor = conn.cursor() + cutoff_date = date.today() - timedelta(days=days) + + # 构建删除SQL + if strategy_name: + # 清空指定策略的信号 + sql = """ + DELETE ss FROM stock_signals ss + JOIN strategies s ON ss.strategy_id = s.id + WHERE s.strategy_name = %s AND ss.signal_date >= %s + """ + cursor.execute(sql, (strategy_name, cutoff_date)) + else: + # 清空所有策略的信号 + sql = "DELETE FROM stock_signals WHERE signal_date >= %s" + cursor.execute(sql, (cutoff_date,)) + + deleted_count = cursor.rowcount + + # 同时清理相关的回踩提醒 + if strategy_name: + # 清空指定策略的回踩提醒(通过signal_id关联) + sql = """ + DELETE pa FROM pullback_alerts pa + JOIN stock_signals ss ON pa.signal_id = ss.id + JOIN strategies s ON ss.strategy_id = s.id + WHERE s.strategy_name = %s AND pa.pullback_date >= %s + """ + cursor.execute(sql, (strategy_name, cutoff_date)) + else: + # 清空所有回踩提醒 + sql = "DELETE FROM pullback_alerts WHERE pullback_date >= %s" + cursor.execute(sql, (cutoff_date,)) + + deleted_alerts = cursor.rowcount + + conn.commit() + + logger.info(f"信号清空完成: 删除了 {deleted_count} 条信号和 {deleted_alerts} 条回踩提醒 " + f"(范围: {days}天, 策略: {strategy_name or '全部'})") + + return deleted_count + + except Exception as e: + logger.error(f"清空信号失败: {e}") raise \ No newline at end of file diff --git a/web/mysql_app.py b/web/mysql_app.py index 363957a..5484d65 100644 --- a/web/mysql_app.py +++ b/web/mysql_app.py @@ -7,7 +7,7 @@ AI 智能选股大师 MySQL版本 Web 展示界面 import sys from pathlib import Path from flask import Flask, render_template, jsonify, request -from datetime import datetime, date, timedelta, timezone +from datetime import datetime, date, timedelta import pandas as pd # 添加项目根目录到路径 @@ -26,6 +26,16 @@ app.secret_key = 'trading_ai_mysql_secret_key_2023' db_manager = MySQLDatabaseManager() config_loader = ConfigLoader() +# 全局分析状态跟踪 +analysis_status = { + 'is_running': False, + 'start_time': None, + 'stock_count': 0, + 'progress': 0, + 'current_stock': '', + 'estimated_completion': None +} + @app.route('/') def index(): @@ -63,18 +73,9 @@ def signals(): # 确保scan_time是datetime类型 signals_df['scan_time'] = pd.to_datetime(signals_df['scan_time']) - # 转换为东八区时间 - china_tz = timezone(timedelta(hours=8)) - - # 如果是naive datetime,假设是UTC时间 - if signals_df['scan_time'].dt.tz is None: - signals_df['scan_time'] = signals_df['scan_time'].dt.tz_localize('UTC') - - # 转换为东八区时间 - signals_df['scan_time_china'] = signals_df['scan_time'].dt.tz_convert(china_tz) - - # 创建小时级别的分组键(基于东八区时间) - signals_df['scan_hour'] = signals_df['scan_time_china'].dt.floor('h') + # Docker容器中已设置正确时区,直接使用数据库时间 + # 创建小时级别的分组键 + signals_df['scan_hour'] = signals_df['scan_time'].dt.floor('h') # 按扫描小时分组 for scan_hour, group in signals_df.groupby('scan_hour'): @@ -100,15 +101,9 @@ def signals(): # 重新按扫描小时分组分页后的数据 paginated_grouped = {} for signal in paginated_signals: - # 转换为东八区时间进行分组 - china_tz = timezone(timedelta(hours=8)) - + # Docker容器中已设置正确时区,直接使用 scan_time = pd.to_datetime(signal['scan_time']) - # 如果是naive datetime,假设是UTC时间 - if scan_time.tz is None: - scan_time = scan_time.tz_localize('UTC') - # 转换为东八区时间并按小时分组 - scan_hour = scan_time.tz_convert(china_tz).floor('h') + scan_hour = scan_time.floor('h') if scan_hour not in paginated_grouped: paginated_grouped[scan_hour] = [] @@ -173,44 +168,171 @@ def api_stats(): return jsonify({'success': False, 'error': str(e)}) +@app.route('/api/clear_signals', methods=['POST']) +def api_clear_signals(): + """API接口 - 清空信号数据""" + try: + # 获取清空范围参数 + days = request.json.get('days', 7) if request.is_json else int(request.form.get('days', 7)) + strategy_name = request.json.get('strategy_name', '') if request.is_json else request.form.get('strategy_name', '') + + # 调用数据库管理器的清空方法 + deleted_count = db_manager.clear_signals( + days=days, + strategy_name=strategy_name if strategy_name else None + ) + + logger.info(f"清空信号完成: 删除了 {deleted_count} 条信号记录 (范围: {days}天, 策略: {strategy_name or '全部'})") + + return jsonify({ + 'success': True, + 'message': f'成功清空 {deleted_count} 条信号记录', + 'deleted_count': deleted_count + }) + except Exception as e: + logger.error(f"清空信号失败: {e}") + return jsonify({'success': False, 'error': str(e)}) + + +@app.route('/api/run_analysis', methods=['POST']) +def api_run_analysis(): + """API接口 - 立即运行市场分析""" + global analysis_status + + try: + import subprocess + import threading + from datetime import datetime + + # 检查是否已有分析在运行 + if analysis_status['is_running']: + return jsonify({ + 'success': False, + 'error': '已有分析任务正在运行,请等待完成后再试' + }) + + # 获取分析参数 + stock_count = request.json.get('stock_count', 200) if request.is_json else int(request.form.get('stock_count', 200)) + + def run_analysis_background(): + """后台运行分析任务""" + global analysis_status + + try: + # 更新分析状态 + analysis_status.update({ + 'is_running': True, + 'start_time': datetime.now(), + 'stock_count': stock_count, + 'progress': 0, + 'current_stock': '准备中...', + 'estimated_completion': None + }) + + logger.info(f"开始后台市场分析: 扫描 {stock_count} 只股票") + + # 执行市场分析命令 + result = subprocess.run([ + sys.executable, 'market_scanner.py', str(stock_count) + ], + cwd=str(project_root), + capture_output=True, + text=True, + timeout=1800 # 30分钟超时 + ) + + if result.returncode == 0: + logger.info(f"后台市场分析完成: 扫描 {stock_count} 只股票") + analysis_status['progress'] = 100 + analysis_status['current_stock'] = '分析完成' + else: + logger.error(f"后台市场分析失败: {result.stderr}") + + except subprocess.TimeoutExpired: + logger.error("后台市场分析超时 (30分钟)") + except Exception as e: + logger.error(f"后台市场分析异常: {e}") + finally: + # 重置分析状态 + analysis_status['is_running'] = False + + # 启动后台线程执行分析 + analysis_thread = threading.Thread(target=run_analysis_background) + analysis_thread.daemon = True + analysis_thread.start() + + logger.info(f"市场分析任务已启动: 扫描 {stock_count} 只股票 (后台执行)") + + return jsonify({ + 'success': True, + 'message': f'市场分析任务已启动,正在后台扫描 {stock_count} 只股票', + 'stock_count': stock_count, + 'start_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S') + }) + + except Exception as e: + logger.error(f"启动市场分析失败: {e}") + analysis_status['is_running'] = False + return jsonify({'success': False, 'error': str(e)}) + + +@app.route('/api/analysis_status') +def api_analysis_status(): + """API接口 - 获取市场分析状态""" + global analysis_status + + try: + # 计算运行时间 + running_time = None + if analysis_status['is_running'] and analysis_status['start_time']: + running_time = (datetime.now() - analysis_status['start_time']).total_seconds() + + # 估算进度(基于时间的粗略估算) + estimated_progress = 0 + if analysis_status['is_running'] and running_time: + # 假设分析200只股票需要约5分钟,根据时间估算进度 + stock_count = analysis_status.get('stock_count', 200) + estimated_time = (stock_count / 200) * 300 # 每200只股票约300秒 + estimated_progress = min(95, int((running_time / estimated_time) * 100)) + analysis_status['progress'] = estimated_progress + + return jsonify({ + 'success': True, + 'data': { + 'is_running': analysis_status['is_running'], + 'start_time': analysis_status['start_time'].strftime('%Y-%m-%d %H:%M:%S') if analysis_status['start_time'] else None, + 'stock_count': analysis_status['stock_count'], + 'progress': analysis_status['progress'], + 'current_stock': analysis_status['current_stock'], + 'running_time': int(running_time) if running_time else 0, + 'estimated_completion': analysis_status['estimated_completion'] + } + }) + + except Exception as e: + logger.error(f"获取分析状态失败: {e}") + return jsonify({'success': False, 'error': str(e)}) + + @app.template_filter('datetime_format') def datetime_format(value, format='%Y-%m-%d %H:%M'): - """日期时间格式化过滤器 - 智能处理时区转换""" + """日期时间格式化过滤器 - Docker容器时区已设置""" if value is None: return '' if isinstance(value, str): try: - # 解析ISO格式时间字符串 - if 'Z' in value: - value = datetime.fromisoformat(value.replace('Z', '+00:00')) - elif '+' not in value and 'T' in value: - # 假设是UTC时间 - value = datetime.fromisoformat(value).replace(tzinfo=timezone.utc) - else: - value = datetime.fromisoformat(value) + value = datetime.fromisoformat(value.replace('Z', '+00:00') if 'Z' in value else value) except: return value - # 如果是naive datetime,假设是UTC时间 - if isinstance(value, datetime) and value.tzinfo is None: - value = value.replace(tzinfo=timezone.utc) + # Docker容器中已设置正确时区,直接格式化 + if isinstance(value, datetime): + return value.strftime(format) - # 智能时区转换:检查是否已经是东八区时间 - if isinstance(value, datetime) and value.tzinfo is not None: - china_tz = timezone(timedelta(hours=8)) - - # 如果已经是东八区时间,直接使用;否则转换 - if value.utcoffset() == timedelta(hours=8): - # 已经是东八区时间,无需转换 - pass - else: - # 转换为东八区时间 - value = value.astimezone(china_tz) - - return value.strftime(format) + return str(value) @app.template_filter('percentage') diff --git a/web/templates/signals.html b/web/templates/signals.html index 1f7348f..5ec84d3 100644 --- a/web/templates/signals.html +++ b/web/templates/signals.html @@ -6,7 +6,35 @@
-
+
+ +
+ + + + + +
+