diff --git a/backend/app/api/stock.py b/backend/app/api/stock.py index 4533117..5d1989b 100644 --- a/backend/app/api/stock.py +++ b/backend/app/api/stock.py @@ -9,6 +9,58 @@ from app.utils.logger import logger router = APIRouter() +@router.post("/sector/check") +async def trigger_sector_check(): + """ + 手动触发板块异动检查 + + Returns: + 检查结果 + """ + try: + from app.main import _astock_monitor_instance + from app.config import get_settings + + if not _astock_monitor_instance: + # 创建临时监控实例 + from app.astock_agent import SectorMonitor + settings = get_settings() + monitor = SectorMonitor( + change_threshold=settings.astock_change_threshold, + top_n=settings.astock_top_n, + enable_notifier=False # 手动触发不发送通知 + ) + result = await monitor.check_once() + return result + else: + result = await _astock_monitor_instance.check_once() + return result + except Exception as e: + logger.error(f"手动触发板块检查失败: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + +@router.get("/sector/stats") +async def get_sector_stats(): + """ + 获取板块监控统计信息 + + Returns: + 统计信息 + """ + try: + from app.main import _astock_monitor_instance + + if not _astock_monitor_instance: + return {"error": "板块监控未运行"} + + stats = _astock_monitor_instance.get_stats() + return stats + except Exception as e: + logger.error(f"获取板块统计失败: {e}") + raise HTTPException(status_code=500, detail=str(e)) + + @router.get("/quote/{stock_code}") async def get_quote(stock_code: str): """ diff --git a/backend/app/astock_agent/__init__.py b/backend/app/astock_agent/__init__.py new file mode 100644 index 0000000..fd15411 --- /dev/null +++ b/backend/app/astock_agent/__init__.py @@ -0,0 +1,16 @@ +""" +A 股板块异动监控 Agent +提供 Tushare 数据源版本 +""" +from .sector_monitor import SectorMonitor +from .tushare_client import TushareClient, get_tushare_client +from .tushare_sector_analyzer import TushareSectorAnalyzer +from .tushare_stock_selector import TushareStockSelector + +__all__ = [ + 'SectorMonitor', + 'TushareClient', + 'get_tushare_client', + 'TushareSectorAnalyzer', + 'TushareStockSelector', +] diff --git a/backend/app/astock_agent/akshare_client.py b/backend/app/astock_agent/akshare_client.py new file mode 100644 index 0000000..eda7364 --- /dev/null +++ b/backend/app/astock_agent/akshare_client.py @@ -0,0 +1,234 @@ +""" +Akshare 数据封装 +提供 A 股板块、个股行情数据获取接口 +支持概念板块和行业板块 +""" +import os +import time +import akshare as ak +import pandas as pd +from typing import Dict, List, Optional +from datetime import datetime, timedelta +from app.utils.logger import logger + + +# 禁用全局代理设置 +os.environ.pop('HTTP_PROXY', None) +os.environ.pop('HTTPS_PROXY', None) +os.environ.pop('http_proxy', None) +os.environ.pop('https_proxy', None) + +# Monkey patch requests 以禁用代理 +import requests +_original_session_init = requests.Session.__init__ + + +def _patched_session_init(self, *args, **kwargs): + _original_session_init(self, *args, **kwargs) + self.trust_env = False + self.proxies = {'http': None, 'https': None} + + +requests.Session.__init__ = _patched_session_init + + +class AkshareClient: + """Akshare 数据客户端""" + + # 缓存数据,避免频繁请求 + _cache = {} + _cache_time = {} + _last_request_time = 0 + + def __init__(self): + """初始化客户端""" + self.cache_ttl = 60 # 缓存60秒 + self.request_delay = 1.0 # 请求间隔(秒) + self.max_retries = 3 # 最大重试次数 + + def _get_cached(self, key: str, fetch_func) -> pd.DataFrame: + """获取缓存数据,支持重试""" + now = datetime.now() + + # 检查缓存 + if key in self._cache: + cache_time = self._cache_time.get(key) + if cache_time and (now - cache_time).seconds < self.cache_ttl: + logger.debug(f"使用缓存数据: {key}") + return self._cache[key] + + # 请求限流 + elapsed = now.timestamp() - self._last_request_time + if elapsed < self.request_delay: + time.sleep(self.request_delay - elapsed) + + # 重试逻辑 + last_error = None + for attempt in range(self.max_retries): + try: + self._last_request_time = time.time() + df = fetch_func() + + if df is not None and not df.empty: + self._cache[key] = df + self._cache_time[key] = now + logger.debug(f"获取数据成功: {key}") + return df + + except Exception as e: + last_error = e + error_msg = str(e) + + # 判断错误类型 + if 'Connection' in error_msg or 'RemoteDisconnected' in error_msg: + # 连接错误,指数退避重试 + if attempt < self.max_retries - 1: + wait_time = (2 ** attempt) * 2 # 2, 4, 8秒 + logger.warning( + f"获取数据失败 {key} (尝试 {attempt + 1}/{self.max_retries}): {e}," + f"等待 {wait_time}秒后重试..." + ) + time.sleep(wait_time) + continue + + # 其他错误或重试次数用尽 + logger.error(f"获取数据失败 {key}: {e}") + break + + return pd.DataFrame() + + def get_concept_spot(self) -> pd.DataFrame: + """ + 获取概念板块行情(实时) + + Returns: + 概念板块行情数据 + """ + def fetch(): + # stock_board_concept_name_em - 东方财富概念板块行情 + return ak.stock_board_concept_name_em() + + return self._get_cached('concept_spot', fetch) + + def get_industry_spot(self) -> pd.DataFrame: + """ + 获取行业板块行情(实时) + + Returns: + 行业板块行情数据 + """ + def fetch(): + # stock_board_industry_name_em - 东方财富行业板块行情 + return ak.stock_board_industry_name_em() + + return self._get_cached('industry_spot', fetch) + + def get_concept_stocks(self, sector_name: str) -> pd.DataFrame: + """ + 获取概念板块成分股 + + Args: + sector_name: 板块名称 + + Returns: + 成分股数据 + """ + def fetch(): + # stock_board_concept_cons_em - 概念板块成分股 + df = ak.stock_board_concept_cons_em(symbol=sector_name) + return df if df is not None else pd.DataFrame() + + return self._get_cached(f'concept_stocks_{sector_name}', fetch) + + def get_industry_stocks(self, sector_name: str) -> pd.DataFrame: + """ + 获取行业板块成分股 + + Args: + sector_name: 板块名称 + + Returns: + 成分股数据 + """ + def fetch(): + # stock_board_industry_cons_em - 行业板块成分股 + df = ak.stock_board_industry_cons_em(symbol=sector_name) + return df if df is not None else pd.DataFrame() + + return self._get_cached(f'industry_stocks_{sector_name}', fetch) + + def get_stock_spot(self) -> pd.DataFrame: + """ + 获取 A 股实时行情 + + Returns: + A 股实时行情数据 + """ + def fetch(): + return ak.stock_zh_a_spot_em() + + return self._get_cached('stock_spot', fetch) + + def get_stock_fund_flow(self, symbol: str) -> pd.DataFrame: + """ + 获取个股资金流向 + + Args: + symbol: 股票代码 + + Returns: + 资金流向数据 + """ + def fetch(): + return ak.stock_individual_fund_flow( + stock=symbol, + market="sh" if symbol.startswith('6') else "sz" + ) + + return self._get_cached(f'fund_flow_{symbol}', fetch) + + def get_stock_info(self, symbol: str) -> Dict: + """ + 获取个股基本信息 + + Args: + symbol: 股票代码 + + Returns: + 股票信息字典 + """ + try: + info = ak.stock_individual_info_em(symbol=symbol) + return { + 'name': info.get('股票简称', ''), + 'industry': info.get('行业', ''), + 'market_cap': info.get('总市值', ''), + 'float_cap': info.get('流通市值', ''), + } + except Exception as e: + logger.error(f"获取股票信息失败 {symbol}: {e}") + return {} + + def get_limit_list_stocks(self) -> pd.DataFrame: + """ + 获取涨停板股票 + + Returns: + 涨停板股票列表 + """ + def fetch(): + return ak.stock_zt_pool_em(date=datetime.now().strftime('%Y%m%d')) + + return self._get_cached('limit_list', fetch) + + +# 全局单例 +_akshare_client: Optional[AkshareClient] = None + + +def get_akshare_client() -> AkshareClient: + """获取 Akshare 客户端单例""" + global _akshare_client + if _akshare_client is None: + _akshare_client = AkshareClient() + return _akshare_client diff --git a/backend/app/astock_agent/notifier.py b/backend/app/astock_agent/notifier.py new file mode 100644 index 0000000..2108c1a --- /dev/null +++ b/backend/app/astock_agent/notifier.py @@ -0,0 +1,338 @@ +""" +钉钉通知模块 +格式化并发送板块异动通知 +""" +import json +import hmac +import hashlib +import base64 +import time +import requests +from typing import Dict, List +from datetime import datetime +from urllib.parse import quote +from app.utils.logger import logger + + +class DingTalkNotifier: + """钉钉通知器""" + + def __init__(self, webhook: str, secret: str = None): + """ + 初始化通知器 + + Args: + webhook: 钉钉机器人 Webhook URL + secret: 加签密钥(可选) + """ + self.webhook = webhook + self.secret = secret + + def _sign(self, timestamp: int) -> str: + """ + 生成签名 + + Args: + timestamp: 时间戳(毫秒) + + Returns: + 签名字符串 + """ + if not self.secret: + return "" + + secret_enc = self.secret.encode('utf-8') + string_to_sign = f'{timestamp}\n{self.secret}' + string_to_sign_enc = string_to_sign.encode('utf-8') + hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest() + sign = base64.b64encode(hmac_code).decode('utf-8') + return sign + + def _build_url(self) -> str: + """ + 构建带签名的 Webhook URL + + Returns: + 完整的 Webhook URL + """ + if not self.secret: + return self.webhook + + timestamp = int(time.time() * 1000) + sign = self._sign(timestamp) + sign_encoded = quote(sign, safe='') + + return f"{self.webhook}×tamp={timestamp}&sign={sign_encoded}" + + def send_sector_alert(self, sector_data: Dict, top_stocks: List[Dict], reason: str = "") -> bool: + """ + 发送板块异动提醒 + + Args: + sector_data: 板块数据 + top_stocks: 龙头股列表 + reason: 异动原因 + + Returns: + 是否发送成功 + """ + try: + # 构建消息卡片 + card = self._format_sector_card(sector_data, top_stocks, reason) + + # 构建请求数据 + data = { + "msgtype": "markdown", + "markdown": { + "title": f"🔥 {sector_data['name']} 异动提醒", + "text": card + } + } + + # 构建带签名的 URL + url = self._build_url() + + # 发送请求 + headers = {"Content-Type": "application/json;charset=utf-8"} + response = requests.post( + url, + data=json.dumps(data), + headers=headers, + timeout=10 + ) + + result = response.json() + if result.get("errcode") == 0: + logger.info(f"钉钉通知发送成功: {sector_data['name']}") + return True + else: + logger.error(f"钉钉通知发送失败: {result}") + return False + + except Exception as e: + logger.error(f"发送钉钉通知异常: {e}") + return False + + def _format_sector_card(self, sector_data: Dict, top_stocks: List[Dict], reason: str) -> str: + """ + 格式化板块异动卡片 + + Args: + sector_data: 板块数据 + top_stocks: 龙头股列表 + reason: 异动原因 + + Returns: + Markdown 格式的消息内容 + """ + lines = [] + + # 标题 + lines.append("### 🔥 A股板块异动提醒") + lines.append("") + + # 基本信息 + change_pct = sector_data['change_pct'] + change_icon = "📈" if change_pct > 0 else "📉" + lines.append(f"**异动板块**: {sector_data['name']} {change_icon} {change_pct:+.2f}%") + lines.append(f"**异动时间**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") + lines.append(f"**异动类型**: 涨幅突增 | {reason if reason else '资金集中流入'}") + lines.append("") + + # 板块概况 + lines.append("#### 📊 板块概况") + lines.append(f"- 涨幅: {change_pct:+.2f}%") + lines.append(f"- 涨跌额: {sector_data.get('change_amount', 0):+.2f}") + + if sector_data.get('amount', 0) > 0: + amount = sector_data['amount'] + if amount >= 100000: + amount_str = f"{amount/100000:.1f}亿" + else: + amount_str = f"{amount/10000:.1f}万" + lines.append(f"- 成交额: {amount_str}") + + if sector_data.get('leading_stock'): + lines.append(f"- 领涨股: {sector_data['leading_stock']}") + + lines.append("") + + # 龙头股 + if top_stocks: + lines.append("#### 🏆 龙头股 Top " + str(len(top_stocks))) + lines.append("") + + for idx, stock in enumerate(top_stocks, 1): + # 价格格式化 + price = stock['price'] + change_pct = stock['change_pct'] + + # 涨跌幅图标 + if change_pct >= 9.9: + change_icon = "🚀" + elif change_pct >= 5: + change_icon = "⚡" + elif change_pct > 0: + change_icon = "📈" + elif change_pct > -3: + change_icon = "➖" + else: + change_icon = "📉" + + lines.append(f"**{idx}. {stock['name']} ({stock['code']})**") + lines.append(f" 现价: ¥{price:.2f} ({change_icon} {change_pct:+.2f}%)") + lines.append(f" 成交额: {self._format_amount(stock['amount'])}") + lines.append(f" 换手率: {stock['turnover']:.2f}%") + lines.append(f" 涨速: {stock['speed_level']}") + + if stock.get('volume_ratio', 1) > 2: + lines.append(f" 量比: {stock['volume_ratio']:.2f} 🔥") + + lines.append("") + + lines.append("---") + lines.append(f"📊 综合评分: {top_stocks[0]['score']:.1f}分") + + return "\n".join(lines) + + def _format_amount(self, amount: float) -> str: + """ + 格式化成交额 + + Args: + amount: 成交额(元) + + Returns: + 格式化后的字符串 + """ + if amount >= 100000000: + return f"{amount/100000000:.2f}亿" + elif amount >= 10000: + return f"{amount/10000:.2f}万" + else: + return f"{amount:.0f}元" + + def send_summary(self, total_sectors: int, total_stocks: int) -> bool: + """ + 发送监控汇总 + + Args: + total_sectors: 异动板块总数 + total_stocks: 龙头股总数 + + Returns: + 是否发送成功 + """ + try: + lines = [] + lines.append("### 📋 A股板块监控汇总") + lines.append("") + lines.append(f"**监控时间**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") + lines.append("") + lines.append("#### 📊 今日统计") + lines.append(f"- 异动板块: {total_sectors} 个") + lines.append(f"- 龙头股: {total_stocks} 只") + lines.append("") + lines.append("---") + lines.append(f"⏰ 下次更新: {datetime.now().strftime('%H:%M')}") + + card = "\n".join(lines) + + # 构建请求数据 + data = { + "msgtype": "markdown", + "markdown": { + "title": "📋 A股板块监控汇总", + "text": card + } + } + + # 构建带签名的 URL + url = self._build_url() + + # 发送请求 + headers = {"Content-Type": "application/json;charset=utf-8"} + response = requests.post( + url, + data=json.dumps(data), + headers=headers, + timeout=10 + ) + + result = response.json() + if result.get("errcode") == 0: + logger.info(f"钉钉汇总发送成功") + return True + else: + logger.error(f"钉钉汇总发送失败: {result}") + return False + + except Exception as e: + logger.error(f"发送钉钉汇总异常: {e}") + return False + + def send_error(self, error_msg: str) -> bool: + """ + 发送错误通知 + + Args: + error_msg: 错误信息 + + Returns: + 是否发送成功 + """ + try: + lines = [] + lines.append("### ❌ A股板块监控异常") + lines.append("") + lines.append(f"**时间**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") + lines.append("") + lines.append(f"```\n{error_msg}\n```") + + card = "\n".join(lines) + + # 构建请求数据 + data = { + "msgtype": "markdown", + "markdown": { + "title": "❌ A股板块监控异常", + "text": card + } + } + + # 构建带签名的 URL + url = self._build_url() + + # 发送请求 + headers = {"Content-Type": "application/json;charset=utf-8"} + response = requests.post( + url, + data=json.dumps(data), + headers=headers, + timeout=10 + ) + + result = response.json() + return result.get("errcode") == 0 + + except Exception as e: + logger.error(f"发送错误通知异常: {e}") + return False + + +# 全局单例 +_notifier: DingTalkNotifier = None + + +def get_dingtalk_notifier() -> DingTalkNotifier: + """获取钉钉通知器单例""" + global _notifier + if _notifier is None: + from app.config import get_settings + settings = get_settings() + # 优先使用A股专用配置,否则使用通用配置 + webhook = settings.dingtalk_astock_webhook or settings.dingtalk_webhook_url + secret = settings.dingtalk_astock_secret or settings.dingtalk_secret + if webhook: + _notifier = DingTalkNotifier(webhook, secret) + return _notifier diff --git a/backend/app/astock_agent/sector_analyzer.py b/backend/app/astock_agent/sector_analyzer.py new file mode 100644 index 0000000..7206312 --- /dev/null +++ b/backend/app/astock_agent/sector_analyzer.py @@ -0,0 +1,200 @@ +""" +板块异动分析 +检测板块涨跌幅、量能、资金流向异动 +""" +import pandas as pd +from typing import Dict, List, Optional, Tuple +from datetime import datetime +from app.utils.logger import logger +from app.utils.error_handler import notify_error +from .akshare_client import get_akshare_client + + +class SectorChangeAnalyzer: + """板块异动分析器""" + + def __init__(self, change_threshold: float = 2.0): + """ + 初始化异动分析器 + + Args: + change_threshold: 涨跌幅阈值(%) + """ + self.change_threshold = change_threshold + self.akshare = get_akshare_client() + + def detect_sector_changes(self) -> List[Dict]: + """ + 检测异动板块(使用概念板块) + + Returns: + 异动板块列表 + """ + try: + # 获取概念板块行情 + df = self.akshare.get_concept_spot() + if df.empty: + logger.warning("概念板块行情数据为空") + return [] + + # 转换数据类型(概念板块返回的列名) + df['涨跌幅'] = pd.to_numeric(df['涨跌幅'], errors='coerce') + df['涨跌额'] = pd.to_numeric(df['涨跌额'], errors='coerce') + df['最新价'] = pd.to_numeric(df['最新价'], errors='coerce') + df['成交额'] = pd.to_numeric(df['成交额'], errors='coerce') + + # 筛选异动板块 + hot_sectors = df[df['涨跌幅'] >= self.change_threshold].copy() + + if hot_sectors.empty: + return [] + + # 排序:涨幅优先,然后成交额 + hot_sectors = hot_sectors.sort_values( + by=['涨跌幅', '成交额'], + ascending=[False, False] + ) + + # 转换为结果列表 + results = [] + for _, row in hot_sectors.iterrows(): + results.append({ + 'name': row['板块名称'], + 'change_pct': float(row['涨跌幅']), + 'change_amount': float(row.get('涨跌额', 0)), + 'volume': float(row.get('成交量', 0)) if '成交量' in row else 0.0, + 'amount': float(row.get('成交额', 0)), + 'leading_stock': row.get('领涨股', ''), + 'ups': int(row.get('上涨家数', 0)), + 'downs': int(row.get('下跌家数', 0)), + 'timestamp': datetime.now() + }) + + logger.info(f"检测到 {len(results)} 个异动概念板块") + return results + + except Exception as e: + error_msg = f"检测板块异动失败: {e}" + logger.error(error_msg) + + # 如果是连接错误,发送通知 + if 'Connection' in str(e) or 'RemoteDisconnected' in str(e): + notify_error( + title="A股板块监控 - 数据源连接失败", + message=f"akshare 概念板块 API 连接失败\n\n错误: {e}\n\n可能原因:\n- eastmoney API 服务不稳定\n- 网络连接问题\n- 建议:稍后自动重试或考虑使用 tushare", + level="warning" + ) + + return [] + + def analyze_sector_momentum(self, sector_name: str) -> Dict: + """ + 分析板块动能 + + Args: + sector_name: 板块名称 + + Returns: + 板块动能分析结果 + """ + try: + # 获取板块成分股 + stocks_df = self.akshare.get_concept_stocks(sector_name) + if stocks_df.empty: + return {} + + # 获取实时行情 + spot_df = self.akshare.get_stock_spot() + if spot_df.empty: + return {} + + # 合并数据 + merged = pd.merge( + stocks_df, + spot_df, + on='代码', + how='inner' + ) + + if merged.empty: + return {} + + # 计算统计 + total_stocks = len(merged) + up_stocks = len(merged[merged['涨跌幅'] > 0]) + down_stocks = len(merged[merged['涨跌幅'] < 0]) + avg_change = merged['涨跌幅'].mean() + max_change = merged['涨跌幅'].max() + + # 计算总成交额 + total_amount = merged['成交额'].sum() if '成交额' in merged.columns else 0 + + # 找出涨幅最大的股票 + if not merged.empty: + top_stock = merged.loc[merged['涨跌幅'].idxmax()] + else: + top_stock = None + + return { + 'sector_name': sector_name, + 'total_stocks': total_stocks, + 'up_stocks': up_stocks, + 'down_stocks': down_stocks, + 'up_down_ratio': f"{up_stocks}:{down_stocks}", + 'avg_change': float(avg_change) if pd.notna(avg_change) else 0, + 'max_change': float(max_change) if pd.notna(max_change) else 0, + 'total_amount': float(total_amount), + 'top_stock': { + 'code': top_stock['代码'] if top_stock is not None else '', + 'name': top_stock['名称'] if top_stock is not None else '', + 'change': float(top_stock['涨跌幅']) if top_stock is not None else 0, + } if top_stock is not None else None + } + + except Exception as e: + logger.error(f"分析板块动能失败 {sector_name}: {e}") + return {} + + def get_hot_reason(self, sector_name: str, top_stocks: List[Dict]) -> str: + """ + 推测异动原因(基于龙头股分析) + + Args: + sector_name: 板块名称 + top_stocks: 龙头股列表 + + Returns: + 异动原因描述 + """ + try: + if not top_stocks: + return "板块整体异动" + + # 简单的原因分析 + reasons = [] + + # 检查是否有涨停股 + limit_up_count = sum(1 for s in top_stocks if s.get('change_pct', 0) >= 9.9) + if limit_up_count > 0: + reasons.append(f"{limit_up_count}只个股涨停") + + # 检查平均涨幅 + avg_change = sum(s.get('change_pct', 0) for s in top_stocks) / len(top_stocks) + if avg_change >= 7: + reasons.append("板块全线爆发") + + # 检查是否集中在某个龙头 + if len(top_stocks) >= 2: + top1_change = top_stocks[0].get('change_pct', 0) + top2_change = top_stocks[1].get('change_pct', 0) + if top1_change - top2_change > 3: + reasons.append(f"{top_stocks[0].get('name', '')}龙头领涨") + + if reasons: + return ",".join(reasons) + else: + return "资金集中流入" + + except Exception as e: + logger.error(f"推测异动原因失败: {e}") + return "板块异动" diff --git a/backend/app/astock_agent/sector_monitor.py b/backend/app/astock_agent/sector_monitor.py new file mode 100644 index 0000000..0432818 --- /dev/null +++ b/backend/app/astock_agent/sector_monitor.py @@ -0,0 +1,250 @@ +""" +板块异动监控主程序 +协调各个模块,实现监控流程 +""" +import asyncio +from typing import List, Dict, Optional +from datetime import datetime +from app.utils.logger import logger +from app.config import get_settings +from .tushare_client import get_tushare_client +from .tushare_sector_analyzer import TushareSectorAnalyzer +from .tushare_stock_selector import TushareStockSelector +from .notifier import get_dingtalk_notifier + + +class SectorMonitor: + """板块异动监控器""" + + def __init__( + self, + change_threshold: float = 2.0, + top_n: int = 3, + enable_notifier: bool = True + ): + """ + 初始化监控器 + + Args: + change_threshold: 涨跌幅阈值(%) + top_n: 每个板块返回前N只龙头股 + enable_notifier: 是否启用钉钉通知 + """ + self.change_threshold = change_threshold + self.top_n = top_n + self.enable_notifier = enable_notifier + + # 获取 Tushare 客户端 + settings = get_settings() + ts_client = get_tushare_client(settings.tushare_token) + if not ts_client: + logger.warning("Tushare token 未配置,板块监控可能无法正常工作") + + # 初始化各个模块 + self.analyzer = TushareSectorAnalyzer(ts_client, change_threshold=change_threshold) + self.selector = TushareStockSelector(ts_client, top_n=top_n) + self.notifier = get_dingtalk_notifier() if enable_notifier else None + + # 统计信息 + self.stats = { + 'total_checks': 0, + 'total_hot_sectors': 0, + 'total_stocks': 0, + 'last_check_time': None, + 'last_hot_count': 0 + } + + async def check_once(self) -> Dict: + """ + 执行一次检查 + + Returns: + 检查结果统计 + """ + try: + logger.info("开始板块异动检查...") + start_time = datetime.now() + + # 1. 检测异动板块 + hot_sectors = self.analyzer.detect_sector_changes() + + if not hot_sectors: + logger.info("未检测到异动板块") + self.stats['total_checks'] += 1 + self.stats['last_check_time'] = datetime.now() + self.stats['last_hot_count'] = 0 + return { + 'hot_sectors': 0, + 'stocks': 0, + 'notified': 0 + } + + logger.info(f"检测到 {len(hot_sectors)} 个异动板块") + + # 2. 对每个异动板块进行深度分析 + results = [] + total_stocks = 0 + + for sector in hot_sectors: + sector_name = sector['name'] + ts_code = sector['ts_code'] + + # 筛选龙头股(Tushare 版本需要 ts_code) + top_stocks = self.selector.select_leading_stocks(ts_code, sector_name) + + if not top_stocks: + logger.warning(f"板块 {sector_name} 未找到龙头股") + continue + + # 分析异动原因 + reason = self.analyzer.get_hot_reason(sector_name, top_stocks) + + # 发送钉钉通知 + notified = False + if self.notifier: + notified = self.notifier.send_sector_alert( + sector_data=sector, + top_stocks=top_stocks, + reason=reason + ) + + results.append({ + 'sector': sector, + 'stocks': top_stocks, + 'reason': reason, + 'notified': notified + }) + + total_stocks += len(top_stocks) + logger.info( + f"板块 {sector_name}: {len(top_stocks)} 只龙头股, " + f"原因: {reason}, 通知: {'成功' if notified else '失败'}" + ) + + # 更新统计 + self.stats['total_checks'] += 1 + self.stats['total_hot_sectors'] += len(hot_sectors) + self.stats['total_stocks'] += total_stocks + self.stats['last_check_time'] = datetime.now() + self.stats['last_hot_count'] = len(hot_sectors) + + elapsed = (datetime.now() - start_time).total_seconds() + logger.info( + f"检查完成: {len(hot_sectors)} 个异动板块, " + f"{total_stocks} 只龙头股, 耗时 {elapsed:.2f}秒" + ) + + return { + 'hot_sectors': len(hot_sectors), + 'stocks': total_stocks, + 'notified': sum(1 for r in results if r['notified']), + 'results': results + } + + except Exception as e: + logger.error(f"板块异动检查失败: {e}") + # 发送错误通知 + if self.notifier: + self.notifier.send_error(str(e)) + return { + 'hot_sectors': 0, + 'stocks': 0, + 'notified': 0, + 'error': str(e) + } + + async def run_periodic(self, interval_minutes: int = 30, max_runs: int = None): + """ + 周期性运行监控 + + Args: + interval_minutes: 检查间隔(分钟) + max_runs: 最大运行次数(None表示无限运行) + """ + logger.info( + f"启动周期性监控: 间隔 {interval_minutes}分钟, " + f"阈值 {self.change_threshold}%, Top{self.top_n}" + ) + + run_count = 0 + + try: + while True: + # 检查是否达到最大运行次数 + if max_runs and run_count >= max_runs: + logger.info(f"已达到最大运行次数 {max_runs},停止监控") + break + + # 执行检查 + await self.check_once() + run_count += 1 + + # 等待下一次检查 + if interval_minutes > 0: + logger.info(f"等待 {interval_minutes} 分钟后进行下次检查...") + await asyncio.sleep(interval_minutes * 60) + else: + break + + except asyncio.CancelledError: + logger.info("监控任务被取消") + + except Exception as e: + logger.error(f"周期性监控异常: {e}") + if self.notifier: + self.notifier.send_error(f"周期性监控异常: {e}") + + def get_stats(self) -> Dict: + """ + 获取统计信息 + + Returns: + 统计信息字典 + """ + return { + **self.stats, + 'avg_stocks_per_check': ( + self.stats['total_stocks'] / self.stats['total_checks'] + if self.stats['total_checks'] > 0 else 0 + ) + } + + def send_summary_report(self) -> bool: + """ + 发送汇总报告 + + Returns: + 是否发送成功 + """ + if not self.notifier: + return False + + return self.notifier.send_summary( + total_sectors=self.stats['total_hot_sectors'], + total_stocks=self.stats['total_stocks'] + ) + + +# 快捷函数 +async def quick_check( + change_threshold: float = 2.0, + top_n: int = 3, + enable_notifier: bool = True +) -> Dict: + """ + 快捷检查函数 + + Args: + change_threshold: 涨跌幅阈值(%) + top_n: 每个板块返回前N只龙头股 + enable_notifier: 是否启用钉钉通知 + + Returns: + 检查结果 + """ + monitor = SectorMonitor( + change_threshold=change_threshold, + top_n=top_n, + enable_notifier=enable_notifier + ) + return await monitor.check_once() diff --git a/backend/app/astock_agent/stock_selector.py b/backend/app/astock_agent/stock_selector.py new file mode 100644 index 0000000..655ee2e --- /dev/null +++ b/backend/app/astock_agent/stock_selector.py @@ -0,0 +1,192 @@ +""" +龙头股筛选 +从异动板块中筛选出龙头股 +""" +import pandas as pd +from typing import Dict, List +from datetime import datetime +from app.utils.logger import logger +from .akshare_client import get_akshare_client + + +class StockSelector: + """龙头股筛选器""" + + def __init__(self, top_n: int = 3): + """ + 初始化筛选器 + + Args: + top_n: 返回前 N 只龙头股 + """ + self.top_n = top_n + self.akshare = get_akshare_client() + + def select_leading_stocks(self, sector_name: str) -> List[Dict]: + """ + 筛选板块龙头股 + + Args: + sector_name: 板块名称 + + Returns: + 龙头股列表(已排序) + """ + try: + # 获取成分股 + stocks_df = self.akshare.get_concept_stocks(sector_name) + if stocks_df.empty: + logger.warning(f"获取板块 {sector_name} 成分股失败") + return [] + + # 获取实时行情 + spot_df = self.akshare.get_stock_spot() + if spot_df.empty: + logger.warning("获取实时行情失败") + return [] + + # 合并数据 + merged = pd.merge( + stocks_df[['代码', '名称']], + spot_df, + on='代码', + how='inner' + ) + + if merged.empty: + return [] + + # 数据类型转换 + merged['最新价'] = pd.to_numeric(merged['最新价'], errors='coerce') + merged['涨跌幅'] = pd.to_numeric(merged['涨跌幅'], errors='coerce') + merged['涨跌额'] = pd.to_numeric(merged['涨跌额'], errors='coerce') + merged['成交量'] = pd.to_numeric(merged['成交量'], errors='coerce') + merged['成交额'] = pd.to_numeric(merged['成交额'], errors='coerce') + merged['换手率'] = pd.to_numeric(merged['换手率'], errors='coerce') + merged['振幅'] = pd.to_numeric(merged['振幅'], errors='coerce') + merged['量比'] = pd.to_numeric(merged['量比'], errors='coerce') + + # 过滤:只保留有成交额的股票 + merged = merged[merged['成交额'] > 0].copy() + + if merged.empty: + return [] + + # 计算综合评分 + merged['score'] = merged.apply(self._calculate_score, axis=1) + + # 排序:按综合得分 + merged = merged.sort_values('score', ascending=False) + + # 取前 N 只 + top_stocks = merged.head(self.top_n) + + # 转换结果 + results = [] + for _, row in top_stocks.iterrows(): + # 计算涨速等级 + change_pct = row['涨跌幅'] + if change_pct >= 5: + speed_level = "⚡⚡⚡ 极快" + elif change_pct >= 3: + speed_level = "⚡⚡ 快速" + elif change_pct >= 1: + speed_level = "⚡ 较快" + else: + speed_level = "🐌 平稳" + + results.append({ + 'code': row['代码'], + 'name': row['名称'], + 'price': float(row['最新价']), + 'change_pct': float(row['涨跌幅']), + 'change_amount': float(row['涨跌额']), + 'amount': float(row['成交额']), + 'turnover': float(row['换手率']), + 'volume_ratio': float(row.get('量比', 1)), + 'amplitude': float(row.get('振幅', 0)), + 'score': float(row['score']), + 'speed_level': speed_level, + }) + + logger.info(f"板块 {sector_name} 龙头股筛选完成,Top {len(results)}") + return results + + except Exception as e: + logger.error(f"筛选龙头股失败 {sector_name}: {e}") + return [] + + def _calculate_score(self, row: pd.Series) -> float: + """ + 计算综合得分 + + 评分维度: + - 涨跌幅 (40%) + - 成交额 (30%) + - 涨速 (20%) + - 换手率 (10%) + + Args: + row: 股票数据行 + + Returns: + 综合得分 + """ + score = 0.0 + + # 1. 涨跌幅得分 (40分) - 涨幅越高得分越高 + change_pct = row['涨跌幅'] + if change_pct >= 7: + score += 40 # 涨停级别 + elif change_pct >= 5: + score += 35 + elif change_pct >= 3: + score += 30 + elif change_pct >= 2: + score += 25 + elif change_pct >= 1: + score += 20 + elif change_pct > 0: + score += 15 + else: + score += max(0, 10 + change_pct * 5) # 下跌也有基础分 + + # 2. 成交额得分 (30分) - 成交额越大得分越高 + amount = row['成交额'] + if amount >= 100000: # 10亿以上 + score += 30 + elif amount >= 50000: # 5亿以上 + score += 25 + elif amount >= 10000: # 1亿以上 + score += 20 + elif amount >= 5000: # 5000万以上 + score += 15 + elif amount >= 1000: # 1000万以上 + score += 10 + else: + score += 5 + + # 3. 涨速得分 (20分) - 简化用涨幅代替 + if change_pct >= 5: + score += 20 + elif change_pct >= 3: + score += 15 + elif change_pct >= 1: + score += 10 + else: + score += 5 + + # 4. 换手率得分 (10分) - 适中换手率加分 + turnover = row['换手率'] + if 5 <= turnover <= 15: + score += 10 # 适中换手率 + elif 15 < turnover <= 25: + score += 8 # 活跃但不过热 + elif turnover > 25: + score += 5 # 过热可能回调 + elif turnover > 0: + score += 3 # 有成交即可 + else: + score += 0 + + return score diff --git a/backend/app/astock_agent/tushare_client.py b/backend/app/astock_agent/tushare_client.py new file mode 100644 index 0000000..402b61d --- /dev/null +++ b/backend/app/astock_agent/tushare_client.py @@ -0,0 +1,346 @@ +""" +Tushare 数据封装 +提供 A 股板块、个股行情数据获取接口(使用同花顺系列接口) +""" +import time +import tushare as ts +import pandas as pd +from typing import Dict, List, Optional +from datetime import datetime, timedelta +from app.utils.logger import logger + + +class TushareClient: + """Tushare 数据客户端(同花顺系列接口)""" + + # 缓存数据,避免频繁请求 + _cache = {} + _cache_time = {} + _last_request_time = 0 + + def __init__(self, token: str): + """ + 初始化客户端 + + Args: + token: Tushare token + """ + self.token = token + ts.set_token(token) + self.pro = ts.pro_api() + self.cache_ttl = 300 # 缓存5分钟 + self.request_delay = 0.5 # 请求间隔(秒)- tushare 有频率限制 + + def _get_cached(self, key: str, fetch_func) -> pd.DataFrame: + """获取缓存数据,支持重试""" + now = datetime.now() + + # 检查缓存 + if key in self._cache: + cache_time = self._cache_time.get(key) + if cache_time and (now - cache_time).seconds < self.cache_ttl: + logger.debug(f"使用缓存数据: {key}") + return self._cache[key] + + # 请求限流 + elapsed = now.timestamp() - self._last_request_time + if elapsed < self.request_delay: + time.sleep(self.request_delay - elapsed) + + # 重试逻辑 + max_retries = 3 + for attempt in range(max_retries): + try: + self._last_request_time = time.time() + df = fetch_func() + + if df is not None and not df.empty: + self._cache[key] = df + self._cache_time[key] = now + logger.debug(f"获取数据成功: {key}") + return df + + except Exception as e: + error_msg = str(e) + # 指数退避重试 + if attempt < max_retries - 1: + wait_time = (2 ** attempt) * 2 + logger.warning( + f"获取数据失败 {key} (尝试 {attempt + 1}/{max_retries}): {e}," + f"等待 {wait_time}秒后重试..." + ) + time.sleep(wait_time) + continue + + logger.error(f"获取数据失败 {key}: {e}") + break + + return pd.DataFrame() + + def get_concept_sectors(self) -> pd.DataFrame: + """ + 获取概念板块列表 + + 使用 ths_index 接口,type="N" 代表概念板块 + + Returns: + 概念板块列表 + """ + def fetch(): + # ths_index - 获取同花顺概念指数列表 + return self.pro.ths_index(type='N') + + return self._get_cached('concept_sectors', fetch) + + def get_sector_daily(self, ts_code: str, start_date: str = None, end_date: str = None) -> pd.DataFrame: + """ + 获取板块日线行情 + + Args: + ts_code: 板块指数代码(如 885823.TI) + start_date: 开始日期 (YYYYMMDD) + end_date: 结束日期 (YYYYMMDD) + + Returns: + 板块日线数据 + """ + if not start_date: + start_date = (datetime.now() - timedelta(days=30)).strftime('%Y%m%d') + if not end_date: + end_date = datetime.now().strftime('%Y%m%d') + + def fetch(): + # ths_daily - 获取板块指数历史行情 + return self.pro.ths_daily( + ts_code=ts_code, + start_date=start_date, + end_date=end_date + ) + + return self._get_cached(f'sector_daily_{ts_code}_{end_date}', fetch) + + def get_sector_members(self, ts_code: str) -> pd.DataFrame: + """ + 获取板块成分股 + + Args: + ts_code: 板块指数代码(如 885823.TI) + + Returns: + 成分股列表 + """ + def fetch(): + # ths_member - 获取板块成分股 + return self.pro.ths_member(ts_code=ts_code) + + return self._get_cached(f'sector_members_{ts_code}', fetch) + + def get_stock_daily(self, ts_code: str, start_date: str = None, end_date: str = None) -> pd.DataFrame: + """ + 获取个股日线行情 + + Args: + ts_code: 股票代码(如 000001.SZ) + start_date: 开始日期 (YYYYMMDD) + end_date: 结束日期 (YYYYMMDD) + + Returns: + 日线数据 + """ + if not start_date: + start_date = (datetime.now() - timedelta(days=30)).strftime('%Y%m%d') + if not end_date: + end_date = datetime.now().strftime('%Y%m%d') + + def fetch(): + # daily - 获取日线行情 + return self.pro.daily( + ts_code=ts_code, + start_date=start_date, + end_date=end_date + ) + + return self._get_cached(f'stock_daily_{ts_code}_{end_date}', fetch) + + def get_stock_daily_basic(self, ts_codes: List[str], trade_date: str = None) -> pd.DataFrame: + """ + 获取个股每日指标(包含换手率、量比等) + + Args: + ts_codes: 股票代码列表 + trade_date: 交易日期 (YYYYMMDD) + + Returns: + 每日指标数据 + """ + if not ts_codes: + return pd.DataFrame() + + from datetime import datetime, timedelta + + if not trade_date: + trade_date = datetime.now().strftime('%Y%m%d') + + def fetch(): + # daily_basic - 获取每日指标 + codes_str = ','.join(ts_codes[:300]) # 限制单次查询数量 + + # 尝试获取最近3天的数据(以防当天数据未更新) + all_data = [] + for i in range(3): + try_date = (datetime.now() - timedelta(days=i)).strftime('%Y%m%d') + df = self.pro.daily_basic( + ts_code=codes_str, + trade_date=try_date, + fields='ts_code,trade_date,turnover_rate,volume_ratio,pe,pb' + ) + if not df.empty: + all_data.append(df) + # 如果找到数据就不再尝试更早的日期 + break + + if all_data: + return pd.concat(all_data, ignore_index=True) + return pd.DataFrame() + + return self._get_cached(f'stock_daily_basic_{trade_date}', fetch) + + def get_stock_basic(self) -> pd.DataFrame: + """ + 获取股票基本信息列表 + + Returns: + 股票基本信息 + """ + def fetch(): + # stock_basic - 获取股票基本信息 + return self.pro.stock_basic( + exchange='', + list_status='L', + fields='ts_code,symbol,name,area,industry,list_date' + ) + + return self._get_cached('stock_basic', fetch) + + def get_realtime_data(self, ts_codes: List[str]) -> pd.DataFrame: + """ + 获取实时行情数据(使用最新的日线数据) + + 注意:tushare 不提供真正的实时数据,这里返回最新的日线数据 + 注意:amount 字段单位是千元,需要 * 1000 转换为元 + + Args: + ts_codes: 股票代码列表 + + Returns: + 实时行情数据(amount 单位为千元) + """ + if not ts_codes: + return pd.DataFrame() + + # 获取今天的日期 + today = datetime.now().strftime('%Y%m%d') + yesterday = (datetime.now() - timedelta(days=10)).strftime('%Y%m%d') + + def fetch(): + # 使用 daily 接口获取最近数据 + codes_str = ','.join(ts_codes[:100]) # 限制单次查询数量 + df = self.pro.daily( + ts_code=codes_str, + start_date=yesterday, + end_date=today + ) + # 只返回每个股票的最新一天数据 + if not df.empty: + df = df.sort_values('trade_date').groupby('ts_code').tail(1) + return df + + return self._get_cached(f'realtime_{today}', fetch) + + def get_hot_sectors(self, threshold: float = 2.0) -> pd.DataFrame: + """ + 获取异动板块(一次性获取所有板块的最新行情) + + Args: + threshold: 涨跌幅阈值(%) + + Returns: + 异动板块数据 + """ + try: + # 1. 获取所有概念板块 + sectors_df = self.get_concept_sectors() + if sectors_df.empty: + logger.warning("获取概念板块列表失败") + return pd.DataFrame() + + logger.info(f"获取到 {len(sectors_df)} 个概念板块") + + # 2. 获取今天的日期 + today = datetime.now().strftime('%Y%m%d') + yesterday = (datetime.now() - timedelta(days=10)).strftime('%Y%m%d') + + # 3. 批量获取板块行情(为了效率,限制数量) + hot_sectors = [] + max_sectors = 100 # 最多检查100个板块 + + for idx, row in sectors_df.head(max_sectors).iterrows(): + ts_code = row['ts_code'] + name = row.get('name', '') + + try: + # 获取板块最新行情 + daily_df = self.pro.ths_daily( + ts_code=ts_code, + start_date=yesterday, + end_date=today + ) + + if daily_df.empty: + continue + + # 获取最新一天的数据 + latest = daily_df.sort_values('trade_date').iloc[-1] + + # 检查涨跌幅 - 注意列名是 pct_change 不是 pct_chg + change_pct = float(latest.get('pct_change', 0)) + if change_pct >= threshold: + hot_sectors.append({ + 'ts_code': ts_code, + 'name': name, + 'change_pct': change_pct, + 'change': float(latest.get('change', 0)), # 涨跌额 + 'close': float(latest.get('close', 0)), + 'amount': float(latest.get('amount', 0)), # 成交额(元) + 'volume': float(latest.get('vol', 0)), # 成交量(手) + 'turnover_rate': float(latest.get('turnover_rate', 0)), # 换手率 + 'trade_date': str(latest.get('trade_date', '')) + }) + + except Exception as e: + logger.debug(f"获取板块 {name} 行情失败: {e}") + continue + + result_df = pd.DataFrame(hot_sectors) + if not result_df.empty: + result_df = result_df.sort_values('change_pct', ascending=False) + + return result_df + + except Exception as e: + logger.error(f"获取异动板块失败: {e}") + return pd.DataFrame() + + +# 全局单例 +_tushare_client: Optional[TushareClient] = None + + +def get_tushare_client(token: str = None) -> Optional[TushareClient]: + """获取 Tushare 客户端单例""" + global _tushare_client + if _tushare_client is None: + if not token: + return None + _tushare_client = TushareClient(token) + return _tushare_client diff --git a/backend/app/astock_agent/tushare_sector_analyzer.py b/backend/app/astock_agent/tushare_sector_analyzer.py new file mode 100644 index 0000000..1815f13 --- /dev/null +++ b/backend/app/astock_agent/tushare_sector_analyzer.py @@ -0,0 +1,189 @@ +""" +板块异动分析(Tushare 版本) +检测板块涨跌幅、量能、资金流向异动 +""" +import pandas as pd +from typing import Dict, List, Optional, Tuple +from datetime import datetime +from app.utils.logger import logger +from app.utils.error_handler import notify_error + + +class TushareSectorAnalyzer: + """板块异动分析器(使用 Tushare 同花顺接口)""" + + def __init__(self, tushare_client, change_threshold: float = 2.0): + """ + 初始化异动分析器 + + Args: + tushare_client: TushareClient 实例 + change_threshold: 涨跌幅阈值(%) + """ + self.change_threshold = change_threshold + self.ts_client = tushare_client + + def detect_sector_changes(self) -> List[Dict]: + """ + 检测异动板块 + + Returns: + 异动板块列表 + """ + try: + # 使用 tushare 获取异动板块(一次性获取) + df = self.ts_client.get_hot_sectors(threshold=self.change_threshold) + + if df.empty: + logger.info("未检测到异动板块") + return [] + + # 转换为结果列表 + results = [] + for _, row in df.iterrows(): + # 成交额转换为万元 + amount_wan = row['amount'] / 10000 if row['amount'] > 0 else 0 + + # 格式化成交额显示 + if amount_wan >= 100000: + amount_str = f"{amount_wan/100000:.1f}亿" + elif amount_wan >= 10000: + amount_str = f"{amount_wan/10000:.1f}万" + else: + amount_str = f"{amount_wan:.0f}元" + + results.append({ + 'name': row['name'], + 'ts_code': row['ts_code'], + 'change_pct': float(row['change_pct']), + 'change': float(row.get('change', 0)), # 涨跌额 + 'close': float(row['close']), + 'amount': float(row['amount']), + 'amount_str': amount_str, + 'volume': float(row['volume']), + 'turnover_rate': float(row.get('turnover_rate', 0)), # 换手率 + 'trade_date': row['trade_date'], + 'timestamp': datetime.now() + }) + + logger.info(f"检测到 {len(results)} 个异动概念板块(Tushare)") + return results + + except Exception as e: + error_msg = f"Tushare 检测板块异动失败: {e}" + logger.error(error_msg) + + # 发送通知 + notify_error( + title="A股板块监控 - Tushare 数据获取失败", + message=f"错误: {e}\n\n可能原因:\n- Tushare token 未配置或无效\n- API 频率限制\n- 网络连接问题", + level="warning" + ) + + return [] + + def get_sector_stocks(self, ts_code: str, sector_name: str) -> List[Dict]: + """ + 获取板块成分股 + + Args: + ts_code: 板块指数代码 + sector_name: 板块名称 + + Returns: + 成分股列表 + """ + try: + # 获取成分股 + members_df = self.ts_client.get_sector_members(ts_code) + + if members_df.empty: + logger.warning(f"板块 {sector_name} 成分股数据为空") + return [] + + # 获取成分股的行情数据 + stock_codes = members_df['ts_code'].tolist() + + # 限制数量,避免请求过多 + if len(stock_codes) > 50: + stock_codes = stock_codes[:50] + + # 获取实时行情 + realtime_df = self.ts_client.get_realtime_data(stock_codes) + + if realtime_df.empty: + logger.warning(f"板块 {sector_name} 成分股行情为空") + return [] + + # 合并数据 + merged = pd.merge( + members_df, + realtime_df, + on='ts_code', + how='inner' + ) + + if merged.empty: + return [] + + # 转换结果 + results = [] + for _, row in merged.iterrows(): + results.append({ + 'code': row['ts_code'], + 'name': row.get('name', row.get('member_name', '')), + 'price': float(row.get('close', 0)), + 'change_pct': float(row.get('pct_chg', 0)), + 'change_amount': float(row.get('change', 0)), + 'amount': float(row.get('amount', 0)), + 'volume': float(row.get('vol', 0)), + }) + + return results + + except Exception as e: + logger.error(f"获取板块 {sector_name} 成分股失败: {e}") + return [] + + def get_hot_reason(self, sector_name: str, top_stocks: List[Dict]) -> str: + """ + 推测异动原因(基于龙头股分析) + + Args: + sector_name: 板块名称 + top_stocks: 龙头股列表 + + Returns: + 异动原因描述 + """ + try: + if not top_stocks: + return "板块整体异动" + + reasons = [] + + # 检查是否有涨停股 + limit_up_count = sum(1 for s in top_stocks if s.get('change_pct', 0) >= 9.9) + if limit_up_count > 0: + reasons.append(f"{limit_up_count}只个股涨停") + + # 检查平均涨幅 + avg_change = sum(s.get('change_pct', 0) for s in top_stocks) / len(top_stocks) + if avg_change >= 7: + reasons.append("板块全线爆发") + + # 检查是否集中在某个龙头 + if len(top_stocks) >= 2: + top1_change = top_stocks[0].get('change_pct', 0) + top2_change = top_stocks[1].get('change_pct', 0) + if top1_change - top2_change > 3: + reasons.append(f"{top_stocks[0].get('name', '')}龙头领涨") + + if reasons: + return ",".join(reasons) + else: + return "资金集中流入" + + except Exception as e: + logger.error(f"推测异动原因失败: {e}") + return "板块异动" diff --git a/backend/app/astock_agent/tushare_stock_selector.py b/backend/app/astock_agent/tushare_stock_selector.py new file mode 100644 index 0000000..6a86269 --- /dev/null +++ b/backend/app/astock_agent/tushare_stock_selector.py @@ -0,0 +1,244 @@ +""" +龙头股筛选(Tushare 版本) +从异动板块中筛选出龙头股 +""" +import pandas as pd +from typing import Dict, List +from datetime import datetime +from app.utils.logger import logger + + +class TushareStockSelector: + """龙头股筛选器(使用 Tushare)""" + + def __init__(self, tushare_client, top_n: int = 3): + """ + 初始化筛选器 + + Args: + tushare_client: TushareClient 实例 + top_n: 返回前 N 只龙头股 + """ + self.top_n = top_n + self.ts_client = tushare_client + + def select_leading_stocks(self, ts_code: str, sector_name: str) -> List[Dict]: + """ + 筛选板块龙头股 + + Args: + ts_code: 板块指数代码 + sector_name: 板块名称 + + Returns: + 龙头股列表(已排序) + """ + try: + # 获取成分股 + members_df = self.ts_client.get_sector_members(ts_code) + if members_df.empty: + logger.warning(f"获取板块 {sector_name} 成分股失败") + return [] + + # ths_member 返回的是 con_code(成分股代码),需要用这个来查行情 + stock_codes = members_df['con_code'].tolist() + + # 限制数量,避免请求过多 + if len(stock_codes) > 50: + stock_codes = stock_codes[:50] + + # 获取实时行情 + realtime_df = self.ts_client.get_realtime_data(stock_codes) + if realtime_df.empty: + logger.warning(f"获取板块 {sector_name} 成分股行情失败") + return [] + + # 获取每日指标(换手率、量比) + from datetime import datetime + trade_date = datetime.now().strftime('%Y%m%d') + basic_df = self.ts_client.get_stock_daily_basic(stock_codes, trade_date) + + # 合并数据 - 注意:ths_member 的 con_code 对应 daily 的 ts_code + members_df = members_df.rename(columns={'con_code': 'stock_code'}) + realtime_df = realtime_df.rename(columns={'ts_code': 'stock_code'}) + + if not basic_df.empty: + basic_df = basic_df.rename(columns={'ts_code': 'stock_code'}) + merged = pd.merge( + members_df[['stock_code', 'con_name']], + realtime_df, + on='stock_code', + how='inner' + ) + merged = pd.merge( + merged, + basic_df[['stock_code', 'turnover_rate', 'volume_ratio']], + on='stock_code', + how='left' + ) + else: + merged = pd.merge( + members_df[['stock_code', 'con_name']], + realtime_df, + on='stock_code', + how='inner' + ) + + if merged.empty: + return [] + + # 数据类型转换 - daily 接口返回 pct_chg 不是 pct_change + merged['close'] = pd.to_numeric(merged['close'], errors='coerce') + merged['pct_chg'] = pd.to_numeric(merged['pct_chg'], errors='coerce') + merged['change'] = pd.to_numeric(merged['change'], errors='coerce') + merged['vol'] = pd.to_numeric(merged['vol'], errors='coerce') + # 注意:daily 接口的 amount 单位是千元,需要转换为元 + merged['amount'] = pd.to_numeric(merged['amount'], errors='coerce') * 1000 + + # 换手率和量比填充默认值 + if 'turnover_rate' in merged.columns: + merged['turnover_rate'] = pd.to_numeric(merged['turnover_rate'], errors='coerce').fillna(0) + else: + merged['turnover_rate'] = 0.0 + + if 'volume_ratio' in merged.columns: + merged['volume_ratio'] = pd.to_numeric(merged['volume_ratio'], errors='coerce').fillna(1.0) + else: + merged['volume_ratio'] = 1.0 + + # 过滤:只保留有成交额的股票 + merged = merged[merged['amount'] > 0].copy() + + if merged.empty: + return [] + + # 计算综合评分 + merged['score'] = merged.apply(self._calculate_score, axis=1) + + # 排序:按综合得分 + merged = merged.sort_values('score', ascending=False) + + # 取前 N 只 + top_stocks = merged.head(self.top_n) + + # 转换结果 + results = [] + for _, row in top_stocks.iterrows(): + # 计算涨速等级 + change_pct = row['pct_chg'] + if change_pct >= 5: + speed_level = "⚡⚡⚡ 极快" + elif change_pct >= 3: + speed_level = "⚡⚡ 快速" + elif change_pct >= 1: + speed_level = "⚡ 较快" + else: + speed_level = "🐌 平稳" + + # 计算振幅 + amplitude = 0.0 + if 'high' in row and 'low' in row and row['low'] > 0: + amplitude = (row['high'] - row['low']) / row['low'] * 100 + + results.append({ + 'code': row['stock_code'], + 'name': row['con_name'], + 'price': float(row['close']), + 'change_pct': float(row['pct_chg']), + 'change_amount': float(row['change']), + 'amount': float(row['amount']), + 'turnover': float(row.get('turnover_rate', 0)), + 'volume_ratio': float(row.get('volume_ratio', 1.0)), + 'amplitude': amplitude, + 'score': float(row['score']), + 'speed_level': speed_level, + }) + + logger.info(f"板块 {sector_name} 龙头股筛选完成,Top {len(results)}") + return results + + except Exception as e: + logger.error(f"筛选龙头股失败 {sector_name}: {e}") + return [] + + def _calculate_score(self, row: pd.Series) -> float: + """ + 计算综合得分 + + 评分维度: + - 涨跌幅 (40%) + - 成交额 (30%) + - 涨速 (20%) + - 换手率 (10%) + + Args: + row: 股票数据行 + + Returns: + 综合得分 + """ + score = 0.0 + + # 1. 涨跌幅得分 (40分) - 涨幅越高得分越高 + change_pct = row['pct_chg'] + if change_pct >= 7: + score += 40 # 涨停级别 + elif change_pct >= 5: + score += 35 + elif change_pct >= 3: + score += 30 + elif change_pct >= 2: + score += 25 + elif change_pct >= 1: + score += 20 + elif change_pct > 0: + score += 15 + else: + score += max(0, 10 + change_pct * 5) # 下跌也有基础分 + + # 2. 成交额得分 (30分) - 成交额越大得分越高 + # 注意:amount 已在 select_leading_stocks 中从千元转换为元 + amount = row['amount'] # 单位是元 + if amount >= 1000000000: # 10亿以上 + score += 30 + elif amount >= 500000000: # 5亿以上 + score += 25 + elif amount >= 100000000: # 1亿以上 + score += 20 + elif amount >= 50000000: # 5000万以上 + score += 15 + elif amount >= 10000000: # 1000万以上 + score += 10 + else: + score += 5 + + # 3. 涨速得分 (20分) - 简化用涨幅代替 + if change_pct >= 5: + score += 20 + elif change_pct >= 3: + score += 15 + elif change_pct >= 1: + score += 10 + else: + score += 5 + + # 4. 换手率得分 (10分) - 使用真实换手率数据 + turnover_rate = row.get('turnover_rate', 0) + if turnover_rate >= 15: + score += 10 # 换手率极高,资金活跃 + elif turnover_rate >= 10: + score += 9 + elif turnover_rate >= 7: + score += 8 + elif turnover_rate >= 5: + score += 7 + elif turnover_rate >= 3: + score += 6 + elif turnover_rate >= 1: + score += 4 + elif turnover_rate >= 0.5: + score += 2 + else: + score += 1 # 换手率较低 + + return score diff --git a/backend/app/config.py b/backend/app/config.py index 1b1fe62..af9f18f 100644 --- a/backend/app/config.py +++ b/backend/app/config.py @@ -97,6 +97,7 @@ class Settings(BaseSettings): feishu_crypto_webhook_url: str = "https://open.feishu.cn/open-apis/bot/v2/hook/8a1dcf69-6753-41e2-a393-edc4f7822db0" # 加密货币通知 feishu_stock_webhook_url: str = "https://open.feishu.cn/open-apis/bot/v2/hook/408ab727-0dcd-4c7a-bde7-4aad38cbf807" # 股票通知 feishu_news_webhook_url: str = "https://open.feishu.cn/open-apis/bot/v2/hook/c7fd0db7-d295-451c-b943-130278a6cd9d" # 新闻智能体通知 + feishu_error_webhook_url: str = "https://open.feishu.cn/open-apis/bot/v2/hook/ba6952c9-3b0c-4bc1-8a43-ceaacb27b043" # 系统异常通知 feishu_enabled: bool = True # 是否启用飞书通知 # Telegram 机器人配置 @@ -183,6 +184,15 @@ class Settings(BaseSettings): stock_analysis_interval: int = 300 # 分析间隔(秒,默认5分钟) stock_llm_threshold: float = 0.70 # 触发 LLM 分析的置信度阈值 + # A股智能体配置 + astock_monitor_enabled: bool = True # 是否启用A股智能体 + astock_change_threshold: float = 2.0 # 涨跌幅阈值(%),超过此值触发异动 + astock_top_n: int = 3 # 每个板块返回前N只龙头股 + astock_check_interval: int = 30 # 检查间隔(分钟) + # 钉钉通知配置(A股专用) + dingtalk_astock_webhook: str = "" # A股钉钉通知 Webhook + dingtalk_astock_secret: str = "" # A股钉钉通知加签密钥 + class Config: env_file = find_env_file() case_sensitive = False diff --git a/backend/app/main.py b/backend/app/main.py index 4a97968..97d6579 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -17,10 +17,106 @@ import os # 后台任务 _price_monitor_task = None -_report_task = None _stock_agent_task = None _crypto_agent_task = None _news_agent_task = None +_astock_monitor_task = None +_astock_scheduler = None +_astock_monitor_instance = None + + +async def is_trading_day() -> bool: + """检查今天是否为A股交易日""" + try: + from datetime import datetime + from app.config import get_settings + from app.astock_agent.tushare_client import TushareClient + + settings = get_settings() + token = settings.tushare_token + if not token: + logger.warning("Tushare token 未配置,使用简单的周末判断") + # 简单判断:周一到周五是交易日(不包含节假日) + return datetime.now().weekday() < 5 + + client = TushareClient(token=token) + pro = client.pro + + # 获取今天的日期 + today = datetime.now().strftime("%Y%m%d") + + # 查询交易日历(最近3天) + df = pro.trade_cal( + exchange='SSE', + start_date=(datetime.now().replace(day=datetime.now().day-2)).strftime("%Y%m%d") if datetime.now().day > 2 else today, + end_date=today + ) + + if df is not None and not df.empty: + # 检查今天是否为交易日 + today_cal = df[df['cal_date'] == today] + if not today_cal.empty: + is_open = today_cal.iloc[0]['is_open'] + logger.info(f"交易日历查询: 今天 {today} {'是' if is_open == 1 else '不是'}交易日") + return is_open == 1 + + # Fallback: 简单周末判断 + is_weekday = datetime.now().weekday() < 5 + logger.warning(f"交易日历查询失败,使用简单判断: 今天 {'是' if is_weekday else '不是'}工作日") + return is_weekday + + except Exception as e: + logger.error(f"检查交易日失败: {e}") + # Fallback: 简单周末判断 + return datetime.now().weekday() < 5 + + +async def run_scheduled_astock_monitor(): + """定时运行A股板块异动监控(每天 15:30)""" + global _astock_monitor_instance + if not _astock_monitor_instance: + logger.warning("A股监控实例未初始化") + return + + try: + # 检查今天是否为交易日 + if not await is_trading_day(): + logger.info("📅 今天不是交易日,跳过板块异动分析") + return + + logger.info("🔔 开始执行定时板块异动分析...") + result = await _astock_monitor_instance.check_once() + + hot_sectors = result.get('hot_sectors', 0) + stocks = result.get('stocks', 0) + notified = result.get('notified', 0) + + logger.info(f"✅ 定时板块分析完成: {hot_sectors}个异动板块, {stocks}只龙头股, {notified}条通知") + except Exception as e: + logger.error(f"定时板块分析失败: {e}") + + +async def start_scheduler(): + """启动定时任务调度器""" + from apscheduler.schedulers.asyncio import AsyncIOScheduler + from apscheduler.triggers.cron import CronTrigger + + global _astock_scheduler + + # 创建调度器 + _astock_scheduler = AsyncIOScheduler(timezone='Asia/Shanghai') + + # 添加定时任务:每天 15:30 运行板块异动分析 + _astock_scheduler.add_job( + run_scheduled_astock_monitor, + trigger=CronTrigger(hour=15, minute=30), + id='daily_astock_monitor', + name='A股板块异动分析', + replace_existing=True + ) + + _astock_scheduler.start() + logger.info("📅 定时任务调度器已启动: 每天 15:30 (A股板块异动分析)") async def price_monitor_loop(): @@ -360,66 +456,10 @@ async def _print_system_status(): logger.info("=" * 60 + "\n") -async def periodic_report_loop(): - """定时报告循环 - 每4小时发送一次模拟交易报告""" - from datetime import datetime - from app.services.paper_trading_service import get_paper_trading_service - from app.services.telegram_service import get_telegram_service - from app.config import get_settings - - logger.info("定时报告任务已启动") - - # 计算距离下一个整4小时的等待时间 - def get_seconds_until_next_4h(): - now = datetime.now() - current_hour = now.hour - # 下一个4小时整点: 0, 4, 8, 12, 16, 20 - next_4h = ((current_hour // 4) + 1) * 4 - if next_4h >= 24: - next_4h = 0 - # 需要等到明天 - from datetime import timedelta - next_time = now.replace(hour=0, minute=0, second=0, microsecond=0) + timedelta(days=1) - else: - next_time = now.replace(hour=next_4h, minute=0, second=0, microsecond=0) - - wait_seconds = (next_time - now).total_seconds() - return int(wait_seconds), next_time - - while True: - try: - # 计算等待时间 - wait_seconds, next_time = get_seconds_until_next_4h() - logger.info(f"下次报告时间: {next_time.strftime('%Y-%m-%d %H:%M')},等待 {wait_seconds // 3600}小时{(wait_seconds % 3600) // 60}分钟") - - # 等待到下一个4小时整点 - await asyncio.sleep(wait_seconds) - - # 检查是否启用 Telegram 通知 - settings = get_settings() - if not settings.telegram_enabled: - logger.info("Telegram 通知已禁用,跳过4小时报告发送") - continue - - # 生成并发送报告 - paper_trading = get_paper_trading_service() - telegram = get_telegram_service() - - report = paper_trading.generate_report(hours=4) - await telegram.send_message(report, parse_mode="HTML") - logger.info("已发送4小时模拟交易报告到 Telegram") - - except Exception as e: - logger.error(f"定时报告循环出错: {e}") - import traceback - logger.error(traceback.format_exc()) - await asyncio.sleep(60) # 出错后等待1分钟再重试 - - @asynccontextmanager async def lifespan(app: FastAPI): """应用生命周期管理""" - global _price_monitor_task, _report_task, _stock_agent_task, _crypto_agent_task, _news_agent_task + global _price_monitor_task, _stock_agent_task, _crypto_agent_task, _news_agent_task, _astock_monitor_task # 启动时执行 logger.info("应用启动") @@ -430,10 +470,16 @@ async def lifespan(app: FastAPI): # 初始化飞书错误通知 try: - from app.services.feishu_service import get_feishu_service - feishu_service = get_feishu_service() + from app.services.feishu_service import FeishuService + from app.config import get_settings + settings = get_settings() + # 使用专用的系统异常 webhook + feishu_error_service = FeishuService( + webhook_url=settings.feishu_error_webhook_url, + service_type="error" + ) init_error_notifier( - feishu_service=feishu_service, + feishu_service=feishu_error_service, enabled=True, # 启用异常通知 cooldown=300 # 5分钟冷却时间 ) @@ -447,9 +493,6 @@ async def lifespan(app: FastAPI): _price_monitor_task = asyncio.create_task(price_monitor_loop()) logger.info("后台价格监控任务已创建") - _report_task = asyncio.create_task(periodic_report_loop()) - logger.info("定时报告任务已创建") - # 启动加密货币智能体 if getattr(settings, 'crypto_symbols', '') and settings.crypto_symbols.strip(): try: @@ -499,6 +542,24 @@ async def lifespan(app: FastAPI): logger.error(f"新闻智能体启动失败: {e}") logger.error(f"提示: 请确保已安装 feedparser 和 beautifulsoup4 (pip install feedparser beautifulsoup4)") + # 启动A股智能体 + if getattr(settings, 'astock_monitor_enabled', True): + try: + from app.astock_agent import SectorMonitor + sector_monitor = SectorMonitor( + change_threshold=settings.astock_change_threshold, + top_n=settings.astock_top_n, + enable_notifier=bool(settings.dingtalk_astock_webhook) + ) + # 保存实例供定时任务使用 + _astock_monitor_instance = sector_monitor + logger.info(f"A股智能体已初始化") + except Exception as e: + logger.error(f"A股智能体初始化失败: {e}") + + # 启动定时任务调度器 + await start_scheduler() + # 显示系统状态摘要 await _print_system_status() @@ -513,14 +574,6 @@ async def lifespan(app: FastAPI): pass logger.info("后台价格监控任务已停止") - if _report_task: - _report_task.cancel() - try: - await _report_task - except asyncio.CancelledError: - pass - logger.info("定时报告任务已停止") - # 停止加密货币智能体 if _crypto_agent_task: _crypto_agent_task.cancel() @@ -551,6 +604,20 @@ async def lifespan(app: FastAPI): logger.error(f"停止新闻智能体失败: {e}") logger.info("新闻智能体已停止") + # 停止A股智能体 + global _astock_scheduler + if _astock_scheduler: + _astock_scheduler.shutdown(wait=False) + logger.info("A股定时任务已停止") + + if _astock_monitor_task: + _astock_monitor_task.cancel() + try: + await _astock_monitor_task + except asyncio.CancelledError: + pass + logger.info("A股智能体已停止") + logger.info("应用关闭") diff --git a/backend/app/utils/error_handler.py b/backend/app/utils/error_handler.py index 62457aa..17965e4 100644 --- a/backend/app/utils/error_handler.py +++ b/backend/app/utils/error_handler.py @@ -215,3 +215,65 @@ def init_error_notifier(feishu_service, enabled: bool = True, cooldown: int = 30 handler.set_cooldown(cooldown) logger.info("错误通知器初始化完成") + + +def notify_error(title: str, message: str, level: str = "error"): + """ + 手动触发错误通知(用于已捕获但需要通知的错误) + + Args: + title: 错误标题 + message: 错误消息 + level: 错误级别 (error, warning, info) + """ + handler = get_exception_handler() + + if not handler.feishu_service or not handler.enabled: + logger.debug(f"错误通知未启用或飞书服务未设置: {title}") + return + + # 检查冷却时间 + if handler.last_error_time: + time_since_last = (datetime.now() - handler.last_error_time).total_seconds() + if time_since_last < handler.error_cooldown: + logger.debug(f"错误通知冷却中,跳过: {title}") + return + + try: + # 根据级别选择图标 + icons = { + "error": "🚨", + "warning": "⚠️", + "info": "ℹ️" + } + icon = icons.get(level, "📌") + + # 构建消息 + formatted_message = f"""{icon} **{title}** + +{message} + +**时间**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}""" + + # 发送飞书通知 + import asyncio + try: + loop = asyncio.get_event_loop() + if loop.is_running(): + asyncio.run_coroutine_threadsafe( + handler.feishu_service.send_text(formatted_message), + loop + ) + else: + asyncio.run(handler.feishu_service.send_text(formatted_message)) + + logger.info(f"✅ 已发送错误通知: {title}") + handler.last_error_time = datetime.now() + + except RuntimeError: + asyncio.run(handler.feishu_service.send_text(formatted_message)) + logger.info(f"✅ 已发送错误通知: {title}") + handler.last_error_time = datetime.now() + + except Exception as e: + logger.error(f"发送手动错误通知失败: {e}") diff --git a/backend/requirements.txt b/backend/requirements.txt index 43b4ac0..3716871 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -29,3 +29,8 @@ websockets>=12.0 # WebSocket 支持,用于实时价格更新 feedparser>=6.0.10 beautifulsoup4>=4.12.0 lxml>=4.9.0 + +# A股板块监控依赖 +akshare>=1.12.0 +apscheduler>=3.10.0 # 定时任务 + diff --git a/backend/test_exception_handler.py b/backend/test_exception_handler.py new file mode 100644 index 0000000..5d8b37e --- /dev/null +++ b/backend/test_exception_handler.py @@ -0,0 +1,53 @@ +""" +测试全局异常处理器 +""" +import sys +import os +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + +import asyncio +from app.utils.error_handler import setup_global_exception_handler, init_error_notifier +from app.services.feishu_service import get_feishu_service + + +async def test_exception_handler(): + """测试异常处理器""" + print("=" * 60) + print("测试全局异常处理器") + print("=" * 60) + print() + + # 初始化 + setup_global_exception_handler() + print("✅ 全局异常处理器已安装") + print() + + # 初始化飞书通知 + try: + feishu = get_feishu_service() + init_error_notifier(feishu_service=feishu, enabled=True, cooldown=10) + print("✅ 飞书错误通知已启用(冷却时间10秒)") + except Exception as e: + print(f"❌ 飞书错误通知初始化失败: {e}") + return + print() + + # 模拟一个未捕获的异常 + print("-" * 60) + print("将触发一个测试异常...") + print("-" * 60) + print() + + # 这个异常会被全局异常处理器捕获 + # 注意:这会触发飞书通知 + raise ValueError("这是一个测试异常,用于验证全局异常处理器是否正常工作") + + +if __name__ == "__main__": + try: + asyncio.run(test_exception_handler()) + except SystemExit: + # 异常被处理后,程序可能会退出 + pass + print() + print("测试结束") diff --git a/backend/test_limit_up_approach.py b/backend/test_limit_up_approach.py new file mode 100644 index 0000000..d665a1e --- /dev/null +++ b/backend/test_limit_up_approach.py @@ -0,0 +1,100 @@ +""" +测试A股异动监控 - 使用涨停板数据作为替代方案 +当 eastmoney API 不可用时,使用涨停板数据分析 +""" +import asyncio +import sys +import os + +# 添加项目根目录到 Python 路径 +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +import akshare as ak +import pandas as pd +from app.utils.logger import logger + + +def test_limit_up_approach(): + """使用涨停板数据发现异动板块""" + print("=" * 60) + print("涨停板数据分析测试") + print("=" * 60) + print() + + try: + # 获取涨停板数据 + print("获取涨停板数据...") + df = ak.stock_zt_pool_em(date='20260227') + + if df.empty: + print("没有涨停数据") + return + + print(f"获取到 {len(df)} 只涨停股") + print() + + # 按行业分组统计 + print("-" * 60) + print("按行业统计涨停股数:") + print("-" * 60) + + # 获取行业列 + industry_col = '所属行业' + if industry_col not in df.columns: + # 尝试其他可能的列名 + for col in df.columns: + if '行业' in col or '板块' in col: + industry_col = col + break + + if industry_col in df.columns: + # 按行业分组 + industry_stats = df.groupby(industry_col).agg({ + '代码': 'count', + '涨跌幅': 'mean', + '最新价': 'mean' + }).rename(columns={'代码': '涨停数', '涨跌幅': '平均涨幅', '最新价': '平均价格'}) + + # 排序 + industry_stats = industry_stats.sort_values('涨停数', ascending=False) + + # 显示Top 10 + for idx, (industry, row) in enumerate(industry_stats.head(10).iterrows(), 1): + print(f"{idx}. {industry}: {int(row['涨停数'])}只涨停, 平均涨幅 {row['平均涨幅']:.2f}%") + + print() + print("-" * 60) + print("涨停股详情 (Top 5):") + print("-" * 60) + + # 显示涨幅最大的5只 + top_5 = df.nlargest(5, '涨跌幅') + for idx, row in top_5.iterrows(): + print(f"{int(row['序号'])}. {row['名称']} ({row['代码']})") + print(f" 涨幅: {row['涨跌幅']:.2f}% | 价格: {row['最新价']:.2f}") + print(f" 行业: {row.get(industry_col, 'N/A')}") + print(f" 封板时间: {row.get('最后封板时间', 'N/A')}") + print() + + return True + + except Exception as e: + print(f"测试失败: {e}") + import traceback + traceback.print_exc() + return False + + +if __name__ == "__main__": + success = test_limit_up_approach() + if success: + print("=" * 60) + print("测试成功!涨停板API可用") + print("=" * 60) + print() + print("建议: 可以使用涨停板数据作为板块异动监控的替代方案") + print(" - 按行业统计涨停股数") + print(" - 发现涨停集中的板块即为异动板块") + print(" - 涨停股本身就是最好的龙头股候选") + else: + print("测试失败") diff --git a/backend/test_sector_monitor.py b/backend/test_sector_monitor.py new file mode 100644 index 0000000..886af54 --- /dev/null +++ b/backend/test_sector_monitor.py @@ -0,0 +1,129 @@ +""" +测试A股板块异动监控 +运行一次检查并打印结果,不发送钉钉通知 +""" +import asyncio +import sys +import os + +# 添加项目根目录到 Python 路径 +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +from app.astock_agent import SectorMonitor +from app.utils.logger import logger + + +async def test_sector_monitor(): + """测试板块异动监控""" + print("=" * 60) + print("A股板块异动监控测试") + print("=" * 60) + print() + + # 配置参数 + change_threshold = 2.0 # 涨跌幅阈值 2% + top_n = 3 # 每个板块返回前3只龙头股 + + print(f"配置参数:") + print(f" - 涨跌幅阈值: {change_threshold}%") + print(f" - 龙头股数量: Top{top_n}") + print(f" - 钉钉通知: 已禁用(测试模式)") + print() + + # 创建监控器(不启用通知) + monitor = SectorMonitor( + change_threshold=change_threshold, + top_n=top_n, + enable_notifier=False # 测试时不发送通知 + ) + + print("开始检查板块异动...") + print("-" * 60) + print() + + # 执行一次检查 + result = await monitor.check_once() + + # 打印结果 + print() + print("-" * 60) + print("检查结果:") + print(f" - 异动板块数: {result['hot_sectors']}") + print(f" - 龙头股总数: {result['stocks']}") + print() + + # 显示详细信息 + if 'results' in result and result['results']: + print("=" * 60) + print("异动板块详情:") + print("=" * 60) + print() + + for idx, item in enumerate(result['results'], 1): + sector = item['sector'] + stocks = item['stocks'] + reason = item['reason'] + + # 板块信息 + change_icon = "📈" if sector['change_pct'] > 0 else "📉" + print(f"{idx}. 【{sector['name']}】{change_icon} {sector['change_pct']:+.2f}%") + print(f" - 涨跌额: {sector['change_amount']:+.2f}") + print(f" - 上涨家数: {sector['ups']}") + print(f" - 下跌家数: {sector['downs']}") + print(f" - 异动原因: {reason}") + print() + + # 龙头股信息 + print(f" 🏆 龙头股 Top {len(stocks)}:") + for s_idx, stock in enumerate(stocks, 1): + change_pct = stock['change_pct'] + if change_pct >= 5: + speed_icon = "⚡⚡⚡" + elif change_pct >= 3: + speed_icon = "⚡⚡" + elif change_pct >= 1: + speed_icon = "⚡" + else: + speed_icon = "🐌" + + # 成交额格式化 + amount = stock['amount'] + if amount >= 100000: + amount_str = f"{amount/100000:.1f}亿" + elif amount >= 10000: + amount_str = f"{amount/10000:.1f}万" + else: + amount_str = f"{amount:.0f}元" + + print(f" {s_idx}. {stock['name']} ({stock['code']})") + print(f" 价格: ¥{stock['price']:.2f} | 涨跌幅: {change_pct:+.2f}% {speed_icon}") + print(f" 成交额: {amount_str} | 换手率: {stock['turnover']:.2f}%") + print(f" 评分: {stock['score']:.1f} | 涨速: {stock['speed_level']}") + print() + + # 显示统计信息 + print("=" * 60) + print("统计信息:") + print("=" * 60) + stats = monitor.get_stats() + print(f" - 总检查次数: {stats['total_checks']}") + print(f" - 总异动板块: {stats['total_hot_sectors']}") + print(f" - 总龙头股数: {stats['total_stocks']}") + if stats['total_checks'] > 0: + print(f" - 平均龙头股: {stats['avg_stocks_per_check']:.1f}") + print(f" - 最后检查时间: {stats['last_check_time']}") + print() + print("=" * 60) + print("测试完成!") + print("=" * 60) + + +if __name__ == "__main__": + try: + asyncio.run(test_sector_monitor()) + except KeyboardInterrupt: + print("\n\n测试被用户中断") + except Exception as e: + print(f"\n\n测试失败: {e}") + import traceback + traceback.print_exc() diff --git a/backend/test_tushare_api_fields.py b/backend/test_tushare_api_fields.py new file mode 100644 index 0000000..1fc43af --- /dev/null +++ b/backend/test_tushare_api_fields.py @@ -0,0 +1,113 @@ +""" +测试 Tushare 接口返回数据 +检查各个接口返回的实际字段 +""" +import sys +import os +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +# 直接使用 token +token = '0ed6419a00d8923dc19c0b58fc92d94c9a0696949ab91a13aa58a0cc' + +def test_tushare_api(): + """测试 Tushare API 返回的数据""" + print("=" * 60) + print("Tushare API 数据测试") + print("=" * 60) + print() + + import tushare as ts + ts.set_token(token) + pro = ts.pro_api() + + from datetime import datetime, timedelta + yesterday = (datetime.now() - timedelta(days=5)).strftime('%Y%m%d') + today = datetime.now().strftime('%Y%m%d') + + # 1. 测试概念板块列表 + print("1. 测试 ths_index (概念板块列表)") + print("-" * 40) + sectors_df = pro.ths_index(type='N') + print(f"返回列: {sectors_df.columns.tolist()}") + print(f"前3行数据:") + print(sectors_df.head(3)) + print() + + # 获取第一个板块的代码 + if not sectors_df.empty: + first_sector_code = sectors_df.iloc[0]['ts_code'] + first_sector_name = sectors_df.iloc[0]['name'] + print(f"使用第一个板块测试: {first_sector_name} ({first_sector_code})") + print() + + # 2. 测试板块日线数据 + print("2. 测试 ths_daily (板块日线数据)") + print("-" * 40) + + daily_df = pro.ths_daily( + ts_code=first_sector_code, + start_date=yesterday, + end_date=today + ) + print(f"返回列: {daily_df.columns.tolist()}") + print(f"最新一天数据:") + if not daily_df.empty: + latest = daily_df.sort_values('trade_date').iloc[-1] + print(latest) + print() + + # 3. 测试成分股数据 + print("3. 测试 ths_member (成分股数据)") + print("-" * 40) + members_df = pro.ths_member(ts_code=first_sector_code) + print(f"返回列: {members_df.columns.tolist()}") + print(f"前5个成分股:") + print(members_df.head(5)) + print() + + # 4. 测试个股日线数据 + if not members_df.empty: + first_stock_code = members_df.iloc[0]['con_code'] + print(f"使用第一个成分股测试: {first_stock_code}") + print() + + print("4. 测试 daily (个股日线数据)") + print("-" * 40) + stock_daily_df = pro.daily( + ts_code=first_stock_code, + start_date=yesterday, + end_date=today + ) + print(f"返回列: {stock_daily_df.columns.tolist()}") + if not stock_daily_df.empty: + print(f"最新一天数据:") + latest_stock = stock_daily_df.sort_values('trade_date').iloc[-1] + print(latest_stock) + print() + + # 5. 测试个股每日指标 + print("5. 测试 daily_basic (个股每日指标)") + print("-" * 40) + basic_df = pro.daily_basic( + ts_code=first_stock_code, + trade_date=today, + fields='ts_code,trade_date,turnover_rate,volume_ratio,pe,pb' + ) + print(f"返回列: {basic_df.columns.tolist()}") + if not basic_df.empty: + print(f"数据:") + print(basic_df) + print() + + print("=" * 60) + print("测试完成") + print("=" * 60) + + +if __name__ == "__main__": + try: + test_tushare_api() + except Exception as e: + print(f"\n测试失败: {e}") + import traceback + traceback.print_exc() diff --git a/backend/test_tushare_detailed.py b/backend/test_tushare_detailed.py new file mode 100644 index 0000000..2cacd90 --- /dev/null +++ b/backend/test_tushare_detailed.py @@ -0,0 +1,87 @@ +""" +测试A股板块异动监控 - Tushare 版本(详细调试) +""" +import sys +import os +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +from app.config import get_settings + + +def test_tushare_detailed(): + """详细测试 Tushare 板块数据""" + print("=" * 60) + print("Tushare 板块数据详细测试") + print("=" * 60) + print() + + settings = get_settings() + token = settings.tushare_token + + from app.astock_agent.tushare_client import TushareClient + + ts_client = TushareClient(token=token) + + # 1. 测试获取概念板块列表 + print("1. 获取概念板块列表...") + sectors_df = ts_client.get_concept_sectors() + print(f" ✅ 获取到 {len(sectors_df)} 个概念板块") + print(f" 示例板块:") + for idx, row in sectors_df.head(5).iterrows(): + print(f" - {row['name']}: {row['ts_code']}") + print() + + # 2. 测试获取单个板块行情 + print("2. 获取单个板块行情(以人工智能为例)...") + ai_sectors = sectors_df[sectors_df['name'].str.contains('人工智能', na=False)] + + if not ai_sectors.empty: + ai_code = ai_sectors.iloc[0]['ts_code'] + print(f" 找到人工智能板块: {ai_code}") + + daily_df = ts_client.get_sector_daily(ai_code) + if not daily_df.empty: + print(f" ✅ 获取到 {len(daily_df)} 天行情数据") + latest = daily_df.sort_values('trade_date').iloc[-1] + print(f" 最新行情 ({latest['trade_date']}):") + print(f" - 收盘价: {latest['close']:.2f}") + print(f" - 涨跌幅: {latest.get('pct_chg', 'N/A')}%") + else: + print(f" ❌ 行情数据为空") + else: + print(f" ⚠️ 未找到人工智能板块") + print() + + # 3. 测试获取板块成分股 + print("3. 获取板块成分股...") + if not ai_sectors.empty: + members_df = ts_client.get_sector_members(ai_code) + if not members_df.empty: + print(f" ✅ 获取到 {len(members_df)} 个成分股") + print(f" 前5个成分股:") + for idx, row in members_df.head(5).iterrows(): + print(f" - {row.get('name', row.get('member_name', 'N/A'))}: {row['ts_code']}") + else: + print(f" ❌ 成分股数据为空") + print() + + # 4. 测试获取异动板块(降低阈值) + print("4. 检测异动板块(阈值 0%)...") + hot_df = ts_client.get_hot_sectors(threshold=0.0) + + if hot_df.empty: + print(" ⚠️ 未检测到上涨的板块") + else: + print(f" ✅ 检测到 {len(hot_df)} 个上涨板块") + print(f" 涨幅最高的 5 个板块:") + for idx, row in hot_df.head(5).iterrows(): + print(f" {idx}. {row['name']}: {row['change_pct']:+.2f}%") + print() + + print("=" * 60) + print("测试完成!") + print("=" * 60) + + +if __name__ == "__main__": + test_tushare_detailed() diff --git a/backend/test_tushare_sector_monitor.py b/backend/test_tushare_sector_monitor.py new file mode 100644 index 0000000..2d6e671 --- /dev/null +++ b/backend/test_tushare_sector_monitor.py @@ -0,0 +1,166 @@ +""" +测试A股板块异动监控 - Tushare 版本 +运行一次检查并打印结果并发送钉钉通知 +""" +import asyncio +import sys +import os + +# 添加项目根目录到 Python 路径 +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +from app.config import get_settings +from app.utils.logger import logger + + +def test_tushare_sector_monitor(): + """测试板块异动监控(Tushare 版本)""" + print("=" * 60) + print("A股板块异动监控测试 - Tushare 版本") + print("=" * 60) + print() + + # 获取配置 + settings = get_settings() + token = settings.tushare_token + + if not token: + print("❌ Tushare token 未配置!") + print("请在 .env 文件中设置 TUSHARE_TOKEN") + return + + print(f"✅ Tushare token 已配置: {token[:10]}...") + print() + + # 导入 Tushare 客户端和通知器 + from app.astock_agent.tushare_client import TushareClient + from app.astock_agent.tushare_sector_analyzer import TushareSectorAnalyzer + from app.astock_agent.tushare_stock_selector import TushareStockSelector + from app.astock_agent.notifier import DingTalkNotifier + + # 检查钉钉配置 + webhook = settings.dingtalk_astock_webhook or settings.dingtalk_webhook_url + secret = settings.dingtalk_astock_secret or settings.dingtalk_secret + notifier = None + + if webhook: + notifier = DingTalkNotifier(webhook, secret) + print(f"✅ 钉钉通知已配置") + else: + print("⚠️ 钉钉通知未配置,仅打印结果") + print() + + # 配置参数 + change_threshold = 2.0 # 涨跌幅阈值 2% + top_n = 3 # 每个板块返回前3只龙头股 + + print(f"配置参数:") + print(f" - 涨跌幅阈值: {change_threshold}%") + print(f" - 龙头股数量: Top{top_n}") + print() + + # 创建 Tushare 客户端 + print("初始化 Tushare 客户端...") + ts_client = TushareClient(token=token) + print("✅ Tushare 客户端初始化成功") + print() + + # 创建分析器 + print("-" * 60) + print("开始检查板块异动...") + print("-" * 60) + print() + + analyzer = TushareSectorAnalyzer(ts_client, change_threshold=change_threshold) + + # 执行检查 + hot_sectors = analyzer.detect_sector_changes() + + if not hot_sectors: + print("未检测到异动板块") + return + + print(f"检测到 {len(hot_sectors)} 个异动板块:") + print() + + # 显示异动板块详情 + notified_count = 0 + for idx, sector in enumerate(hot_sectors, 1): + print(f"{idx}. 【{sector['name']}】") + print(f" 代码: {sector['ts_code']}") + print(f" 涨跌幅: {sector['change_pct']:+.2f}%") + print(f" 收盘价: {sector['close']:.2f}") + print(f" 成交额: {sector['amount_str']}") + print(f" 交易日期: {sector['trade_date']}") + print() + + # 获取龙头股 + print(f" 正在获取龙头股...") + selector = TushareStockSelector(ts_client, top_n=top_n) + top_stocks = selector.select_leading_stocks(sector['ts_code'], sector['name']) + + if top_stocks: + print(f" 🏆 龙头股 Top {len(top_stocks)}:") + for stock in top_stocks: + # 成交额格式化 + amount = stock['amount'] + if amount >= 100000000: + amount_str = f"{amount/100000000:.2f}亿" + elif amount >= 10000: + amount_str = f"{amount/10000:.2f}万" + else: + amount_str = f"{amount:.0f}元" + + print(f" {stock['name']} ({stock['code']})") + print(f" 价格: {stock['price']:.2f} | 涨跌幅: {stock['change_pct']:+.2f}% {stock['speed_level']}") + print(f" 成交额: {amount_str} | 评分: {stock['score']:.1f}") + else: + print(f" ⚠️ 未找到龙头股") + print() + + # 发送钉钉通知 + if notifier and top_stocks: + # 使用实际的板块数据 + sector_for_notify = { + 'name': sector['name'], + 'change_pct': sector['change_pct'], + 'change_amount': sector.get('change', 0), # 涨跌额 + 'amount': sector['amount'], + 'leading_stock': top_stocks[0]['name'] if top_stocks else '', + 'ts_code': sector['ts_code'], + } + + # 分析异动原因 + reason = analyzer.get_hot_reason(sector['name'], top_stocks) + + # 发送通知 + print(f" 📲 发送钉钉通知...") + success = notifier.send_sector_alert( + sector_data=sector_for_notify, + top_stocks=top_stocks, + reason=reason + ) + if success: + print(f" ✅ 通知发送成功") + notified_count += 1 + else: + print(f" ❌ 通知发送失败") + print() + + print("=" * 60) + print("测试完成!") + print(f"检测到 {len(hot_sectors)} 个异动板块") + if notifier: + print(f"发送通知 {notified_count}/{len(hot_sectors)}") + print("=" * 60) + + +if __name__ == "__main__": + try: + test_tushare_sector_monitor() + except KeyboardInterrupt: + print("\n\n测试被用户中断") + except Exception as e: + print(f"\n\n测试失败: {e}") + import traceback + traceback.print_exc()