This commit is contained in:
aaron 2026-03-11 00:01:18 +08:00
parent 1809a45a5b
commit d242a5978f
18 changed files with 2458 additions and 52 deletions

212
backend/app/api/astock.py Normal file
View File

@ -0,0 +1,212 @@
"""
A股相关 API 路由
"""
from fastapi import APIRouter, HTTPException, BackgroundTasks
from typing import Dict, Any
from app.utils.logger import logger
from app.config import get_settings
router = APIRouter()
# 全局变量,用于访问智能体实例
_astock_agent_instance = None
def set_astock_agent(agent):
"""设置智能体实例(由 main.py 调用)"""
global _astock_agent_instance
_astock_agent_instance = agent
@router.get("/status")
async def get_astock_status() -> Dict[str, Any]:
"""
获取A股智能体状态
Returns:
智能体状态信息
"""
try:
if _astock_agent_instance is None:
return {
"enabled": False,
"message": "A股智能体未启用"
}
settings = get_settings()
return {
"enabled": True,
"running": _astock_agent_instance.running,
"selector_type": "short_term_thematic",
"description": "短期题材选股器(题材轮动 + 技术面确认 + 风险控制)",
"config": {
"min_market_cap": settings.astock_min_market_cap if hasattr(settings, 'astock_min_market_cap') else 50,
"max_market_cap": settings.astock_max_market_cap if hasattr(settings, 'astock_max_market_cap') else 500,
"change_threshold": settings.astock_change_threshold,
"top_n": settings.astock_top_n
}
}
except Exception as e:
logger.error(f"获取A股状态失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/select")
async def trigger_selection(background_tasks: BackgroundTasks) -> Dict[str, Any]:
"""
手动触发选股
Returns:
选股任务状态
"""
try:
if _astock_agent_instance is None:
raise HTTPException(status_code=400, detail="A股智能体未启用")
# 在后台执行选股任务
background_tasks.add_task(_astock_agent_instance.run_once)
return {
"success": True,
"message": "选股任务已提交,正在后台执行",
"note": "请查看通知或日志获取结果"
}
except HTTPException:
raise
except Exception as e:
logger.error(f"触发选股失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/select/sync")
async def trigger_selection_sync() -> Dict[str, Any]:
"""
手动触发选股同步执行
Returns:
选股结果
"""
try:
if _astock_agent_instance is None:
raise HTTPException(status_code=400, detail="A股智能体未启用")
# 同步执行选股
result = await _astock_agent_instance.run_once()
return {
"success": True,
"result": result
}
except HTTPException:
raise
except Exception as e:
logger.error(f"触发选股失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/config")
async def get_astock_config() -> Dict[str, Any]:
"""
获取A股选股配置
Returns:
配置信息
"""
try:
settings = get_settings()
return {
"selector": {
"type": "short_term_thematic",
"description": "短期题材选股器",
"strategy": "题材轮动 + 技术面确认 + 风险控制"
},
"screening": {
"min_market_cap": 50, # 最小市值(亿)
"max_market_cap": 500, # 最大市值(亿)
"min_turnover": 3.0, # 最小换手率(%
"max_turnover": 15.0, # 最大换手率(%
"sector_change_threshold": 2.0, # 板块涨幅阈值(%
"volume_ratio_threshold": 1.2 # 量比阈值
},
"risk_control": {
"max_drawdown": 10.0, # 最大回撤(%
"hard_stop_loss": -7.0, # 硬止损(%
"max_single_position": 20, # 单票最大仓位(%
"max_sector_position": 40, # 单行业最大仓位(%
"max_total_position": 80 # 总仓位最大值(%
},
"schedule": {
"enabled": settings.astock_monitor_enabled,
"time": "15:30", # 盘后运行
"timezone": "Asia/Shanghai"
},
"notifications": {
"dingtalk": settings.dingtalk_enabled,
"telegram": settings.telegram_enabled
}
}
except Exception as e:
logger.error(f"获取配置失败: {e}")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/sectors")
async def get_hot_sectors(limit: int = 10) -> Dict[str, Any]:
"""
获取当前异动板块
Args:
limit: 返回板块数量
Returns:
异动板块列表
"""
try:
from app.astock_agent.tushare_client import get_tushare_client
from app.config import get_settings
settings = get_settings()
ts_client = get_tushare_client(settings.tushare_token)
if not ts_client:
raise HTTPException(status_code=400, detail="Tushare客户端未初始化")
# 获取异动板块
sectors_df = ts_client.get_hot_sectors(threshold=2.0)
if sectors_df.empty:
return {
"success": True,
"count": 0,
"sectors": []
}
# 转换为列表格式
sectors = []
for _, row in sectors_df.head(limit).iterrows():
sectors.append({
"code": row['ts_code'],
"name": row['name'],
"change_pct": float(row['change_pct']),
"amount": float(row['amount']),
"amount_yi": float(row['amount']) / 100000000, # 转换为亿
"close": float(row['close'])
})
return {
"success": True,
"count": len(sectors),
"sectors": sectors
}
except HTTPException:
raise
except Exception as e:
logger.error(f"获取异动板块失败: {e}")
raise HTTPException(status_code=500, detail=str(e))

View File

@ -6,6 +6,8 @@ from .sector_monitor import SectorMonitor
from .tushare_client import TushareClient, get_tushare_client
from .tushare_sector_analyzer import TushareSectorAnalyzer
from .tushare_stock_selector import TushareStockSelector
from .short_term_thematic_selector import ShortTermThematicSelector, get_thematic_selector
from .astock_agent import AStockAgent, get_astock_agent
__all__ = [
'SectorMonitor',
@ -13,4 +15,8 @@ __all__ = [
'get_tushare_client',
'TushareSectorAnalyzer',
'TushareStockSelector',
'ShortTermThematicSelector',
'get_thematic_selector',
'AStockAgent',
'get_astock_agent',
]

View File

@ -0,0 +1,206 @@
"""
A股智能体 - 主控制器
负责执行每日选股并发送通知
"""
import asyncio
from typing import Dict, Any, Optional
from datetime import datetime, time
from app.utils.logger import logger
from app.config import get_settings
from app.services.dingtalk_service import get_dingtalk_service
from app.services.telegram_service import get_telegram_service
from app.astock_agent.tushare_client import get_tushare_client
from app.astock_agent.short_term_thematic_selector import get_thematic_selector
class AStockAgent:
"""A股智能体"""
_instance = None
_initialized = False
def __new__(cls, *args, **kwargs):
"""单例模式"""
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
"""初始化智能体"""
if AStockAgent._initialized:
return
AStockAgent._initialized = True
self.settings = get_settings()
# 初始化Tushare客户端
self.ts_client = get_tushare_client(self.settings.tushare_token)
if not self.ts_client:
logger.error("Tushare客户端初始化失败请检查配置")
raise Exception("Tushare客户端初始化失败")
# 初始化选股器
self.selector = get_thematic_selector(self.ts_client)
# 初始化通知服务
self.dingtalk = get_dingtalk_service()
self.telegram = get_telegram_service()
# 运行状态
self.running = False
self._task = None
logger.info("A股智能体初始化完成")
async def run_once(self) -> Dict[str, Any]:
"""
执行一次选股
Returns:
选股结果
"""
try:
logger.info("\n" + "=" * 60)
logger.info("📊 开始执行短期题材选股")
logger.info("=" * 60)
# 执行选股
result = self.selector.select_stocks(max_stocks=10)
# 输出日志
self._log_result(result)
# 发送通知
await self._send_notifications(result)
return result
except Exception as e:
logger.error(f"选股执行失败: {e}")
import traceback
logger.error(traceback.format_exc())
return {}
def _log_result(self, result: Dict[str, Any]):
"""输出选股结果到日志"""
if not result or result.get('total_stocks', 0) == 0:
logger.info("\n📊 今日未选出符合条件的股票")
return
logger.info(f"\n📊 选股完成,共选出 {result['total_stocks']} 只股票")
if result.get('summary'):
summary = result['summary']
logger.info(f" - 总仓位: {summary.get('position_percent', 0):.1f}%")
logger.info(f" - 涉及板块: {summary.get('sector_count', 0)}")
for stock in result.get('stocks', []):
logger.info(f" - {stock['name']}({stock['ts_code']}): {stock['close']:.2f}元, "
f"仓位:{stock['position']*100:.1f}%, 评分:{stock['score']:.1f}")
async def _send_notifications(self, result: Dict[str, Any]):
"""发送选股通知"""
try:
# 格式化输出文本
text = self.selector.format_output_text(result)
# 发送到钉钉
if self.settings.dingtalk_enabled:
await self.dingtalk.send_markdown(
"📊 短期题材选股结果",
text
)
logger.info("✅ 钉钉通知已发送")
# 发送到Telegram
if self.settings.telegram_enabled:
await self.telegram.send_message(text)
logger.info("✅ Telegram通知已发送")
except Exception as e:
logger.error(f"发送通知失败: {e}")
async def run_daily(self, run_time: str = "15:30"):
"""
每日定时运行
Args:
run_time: 运行时间HH:MM格式24小时制
"""
self.running = True
logger.info("\n" + "=" * 60)
logger.info("🚀 A股智能体已启动")
logger.info(f"⏰ 运行时间: 每天 {run_time}(盘后)")
logger.info("=" * 60)
# 解析运行时间
hour, minute = map(int, run_time.split(':'))
while self.running:
try:
# 计算下次运行时间
now = datetime.now()
next_run = now.replace(
hour=hour,
minute=minute,
second=0,
microsecond=0
)
# 如果今天的运行时间已过,设置为明天
if now >= next_run:
from datetime import timedelta
next_run = next_run + timedelta(days=1)
wait_seconds = (next_run - now).total_seconds()
logger.info(f"⏳ 等待下次运行: {next_run.strftime('%Y-%m-%d %H:%M:%S')} "
f"(等待 {wait_seconds/3600:.1f} 小时)")
# 等待到运行时间
await asyncio.sleep(wait_seconds)
# 执行选股
await self.run_once()
except Exception as e:
logger.error(f"定时运行出错: {e}")
import traceback
logger.error(traceback.format_exc())
# 等待1小时后重试
await asyncio.sleep(3600)
def stop(self):
"""停止运行"""
self.running = False
logger.info("A股智能体已停止")
# 全局单例
_astock_agent: Optional[AStockAgent] = None
def get_astock_agent() -> AStockAgent:
"""获取A股智能体单例"""
global _astock_agent
if _astock_agent is None:
_astock_agent = AStockAgent()
return _astock_agent
async def main():
"""测试入口"""
agent = get_astock_agent()
# 执行一次选股
result = await agent.run_once()
# 输出结果
print("\n" + "=" * 60)
print(agent.selector.format_output_text(result))
print("=" * 60)
if __name__ == "__main__":
asyncio.run(main())

View File

@ -0,0 +1,802 @@
"""
A股短期题材选股器
策略题材轮动 + 资金异动 + MA多头排列 + 量能配合
执行时间每天盘后输出
选股策略
1. 题材筛选资金异动板块成交量放大成交额增加
2. 个股筛选
- 市值 30-1000亿流动性好有炒作空间
- 换手率 1%-20%资金活跃
- 排除ST退市风险股
- MA趋势向上MA5 > MA20适合震荡市场
- 量能配合量比1.0
风险控制最大回撤10%
- 硬止损-7%单只股票最大损失
- 技术止损跌破20日均线
- 时间止损持仓>30天未启动
- 仓位管理
* 单票最大20%
* 单行业最大40%
* 总仓位最大80%
数据源
- Tushare API行情基本面资金流
"""
import pandas as pd
import numpy as np
from typing import Dict, List, Any, Optional
from datetime import datetime, timedelta
from app.utils.logger import logger
from app.config import get_settings
class ShortTermThematicSelector:
"""短期题材选股器"""
def __init__(self, tushare_client, strict_mode: bool = False):
"""
初始化选股器
Args:
tushare_client: TushareClient实例
strict_mode: 严格模式True使用原策略False放宽条件
"""
self.ts_client = tushare_client
self.strict_mode = strict_mode
# 选股参数(严格模式 vs 宽松模式)
if strict_mode:
# 严格模式:原策略
self.min_market_cap = 50
self.max_market_cap = 500
self.min_turnover = 3.0
self.max_turnover = 15.0
self.sector_change_threshold = 2.0
self.volume_ratio_threshold = 1.2
else:
# 宽松模式:适应当前市场
self.min_market_cap = 30 # 降低市值下限
self.max_market_cap = 1000 # 提高市值上限
self.min_turnover = 1.0 # 降低换手率下限
self.max_turnover = 20.0 # 提高换手率上限
self.sector_change_threshold = 1.5 # 降低板块涨幅要求
self.volume_ratio_threshold = 0.6 # 放宽量比要求原1.0现0.6
# 风险控制参数
self.max_drawdown = 10.0 # 最大回撤(%
self.hard_stop_loss = -7.0 # 硬止损(%
self.max_single_position = 0.20 # 单票最大仓位
self.max_sector_position = 0.40 # 单行业最大仓位
self.max_total_position = 0.80 # 总仓位最大值
def select_stocks(self, max_stocks: int = 10) -> Dict[str, Any]:
"""
执行选股
Args:
max_stocks: 最多返回股票数
Returns:
选股结果字典
"""
try:
logger.info("=" * 60)
logger.info(f"📊 短期题材选股开始 ({'严格模式' if self.strict_mode else '宽松模式'})")
logger.info("=" * 60)
# 1. 获取异动板块
logger.info("\n【第一步】筛选异动板块...")
hot_sectors = self._get_hot_sectors()
if hot_sectors.empty:
logger.warning("未找到异动板块")
return self._empty_result()
logger.info(f"找到 {len(hot_sectors)} 个异动板块")
for _, sector in hot_sectors.head(5).iterrows():
logger.info(f" - {sector['name']}: {sector['change_pct']:+.2f}%, 成交额: {sector['amount']/100000000:.2f}亿")
# 2. 从异动板块中筛选个股
logger.info("\n【第二步】从异动板块中筛选个股...")
all_selected = []
for idx, sector in hot_sectors.iterrows():
sector_code = sector['ts_code']
sector_name = sector['name']
sector_change = sector['change_pct']
logger.info(f"\n检查板块: {sector_name} ({sector_code})")
# 获取该板块的成分股
members_df = self.ts_client.get_sector_members(sector_code)
if members_df.empty:
logger.warning(f" 无法获取板块成分股")
continue
stock_codes = members_df['con_code'].tolist()
logger.info(f" 板块成分股: {len(stock_codes)}")
# 筛选该板块的个股
sector_stocks = self._select_stocks_from_sector(
stock_codes, sector_name, sector_change
)
if sector_stocks:
all_selected.extend(sector_stocks)
logger.info(f" ✓ 选出 {len(sector_stocks)}")
if len(all_selected) >= max_stocks * 2: # 多选一些备用
break
if not all_selected:
logger.warning("未选出符合条件的股票")
return self._empty_result()
# 3. 综合评分和排序
logger.info("\n【第三步】综合评分和排序...")
all_selected = self._rank_stocks(all_selected)
# 4. 应用仓位管理
logger.info("\n【第四步】计算仓位配置...")
final_stocks = self._allocate_positions(all_selected[:max_stocks])
# 5. 生成输出
result = self._format_result(final_stocks, hot_sectors)
logger.info("\n" + "=" * 60)
logger.info(f"✅ 选股完成,共选出 {len(final_stocks)} 只股票")
logger.info("=" * 60)
return result
except Exception as e:
logger.error(f"选股失败: {e}")
import traceback
logger.error(traceback.format_exc())
return self._empty_result()
def _get_hot_sectors(self) -> pd.DataFrame:
"""
获取异动板块基于成交量和资金异动
策略
1. 优先选择热门概念板块AI新能源芯片等
2. 关键指标成交量放大成交额增加
3. 辅助指标涨幅可选
Returns:
异动板块列表
"""
try:
sectors_df = self.ts_client.get_concept_sectors()
if sectors_df.empty:
return pd.DataFrame()
today = datetime.now().strftime('%Y%m%d')
yesterday = (datetime.now() - timedelta(days=10)).strftime('%Y%m%d')
# 热门板块关键词(优先选择这些)
hot_keywords = [
'人工智能', 'AI', '算力', 'CPO', 'AIGC',
'新能源汽车', '锂电', '储能', '充电桩', '汽车',
'半导体', '芯片', '集成电路',
'机器人', '工业4.0',
'5G', '6G', '通信',
'数字经济', '云计算', '大数据', '物联网',
'军工', '航空',
'生物医药', '医药', '医疗',
'消费电子',
'光伏', '风电', '氢能',
'智能电网', '电力',
'元宇宙', '虚拟现实',
]
hot_sectors = []
checked_codes = set()
# 1. 优先检查热门概念板块
logger.info("优先检查热门概念板块的资金异动...")
for keyword in hot_keywords:
# 查找包含关键词的板块
matching_sectors = sectors_df[sectors_df['name'].str.contains(keyword, na=False)]
for _, row in matching_sectors.iterrows():
ts_code = row['ts_code']
name = row['name']
if ts_code in checked_codes:
continue
checked_codes.add(ts_code)
try:
# 获取板块行情最近10天
daily_df = self.ts_client.pro.ths_daily(
ts_code=ts_code,
start_date=yesterday,
end_date=today
)
if daily_df.empty or len(daily_df) < 5:
continue
daily_df = daily_df.sort_values('trade_date')
# 获取最新2天数据
latest = daily_df.iloc[-1]
prev = daily_df.iloc[-2]
# 计算成交量和成交额
# ths_daily API 返回: vol(手), avg_price(元/股)
# 成交额(元) = vol * avg_price * 100
latest_vol = float(latest.get('vol', 0))
latest_avg_price = float(latest.get('avg_price', 0))
latest_amount = latest_vol * latest_avg_price * 100 # 转换为元
prev_vol = float(prev.get('vol', 0))
prev_avg_price = float(prev.get('avg_price', 0))
prev_amount = prev_vol * prev_avg_price * 100
# 计算成交量放大倍数
vol_ratio = latest_vol / prev_vol if prev_vol > 0 else 1
# 计算成交额放大倍数
amount_ratio = latest_amount / prev_amount if prev_amount > 0 else 1
# 涨跌幅
change_pct = float(latest.get('pct_change', 0))
# 判断资金异动:
# 1. 成交量放大 >= 1.2倍(宽松)或 2倍严格
# 2. 成交额明显增加(>= 10%
# 3. 有一定涨幅辅助判断(可选)
vol_threshold = 1.2 if not self.strict_mode else 2.0
amount_threshold = 1.1 if not self.strict_mode else 1.5
is_volume_surge = vol_ratio >= vol_threshold
is_amount_surge = amount_ratio >= amount_threshold
has_min_change = change_pct >= 0.5 # 至少有一点涨幅
if (is_volume_surge or is_amount_surge) and has_min_change:
hot_sectors.append({
'ts_code': ts_code,
'name': name,
'change_pct': change_pct,
'amount': latest_amount,
'close': float(latest.get('close', 0)),
'vol_ratio': vol_ratio,
'amount_ratio': amount_ratio,
'is_hot_sector': True
})
logger.info(f"{name}: 涨{change_pct:+.2f}%, 量比{vol_ratio:.2f}x, 额比{amount_ratio:.2f}x")
except Exception as e:
logger.debug(f"获取板块 {name} 行情失败: {e}")
continue
# 2. 如果热门板块不够,继续检查其他板块
if len(hot_sectors) < 5:
logger.info("热门板块数量不足,继续检查其他板块的资金异动...")
max_check = 200
for idx, row in sectors_df.iterrows():
ts_code = row['ts_code']
name = row.get('name', '')
if ts_code in checked_codes:
continue
checked_codes.add(ts_code)
try:
daily_df = self.ts_client.pro.ths_daily(
ts_code=ts_code,
start_date=yesterday,
end_date=today
)
if daily_df.empty or len(daily_df) < 5:
continue
daily_df = daily_df.sort_values('trade_date')
latest = daily_df.iloc[-1]
prev = daily_df.iloc[-2]
latest_vol = float(latest.get('vol', 0))
latest_avg_price = float(latest.get('avg_price', 0))
latest_amount = latest_vol * latest_avg_price * 100
prev_vol = float(prev.get('vol', 0))
prev_avg_price = float(prev.get('avg_price', 0))
prev_amount = prev_vol * prev_avg_price * 100
vol_ratio = latest_vol / prev_vol if prev_vol > 0 else 1
amount_ratio = latest_amount / prev_amount if prev_amount > 0 else 1
change_pct = float(latest.get('pct_change', 0))
# 非热门板块需要更强的异动信号
if vol_ratio >= 2.0 and amount_ratio >= 1.5 and change_pct >= 1.0:
hot_sectors.append({
'ts_code': ts_code,
'name': name,
'change_pct': change_pct,
'amount': latest_amount,
'close': float(latest.get('close', 0)),
'vol_ratio': vol_ratio,
'amount_ratio': amount_ratio,
'is_hot_sector': False
})
logger.info(f"{name}: 涨{change_pct:+.2f}%, 量比{vol_ratio:.2f}x, 额比{amount_ratio:.2f}x")
except Exception as e:
logger.debug(f"获取板块 {name} 行情失败: {e}")
continue
if len(hot_sectors) >= 10:
break
result_df = pd.DataFrame(hot_sectors)
if not result_df.empty:
# 热门板块排在前面,按成交额放大倍数排序
result_df = result_df.sort_values(['is_hot_sector', 'amount_ratio'], ascending=[False, False])
logger.info(f"共找到 {len(result_df)} 个资金异动板块(热门: {result_df['is_hot_sector'].sum()} 个)")
logger.info(f"平均量比: {result_df['vol_ratio'].mean():.2f}x, 平均额比: {result_df['amount_ratio'].mean():.2f}x")
return result_df
except Exception as e:
logger.error(f"获取异动板块失败: {e}")
import traceback
logger.debug(traceback.format_exc())
return pd.DataFrame()
def _select_stocks_from_sector(
self,
stock_codes: List[str],
sector_name: str,
sector_change: float
) -> List[Dict[str, Any]]:
"""
从板块中筛选个股
Args:
stock_codes: 股票代码列表
sector_name: 板块名称
sector_change: 板块涨跌幅
Returns:
符合条件的股票列表
"""
selected = []
# 批量获取行情数据
realtime_df = self.ts_client.get_realtime_data(stock_codes)
if realtime_df.empty:
logger.warning(f"板块 {sector_name} 无法获取实时行情数据")
return []
logger.info(f" 获取到 {len(realtime_df)} 只股票的行情数据(请求了 {len(stock_codes)} 只)")
# 获取每日指标
from datetime import datetime
trade_date = datetime.now().strftime('%Y%m%d')
basic_df = self.ts_client.get_stock_daily_basic(stock_codes, trade_date)
# 获取历史数据(计算技术指标)
logger.debug(f" 开始检查 {len(stock_codes)} 只成分股...")
checked_count = 0
passed_count = 0
for stock_code in stock_codes: # 检查所有成分股
try:
checked_count += 1
if checked_count % 10 == 0:
logger.debug(f" 进度: {checked_count}/{len(stock_codes)}, 已通过: {passed_count}")
result = self._check_single_stock(
stock_code, sector_name, sector_change,
realtime_df, basic_df
)
if result:
selected.append(result)
passed_count += 1
except Exception as e:
logger.debug(f"检查股票 {stock_code} 失败: {e}")
continue
logger.info(f" 检查完成: {checked_count} 只,通过筛选: {passed_count}")
return selected
def _check_single_stock(
self,
stock_code: str,
sector_name: str,
sector_change: float,
realtime_df: pd.DataFrame,
basic_df: pd.DataFrame
) -> Optional[Dict[str, Any]]:
"""
检查单只股票是否符合条件
Args:
stock_code: 股票代码
sector_name: 所属板块
sector_change: 板块涨跌幅
realtime_df: 实时行情数据
basic_df: 每日指标数据
Returns:
符合条件返回股票信息否则返回None
"""
# 获取实时行情
stock_data = realtime_df[realtime_df['ts_code'] == stock_code]
if stock_data.empty:
logger.debug(f" ⚠️ {stock_code}: 无实时行情数据")
return None
row = stock_data.iloc[0]
# 基本数据
close = float(row['close'])
pct_chg = float(row['pct_chg'])
amount = float(row['amount']) * 1000 # 转换为元
vol = float(row['vol'])
# 获取股票名称
name = row.get('name', '')
logger.debug(f" 🔍 {name}({stock_code}): 价格={close:.2f}, 涨跌幅={pct_chg:+.2f}%")
# 过滤ST股票
if 'ST' in name or '退' in name:
logger.debug(f"{name}({stock_code}): ST/退市股,跳过")
return None
# 获取每日指标
basic_data = basic_df[basic_df['ts_code'] == stock_code]
if not basic_data.empty:
turnover = float(basic_data.iloc[0].get('turnover_rate', 0))
# 换手率过滤(只有有数据时才检查)
if turnover > 0 and not (self.min_turnover <= turnover <= self.max_turnover):
logger.debug(f"{name}({stock_code}): 换手率不符合 ({turnover:.2f}%)")
return None
else:
turnover = 0.0
# 获取历史数据计算技术指标
end_date = datetime.now().strftime('%Y%m%d')
start_date = (datetime.now() - timedelta(days=60)).strftime('%Y%m%d')
try:
daily_df = self.ts_client.pro.daily(
ts_code=stock_code,
start_date=start_date,
end_date=end_date
)
if daily_df.empty or len(daily_df) < 30:
return None
daily_df = daily_df.sort_values('trade_date').reset_index(drop=True)
close_series = daily_df['close']
vol_series = daily_df['vol']
# 计算均线
ma5 = close_series.rolling(window=5).mean().iloc[-1]
ma10 = close_series.rolling(window=10).mean().iloc[-1]
ma20 = close_series.rolling(window=20).mean().iloc[-1]
ma5_vol = vol_series.rolling(window=5).mean().iloc[-1]
# MA趋势检查MA5 > MA20要求短期在长期趋势之上
# 在震荡修复阶段允许MA5略低于MA10但必须高于MA20
if not (ma5 > ma20):
logger.debug(f"{name}({stock_code}): MA5不在MA20之上 (MA5={ma5:.2f}, MA10={ma10:.2f}, MA20={ma20:.2f})")
return None
# 量能检查
volume_ratio = vol / ma5_vol if ma5_vol > 0 else 0
if volume_ratio < self.volume_ratio_threshold:
logger.debug(f"{name}({stock_code}): 量能不足 (量比: {volume_ratio:.2f})")
return None
except Exception as e:
logger.debug(f"{name}({stock_code}): 计算技术指标失败: {e}")
return None
# 估算市值(使用成交额和换手率)
if turnover > 0:
market_cap = amount / (turnover / 100) # 元
market_cap_yi = market_cap / 100000000 # 转换为亿
# 市值过滤
if not (self.min_market_cap <= market_cap_yi <= self.max_market_cap):
logger.debug(f"{name}({stock_code}): 市值不符合 ({market_cap_yi:.2f}亿)")
return None
else:
market_cap_yi = 0
# 通过所有筛选条件
logger.info(f"{name}({stock_code}): 符合条件")
return {
'ts_code': stock_code,
'name': name,
'close': close,
'pct_chg': pct_chg,
'amount': amount,
'turnover': turnover,
'volume_ratio': volume_ratio,
'market_cap_yi': market_cap_yi,
'sector': sector_name,
'sector_change': sector_change,
'ma5': ma5,
'ma10': ma10,
'ma20': ma20,
}
def _rank_stocks(self, stocks: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
综合评分和排序
评分维度
- 板块强度 (40%)
- 个股涨幅 (30%)
- 量能表现 (30%)
Args:
stocks: 股票列表
Returns:
排序后的股票列表
"""
for stock in stocks:
score = 0.0
# 1. 板块强度 (40分)
sector_change = stock['sector_change']
if sector_change >= 5:
score += 40
elif sector_change >= 3:
score += 35
elif sector_change >= 2:
score += 30
elif sector_change >= 1:
score += 25
elif sector_change > 0:
score += 20
else:
score += 10
# 2. 个股涨幅 (30分)
pct_chg = stock['pct_chg']
if pct_chg >= 7:
score += 30
elif pct_chg >= 5:
score += 26
elif pct_chg >= 3:
score += 22
elif pct_chg >= 1:
score += 18
elif pct_chg > 0:
score += 12
else:
score += 5
# 3. 量能表现 (30分)
volume_ratio = stock['volume_ratio']
if volume_ratio >= 2.5:
score += 30
elif volume_ratio >= 2.0:
score += 26
elif volume_ratio >= 1.5:
score += 22
elif volume_ratio >= 1.2:
score += 18
else:
score += 10
# 4. 换手率 (10分)
turnover = stock.get('turnover', 0)
if 8 <= turnover <= 12: # 最理想的换手率范围
score += 10
elif 5 <= turnover < 8 or 12 < turnover <= 15:
score += 8
elif 3 <= turnover < 5:
score += 6
else:
score += 4
stock['score'] = score
# 按得分排序
return sorted(stocks, key=lambda x: x['score'], reverse=True)
def _allocate_positions(self, stocks: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
仓位分配
策略
- 优先级高的股票获得更大仓位
- 根据得分动态分配仓位
- 确保风险分散
Args:
stocks: 股票列表
Returns:
添加了仓位信息的股票列表
"""
if not stocks:
return []
total_score = sum(s['score'] for s in stocks)
for stock in stocks:
# 根据得分比例分配仓位
score_ratio = stock['score'] / total_score if total_score > 0 else 1.0 / len(stocks)
# 基础仓位(按得分比例)
base_position = score_ratio * self.max_total_position
# 调整:最高得分股票仓位不超过最大单票仓位
if base_position > self.max_single_position:
base_position = self.max_single_position
# 仓位范围5% - 20%
position = max(0.05, min(base_position, self.max_single_position))
stock['position'] = position
stock['stop_loss'] = close * (1 - 0.07) # 硬止损-7%
stock['target_profit'] = close * (1 + 0.15) # 目标止盈+15%
return stocks
def _format_result(self, stocks: List[Dict[str, Any]], sectors: pd.DataFrame) -> Dict[str, Any]:
"""
格式化选股结果
Args:
stocks: 股票列表
sectors: 异动板块列表
Returns:
格式化的结果字典
"""
return {
'date': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'total_stocks': len(stocks),
'total_sectors': len(sectors),
'stocks': stocks,
'sectors': sectors.head(10).to_dict('records'),
'summary': self._generate_summary(stocks)
}
def _generate_summary(self, stocks: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
生成汇总信息
Args:
stocks: 股票列表
Returns:
汇总信息字典
"""
if not stocks:
return {}
total_position = sum(s['position'] for s in stocks)
sectors = list(set(s['sector'] for s in stocks))
return {
'total_position': total_position,
'position_percent': total_position * 100,
'sector_count': len(sectors),
'sectors': sectors,
'avg_score': sum(s['score'] for s in stocks) / len(stocks),
}
def _empty_result(self) -> Dict[str, Any]:
"""返回空结果"""
return {
'date': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'total_stocks': 0,
'total_sectors': 0,
'stocks': [],
'sectors': [],
'summary': {}
}
def format_output_text(self, result: Dict[str, Any]) -> str:
"""
格式化输出文本
Args:
result: 选股结果
Returns:
格式化的文本
"""
if not result or result['total_stocks'] == 0:
return "📊 **短期题材选股结果**\n\n今日未选出符合条件的股票\n\n*⚠️ 仅供参考,不构成投资建议*"
lines = [
"📊 **短期题材选股结果**",
"",
f"选股时间: {result['date']}",
f"选出股票: {result['total_stocks']}",
f"异动板块: {result['total_sectors']}",
"",
]
# 汇总信息
if result.get('summary'):
summary = result['summary']
lines.extend([
"**💼 仓位配置**",
f"总仓位: {summary['position_percent']:.1f}%",
f"涉及板块: {summary['sector_count']}",
f"平均得分: {summary['avg_score']:.1f}",
"",
])
# 异动板块
if result.get('sectors'):
lines.append("**🔥 异动板块 Top5**")
for sector in result['sectors'][:5]:
vol_ratio = sector.get('vol_ratio', 0)
amount_ratio = sector.get('amount_ratio', 0)
vol_icon = "🔥" if vol_ratio >= 2.0 else "📊"
lines.append(f"- {sector['name']}: {sector['change_pct']:+.2f}% | 量比{vol_ratio:.2f}x {vol_icon} | 额比{amount_ratio:.2f}x")
lines.append("")
# 选出股票
lines.append("**🏆 选出股票**")
for idx, stock in enumerate(result['stocks'], 1):
lines.extend([
f"",
f"**{idx}. {stock['name']} ({stock['ts_code']})**",
f" 现价: ¥{stock['close']:.2f} ({stock['pct_chg']:+.2f}%)",
f" 板块: {stock['sector']} ({stock['sector_change']:+.2f}%)",
f" 换手率: {stock['turnover']:.2f}% | 量比: {stock['volume_ratio']:.2f}",
f" 市值: {stock['market_cap_yi']:.2f}亿 | 评分: {stock['score']:.1f}",
f" MA5: ¥{stock['ma5']:.2f} | MA10: ¥{stock['ma10']:.2f} | MA20: ¥{stock['ma20']:.2f}",
f" 建议仓位: {stock['position']*100:.1f}%",
f" 止损价: ¥{stock['stop_loss']:.2f} (-7%)",
f" 目标价: ¥{stock['target_profit']:.2f} (+15%)",
])
lines.extend([
"",
"---",
"",
"**⚠️ 风险提示**",
f"- 硬止损: {self.hard_stop_loss}%(单只股票最大损失)",
f"- 技术止损: 跌破20日均线",
f"- 时间止损: 持仓>30天未启动",
f"- 单票最大: {self.max_single_position*100}%",
f"- 单行业最大: {self.max_sector_position*100}%",
"",
"*⚠️ 仅供参考,不构成投资建议*"
])
return "\n".join(lines)
# 全局单例
_thematic_selector: Optional[ShortTermThematicSelector] = None
def get_thematic_selector(tushare_client=None) -> ShortTermThematicSelector:
"""获取短期题材选股器单例"""
global _thematic_selector
if _thematic_selector is None:
if tushare_client is None:
from app.astock_agent.tushare_client import get_tushare_client
from app.config import get_settings
settings = get_settings()
tushare_client = get_tushare_client(settings.tushare_token)
_thematic_selector = ShortTermThematicSelector(tushare_client)
return _thematic_selector

View File

@ -183,27 +183,34 @@ class TushareClient:
def fetch():
# daily_basic - 获取每日指标
codes_str = ','.join(ts_codes[:300]) # 限制单次查询数量
# 尝试获取最近3天的数据以防当天数据未更新
# 分批处理以支持超过300只股票的情况
all_data = []
for i in range(3):
try_date = (datetime.now() - timedelta(days=i)).strftime('%Y%m%d')
df = self.pro.daily_basic(
ts_code=codes_str,
trade_date=try_date,
fields='ts_code,trade_date,turnover_rate,volume_ratio,pe,pb'
)
if not df.empty:
all_data.append(df)
# 如果找到数据就不再尝试更早的日期
break
batch_size = 300
for i in range(0, min(len(ts_codes), 900), batch_size): # 最多处理900只
batch_codes = ts_codes[i:i+batch_size]
# 尝试获取最近3天的数据以防当天数据未更新
for j in range(3):
try_date = (datetime.now() - timedelta(days=j)).strftime('%Y%m%d')
df = self.pro.daily_basic(
ts_code=','.join(batch_codes),
trade_date=try_date,
fields='ts_code,trade_date,turnover_rate,volume_ratio,pe,pb'
)
if not df.empty:
all_data.append(df)
# 如果找到数据就不再尝试更早的日期
break
if all_data:
return pd.concat(all_data, ignore_index=True)
return pd.DataFrame()
return self._get_cached(f'stock_daily_basic_{trade_date}', fetch)
# 创建包含股票代码的缓存键
codes_key = '_'.join(sorted(ts_codes[:20]))
cache_key = f'stock_daily_basic_{trade_date}_{codes_key}'
return self._get_cached(cache_key, fetch)
def get_stock_basic(self) -> pd.DataFrame:
"""
@ -242,20 +249,51 @@ class TushareClient:
today = datetime.now().strftime('%Y%m%d')
yesterday = (datetime.now() - timedelta(days=10)).strftime('%Y%m%d')
# 创建包含股票代码的缓存键
codes_key = '_'.join(sorted(ts_codes[:20])) # 使用前20只代码创建唯一键
cache_key = f'realtime_{today}_{codes_key}'
def fetch():
# 使用 daily 接口获取最近数据
codes_str = ','.join(ts_codes[:100]) # 限制单次查询数量
df = self.pro.daily(
ts_code=codes_str,
start_date=yesterday,
end_date=today
)
# 只返回每个股票的最新一天数据
if not df.empty:
df = df.sort_values('trade_date').groupby('ts_code').tail(1)
return df
# 分批处理以支持超过100只股票的情况
all_dfs = []
batch_size = 100
for i in range(0, min(len(ts_codes), 500), batch_size): # 最多处理500只
batch_codes = ts_codes[i:i+batch_size]
codes_str = ','.join(batch_codes)
df = self.pro.daily(
ts_code=codes_str,
start_date=yesterday,
end_date=today
)
if not df.empty:
all_dfs.append(df)
return self._get_cached(f'realtime_{today}', fetch)
# 合并所有批次的数据
if all_dfs:
combined_df = pd.concat(all_dfs, ignore_index=True)
# 只返回每个股票的最新一天数据
combined_df = combined_df.sort_values('trade_date').groupby('ts_code').tail(1)
# 获取股票基本信息(包含股票名称)
stock_basic = self.pro.stock_basic(
exchange='',
list_status='L',
fields='ts_code,symbol,name,area,industry,list_date'
)
# 合并股票名称
if not stock_basic.empty:
combined_df = combined_df.merge(
stock_basic[['ts_code', 'name']],
on='ts_code',
how='left'
)
return combined_df
return pd.DataFrame()
return self._get_cached(cache_key, fetch)
def get_hot_sectors(self, threshold: float = 2.0) -> pd.DataFrame:
"""

View File

@ -135,6 +135,7 @@ class Settings(BaseSettings):
paper_trading_max_orders: int = 10 # 最大持仓+挂单总数
paper_trading_auto_close_opposite: bool = False # 是否自动平掉反向持仓(智能策略)
paper_trading_breakeven_threshold: float = 1 # 保本止损触发阈值盈利百分比0表示禁用
paper_trading_order_timeout_hours: float = 4 # 挂单超时时间小时默认4小时
# 移动止损配置
paper_trading_trailing_stop_enabled: bool = True # 是否启用移动止损

View File

@ -241,6 +241,54 @@ class CryptoAgent:
await self.dingtalk.send_action_card(title, content)
logger.info(f"已发送订单平仓通知: {result.get('order_id')}")
async def _notify_expired_orders_cancelled(self, cancelled_orders: List[Dict[str, Any]]):
"""
发送超时订单取消通知
Args:
cancelled_orders: 被取消的订单列表
"""
if not cancelled_orders:
return
title = f"⏰ 已自动取消 {len(cancelled_orders)} 个超时挂单"
# 构建订单列表内容
order_lines = []
for order in cancelled_orders[:5]: # 最多显示5个
side_icon = "🟢" if order['side'] == 'long' else "🔴"
order_lines.append(
f"{side_icon} **{order['symbol']}** ({order['side']})\n"
f" 入场价: ${order['entry_price']:.2f} | "
f"已挂单: {order['age_hours']:.1f}小时"
)
if len(cancelled_orders) > 5:
order_lines.append(f"\n... 还有 {len(cancelled_orders) - 5} 个订单")
content_parts = [
f"⏰ **挂单超时自动取消**",
f"📊 **取消数量**: {len(cancelled_orders)}",
f"⚙️ **超时阈值**: {self.paper_trading.order_timeout_hours} 小时",
"",
"**取消的订单**:",
]
content_parts.extend(order_lines)
content_parts.append("\n💡 挂单超时自动取消,释放仓位供新信号使用")
content = "\n".join(content_parts)
# 发送通知
if self.settings.feishu_enabled:
await self.feishu_paper.send_card(title, content, "orange")
if self.settings.telegram_enabled:
message = f"{title}\n\n{content}"
await self.telegram.send_message(message)
if self.settings.dingtalk_enabled:
await self.dingtalk.send_action_card(title, content)
logger.info(f"已发送超时订单取消通知: {len(cancelled_orders)} 个订单")
def _get_seconds_until_next_5min(self) -> int:
"""计算距离下一个5分钟整点的秒数"""
now = datetime.now()
@ -311,6 +359,13 @@ class CryptoAgent:
logger.info(f"⏰ 定时任务执行 [{run_time.strftime('%Y-%m-%d %H:%M:%S')}]")
logger.info("=" * 60)
# 检查并取消超时挂单(在分析开始前)
cancelled = self.paper_trading.check_and_cancel_expired_orders()
if cancelled:
logger.info(f"🔄 已自动取消 {len(cancelled)} 个超时挂单")
# 发送超时取消通知
await self._notify_expired_orders_cancelled(cancelled)
for symbol in self.symbols:
await self.analyze_symbol(symbol)

View File

@ -24,7 +24,7 @@ from app.services.news_service import get_news_service
class MarketSignalAnalyzer:
"""市场信号分析器 - 只关注市场,输出客观信号"""
# 纯市场分析系统提示词(日内交易优化版
# 纯市场分析系统提示词(日内交易优化版 + 多级别反转检测
MARKET_ANALYSIS_PROMPT = """你是一位专业的加密货币**日内交易员**和技术分析师。你的任务是综合分析**趋势方向、K线数据、量价关系、技术指标和新闻舆情**,给出**适合日内快进快出**的交易信号。
## 🎯 日内交易核心定位
@ -33,6 +33,33 @@ class MarketSignalAnalyzer:
- 时限单笔持仓不超过 4 小时
- 策略捕捉短期波动不过夜持仓
## 🔄 多级别趋势分析(重要!)
当检测到**小级别反转但大级别未反转**这通常是**重要的交易机会**
### 识别级别背离
- **15分钟/30分钟** 已转势 **1小时/4小时** 还没反应
- 这说明大资金可能正在调仓或反转但均线系统滞后
- **这种情况下可以提前布局而不是等待大级别确认**
### 反转信号的处理
1. **强反转信号**30分钟 vs 4小时背离
- 优先考虑**反手操作**平掉旧仓位开新方向仓位
- 或考虑**顺势短线**跟随小级别趋势快进快出
2. **启动信号**大级别为震荡小级别启动
- 可以顺势入场跟随小级别趋势
- 但要警惕假突破设置好止损
3. **谨慎信号**小级别反转但大级别趋势强劲
- 可能只是回调不要盲目反手
- 等待更多确认信号
### 多级别趋势判断原则
- **顺势优先**当各级别趋势一致时顺势交易
- **小级别反转**小级别反转 + 大级别震荡 可以尝试短线跟随
- **级别背离**小级别反转 + 大级别强劲 谨慎可能是假突破
- **强反转信号**多个小级别同时反转 + 大级别走弱 **重点考虑反手**
## 🚨 铁律(违反即失败)
1. **盈亏比第一**所有交易必须满足盈亏比 1:1.2
- 盈亏比 = (目标盈利 - 入场价) / (入场价 - 止损价)
@ -799,29 +826,8 @@ class MarketSignalAnalyzer:
bb_lower = df['bb_lower'].iloc[-1]
context_parts.append(f"布林带: 上轨 {bb_upper:.2f}, 下轨 {bb_lower:.2f}")
# 均线系统(使用 30m 作为日内主周期)
context_parts.append(f"\n## 均线系统 (30m 日内主趋势)")
df_30m = data.get('30m')
if df_30m is not None and len(df_30m) > 0:
latest = df_30m.iloc[-1]
context_parts.append(f"EMA5: {latest.get('ma5', 'N/A')}")
context_parts.append(f"EMA10: {latest.get('ma10', 'N/A')}")
context_parts.append(f"EMA20: {latest.get('ma20', 'N/A')}")
context_parts.append(f"EMA50: {latest.get('ma50', 'N/A')}")
# 判断均线排列
ma5 = latest.get('ma5', 0)
ma10 = latest.get('ma10', 0)
ma20 = latest.get('ma20', 0)
ma50 = latest.get('ma50', 0)
if all([ma5, ma10, ma20, ma50]):
if ma5 > ma10 > ma20 > ma50:
context_parts.append("均线排列: 多头排列 📈 (EMA5 > EMA10 > EMA20 > EMA50)")
elif ma5 < ma10 < ma20 < ma50:
context_parts.append("均线排列: 空头排列 📉 (EMA5 < EMA10 < EMA20 < EMA50)")
else:
context_parts.append("均线排列: 交织,方向不明")
# 多级别趋势分析(检测小级别反转)
context_parts.append(self._analyze_multi_timeframe_trend(data))
# 量比分析
df_5m = data.get('5m')
@ -1110,6 +1116,158 @@ class MarketSignalAnalyzer:
return "\n".join(prompt_parts)
def _analyze_multi_timeframe_trend(self, data: Dict[str, Any]) -> str:
"""
多级别趋势分析 - 检测小级别反转信号
目的识别小级别15m/30m已经反转但大级别1h/4h还未反应的情况
这样可以提前捕捉反转信号而不是等待均线系统确认
"""
context_parts = ["\n## 🔄 多级别趋势分析(检测反转信号)"]
# 定义各级别
timeframes = {
'5m': ('超短线', 5),
'15m': ('短线', 15),
'30m': ('日内', 30),
'1h': ('小时', 60),
'4h': ('趋势', 240)
}
trend_status = {} # 存储各级别趋势状态
# 分析各级别趋势
for tf, (tf_name, minutes) in timeframes.items():
df = data.get(tf)
if df is None or len(df) < 10:
continue
latest = df.iloc[-1]
prev = df.iloc[-2]
# 1. 均线趋势判断
ma5 = latest.get('ma5', 0)
ma10 = latest.get('ma10', 0)
ma20 = latest.get('ma20', 0)
ma_trend = None
if ma5 and ma10 and ma20:
if ma5 > ma10 > ma20:
ma_trend = 'bull'
elif ma5 < ma10 < ma20:
ma_trend = 'bear'
else:
ma_trend = 'neutral'
# 2. MACD 趋势判断
macd_trend = None
if 'macd' in df.columns and 'macd_signal' in df.columns:
macd = df['macd'].iloc[-1]
signal = df['macd_signal'].iloc[-1]
hist = df.get('macd_hist', pd.Series([0])).iloc[-1]
if macd > 0 and signal > 0:
macd_trend = 'bull'
elif macd < 0 and signal < 0:
macd_trend = 'bear'
else:
macd_trend = 'neutral'
# 3. 价格动量最近3根K线
close_3 = df['close'].iloc[-3]
close_2 = df['close'].iloc[-2]
close_1 = df['close'].iloc[-1]
price_momentum = 'up' if close_1 > close_3 else 'down' if close_1 < close_3 else 'flat'
# 综合判断趋势
if ma_trend == 'bull' and (macd_trend == 'bull' or price_momentum == 'up'):
trend = 'bull'
elif ma_trend == 'bear' and (macd_trend == 'bear' or price_momentum == 'down'):
trend = 'bear'
elif price_momentum == 'up' and macd_trend == 'bull':
trend = 'bull'
elif price_momentum == 'down' and macd_trend == 'bear':
trend = 'bear'
else:
trend = 'neutral'
trend_status[tf] = {
'name': tf_name,
'trend': trend,
'ma_trend': ma_trend,
'macd_trend': macd_trend,
'momentum': price_momentum,
'price': float(latest['close']),
'change_3': ((close_1 - close_3) / close_3 * 100) if close_3 > 0 else 0
}
# 生成多级别趋势报告
if not trend_status:
context_parts.append("⚠️ 数据不足,无法进行多级别分析")
return "\n".join(context_parts)
# 检测反转信号
reversal_signals = []
# 1. 小级别反转但大级别未反转
if ('15m' in trend_status and '1h' in trend_status and
trend_status['15m']['trend'] != trend_status['1h']['trend'] and
trend_status['15m']['trend'] != 'neutral'):
small_tf = trend_status['15m']
large_tf = trend_status['1h']
reversal_type = "🔄 反转信号" if large_tf['trend'] != 'neutral' else "⚡ 启动信号"
reversal_signals.append(
f"{reversal_type}: 15分钟[{small_tf['trend']}] vs 1小时[{large_tf['trend']}]"
)
reversal_signals.append(
f" 15分钟变动: {small_tf['change_3']:+.2f}% | 价格: ${small_tf['price']:.2f}"
)
# 2. 30分钟反转但4小时未反转更强的反转信号
if ('30m' in trend_status and '4h' in trend_status and
trend_status['30m']['trend'] != trend_status['4h']['trend'] and
trend_status['30m']['trend'] != 'neutral'):
small_tf = trend_status['30m']
large_tf = trend_status['4h']
reversal_type = "🔄 强反转" if large_tf['trend'] != 'neutral' else "⚡ 趋势启动"
reversal_signals.append(
f"{reversal_type}: 30分钟[{small_tf['trend']}] vs 4小时[{large_tf['trend']}]"
)
reversal_signals.append(
f" 30分钟变动: {small_tf['change_3']:+.2f}% | 价格: ${small_tf['price']:.2f}"
)
# 添加各级别趋势详情
context_parts.append("\n各级别趋势状态:")
for tf in ['5m', '15m', '30m', '1h', '4h']:
if tf in trend_status:
status = trend_status[tf]
trend_icon = {'bull': '📈', 'bear': '📉', 'neutral': '➡️'}.get(status['trend'], '')
context_parts.append(
f" {tf} ({status['name']}): {trend_icon} {status['trend']} "
f"| 动量: {status['change_3']:+.2f}% | 价格: ${status['price']:.2f}"
)
# 添加反转信号
if reversal_signals:
context_parts.append("\n⚠️ 检测到级别背离/反转信号:")
context_parts.extend(reversal_signals)
context_parts.append("\n💡 提示: 小级别已反转但大级别滞后,可考虑:")
context_parts.append(" - 反手操作(平掉旧仓位,开新方向仓位)")
context_parts.append(" - 顺势短线(跟随小级别趋势,快进快出)")
context_parts.append(" - 等待大级别确认(避免假突破)")
else:
context_parts.append("\n✅ 各级别趋势一致,无反转信号")
return "\n".join(context_parts)
def _parse_llm_response(self, response: str, symbol: str) -> Dict[str, Any]:
"""解析 LLM 响应"""
try:

View File

@ -128,6 +128,14 @@ class TradingDecisionMaker:
2. **REDUCE减仓** - 如果趋势不明但需降低风险
3. **HOLD观望** - 如果反转信号不强
**特殊情况检测到多级别反转信号优先级最高**
- 如果信号中包含 **"🔄 强反转"** **"⚡ 趋势启动"** 标记
- 说明小级别已反转但大级别滞后这是**提前布局的机会**
- **优先选择FLIP_POSITION反手操作**
- 平掉当前持仓
- 开立新方向仓位如果信号质量足够
- 理由抓住小级别反转机会避免等待大级别确认而错失最佳点位
#### 情况C无持仓 + 有同向挂单
**优先选择OPEN新增挂单- 金字塔式布局**

View File

@ -9,7 +9,7 @@ from fastapi.responses import FileResponse
from contextlib import asynccontextmanager
from app.config import get_settings
from app.utils.logger import logger
from app.api import chat, stock, skills, llm, auth, admin, paper_trading, stocks, signals, system, real_trading, news
from app.api import chat, stock, skills, llm, auth, admin, paper_trading, stocks, signals, system, real_trading, news, astock
from app.utils.error_handler import setup_global_exception_handler, init_error_notifier
from app.utils.system_status import get_system_monitor
import os
@ -550,7 +550,8 @@ async def lifespan(app: FastAPI):
# 启动A股智能体
if getattr(settings, 'astock_monitor_enabled', True):
try:
from app.astock_agent import SectorMonitor
from app.astock_agent import SectorMonitor, AStockAgent
# 初始化板块监控(保留原有功能)
sector_monitor = SectorMonitor(
change_threshold=settings.astock_change_threshold,
top_n=settings.astock_top_n,
@ -558,6 +559,16 @@ async def lifespan(app: FastAPI):
)
# 保存实例供定时任务使用
_astock_monitor_instance = sector_monitor
# 初始化短期题材选股器(新功能)
try:
astock_agent = AStockAgent()
# 设置智能体实例到 API 模块
astock.set_astock_agent(astock_agent)
logger.info(f"A股智能体已初始化短期题材选股器")
except Exception as e:
logger.warning(f"A股短期题材选股器初始化失败: {e}可能缺少Tushare配置")
logger.info(f"A股智能体已初始化")
except Exception as e:
logger.error(f"A股智能体初始化失败: {e}")
@ -623,6 +634,15 @@ async def lifespan(app: FastAPI):
pass
logger.info("A股智能体已停止")
# 停止A股短期题材选股器
try:
from app.astock_agent import get_astock_agent
astock_agent = get_astock_agent()
astock_agent.stop()
logger.info("A股短期题材选股器已停止")
except:
pass
logger.info("应用关闭")
@ -654,6 +674,7 @@ app.include_router(llm.router, tags=["LLM模型"])
app.include_router(paper_trading.router, tags=["交易"])
app.include_router(real_trading.router, tags=["实盘交易"])
app.include_router(stocks.router, prefix="/api/stocks", tags=["美股分析"])
app.include_router(astock.router, prefix="/api/astock", tags=["A股分析"])
app.include_router(signals.router, tags=["信号管理"])
app.include_router(news.router, tags=["新闻管理"])
app.include_router(system.router, prefix="/api/system", tags=["系统状态"])

View File

@ -50,6 +50,9 @@ class PaperTradingService:
self.weak_trend_ratio = self.settings.paper_trading_weak_trend_ratio
self.sideways_tp_percent = self.settings.paper_trading_sideways_tp_percent
# 订单超时配置
self.order_timeout_hours = self.settings.paper_trading_order_timeout_hours # 挂单超时时间(小时)
# 确保表已创建
self._ensure_table_exists()
@ -678,6 +681,14 @@ class PaperTradingService:
if getattr(db_order, 'trailing_stop_triggered', 0) == 1:
status = OrderStatus.CLOSED_TS # 移动止盈
# === 最终安全校验:确保亏损的订单不会被标记为止盈 ===
# 如果实际亏损,强制将状态改为止损,避免被错误标记为移动止盈
if pnl_percent < 0:
if status in [OrderStatus.CLOSED_TP, OrderStatus.CLOSED_TS, OrderStatus.CLOSED_BE]:
logger.warning(f"订单亏损但被标记为 {status.value},强制修正为止损: "
f"{db_order.order_id} | {db_order.symbol} | 盈亏: {pnl_percent:+.2f}%")
status = OrderStatus.CLOSED_SL
# 更新订单
db_order.status = status
db_order.exit_price = exit_price
@ -1264,6 +1275,48 @@ class PaperTradingService:
'message': '取消挂单失败'
}
def check_and_cancel_expired_orders(self) -> List[Dict[str, Any]]:
"""
检查并取消超时未成交的挂单
Returns:
被取消的订单列表
"""
cancelled_orders = []
current_time = get_beijing_time()
timeout_threshold = timedelta(hours=self.order_timeout_hours)
# 遍历所有活跃订单
for order_id, order in list(self.active_orders.items()):
# 只检查挂单状态的订单
if order.status != OrderStatus.PENDING:
continue
# 计算订单已存在时间
order_age = current_time - order.created_at
# 如果超过超时时间,自动取消
if order_age > timeout_threshold:
logger.info(f"订单超时自动取消: {order_id} | {order.symbol} | "
f"已挂单: {order_age.total_seconds() / 3600:.1f}小时 | "
f"超时阈值: {self.order_timeout_hours}小时")
result = self._cancel_pending_order(order)
if result:
cancelled_orders.append({
'order_id': order_id,
'symbol': order.symbol,
'side': order.side.value,
'entry_price': order.entry_price,
'created_at': order.created_at.strftime('%Y-%m-%d %H:%M:%S'),
'age_hours': order_age.total_seconds() / 3600
})
if cancelled_orders:
logger.info(f"已自动取消 {len(cancelled_orders)} 个超时挂单")
return cancelled_orders
def update_order(self, order_id: str, entry_price: float = None,
stop_loss: float = None, take_profit: float = None) -> Dict[str, Any]:
"""
@ -1815,6 +1868,7 @@ class PaperTradingService:
OrderStatus.CLOSED_TP,
OrderStatus.CLOSED_SL,
OrderStatus.CLOSED_BE,
OrderStatus.CLOSED_TS,
OrderStatus.CLOSED_MANUAL
]),
PaperOrder.closed_at >= cutoff_time
@ -2121,6 +2175,7 @@ class PaperTradingService:
OrderStatus.CLOSED_TP,
OrderStatus.CLOSED_SL,
OrderStatus.CLOSED_BE,
OrderStatus.CLOSED_TS,
OrderStatus.CLOSED_MANUAL
]),
PaperOrder.closed_at >= start_date

View File

@ -0,0 +1,70 @@
#!/usr/bin/env python3
"""
检查 Tushare ths_daily API 返回的数据字段
"""
import asyncio
import sys
import os
from datetime import datetime, timedelta
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from app.config import get_settings
from app.astock_agent.tushare_client import get_tushare_client
async def check_api_fields():
"""检查API字段"""
print("\n" + "=" * 80)
print("🔍 检查 ths_daily API 返回字段")
print("=" * 80)
settings = get_settings()
ts_client = get_tushare_client(settings.tushare_token)
# 获取智能电网板块
sectors_df = ts_client.get_concept_sectors()
smart_grid = sectors_df[sectors_df['name'] == '智能电网']
if smart_grid.empty:
print("未找到智能电网板块")
return
ts_code = smart_grid.iloc[0]['ts_code']
print(f"\n板块代码: {ts_code}")
today = datetime.now().strftime('%Y%m%d')
yesterday = (datetime.now() - timedelta(days=10)).strftime('%Y%m%d')
daily_df = ts_client.pro.ths_daily(
ts_code=ts_code,
start_date=yesterday,
end_date=today
)
if daily_df.empty:
print("未获取到数据")
return
print(f"\n获取到 {len(daily_df)} 条数据")
print("\n数据列:")
print(daily_df.columns.tolist())
print("\n最近3天的数据:")
print(daily_df.tail(3).to_string())
print("\n" + "=" * 80)
print("\n字段分析:")
for col in daily_df.columns:
print(f" {col}: {daily_df[col].dtype}")
if col in ['volume', 'amount', 'vol', 'amt']:
print(f" 最新值: {daily_df[col].iloc[-1]}")
print(f" 前一日: {daily_df[col].iloc[-2] if len(daily_df) > 1 else 'N/A'}")
async def main():
await check_api_fields()
if __name__ == "__main__":
asyncio.run(main())

View File

@ -0,0 +1,225 @@
#!/usr/bin/env python3
"""
A股短期题材选股 - 调试版本
用于诊断为什么没有选出股票
"""
import asyncio
import sys
import os
from datetime import datetime, timedelta
# 添加项目根目录到Python路径
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from app.utils.logger import logger
from app.config import get_settings
from app.astock_agent.tushare_client import get_tushare_client
async def debug_tushare_connection():
"""测试Tushare连接"""
print("\n" + "=" * 60)
print("📊 测试1: Tushare连接测试")
print("=" * 60)
try:
settings = get_settings()
print(f"Token配置: {'已配置' if settings.tushare_token else '未配置'}")
if not settings.tushare_token:
print("❌ Tushare Token未配置请在.env文件中设置TUSHARE_TOKEN")
return False
ts_client = get_tushare_client(settings.tushare_token)
print(f"✅ Tushare客户端初始化成功")
# 测试基本API调用
print("\n测试API调用...")
# 测试获取概念板块
sectors_df = ts_client.get_concept_sectors()
print(f"概念板块数量: {len(sectors_df)}")
if not sectors_df.empty:
print(f"示例板块: {sectors_df.head(3)['name'].tolist()}")
else:
print("❌ 无法获取概念板块列表")
return False
return True
except Exception as e:
print(f"❌ Tushare连接失败: {e}")
import traceback
print(traceback.format_exc())
return False
async def debug_hot_sectors():
"""测试异动板块获取"""
print("\n" + "=" * 60)
print("📊 测试2: 异动板块检测")
print("=" * 60)
try:
settings = get_settings()
ts_client = get_tushare_client(settings.tushare_token)
sectors_df = ts_client.get_concept_sectors()
print(f"总概念板块数: {len(sectors_df)}")
# 检查异动板块
today = datetime.now().strftime('%Y%m%d')
print(f"当前日期: {today}")
# 检查是否是交易日
weekday = datetime.now().weekday()
if weekday >= 5:
print(f"⚠️ 当前是周末(周{weekday}),可能没有最新数据")
else:
print(f"✅ 当前是工作日(周{weekday}")
# 手动检查几个热门板块
print("\n检查前10个板块的行情...")
check_count = min(10, len(sectors_df))
hot_count = 0
for idx, row in sectors_df.head(check_count).iterrows():
ts_code = row['ts_code']
name = row['name']
try:
yesterday = (datetime.now() - timedelta(days=10)).strftime('%Y%m%d')
daily_df = ts_client.pro.ths_daily(
ts_code=ts_code,
start_date=yesterday,
end_date=today
)
if not daily_df.empty:
latest = daily_df.sort_values('trade_date').iloc[-1]
change_pct = float(latest.get('pct_change', 0))
trade_date = str(latest.get('trade_date', ''))
status = "🔥" if change_pct >= 2.0 else "📊"
print(f" {status} {name}: {change_pct:+.2f}% (日期: {trade_date})")
if change_pct >= 2.0:
hot_count += 1
else:
print(f" ⚠️ {name}: 无数据")
except Exception as e:
print(f"{name}: 查询失败 ({e})")
print(f"\n找到 {hot_count} 个涨幅≥2%的板块")
if hot_count == 0:
print("\n⚠️ 没有找到符合条件的异动板块,可能原因:")
print(" 1. 当前不是交易日(周末或节假日)")
print(" 2. 盘中时段数据未更新")
print(" 3. 市场整体表现平淡")
return hot_count > 0
except Exception as e:
print(f"❌ 异动板块检测失败: {e}")
import traceback
print(traceback.format_exc())
return False
async def debug_stock_screening():
"""测试个股筛选"""
print("\n" + "=" * 60)
print("📊 测试3: 个股筛选条件分析")
print("=" * 60)
print("\n筛选条件回顾:")
print(" 1. 市值: 50-500亿")
print(" 2. 换手率: 3%-15%")
print(" 3. 涨跌幅: -5% 到 +8%")
print(" 4. MA多头排列: MA5 > MA10 > MA20")
print(" 5. 量能配合: 量比 > 1.2")
print(" 6. 20日动量 > 0")
print(" 7. 距离高点回撤 < 15%")
print("\n⚠️ 如果没有选出股票,可能是因为:")
print(" 1. 市场整体不符合技术形态没有MA多头排列的股票")
print(" 2. 筛选条件较严格(可以尝试放宽参数)")
print(" 3. 数据时间窗口问题需要30天以上历史数据")
# 建议放宽的条件
print("\n建议放宽的参数(在当前市场环境下):")
print(" - 换手率: 1%-15% (降低下限)")
print(" - 涨跌幅: -7% 到 +10% (扩大范围)")
print(" - 市值: 30-500亿 (降低下限)")
async def debug_selector_run():
"""尝试运行选股器并显示详细信息"""
print("\n" + "=" * 60)
print("📊 测试4: 运行选股器(详细日志)")
print("=" * 60)
try:
from app.astock_agent.short_term_thematic_selector import get_thematic_selector
settings = get_settings()
ts_client = get_tushare_client(settings.tushare_token)
selector = get_thematic_selector(ts_client)
# 运行选股,启用详细日志
import logging
logging.basicConfig(level=logging.DEBUG)
result = selector.select_stocks(max_stocks=10)
print(f"\n选股结果: {result['total_stocks']}")
if result['total_stocks'] == 0:
print("\n❌ 未选出股票")
print("\n请查看上方详细日志,分析哪个环节过滤掉了股票")
else:
print("\n✅ 选股成功!")
print(selector.format_output_text(result))
except Exception as e:
print(f"❌ 选股器运行失败: {e}")
import traceback
print(traceback.format_exc())
async def main():
"""主函数"""
print("\n" + "=" * 60)
print("🔍 A股选股器诊断工具")
print("=" * 60)
# 运行所有测试
step1_ok = await debug_tushare_connection()
if not step1_ok:
print("\n❌ Tushare连接失败请检查配置")
return 1
step2_ok = await debug_hot_sectors()
if not step2_ok:
print("\n⚠️ 没有找到异动板块,这是正常的(取决于市场情况)")
await debug_stock_screening()
print("\n" + "=" * 60)
print("📋 诊断完成")
print("=" * 60)
print("\n如果所有测试通过但未选出股票,说明当前市场条件不符合策略要求。")
print("这是正常的,策略不会在市场条件不符合时强行选股。")
print("\n建议:")
print(" 1. 在交易日15:00后运行确保有完整数据")
print(" 2. 或者放宽筛选条件以适应当前市场环境")
print("")
return 0
if __name__ == "__main__":
exit_code = asyncio.run(main())
sys.exit(exit_code)

203
backend/diagnose_astock.py Normal file
View File

@ -0,0 +1,203 @@
#!/usr/bin/env python3
"""
A股短期题材选股 - 详细诊断版本
显示每个股票的筛选过程
"""
import asyncio
import sys
import os
from datetime import datetime, timedelta
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from app.utils.logger import logger
from app.config import get_settings
from app.astock_agent.tushare_client import get_tushare_client
async def diagnose_sector_stocks():
"""诊断板块成分股的筛选过程"""
print("\n" + "=" * 60)
print("🔍 A股选股详细诊断")
print("=" * 60)
settings = get_settings()
ts_client = get_tushare_client(settings.tushare_token)
# 1. 测试获取一个板块的成分股
print("\n【测试】获取智能电网板块成分股...")
try:
# 使用智能电网板块代码(与选股器一致)
sector_code = "885311.TI"
members_df = ts_client.get_sector_members(sector_code)
if members_df.empty:
print("❌ 无法获取板块成分股")
return
stock_codes = members_df['con_code'].tolist()[:10] # 只测试前10只
print(f"✓ 获取到 {len(stock_codes)} 只成分股测试前10只")
print(f"股票代码: {stock_codes}")
# 2. 获取这些股票的实时行情
print("\n【测试】获取实时行情...")
today = datetime.now().strftime('%Y%m%d')
yesterday = (datetime.now() - timedelta(days=10)).strftime('%Y%m%d')
all_stocks_data = []
for stock_code in stock_codes:
try:
daily_df = ts_client.pro.daily(
ts_code=stock_code,
start_date=yesterday,
end_date=today
)
if daily_df.empty:
print(f" ⚠️ {stock_code}: 无历史数据")
continue
daily_df = daily_df.sort_values('trade_date')
latest = daily_df.iloc[-1]
stock_info = {
'ts_code': stock_code,
'name': latest.get('name', ''),
'close': float(latest['close']),
'pct_chg': float(latest['pct_chg']),
'vol': float(latest['vol']),
'amount': float(latest['amount']) * 1000,
'trade_date': str(latest['trade_date'])
}
all_stocks_data.append(stock_info)
except Exception as e:
print(f"{stock_code}: 获取失败 - {e}")
continue
print(f"\n✓ 成功获取 {len(all_stocks_data)} 只股票的行情")
# 3. 获取每日指标
print("\n【测试】获取每日指标(换手率等)...")
basic_df = ts_client.pro.daily_basic(
ts_code=','.join(stock_codes),
trade_date=all_stocks_data[0]['trade_date'],
fields='ts_code,trade_date,turnover_rate,pe,pb'
)
if basic_df.empty:
print("⚠️ 无法获取每日指标数据")
else:
print(f"✓ 获取到 {len(basic_df)} 只股票的每日指标")
# 4. 逐个检查筛选条件
print("\n【测试】逐个检查筛选条件...")
print("=" * 80)
for stock_info in all_stocks_data:
stock_code = stock_info['ts_code']
name = stock_info['name']
close = stock_info['close']
pct_chg = stock_info['pct_chg']
vol = stock_info['vol']
amount = stock_info['amount']
print(f"\n🔍 {name}({stock_code}):")
print(f" 日期: {stock_info['trade_date']}")
print(f" 现价: ¥{close:.2f}, 涨跌幅: {pct_chg:+.2f}%")
# 检查1: ST股票
if 'ST' in name or '退' in name:
print(f" ❌ ST/退市股,被过滤")
continue
print(f" ✓ 不是ST/退市股")
# 检查2: 换手率
basic_row = basic_df[basic_df['ts_code'] == stock_code]
if not basic_row.empty:
turnover = float(basic_row.iloc[0].get('turnover_rate', 0))
print(f" 换手率: {turnover:.2f}%")
if 1.0 <= turnover <= 20.0:
print(f" ✓ 换手率符合")
else:
print(f" ❌ 换手率不符合需要1%-20%")
continue
else:
print(f" ⚠️ 无换手率数据")
turnover = 0
# 检查3: MA多头排列
try:
start_date = (datetime.now() - timedelta(days=60)).strftime('%Y%m%d')
daily_df = ts_client.pro.daily(
ts_code=stock_code,
start_date=start_date,
end_date=today
)
if daily_df.empty or len(daily_df) < 30:
print(f" ❌ 历史数据不足需要30天以上无法计算MA")
continue
daily_df = daily_df.sort_values('trade_date').reset_index(drop=True)
close_series = daily_df['close']
vol_series = daily_df['vol']
ma5 = close_series.rolling(window=5).mean().iloc[-1]
ma10 = close_series.rolling(window=10).mean().iloc[-1]
ma20 = close_series.rolling(window=20).mean().iloc[-1]
print(f" MA5: ¥{ma5:.2f}, MA10: ¥{ma10:.2f}, MA20: ¥{ma20:.2f}")
if ma5 > ma20:
print(f" ✓ MA趋势符合MA5 > MA20")
else:
print(f" ❌ MA趋势不符合需要 MA5 > MA20")
continue
# 检查4: 量能
ma5_vol = vol_series.rolling(window=5).mean().iloc[-1]
volume_ratio = vol / ma5_vol if ma5_vol > 0 else 1
print(f" 量比: {volume_ratio:.2f}")
if volume_ratio >= 0.7:
print(f" ✓ 量能符合≥0.7")
else:
print(f" ❌ 量能不足量比需要≥0.7")
continue
# 检查5: 市值
if turnover > 0:
market_cap = amount / (turnover / 100)
market_cap_yi = market_cap / 100000000
print(f" 市值: {market_cap_yi:.2f}亿")
if 30 <= market_cap_yi <= 1000:
print(f" ✓ 市值符合")
else:
print(f" ❌ 市值不符合需要30-1000亿")
continue
print(f" ✅✅✅ {name}({stock_code}) 通过所有筛选条件!")
except Exception as e:
print(f" ❌ 计算技术指标失败: {e}")
import traceback
print(traceback.format_exc())
except Exception as e:
print(f"❌ 诊断失败: {e}")
import traceback
traceback.print_exc()
async def main():
"""主函数"""
await diagnose_sector_stocks()
print("\n" + "=" * 60)
print("诊断完成")
print("=" * 60)
if __name__ == "__main__":
asyncio.run(main())

View File

@ -0,0 +1,150 @@
#!/usr/bin/env python3
"""
诊断板块资金异动检测
检查为什么没有找到异动板块
"""
import asyncio
import sys
import os
from datetime import datetime, timedelta
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from app.utils.logger import logger
from app.config import get_settings
from app.astock_agent.tushare_client import get_tushare_client
async def diagnose_sector_detection():
"""诊断板块检测"""
print("\n" + "=" * 80)
print("🔍 板块资金异动诊断")
print("=" * 80)
settings = get_settings()
ts_client = get_tushare_client(settings.tushare_token)
# 获取热门概念板块
print("\n【第一步】获取热门概念板块...")
hot_concept_sectors = [
'人工智能', '新能源汽车', '芯片', '半导体', '5G',
'智能电网', '物联网', '云计算', '大数据', '区块链'
]
sectors_df = ts_client.get_concept_sectors()
# 找到热门板块
hot_sectors_codes = []
for hot_name in hot_concept_sectors:
matches = sectors_df[sectors_df['name'].str.contains(hot_name, na=False)]
if not matches.empty:
for _, row in matches.iterrows():
hot_sectors_codes.append({
'ts_code': row['ts_code'],
'name': row['name']
})
print(f"✓ 找到 {len(hot_sectors_codes)} 个热门板块")
# 检查这些板块的资金异动
print("\n【第二步】检查板块资金异动(量比、额比、涨幅)...")
print("=" * 80)
today = datetime.now().strftime('%Y%m%d')
yesterday = (datetime.now() - timedelta(days=10)).strftime('%Y%m%d')
# 宽松模式的阈值
vol_threshold = 1.5
amount_threshold = 1.3
min_change = 0.5
qualified_sectors = []
for sector_info in hot_sectors_codes[:15]: # 只检查前15个
ts_code = sector_info['ts_code']
name = sector_info['name']
try:
daily_df = ts_client.pro.ths_daily(
ts_code=ts_code,
start_date=yesterday,
end_date=today
)
if daily_df.empty or len(daily_df) < 2:
print(f" ⚠️ {name}: 数据不足")
continue
daily_df = daily_df.sort_values('trade_date')
latest = daily_df.iloc[-1]
prev = daily_df.iloc[-2]
latest_vol = float(latest.get('vol', 0))
latest_avg_price = float(latest.get('avg_price', 0))
latest_amount = latest_vol * latest_avg_price * 100 # 估算成交额
prev_vol = float(prev.get('vol', 0))
prev_avg_price = float(prev.get('avg_price', 0))
prev_amount = prev_vol * prev_avg_price * 100
change_pct = float(latest.get('pct_change', 0))
# 计算量比和额比
vol_ratio = latest_vol / prev_vol if prev_vol > 0 else 0
amount_ratio = latest_amount / prev_amount if prev_amount > 0 else 0
# 判断是否符合条件
is_volume_surge = vol_ratio >= vol_threshold
is_amount_surge = amount_ratio >= amount_threshold
has_min_change = change_pct >= min_change
is_qualified = (is_volume_surge or is_amount_surge) and has_min_change
# 显示结果
status = "" if is_qualified else ""
vol_status = "🔥" if is_volume_surge else "📊"
amount_status = "🔥" if is_amount_surge else "📊"
change_status = "" if has_min_change else ""
print(f" {status} {name}")
print(f" 涨跌幅: {change_pct:+.2f}% {change_status}")
print(f" 量比: {vol_ratio:.2f}x {vol_status} (需要≥{vol_threshold})")
print(f" 额比: {amount_ratio:.2f}x {amount_status} (需要≥{amount_threshold})")
if is_qualified:
qualified_sectors.append({
'name': name,
'change_pct': change_pct,
'vol_ratio': vol_ratio,
'amount_ratio': amount_ratio
})
except Exception as e:
print(f"{name}: 查询失败 ({e})")
continue
print("\n" + "=" * 80)
print(f"【结果】找到 {len(qualified_sectors)} 个符合资金异动条件的板块")
if len(qualified_sectors) == 0:
print("\n⚠️ 没有板块符合条件,可能原因:")
print(" 1. 市场整体资金流入不足(量比、额比都未达标)")
print(" 2. 板块涨幅不够需要≥0.5%")
print(" 3. 阈值设置过高当前量比≥1.5额比≥1.3")
print("\n建议放宽阈值:")
print(" - 量比阈值: 1.5 → 1.2")
print(" - 额比阈值: 1.3 → 1.1")
print(" - 最小涨幅: 0.5% → 0.3%")
else:
print("\n✅ 符合条件的板块:")
for idx, sector in enumerate(qualified_sectors, 1):
print(f" {idx}. {sector['name']}: {sector['change_pct']:+.2f}%, "
f"量比{sector['vol_ratio']:.2f}x, 额比{sector['amount_ratio']:.2f}x")
print("=" * 80)
async def main():
await diagnose_sector_detection()
if __name__ == "__main__":
asyncio.run(main())

View File

@ -0,0 +1,60 @@
#!/usr/bin/env python3
"""
A股短期题材选股 - 手动执行脚本
"""
import asyncio
import sys
import os
# 添加项目根目录到Python路径
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from app.utils.logger import logger
from app.astock_agent.astock_agent import get_astock_agent
async def main():
"""手动执行选股"""
# 解析命令行参数
strict_mode = '--strict' in sys.argv or '-s' in sys.argv
try:
print("\n" + "=" * 60)
mode_text = "严格模式" if strict_mode else "宽松模式(适应当前市场)"
print(f"📊 A股短期题材选股 - 手动执行 [{mode_text}]")
print("=" * 60)
if not strict_mode:
print("\n💡 使用宽松模式:")
print(" - 市值: 30-1000亿原50-500亿")
print(" - 换手率: 1%-20%原3%-15%")
print(" - 板块涨幅: ≥1.5%原2%")
print(" - 量比: ≥1.0原1.2")
print("\n使用 --strict 或 -s 参数启用严格模式")
# 获取智能体实例
agent = get_astock_agent()
# 设置模式
agent.selector.strict_mode = strict_mode
# 执行选股
result = await agent.run_once()
# 输出结果
print("\n" + "=" * 60)
print(agent.selector.format_output_text(result))
print("=" * 60 + "\n")
return 0
except Exception as e:
logger.error(f"选股执行失败: {e}")
import traceback
logger.error(traceback.format_exc())
return 1
if __name__ == "__main__":
exit_code = asyncio.run(main())
sys.exit(exit_code)

View File

@ -0,0 +1,130 @@
#!/usr/bin/env python3
"""
测试单个股票的筛选逻辑
"""
import asyncio
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from app.utils.logger import logger
from app.config import get_settings
from app.astock_agent.tushare_client import get_tushare_client
from app.astock_agent.short_term_thematic_selector import get_thematic_selector
async def test_single_stock():
"""测试单个股票"""
print("\n" + "=" * 80)
print("🔍 测试单个股票筛选")
print("=" * 80)
settings = get_settings()
ts_client = get_tushare_client(settings.tushare_token)
selector = get_thematic_selector(ts_client)
# 测试股票代码(从诊断脚本中找到的通过股票)
test_stock = "000682.SZ"
# 获取该股票所属的板块
sectors_df = ts_client.get_concept_sectors()
smart_grid = sectors_df[sectors_df['name'] == '智能电网']
if smart_grid.empty:
print("未找到智能电网板块")
return
sector_code = smart_grid.iloc[0]['ts_code']
sector_name = smart_grid.iloc[0]['name']
# 获取板块成分股
members_df = ts_client.get_sector_members(sector_code)
stock_codes = members_df['con_code'].tolist()
if test_stock not in stock_codes:
print(f"{test_stock} 不在智能电网板块中")
return
print(f"\n测试股票: {test_stock}")
print(f"所属板块: {sector_name} ({sector_code})")
print(f"板块成分股数量: {len(stock_codes)}")
print(f"测试股票在板块中的位置: {stock_codes.index(test_stock) + 1}/{len(stock_codes)}")
# 获取实时行情 - 检查更多股票
check_count = min(200, len(stock_codes))
print(f"\n获取前 {check_count} 只股票的实时行情...")
realtime_df = ts_client.get_realtime_data(stock_codes[:check_count])
if realtime_df.empty:
print("无法获取实时行情")
return
print(f"实时行情数据获取成功,共 {len(realtime_df)} 只股票")
# 检查目标股票是否在行情数据中
if test_stock not in realtime_df['ts_code'].values:
print(f"{test_stock} 不在行情数据中")
print(f"行情数据中的股票: {realtime_df['ts_code'].tolist()[:10]}")
return
stock_row = realtime_df[realtime_df['ts_code'] == test_stock].iloc[0]
print(f"\n{test_stock} 行情数据:")
print(f" 现价: {stock_row['close']}")
print(f" 涨跌幅: {stock_row['pct_chg']}%")
print(f" 成交量: {stock_row['vol']}")
print(f" 成交额: {stock_row['amount']}千元")
# 获取每日指标
trade_date = realtime_df.iloc[0]['trade_date']
basic_df = ts_client.get_stock_daily_basic([test_stock], str(trade_date))
print(f"\n每日指标数据: {'' if not basic_df.empty else ''}")
if not basic_df.empty:
basic_row = basic_df[basic_df['ts_code'] == test_stock]
if not basic_row.empty:
print(f" 换手率: {basic_row.iloc[0]['turnover_rate']}%")
# 调用选股器的内部检查函数
print("\n开始筛选检查...")
print("=" * 80)
# 检查所有股票
passed_stocks = []
for idx, stock_code in enumerate(stock_codes[:check_count], 1):
try:
result = selector._check_single_stock(
stock_code=stock_code,
sector_name=sector_name,
sector_change=2.77,
realtime_df=realtime_df,
basic_df=basic_df
)
if result:
passed_stocks.append((stock_code, result.get('name', '')))
print(f" ✓ [{idx}] {stock_code}: {result.get('name', '')}")
except Exception as e:
print(f" ✗ [{idx}] {stock_code}: 检查失败 - {e}")
print("=" * 80)
print(f"\n检查了 {check_count} 只股票,通过筛选: {len(passed_stocks)}")
if passed_stocks:
print("\n✅ 通过的股票:")
for stock_code, name in passed_stocks[:20]: # 只显示前20只
print(f" - {stock_code}: {name}")
if len(passed_stocks) > 20:
print(f" ... 还有 {len(passed_stocks) - 20}")
else:
print("\n❌ 没有股票通过筛选")
async def main():
await test_single_stock()
if __name__ == "__main__":
asyncio.run(main())

View File

@ -243,6 +243,11 @@
color: #ffc107;
}
.status-badge.closed_ts {
background: rgba(75, 192, 192, 0.1);
color: #4bc0c0;
}
.status-badge.closed_manual {
background: rgba(255, 165, 0, 0.1);
color: orange;
@ -2159,6 +2164,7 @@
'closed_tp': '止盈',
'closed_sl': '止损',
'closed_be': '保本止损',
'closed_ts': '移动止盈',
'closed_manual': '手动平仓'
};
return map[status] || status;