From 020b0f4eb3134c22c09f5dede17da99c755e8f3a Mon Sep 17 00:00:00 2001 From: aaron <> Date: Fri, 27 Feb 2026 21:50:26 +0800 Subject: [PATCH] update --- backend/app/astock_agent/pullback_selector.py | 373 ++++++++++++++++++ backend/app/config.py | 6 +- backend/app/crypto_agent/crypto_agent.py | 60 ++- .../crypto_agent/trading_decision_maker.py | 12 +- backend/app/main.py | 3 +- backend/app/stock_agent/stock_agent.py | 7 +- backend/run_crypto.sh | 2 +- tests/test_pullback_selector.py | 83 ++++ 8 files changed, 529 insertions(+), 17 deletions(-) create mode 100644 backend/app/astock_agent/pullback_selector.py create mode 100644 tests/test_pullback_selector.py diff --git a/backend/app/astock_agent/pullback_selector.py b/backend/app/astock_agent/pullback_selector.py new file mode 100644 index 0000000..d13363d --- /dev/null +++ b/backend/app/astock_agent/pullback_selector.py @@ -0,0 +1,373 @@ +""" +A股龙回头选股器 +策略:热门板块 + MA多头排列 + 量价配合龙回头 +执行时间:每天盘前 9:00 + +【选股条件】 +1. MA 多头排列:MA5 > MA10 > MA30 +2. 从近期高点回调 2-20% +3. 回调期间缩量(成交量 < 上涨期的 80%) +4. 接近 MA10/30/60 支撑位(±10%以内) +5. 近期再度放量(最近2天 > 回调期的 1.2倍) +""" +import pandas as pd +from typing import Dict, List, Any, Optional +from datetime import datetime, timedelta +from app.utils.logger import logger + + +class PullbackStockSelector: + """龙回头选股器""" + + def __init__(self): + """初始化选股器""" + try: + import tushare as ts + self.ts = ts + # 从配置获取 token + from app.config import get_settings + self.settings = get_settings() + self.pro = ts.pro_api(self.settings.tushare_token) + logger.info("龙回头选股器初始化成功") + except ImportError: + logger.error("tushare 未安装") + raise + except Exception as e: + logger.error(f"龙回头选股器初始化失败: {e}") + raise + + def get_hot_concepts(self, limit: int = 10) -> List[tuple]: + """ + 从 Tushare 获取热门概念板块 + + 使用 ths_index 接口获取同花顺概念板块列表 + + Args: + limit: 返回板块数量 + + Returns: + [(概念代码, 概念名称), ...] + """ + try: + # 使用 ths_index 获取同花顺概念板块 + index_df = self.pro.ths_index( + market='A', # A股市场 + fields='ts_code,name' + ) + + if index_df.empty: + logger.warning("未能获取概念板块列表,使用备用列表") + return self._get_fallback_sectors() + + # 筛选科技相关的热门概念(关键词匹配) + tech_keywords = ['人工智能', '芯片', '半导体', '新能源', '汽车', '云计算', + '网络安全', '软件', '数字', '5G', '锂电', '光伏', 'AI', '科技'] + + filtered = [] + for _, row in index_df.iterrows(): + concept_name = row['name'] + concept_code = row['ts_code'] + + # 检查是否包含关键词 + for keyword in tech_keywords: + if keyword in concept_name: + filtered.append((concept_code, concept_name)) + break + + if len(filtered) >= limit * 2: # 多获取一些,后续筛选 + break + + return filtered[:limit] if filtered else self._get_fallback_sectors() + + except Exception as e: + logger.warning(f"获取热门概念失败: {e},使用备用列表") + return self._get_fallback_sectors() + + def _get_fallback_sectors(self) -> List[tuple]: + """备用板块列表(使用已验证的板块代码)""" + return [ + ('884031.TI', '人工智能'), + ('884065.TI', '新能源汽车'), + ('884039.TI', '云计算'), + ('884145.TI', '国产软件'), + ('884192.TI', '5G概念'), + ] + + def select_from_sector(self, sector_code: str, sector_name: str, max_stocks: int = 5) -> List[Dict[str, Any]]: + """ + 从指定板块选出龙回头股票 + + Args: + sector_code: 板块代码 + sector_name: 板块名称 + max_stocks: 最多返回股票数 + + Returns: + 符合条件的股票列表 + """ + try: + # 1. 获取板块成分股 + members_df = self.pro.ths_member( + ts_code=sector_code, + fields='ts_code,name,con_code,name' + ) + if members_df.empty: + logger.warning(f"板块 {sector_name}({sector_code}) 无成分股数据,可能是板块代码不正确或该板块已下线") + return [] + + stock_codes = members_df['con_code'].tolist() + logger.info(f"板块 {sector_name} 共 {len(stock_codes)} 只成分股") + + # 2. 逐个检查股票 + selected_stocks = [] + for i, stock_code in enumerate(stock_codes[:50]): # 最多检查前50只 + result = self._check_stock(stock_code) + if result: + result['sector_name'] = sector_name + selected_stocks.append(result) + logger.info(f" ✓ 找到: {result['name']}({stock_code}) - 回踩 {result['pullback_pct']:.2f}%") + + if len(selected_stocks) >= max_stocks: + break + + return selected_stocks + + except Exception as e: + logger.error(f"从板块选股失败 {sector_name}({sector_code}): {e}") + return [] + + def _check_stock(self, stock_code: str) -> Optional[Dict[str, Any]]: + """ + 检查单只股票是否符合量价配合龙回头条件 + + 【新策略】量价配合龙回头: + 1. MA 多头排列:MA5 > MA10 > MA30(上涨趋势) + 2. 从近期高点回调(寻找龙回头机会) + 3. 回调期间缩量(成交量明显萎缩,主力未离场) + 4. 接近 MA10/30/60 支撑位 + 5. 近期有再度放量迹象(资金重新入场) + + Returns: + 符合条件返回股票信息,否则返回 None + """ + try: + # 获取最近150天的日线数据 + end_date = datetime.now().strftime('%Y%m%d') + start_date = (datetime.now() - timedelta(days=150)).strftime('%Y%m%d') + + df = self.pro.daily( + ts_code=stock_code, + start_date=start_date, + end_date=end_date + ) + + if df.empty or len(df) < 70: + return None + + df = df.sort_values('trade_date').reset_index(drop=True) + df = df.tail(90).reset_index(drop=True) # 取最近90天 + + close = df['close'] + volume = df['vol'] # 成交量(手) + + # ========== 1. 计算 MA 均线 ========== + ma5 = close.rolling(window=5).mean() + ma10 = close.rolling(window=10).mean() + ma30 = close.rolling(window=30).mean() + ma60 = close.rolling(window=60).mean() + + latest = df.iloc[-1] + latest_close = latest['close'] + latest_ma5 = ma5.iloc[-1] + latest_ma10 = ma10.iloc[-1] + latest_ma30 = ma30.iloc[-1] + latest_ma60 = ma60.iloc[-1] + + # 条件1:MA 多头排列 MA5 > MA10 > MA30 + if not (latest_ma5 > latest_ma10 > latest_ma30): + logger.debug(f" ✗ {stock_code}: MA非多头排列 (MA5:{latest_ma5:.2f}, MA10:{latest_ma10:.2f}, MA30:{latest_ma30:.2f})") + return None + + # ========== 2. 找近期高点(最近20天内最高价) ========== + lookback_high = 20 + recent_df = df.tail(lookback_high).reset_index(drop=True) + high_idx_in_recent = recent_df['close'].idxmax() + high_price = recent_df.loc[high_idx_in_recent, 'close'] + high_idx_in_df = df.index[-lookback_high] + high_idx_in_recent + + # 计算从高点的回踩幅度 + pullback_pct = (high_price - latest_close) / high_price * 100 + + # 条件2:回踩幅度 2-20%(放宽) + if not (2 <= pullback_pct <= 20): + logger.debug(f" ✗ {stock_code}: 回踩幅度不符合 (回踩:{pullback_pct:.2f}%, 需要2-20%)") + return None + + # ========== 3. 量能形态分析:放量上涨→缩量回调→再度放量 ========== + + # 上涨期间的成交量(从低点到高点) + rise_period = df.loc[:high_idx_in_df] + rise_volume_avg = rise_period['vol'].mean() + + # 回调期间的成交量(从高点到现在,最近5天) + pullback_start_idx = min(high_idx_in_df + 1, len(df) - 1) + pullback_period = df.loc[pullback_start_idx:][-5:] # 回调期间最近5天 + pullback_volume_avg = pullback_period['vol'].mean() + + # 条件3:回调期间缩量(成交量 < 上涨期间的 80%,放宽) + volume_shrink_ratio = pullback_volume_avg / rise_volume_avg if rise_volume_avg > 0 else 1 + if volume_shrink_ratio >= 0.8: + logger.debug(f" ✗ {stock_code}: 回调未缩量 (缩量比:{volume_shrink_ratio:.2%}, 需要<80%)") + return None + + # 条件4:最近2天再度放量(比回调期间平均成交量增加 20%+,放宽) + recent_2_days_volume = df.tail(2)['vol'].mean() + if recent_2_days_volume < pullback_volume_avg * 1.2: + logger.debug(f" ✗ {stock_code}: 未再度放量 (最近2天/回调期:{recent_2_days_volume/pullback_volume_avg:.2f}, 需要>1.2)") + return None + + # ========== 4. 接近 MA 支撑位 ========== + ma10_diff = abs(latest_close - latest_ma10) / latest_ma10 * 100 + ma30_diff = abs(latest_close - latest_ma30) / latest_ma30 * 100 + ma60_diff = abs(latest_close - latest_ma60) / latest_ma60 * 100 + + # 接近任意一条 MA 线(±10%以内,放宽) + near_ma = (ma10_diff <= 10 or ma30_diff <= 10 or ma60_diff <= 10) + + # 未跌破 MA60(允许跌破 10%,放宽) + above_ma60 = latest_close >= latest_ma60 * 0.90 + + if not (near_ma and above_ma60): + logger.debug(f" ✗ {stock_code}: 未接近MA支撑或已跌破MA60 (MA10差:{ma10_diff:.2f}%, MA30:{ma30_diff:.2f}%, MA60:{ma60_diff:.2f}%)") + return None + + # ========== 5. 计算前期涨幅 ========== + rise_data = df.loc[:high_idx_in_df] + low_price = rise_data['low'].min() + rise_pct = (high_price - low_price) / low_price * 100 + + # 涨幅需 > 10% + if rise_pct < 10: + logger.debug(f" ✗ {stock_code}: 涨幅不足 (涨幅:{rise_pct:.2f}%, 需要>10%)") + return None + + # 获取股票名称 + stock_info = self.pro.stock_basic(ts_code=stock_code, fields='ts_code,name') + stock_name = stock_info.iloc[0]['name'] if not stock_info.empty else stock_code + + return { + 'ts_code': stock_code, + 'name': stock_name, + 'close': latest_close, + 'high': high_price, + 'rise_pct': rise_pct, + 'pullback_pct': pullback_pct, + 'ma5': latest_ma5, + 'ma10': latest_ma10, + 'ma30': latest_ma30, + 'ma60': latest_ma60, + 'volume_shrink_ratio': volume_shrink_ratio, + 'recent_volume_ratio': recent_2_days_volume / pullback_volume_avg, + 'trade_date': latest['trade_date'] + } + + except Exception as e: + logger.debug(f"检查股票 {stock_code} 失败: {e}") + return None + + def select_from_hot_sectors(self, top_n: int = 5) -> Dict[str, List[Dict[str, Any]]]: + """ + 从热门板块选股 + + Args: + top_n: 选择前N个热门板块 + + Returns: + {板块名称: [股票列表]} + """ + try: + # 使用 Tushare API 动态获取热门板块 + hot_sectors = self.get_hot_concepts(limit=top_n * 2) # 多获取一些以备筛选 + + logger.info(f"开始龙回头选股,共 {len(hot_sectors)} 个热门板块") + + results = {} + checked_count = 0 + skipped_count = 0 + + for sector_code, sector_name in hot_sectors: + if checked_count >= top_n: + break + + logger.info(f"检查板块: {sector_name}") + + selected = self.select_from_sector(sector_code, sector_name, max_stocks=2) + + if selected: + results[sector_name] = selected + logger.info(f" ✓ 板块 {sector_name} 选出 {len(selected)} 只") + checked_count += 1 + else: + logger.info(f" - 板块 {sector_name} 未选出") + skipped_count += 1 + # 跳过无数据的板块,继续检查下一个 + if skipped_count >= 5: # 如果连续5个板块都没有数据,就停止 + logger.warning("连续多个板块无数据,停止检查") + break + + return results + + except Exception as e: + logger.error(f"从热门板块选股失败: {e}") + import traceback + logger.error(traceback.format_exc()) + return {} + + def format_result(self, results: Dict[str, List[Dict[str, Any]]]) -> str: + """ + 格式化选股结果 + + Args: + results: 选股结果 + + Returns: + 格式化的文本 + """ + if not results: + return "今日未选出符合条件的龙回头股票" + + lines = [ + "📊 **龙回头选股结果**", + f"", + f"选股时间: {datetime.now().strftime('%Y-%m-%d %H:%M')}", + f"", + ] + + total_stocks = 0 + for sector_name, stocks in results.items(): + lines.append(f"**🏢 {sector_name}**") + for stock in stocks: + total_stocks += 1 + lines.append(f" • {stock['name']}({stock['ts_code']})") + lines.append(f" 现价: ¥{stock['close']:.2f} | 回踩: {stock['pullback_pct']:.2f}% | 涨幅: {stock['rise_pct']:.2f}%") + lines.append(f" MA5: ¥{stock['ma5']:.2f} | MA10: ¥{stock['ma10']:.2f} | MA30: ¥{stock['ma30']:.2f} | MA60: ¥{stock['ma60']:.2f}") + lines.append(f" 缩量比: {stock['volume_shrink_ratio']:.0%} | 再放量: {stock['recent_volume_ratio']:.2f}x") + + lines.append("") + lines.append(f"**共选出 {total_stocks} 只股票**") + lines.append("") + lines.append("*⚠️ 仅供参考,不构成投资建议*") + + return "\n".join(lines) + + +# 全局单例 +_pullback_selector: Optional[PullbackStockSelector] = None + + +def get_pullback_selector() -> PullbackStockSelector: + """获取龙回头选股器单例""" + global _pullback_selector + if _pullback_selector is None: + _pullback_selector = PullbackStockSelector() + return _pullback_selector diff --git a/backend/app/config.py b/backend/app/config.py index 8ec6501..5e9bf65 100644 --- a/backend/app/config.py +++ b/backend/app/config.py @@ -112,7 +112,6 @@ class Settings(BaseSettings): # 加密货币交易智能体配置 crypto_symbols: str = "BTCUSDT,ETHUSDT" # 监控的交易对,逗号分隔 - crypto_analysis_interval: int = 60 # 分析间隔(秒) crypto_llm_threshold: float = 0.70 # 触发 LLM 分析的置信度阈值 # 价格监控模式配置 @@ -198,6 +197,11 @@ class Settings(BaseSettings): dingtalk_astock_webhook: str = "" # A股钉钉通知 Webhook dingtalk_astock_secret: str = "" # A股钉钉通知加签密钥 + # A股龙回头选股配置 + pullback_selector_enabled: bool = True # 是否启用龙回头选股 + pullback_select_time: str = "09:00" # 选股时间(24小时制) + pullback_sectors_to_check: int = 5 # 检查板块数量 + class Config: env_file = find_env_file() case_sensitive = False diff --git a/backend/app/crypto_agent/crypto_agent.py b/backend/app/crypto_agent/crypto_agent.py index 2c0ee9a..b49e8c7 100644 --- a/backend/app/crypto_agent/crypto_agent.py +++ b/backend/app/crypto_agent/crypto_agent.py @@ -525,8 +525,10 @@ class CryptoAgent: if paper_trading_enabled: logger.info(f"\n📊 【模拟交易决策】") positions, account, pending_orders = self._get_trading_state(use_real_trading=False) + # 过滤:只传递当前symbol的挂单给决策器,避免LLM搞混 + pending_orders_for_symbol = [o for o in pending_orders if o.get('symbol') == symbol] paper_decision = await self.decision_maker.make_decision( - market_signal, positions, account, current_price, pending_orders + market_signal, positions, account, current_price, pending_orders_for_symbol ) self._log_trading_decision(paper_decision) # 发送交易决策通知 @@ -540,8 +542,10 @@ class CryptoAgent: # 检查是否开启自动交易 if self.real_trading and self.real_trading.get_auto_trading_status(): positions, account, pending_orders = self._get_trading_state(use_real_trading=True) + # 过滤:只传递当前symbol的挂单给决策器,避免LLM搞混 + pending_orders_for_symbol = [o for o in pending_orders if o.get('symbol') == symbol] real_decision = await self.decision_maker.make_decision( - market_signal, positions, account, current_price, pending_orders + market_signal, positions, account, current_price, pending_orders_for_symbol ) self._log_trading_decision(real_decision) # 发送交易决策通知 @@ -996,7 +1000,10 @@ class CryptoAgent: content_parts.append(f"") content_parts.append(f"📝 **分析理由**:") - content_parts.append(f"{reasoning}") + # HTML转义reasoning,避免特殊字符破坏HTML格式 + import html + escaped_reasoning = html.escape(reasoning) if reasoning else reasoning + content_parts.append(f"{escaped_reasoning}") content = "\n".join(content_parts) @@ -1089,13 +1096,18 @@ class CryptoAgent: position_map = {'heavy': '🔥 重仓', 'medium': '📊 中仓', 'light': '🌱 轻仓', 'micro': '🌱 微仓'} position_display = position_map.get(position_size, position_size) + # HTML转义,避免特殊字符破坏HTML格式 + import html + escaped_reasoning = html.escape(reasoning) if reasoning else '' + escaped_risk = html.escape(risk_analysis) if risk_analysis else '' + content_parts.extend([ f"📊 **仓位**: {position_display}", - f"💭 **决策理由**: {reasoning}", + f"💭 **决策理由**: {escaped_reasoning}", ]) - if risk_analysis: - content_parts.append(f"⚠️ **风险**: {risk_analysis}") + if escaped_risk: + content_parts.append(f"⚠️ **风险**: {escaped_risk}") # 添加价格信息(如果有) quantity = decision.get('quantity', 0) @@ -1486,14 +1498,42 @@ class CryptoAgent: trading_type = "模拟" if paper_trading else "实盘" if not trading_service: - logger.warning(f" {trading_type}交易服务未初始化") + logger.warning(f" {trading_type}交易服务未启用") + return + + # 安全检查:验证要取消的订单是否属于当前symbol + active_orders = trading_service.get_active_orders() + valid_orders = [] + invalid_orders = [] + + for order_id in orders_to_cancel: + # 查找订单 + order = next((o for o in active_orders if o.get('order_id') == order_id), None) + if not order: + logger.warning(f" ⚠️ 订单不存在: {order_id}") + invalid_orders.append(order_id) + continue + + # 检查订单是否属于当前symbol + if order.get('symbol') != symbol: + logger.error(f" ❌ 安全拦截:订单 {order_id} 属于 {order.get('symbol')},不是当前分析标的 {symbol}") + invalid_orders.append(order_id) + continue + + valid_orders.append(order_id) + + if invalid_orders: + logger.error(f" 🚫 拒绝取消 {len(invalid_orders)} 个不属于 {symbol} 的订单") + + if not valid_orders: + logger.warning(f" ⚠️ 没有有效的订单可以取消") return logger.info(f" 🚫 {trading_type}取消挂单: {symbol}") - logger.info(f" 取消订单数量: {len(orders_to_cancel)}") + logger.info(f" 取消订单数量: {len(valid_orders)}") cancelled_count = 0 - for order_id in orders_to_cancel: + for order_id in valid_orders: try: # 取消订单 result = trading_service.cancel_order(order_id) @@ -1505,7 +1545,7 @@ class CryptoAgent: except Exception as e: logger.error(f" ❌ 取消订单异常: {order_id} | {e}") - logger.info(f" 📊 成功取消 {cancelled_count}/{len(orders_to_cancel)} 个订单") + logger.info(f" 📊 成功取消 {cancelled_count}/{len(valid_orders)} 个订单") except Exception as e: logger.error(f"执行取消挂单失败: {e}") diff --git a/backend/app/crypto_agent/trading_decision_maker.py b/backend/app/crypto_agent/trading_decision_maker.py index 241544d..bb67f3f 100644 --- a/backend/app/crypto_agent/trading_decision_maker.py +++ b/backend/app/crypto_agent/trading_decision_maker.py @@ -120,12 +120,19 @@ class TradingDecisionMaker: - **信号方向与挂单方向相反**: - 新信号是 buy,但存在 sell 挂单 → 取消 sell 挂单 - 新信号是 sell,但存在 buy 挂单 → 取消 buy 挂单 + - ⚠️ **绝不取消同向挂单**:buy信号不应取消buy挂单,sell信号不应取消sell挂单 - **价格偏离过大**: - 当前价格距离挂单价超过 3%,建议取消重新挂单 +**🚨 取消订单的严格要求**(违反这些规则会导致严重错误): +1. **只能取消当前交易对的挂单**:你只能看到 {symbol} 的挂单,不要取消其他交易对的订单 +2. **只能取消反向挂单**:buy信号取消sell挂单,sell信号取消buy挂单 +3. **绝不取消同向挂单**:如果信号是sell,不应该取消sell挂单(同向应该保留或加仓) +4. **检查订单ID的symbol**:在填写 orders_to_cancel 前,确认订单ID属于当前分析的交易对 + **输出格式**: - `decision: "CANCEL_PENDING"` -- `orders_to_cancel`: ["order_id_1", "order_id_2"] - 要取消的订单ID列表 +- `orders_to_cancel`: ["order_id_1", "order_id_2"] - 要取消的订单ID列表(必须来自上面的挂单列表) - `reasoning`: "取消原因" ### 6. 观望(HOLD) @@ -474,8 +481,9 @@ class TradingDecisionMaker: # 当前挂单 pending_orders = context.get('pending_orders', []) - prompt_parts.append(f"\n## 当前挂单") + prompt_parts.append(f"\n## 当前挂单(仅 {context['symbol']} 的挂单)") if pending_orders: + prompt_parts.append(f"⚠️ 重要:以下挂单都属于当前交易对 {context['symbol']},取消订单时只能选择这些订单ID") for order in pending_orders: side_icon = "🟢" if order.get('side') == 'long' else "🔴" entry_type = "现价单" if order.get('entry_type') == 'market' else "挂单" diff --git a/backend/app/main.py b/backend/app/main.py index 97d6579..852903c 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -116,7 +116,8 @@ async def start_scheduler(): ) _astock_scheduler.start() - logger.info("📅 定时任务调度器已启动: 每天 15:30 (A股板块异动分析)") + logger.info("📅 定时任务调度器已启动:") + logger.info(" - 每天 15:30 (A股板块异动分析)") async def price_monitor_loop(): diff --git a/backend/app/stock_agent/stock_agent.py b/backend/app/stock_agent/stock_agent.py index cf48fad..aa3a428 100644 --- a/backend/app/stock_agent/stock_agent.py +++ b/backend/app/stock_agent/stock_agent.py @@ -558,8 +558,11 @@ class StockAgent: logger.info(f" Webhook URL: {self.feishu.webhook_url[:50]}...") # 发送到飞书 - await self.feishu.send_card(title, content, color) - logger.info(f" ✅ 飞书通知发送成功") + feishu_success = await self.feishu.send_card(title, content, color) + if feishu_success: + logger.info(f" ✅ 飞书通知发送成功") + else: + logger.warning(f" ⚠️ 飞书通知发送失败(但Telegram会发送)") # 发送到 Telegram await self.telegram.send_message(formatter.format_signal_message(signal, symbol, agent_type='stock')) diff --git a/backend/run_crypto.sh b/backend/run_crypto.sh index a6ad8f1..9bd23cf 100755 --- a/backend/run_crypto.sh +++ b/backend/run_crypto.sh @@ -65,7 +65,7 @@ from app.config import get_settings settings = get_settings() print(f" 监控交易对: {settings.crypto_symbols}") -print(f" 分析间隔: {settings.crypto_analysis_interval}秒") +print(f" 运行频率: 每5分钟整点") print(f" 飞书 Webhook: {'✓ 已配置' if settings.feishu_webhook_url else '❌ 未配置'}") print(f" LLM 服务: {'✓ 已配置' if settings.zhipuai_api_key or settings.deepseek_api_key else '⚠️ 未配置'}") diff --git a/tests/test_pullback_selector.py b/tests/test_pullback_selector.py new file mode 100644 index 0000000..65d0853 --- /dev/null +++ b/tests/test_pullback_selector.py @@ -0,0 +1,83 @@ +""" +龙回头选股器测试脚本 +运行: cd backend && python ../tests/test_pullback_selector.py +""" +import asyncio +import sys +import os +from pathlib import Path + +# 添加 backend 目录到 Python 路径 +backend_dir = Path(__file__).parent.parent / "backend" +sys.path.insert(0, str(backend_dir)) + +# 切换到 backend 目录(确保 .env 能被找到) +os.chdir(backend_dir) + +from app.config import get_settings +from app.astock_agent.pullback_selector import get_pullback_selector +import logging + +# 启用详细日志 +logging.getLogger('stock_agent').setLevel(logging.DEBUG) + + +async def test_pullback_selector(): + """测试龙回头选股器""" + print("=" * 60) + print("龙回头选股器测试") + print("=" * 60) + + settings = get_settings() + print(f"\n配置:") + print(f" Tushare Token: {'已配置' if settings.tushare_token else '未配置'}") + print(f" 选股启用: {settings.pullback_selector_enabled}") + print(f" 检查板块数: {settings.pullback_sectors_to_check}") + + if not settings.tushare_token: + print("\n❌ 错误: 请先在 .env 中配置 TUSHARE_TOKEN") + return + + # 创建选股器 + selector = get_pullback_selector() + + print(f"\n开始选股...") + print(f"检查板块数: {settings.pullback_sectors_to_check}") + print(f"每个板块最多选: 2 只\n") + + # 执行选股 + results = selector.select_from_hot_sectors(top_n=settings.pullback_sectors_to_check) + + # 显示结果 + if results: + print("\n" + "=" * 60) + print("选股结果") + print("=" * 60) + + for sector_name, stocks in results.items(): + print(f"\n【{sector_name}】") + for stock in stocks: + print(f" 📈 {stock['name']}({stock['ts_code']})") + print(f" 现价: ¥{stock['close']:.2f} | 回踩: {stock['pullback_pct']:.2f}% | 涨幅: {stock['rise_pct']:.2f}%") + print(f" MA5: ¥{stock['ma5']:.2f} | MA10: ¥{stock['ma10']:.2f} | MA30: ¥{stock['ma30']:.2f} | MA60: ¥{stock['ma60']:.2f}") + print(f" 缩量比: {stock['volume_shrink_ratio']:.0%} | 再放量: {stock['recent_volume_ratio']:.2f}x") + + total = sum(len(stocks) for stocks in results.values()) + print(f"\n共选出 {total} 只股票") + + # 格式化完整消息 + print("\n" + "=" * 60) + print("通知消息预览") + print("=" * 60) + print(selector.format_result(results)) + + else: + print("\n未选出符合条件的股票") + + print("\n" + "=" * 60) + print("测试完成") + print("=" * 60) + + +if __name__ == "__main__": + asyncio.run(test_pullback_selector())