This commit is contained in:
aaron 2025-10-01 11:02:24 +08:00
parent 138a3007fd
commit 6ea3aa0b8a
4 changed files with 401 additions and 48 deletions

View File

@ -13,6 +13,7 @@ services:
environment: environment:
- FLASK_ENV=production - FLASK_ENV=production
- PYTHONPATH=/app - PYTHONPATH=/app
- TZ=Asia/Shanghai
# MySQL连接配置 (可通过环境变量覆盖) # MySQL连接配置 (可通过环境变量覆盖)
- MYSQL_HOST=${MYSQL_HOST:-cd-cynosdbmysql-grp-7kdd8qe4.sql.tencentcdb.com} - MYSQL_HOST=${MYSQL_HOST:-cd-cynosdbmysql-grp-7kdd8qe4.sql.tencentcdb.com}
- MYSQL_PORT=${MYSQL_PORT:-26558} - MYSQL_PORT=${MYSQL_PORT:-26558}
@ -37,6 +38,7 @@ services:
- ./crontab:/app/crontab - ./crontab:/app/crontab
environment: environment:
- PYTHONPATH=/app - PYTHONPATH=/app
- TZ=Asia/Shanghai
- LOG_LEVEL=${LOG_LEVEL:-INFO} - LOG_LEVEL=${LOG_LEVEL:-INFO}
- MARKET_SCAN_STOCKS=${MARKET_SCAN_STOCKS:-200} - MARKET_SCAN_STOCKS=${MARKET_SCAN_STOCKS:-200}
# MySQL连接配置 # MySQL连接配置

View File

@ -336,7 +336,9 @@ class MySQLDatabaseManager:
def get_latest_signals(self, strategy_name: str = None, limit: int = 100) -> pd.DataFrame: def get_latest_signals(self, strategy_name: str = None, limit: int = 100) -> pd.DataFrame:
"""获取最新信号""" """获取最新信号"""
try: try:
sql = "SELECT * FROM latest_signals_view" sql = """
SELECT * FROM latest_signals_view
"""
params = [] params = []
if strategy_name: if strategy_name:
@ -479,4 +481,64 @@ class MySQLDatabaseManager:
except Exception as e: except Exception as e:
logger.error(f"清理旧数据失败: {e}") logger.error(f"清理旧数据失败: {e}")
raise
def clear_signals(self, days: int = 7, strategy_name: str = None) -> int:
"""
清空指定范围的信号数据
Args:
days: 清空最近多少天的数据默认7天
strategy_name: 指定策略名称如果为None则清空所有策略
Returns:
删除的信号数量
"""
try:
with pymysql.connect(**self.connection_params) as conn:
cursor = conn.cursor()
cutoff_date = date.today() - timedelta(days=days)
# 构建删除SQL
if strategy_name:
# 清空指定策略的信号
sql = """
DELETE ss FROM stock_signals ss
JOIN strategies s ON ss.strategy_id = s.id
WHERE s.strategy_name = %s AND ss.signal_date >= %s
"""
cursor.execute(sql, (strategy_name, cutoff_date))
else:
# 清空所有策略的信号
sql = "DELETE FROM stock_signals WHERE signal_date >= %s"
cursor.execute(sql, (cutoff_date,))
deleted_count = cursor.rowcount
# 同时清理相关的回踩提醒
if strategy_name:
# 清空指定策略的回踩提醒通过signal_id关联
sql = """
DELETE pa FROM pullback_alerts pa
JOIN stock_signals ss ON pa.signal_id = ss.id
JOIN strategies s ON ss.strategy_id = s.id
WHERE s.strategy_name = %s AND pa.pullback_date >= %s
"""
cursor.execute(sql, (strategy_name, cutoff_date))
else:
# 清空所有回踩提醒
sql = "DELETE FROM pullback_alerts WHERE pullback_date >= %s"
cursor.execute(sql, (cutoff_date,))
deleted_alerts = cursor.rowcount
conn.commit()
logger.info(f"信号清空完成: 删除了 {deleted_count} 条信号和 {deleted_alerts} 条回踩提醒 "
f"(范围: {days}天, 策略: {strategy_name or '全部'})")
return deleted_count
except Exception as e:
logger.error(f"清空信号失败: {e}")
raise raise

View File

@ -7,7 +7,7 @@ AI 智能选股大师 MySQL版本 Web 展示界面
import sys import sys
from pathlib import Path from pathlib import Path
from flask import Flask, render_template, jsonify, request from flask import Flask, render_template, jsonify, request
from datetime import datetime, date, timedelta, timezone from datetime import datetime, date, timedelta
import pandas as pd import pandas as pd
# 添加项目根目录到路径 # 添加项目根目录到路径
@ -26,6 +26,16 @@ app.secret_key = 'trading_ai_mysql_secret_key_2023'
db_manager = MySQLDatabaseManager() db_manager = MySQLDatabaseManager()
config_loader = ConfigLoader() config_loader = ConfigLoader()
# 全局分析状态跟踪
analysis_status = {
'is_running': False,
'start_time': None,
'stock_count': 0,
'progress': 0,
'current_stock': '',
'estimated_completion': None
}
@app.route('/') @app.route('/')
def index(): def index():
@ -63,18 +73,9 @@ def signals():
# 确保scan_time是datetime类型 # 确保scan_time是datetime类型
signals_df['scan_time'] = pd.to_datetime(signals_df['scan_time']) signals_df['scan_time'] = pd.to_datetime(signals_df['scan_time'])
# 转换为东八区时间 # Docker容器中已设置正确时区直接使用数据库时间
china_tz = timezone(timedelta(hours=8)) # 创建小时级别的分组键
signals_df['scan_hour'] = signals_df['scan_time'].dt.floor('h')
# 如果是naive datetime假设是UTC时间
if signals_df['scan_time'].dt.tz is None:
signals_df['scan_time'] = signals_df['scan_time'].dt.tz_localize('UTC')
# 转换为东八区时间
signals_df['scan_time_china'] = signals_df['scan_time'].dt.tz_convert(china_tz)
# 创建小时级别的分组键(基于东八区时间)
signals_df['scan_hour'] = signals_df['scan_time_china'].dt.floor('h')
# 按扫描小时分组 # 按扫描小时分组
for scan_hour, group in signals_df.groupby('scan_hour'): for scan_hour, group in signals_df.groupby('scan_hour'):
@ -100,15 +101,9 @@ def signals():
# 重新按扫描小时分组分页后的数据 # 重新按扫描小时分组分页后的数据
paginated_grouped = {} paginated_grouped = {}
for signal in paginated_signals: for signal in paginated_signals:
# 转换为东八区时间进行分组 # Docker容器中已设置正确时区直接使用
china_tz = timezone(timedelta(hours=8))
scan_time = pd.to_datetime(signal['scan_time']) scan_time = pd.to_datetime(signal['scan_time'])
# 如果是naive datetime假设是UTC时间 scan_hour = scan_time.floor('h')
if scan_time.tz is None:
scan_time = scan_time.tz_localize('UTC')
# 转换为东八区时间并按小时分组
scan_hour = scan_time.tz_convert(china_tz).floor('h')
if scan_hour not in paginated_grouped: if scan_hour not in paginated_grouped:
paginated_grouped[scan_hour] = [] paginated_grouped[scan_hour] = []
@ -173,44 +168,171 @@ def api_stats():
return jsonify({'success': False, 'error': str(e)}) return jsonify({'success': False, 'error': str(e)})
@app.route('/api/clear_signals', methods=['POST'])
def api_clear_signals():
"""API接口 - 清空信号数据"""
try:
# 获取清空范围参数
days = request.json.get('days', 7) if request.is_json else int(request.form.get('days', 7))
strategy_name = request.json.get('strategy_name', '') if request.is_json else request.form.get('strategy_name', '')
# 调用数据库管理器的清空方法
deleted_count = db_manager.clear_signals(
days=days,
strategy_name=strategy_name if strategy_name else None
)
logger.info(f"清空信号完成: 删除了 {deleted_count} 条信号记录 (范围: {days}天, 策略: {strategy_name or '全部'})")
return jsonify({
'success': True,
'message': f'成功清空 {deleted_count} 条信号记录',
'deleted_count': deleted_count
})
except Exception as e:
logger.error(f"清空信号失败: {e}")
return jsonify({'success': False, 'error': str(e)})
@app.route('/api/run_analysis', methods=['POST'])
def api_run_analysis():
"""API接口 - 立即运行市场分析"""
global analysis_status
try:
import subprocess
import threading
from datetime import datetime
# 检查是否已有分析在运行
if analysis_status['is_running']:
return jsonify({
'success': False,
'error': '已有分析任务正在运行,请等待完成后再试'
})
# 获取分析参数
stock_count = request.json.get('stock_count', 200) if request.is_json else int(request.form.get('stock_count', 200))
def run_analysis_background():
"""后台运行分析任务"""
global analysis_status
try:
# 更新分析状态
analysis_status.update({
'is_running': True,
'start_time': datetime.now(),
'stock_count': stock_count,
'progress': 0,
'current_stock': '准备中...',
'estimated_completion': None
})
logger.info(f"开始后台市场分析: 扫描 {stock_count} 只股票")
# 执行市场分析命令
result = subprocess.run([
sys.executable, 'market_scanner.py', str(stock_count)
],
cwd=str(project_root),
capture_output=True,
text=True,
timeout=1800 # 30分钟超时
)
if result.returncode == 0:
logger.info(f"后台市场分析完成: 扫描 {stock_count} 只股票")
analysis_status['progress'] = 100
analysis_status['current_stock'] = '分析完成'
else:
logger.error(f"后台市场分析失败: {result.stderr}")
except subprocess.TimeoutExpired:
logger.error("后台市场分析超时 (30分钟)")
except Exception as e:
logger.error(f"后台市场分析异常: {e}")
finally:
# 重置分析状态
analysis_status['is_running'] = False
# 启动后台线程执行分析
analysis_thread = threading.Thread(target=run_analysis_background)
analysis_thread.daemon = True
analysis_thread.start()
logger.info(f"市场分析任务已启动: 扫描 {stock_count} 只股票 (后台执行)")
return jsonify({
'success': True,
'message': f'市场分析任务已启动,正在后台扫描 {stock_count} 只股票',
'stock_count': stock_count,
'start_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
})
except Exception as e:
logger.error(f"启动市场分析失败: {e}")
analysis_status['is_running'] = False
return jsonify({'success': False, 'error': str(e)})
@app.route('/api/analysis_status')
def api_analysis_status():
"""API接口 - 获取市场分析状态"""
global analysis_status
try:
# 计算运行时间
running_time = None
if analysis_status['is_running'] and analysis_status['start_time']:
running_time = (datetime.now() - analysis_status['start_time']).total_seconds()
# 估算进度(基于时间的粗略估算)
estimated_progress = 0
if analysis_status['is_running'] and running_time:
# 假设分析200只股票需要约5分钟根据时间估算进度
stock_count = analysis_status.get('stock_count', 200)
estimated_time = (stock_count / 200) * 300 # 每200只股票约300秒
estimated_progress = min(95, int((running_time / estimated_time) * 100))
analysis_status['progress'] = estimated_progress
return jsonify({
'success': True,
'data': {
'is_running': analysis_status['is_running'],
'start_time': analysis_status['start_time'].strftime('%Y-%m-%d %H:%M:%S') if analysis_status['start_time'] else None,
'stock_count': analysis_status['stock_count'],
'progress': analysis_status['progress'],
'current_stock': analysis_status['current_stock'],
'running_time': int(running_time) if running_time else 0,
'estimated_completion': analysis_status['estimated_completion']
}
})
except Exception as e:
logger.error(f"获取分析状态失败: {e}")
return jsonify({'success': False, 'error': str(e)})
@app.template_filter('datetime_format') @app.template_filter('datetime_format')
def datetime_format(value, format='%Y-%m-%d %H:%M'): def datetime_format(value, format='%Y-%m-%d %H:%M'):
"""日期时间格式化过滤器 - 智能处理时区转换""" """日期时间格式化过滤器 - Docker容器时区已设置"""
if value is None: if value is None:
return '' return ''
if isinstance(value, str): if isinstance(value, str):
try: try:
# 解析ISO格式时间字符串 value = datetime.fromisoformat(value.replace('Z', '+00:00') if 'Z' in value else value)
if 'Z' in value:
value = datetime.fromisoformat(value.replace('Z', '+00:00'))
elif '+' not in value and 'T' in value:
# 假设是UTC时间
value = datetime.fromisoformat(value).replace(tzinfo=timezone.utc)
else:
value = datetime.fromisoformat(value)
except: except:
return value return value
# 如果是naive datetime假设是UTC时间 # Docker容器中已设置正确时区直接格式化
if isinstance(value, datetime) and value.tzinfo is None: if isinstance(value, datetime):
value = value.replace(tzinfo=timezone.utc) return value.strftime(format)
# 智能时区转换:检查是否已经是东八区时间 return str(value)
if isinstance(value, datetime) and value.tzinfo is not None:
china_tz = timezone(timedelta(hours=8))
# 如果已经是东八区时间,直接使用;否则转换
if value.utcoffset() == timedelta(hours=8):
# 已经是东八区时间,无需转换
pass
else:
# 转换为东八区时间
value = value.astimezone(china_tz)
return value.strftime(format)
@app.template_filter('percentage') @app.template_filter('percentage')

View File

@ -6,7 +6,35 @@
<!-- 页面标题和筛选 --> <!-- 页面标题和筛选 -->
<div class="row mb-4"> <div class="row mb-4">
<div class="col-12"> <div class="col-12">
<div class="d-flex justify-content-end align-items-center"> <div class="d-flex justify-content-between align-items-center">
<!-- 操作按钮组 -->
<div class="d-flex gap-2 align-items-center">
<button id="clearSignalsBtn" class="btn btn-outline-danger btn-sm">
<i class="fas fa-trash me-1"></i>清空信号
</button>
<button id="runAnalysisBtn" class="btn btn-outline-success btn-sm">
<i class="fas fa-play me-1"></i>立即分析
</button>
<!-- 分析状态显示 -->
<div id="analysisStatus" class="ms-3" style="display: none;">
<div class="d-flex align-items-center">
<div class="spinner-border spinner-border-sm text-primary me-2" role="status">
<span class="visually-hidden">分析中...</span>
</div>
<div class="analysis-status-text">
<small class="text-muted fw-bold">正在分析中...</small>
<div class="mt-1">
<div class="progress" style="width: 150px; height: 6px;">
<div id="analysisProgress" class="progress-bar bg-success" role="progressbar" style="width: 0%"></div>
</div>
<small id="analysisInfo" class="text-muted">启动中...</small>
</div>
</div>
</div>
</div>
</div>
<!-- 筛选表单 --> <!-- 筛选表单 -->
<form method="GET" class="d-flex gap-2 align-items-center"> <form method="GET" class="d-flex gap-2 align-items-center">
<select name="strategy" class="form-select form-select-sm"> <select name="strategy" class="form-select form-select-sm">
@ -269,6 +297,145 @@
urlParams.set('timeframe', 'daily'); urlParams.set('timeframe', 'daily');
window.location.search = urlParams.toString(); window.location.search = urlParams.toString();
} }
// 清空信号按钮
$('#clearSignalsBtn').on('click', function() {
if (confirm('确认要清空最近7天的信号数据吗此操作不可恢复')) {
const btn = $(this);
const originalText = btn.html();
btn.prop('disabled', true).html('<i class="fas fa-spinner fa-spin me-1"></i>清空中...');
$.ajax({
url: '/api/clear_signals',
method: 'POST',
contentType: 'application/json',
data: JSON.stringify({
days: 7,
strategy_name: ''
}),
success: function(response) {
if (response.success) {
alert(`清空成功!删除了 ${response.deleted_count} 条信号记录`);
location.reload();
} else {
alert('清空失败:' + response.error);
}
},
error: function(xhr, status, error) {
alert('清空失败:网络错误或服务器异常');
console.error('清空信号失败:', error);
},
complete: function() {
btn.prop('disabled', false).html(originalText);
}
});
}
});
// 全局变量
let analysisStatusInterval = null;
// 检查分析状态
function checkAnalysisStatus() {
$.ajax({
url: '/api/analysis_status',
method: 'GET',
success: function(response) {
if (response.success && response.data) {
updateAnalysisStatusUI(response.data);
}
},
error: function(xhr, status, error) {
console.error('获取分析状态失败:', error);
}
});
}
// 更新分析状态UI
function updateAnalysisStatusUI(status) {
const $statusDiv = $('#analysisStatus');
const $progress = $('#analysisProgress');
const $info = $('#analysisInfo');
const $runBtn = $('#runAnalysisBtn');
if (status.is_running) {
// 显示分析状态
$statusDiv.show();
$runBtn.prop('disabled', true).html('<i class="fas fa-cog fa-spin me-1"></i>分析中...');
// 更新进度条
$progress.css('width', status.progress + '%');
// 更新信息文本
const runningTime = Math.floor(status.running_time / 60);
const remainingTime = Math.max(0, Math.ceil((status.stock_count / 200 * 5) - (status.running_time / 60)));
$info.text(`进度: ${status.progress}% | 已运行: ${runningTime}分钟 | 预计剩余: ${remainingTime}分钟`);
// 开始轮询(如果还没开始)
if (!analysisStatusInterval) {
analysisStatusInterval = setInterval(checkAnalysisStatus, 3000); // 每3秒检查一次
}
} else {
// 隐藏分析状态
$statusDiv.hide();
$runBtn.prop('disabled', false).html('<i class="fas fa-play me-1"></i>立即分析');
// 停止轮询
if (analysisStatusInterval) {
clearInterval(analysisStatusInterval);
analysisStatusInterval = null;
}
// 如果刚完成分析,刷新页面
if (status.progress === 100) {
setTimeout(() => {
location.reload();
}, 2000);
}
}
}
// 立即分析按钮
$('#runAnalysisBtn').on('click', function() {
const stockCount = prompt('请输入要扫描的股票数量 (默认200):', '200');
if (stockCount === null) return; // 用户取消
const count = parseInt(stockCount) || 200;
if (count <= 0 || count > 1000) {
alert('股票数量必须在 1-1000 之间');
return;
}
if (confirm(`确认要立即分析 ${count} 只热门股票吗?分析可能需要几分钟时间`)) {
$.ajax({
url: '/api/run_analysis',
method: 'POST',
contentType: 'application/json',
data: JSON.stringify({
stock_count: count
}),
success: function(response) {
if (response.success) {
// 立即开始状态轮询
checkAnalysisStatus();
alert(`分析任务已启动!正在后台扫描 ${response.stock_count} 只股票\n\n启动时间: ${response.start_time}\n\n页面将自动显示分析进度`);
} else {
alert('启动分析失败:' + response.error);
}
},
error: function(xhr, status, error) {
alert('启动分析失败:网络错误或服务器异常');
console.error('启动分析失败:', error);
}
});
}
});
// 页面加载时检查是否有正在运行的分析
checkAnalysisStatus();
}); });
</script> </script>
{% endblock %} {% endblock %}