Initial commit: A股量化交易系统

主要功能:
- K线形态策略: 两阳+阴+阳突破形态识别
- 信号时间修复: 使用K线时间而非发送时间
- 换手率约束: 最后阳线换手率不超过40%
- 汇总通知: 钉钉webhook单次发送所有信号
- 数据获取: 支持AKShare数据源
- 舆情分析: 北向资金、热门股票等

技术特性:
- 支持日线/周线/月线多时间周期
- EMA20趋势确认
- 实体比例验证
- 突破价格确认
- 流动性约束检查

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
aaron 2025-09-16 15:59:48 +08:00
commit 283901df18
19 changed files with 3586 additions and 0 deletions

12
.gitignore vendored Normal file
View File

@ -0,0 +1,12 @@
venv/
__pycache__/
*.pyc
*.pyo
*.pyd
.Python
*.so
.env
.venv
logs/*.log
.DS_Store
.claude/

182
README.md Normal file
View File

@ -0,0 +1,182 @@
# A股量化交易系统
一个基于Python的A股市场监控和选股量化程序使用adata数据源。
## 功能特性
- 📈 **实时数据获取**: 使用adata获取A股实时行情数据
- 🔍 **舆情分析**: 北向资金、融资融券、热点股票、龙虎榜数据分析
- 📊 **股票筛选**: 基于技术指标和基本面的智能选股
- 💰 **市场监控**: 实时监控价格变动、成交量异常、资金流向
- 💹 **策略回测**: 历史数据验证交易策略效果
- ⚙️ **灵活配置**: 支持通过配置文件自定义参数
## 项目结构
```
TradingAI/
├── main.py # 程序入口
├── requirements.txt # 依赖包列表
├── config/
│ └── config.yaml # 配置文件
├── src/ # 源代码
│ ├── data/ # 数据获取模块
│ │ ├── __init__.py
│ │ ├── data_fetcher.py # 行情数据获取
│ │ └── sentiment_fetcher.py # 舆情数据获取
│ ├── strategy/ # 策略模块
│ ├── monitor/ # 监控模块
│ └── utils/ # 工具模块
│ ├── __init__.py
│ └── config_loader.py
├── tests/ # 测试文件
├── logs/ # 日志文件
└── data/ # 数据文件
```
## 环境要求
- Python 3.8+
- 依赖包见 requirements.txt
## 快速开始
### 1. 克隆项目
```bash
git clone <your-repo-url>
cd TradingAI
```
### 2. 创建虚拟环境
```bash
python -m venv venv
source venv/bin/activate # Linux/Mac
# or
venv\Scripts\activate # Windows
```
### 3. 安装依赖
```bash
pip install -r requirements.txt
```
### 4. 运行程序
```bash
python main.py
```
## 配置说明
配置文件位于 `config/config.yaml`,包含以下主要配置:
- **trading**: 交易相关配置(交易时间、风险控制等)
- **data**: 数据源配置(更新频率、存储格式等)
- **strategy**: 策略配置(技术指标参数、选股条件等)
- **monitor**: 监控配置(报警阈值等)
- **logging**: 日志配置
## 使用示例
### 获取实时行情
```python
from src.data.data_fetcher import ADataFetcher
fetcher = ADataFetcher()
# 获取单只股票实时数据
data = fetcher.get_realtime_data("000001.SZ")
# 获取多只股票实时数据
data = fetcher.get_realtime_data(["000001.SZ", "000002.SZ"])
```
### 舆情分析
```python
from src.data.sentiment_fetcher import SentimentFetcher
sentiment_fetcher = SentimentFetcher()
# 获取北向资金流向
north_flow = sentiment_fetcher.get_north_flow_current()
# 获取热门股票排行
hot_stocks = sentiment_fetcher.get_popular_stocks_east_100()
# 获取龙虎榜数据
dragon_tiger = sentiment_fetcher.get_dragon_tiger_list_daily()
# 分析单只股票舆情
analysis = sentiment_fetcher.analyze_stock_sentiment("000001.SZ")
```
### 搜索股票
```python
# 搜索包含"平安"的股票
results = fetcher.search_stocks("平安")
```
### 获取历史数据
```python
# 获取历史日线数据
hist_data = fetcher.get_historical_data(
stock_code="000001.SZ",
start_date="2023-01-01",
end_date="2023-12-31"
)
```
## 命令行界面
程序启动后提供交互式命令行界面:
- `help` - 显示帮助信息
- `status` - 显示系统状态
- `market` - 显示市场概况
- `search <关键词>` - 搜索股票
- `sentiment` - 显示市场舆情综合概览
- `hotstock` - 显示热门股票排行
- `northflow` - 显示北向资金流向
- `dragon` - 显示龙虎榜数据
- `analyze <股票代码>` - 分析单只股票舆情
- `quit` - 退出程序
## 舆情分析功能
### 数据来源
- **北向资金**: 沪深股通资金流向数据
- **融资融券**: 两融余额和变化趋势
- **热点股票**: 东方财富、同花顺人气排行
- **龙虎榜**: 每日异动股票上榜数据
- **风险扫描**: 个股风险评估
### 主要指标
- 资金流入流出情况
- 市场热度排名
- 机构席位动向
- 个股异动原因
## 开发计划
- [x] 基础数据获取功能
- [x] 舆情数据分析模块
- [ ] 技术指标计算模块
- [ ] 选股策略实现
- [ ] 实时监控功能
- [ ] 回测系统
- [ ] Web界面
- [ ] 报警通知系统
## 注意事项
1. 首次使用需要确保网络连接正常adata需要从网络获取数据
2. 请合理使用数据接口,避免频繁请求
3. 舆情数据仅供参考,投资需谨慎
4. 本系统仅供学习和研究使用,不构成投资建议
5. 实盘交易请谨慎,注意风险控制
## 许可证
MIT License
## 贡献
欢迎提交Issue和Pull Request

248
STRATEGY_USAGE.md Normal file
View File

@ -0,0 +1,248 @@
# K线形态策略使用指南
## 策略介绍
"两阳线+阴线+阳线"形态突破策略,用于识别股票的技术性突破信号。
### 策略逻辑
1. **形态识别**: 连续4根K线形成"阳线+阳线+阴线+阳线"的组合
2. **实体验证**: 前两根阳线的实体部分须占振幅的55%以上
3. **最后阳线验证**: 最后一根阳线的实体部分须占振幅的40%以上
4. **突破确认**: 最后一根阳线的收盘价须高于阴线的最高价
5. **趋势确认**: 最后一根阳线的收盘价须在EMA20上方
### 信号触发条件
- ✅ 形态完整匹配
- ✅ 前两根阳线实体比例达标(>55%
- ✅ 最后阳线实体比例达标(>40%
- ✅ 价格突破确认
- ✅ EMA20趋势确认
## 配置说明
### 1. 启用策略
`config/config.yaml` 中配置:
```yaml
strategy:
kline_pattern:
enabled: true # 启用K线形态策略
min_entity_ratio: 0.55 # 前两根阳线实体最小占振幅比例55%
final_yang_min_ratio: 0.40 # 最后阳线实体最小占振幅比例40%
timeframes: ["daily", "weekly", "monthly"] # 支持的时间周期
scan_stocks_count: 50 # 扫描股票数量限制
analysis_days: 60 # 分析的历史天数
```
### 2. 通知配置
#### 钉钉机器人通知
```yaml
notification:
dingtalk:
enabled: true # 启用钉钉通知
webhook_url: "https://oapi.dingtalk.com/robot/send?access_token=YOUR_TOKEN"
at_all: false # 是否@所有人
at_mobiles: [] # @指定手机号列表
```
要获取钉钉webhook地址
1. 打开钉钉群聊
2. 点击群设置 → 智能群助手 → 添加机器人 → 自定义
3. 设置机器人名称和头像
4. 选择安全设置关键词、加签或IP地址
5. 复制webhook地址到配置文件
## 使用方法
### 1. 启动系统
```bash
source venv/bin/activate
python main.py
```
### 2. 可用命令
```bash
# 显示策略信息
> strategy
# 扫描单只股票K线形态
> scan 000001.SZ
# 扫描市场K线形态使用双数据源同花顺热股+东财人气榜,注意:耗时较长)
> scanmarket
# 测试通知功能
> testnotify
# 显示帮助
> help
# 退出程序
> quit
```
### 3. 程序化使用
```python
from src.data.data_fetcher import ADataFetcher
from src.utils.notification import NotificationManager
from src.strategy.kline_pattern_strategy import KLinePatternStrategy
# 初始化组件
data_fetcher = ADataFetcher()
notification_manager = NotificationManager(notification_config)
strategy_config = {
'min_entity_ratio': 0.55,
'timeframes': ['daily', 'weekly', 'monthly'],
'scan_stocks_count': 50,
'analysis_days': 60
}
strategy = KLinePatternStrategy(data_fetcher, notification_manager, strategy_config)
# 分析单只股票
results = strategy.analyze_stock("000001.SZ")
# 扫描市场
market_results = strategy.scan_market(max_stocks=20)
```
## 信号示例
### 成功信号示例
当检测到形态信号时,系统会:
1. **日志记录**:
```
策略信号: 两阳+阴+阳突破 | 000001.SZ(平安银行) | daily | 11.44元
额外信息: {'阳线1实体比例': '56.2%', '阳线2实体比例': '58.0%', '突破幅度': '0.62%', '阴线最高价': '11.37', '突破价格': '11.44'}
```
2. **钉钉通知**(如已配置):
```markdown
# 📈 两阳+阴+阳突破信号提醒
**股票信息:**
- 代码: `000001.SZ`
- 名称: `平安银行`
- 价格: `11.44`
- 时间周期: `daily`
**信号时间:** 2025-09-16 09:10:15
**策略说明:** 两阳线+阴线+阳线形态突破
**额外信息:**
- 阳线1实体比例: `56.2%`
- 阳线2实体比例: `58.0%`
- 突破幅度: `0.62%`
- 阴线最高价: `11.37`
- 突破价格: `11.44`
```
## 注意事项
### 1. 数据源
- 优先使用adata真实数据
- 如无法获取真实数据,会生成模拟数据进行测试
- 模拟数据中会人为插入形态信号用于验证策略逻辑
### 2. 时间周期
- **daily**: 日线数据,信号较频繁
- **weekly**: 周线数据,从日线转换而来
- **monthly**: 月线数据,信号较少但质量较高
### 3. 风险控制
- 策略仅用于信号识别,不包含仓位管理
- 实盘使用时请结合其他指标和风险控制措施
- 建议设置止损和止盈规则
### 4. 性能优化
- `scan_stocks_count` 控制扫描数量,避免过度消耗资源
- `analysis_days` 控制历史数据量,影响分析速度
- 市场扫描建议在非交易时间进行
## 数据源说明
### 股票扫描范围
系统在进行市场扫描时优先使用**双数据源合并**策略:
1. **数据源组合**:
- **同花顺热股TOP100**:热度值、概念标签、人气标签
- **东方财富人气榜TOP100**:人气排名、价格变动、成交活跃度
- **智能去重**:自动识别重复股票,保留最优质数据
2. **合并优势**:
- 覆盖面更广,减少遗漏优质股票
- 两大平台数据互补,提升准确性
- 关注度和活跃度双重验证
- 提升信号质量和市场代表性
3. **数据特征**:
- 股票代码、名称、涨跌幅
- 热度值、人气排名
- 概念标签AI PC、脑机接口、新能源等
- 数据源标识,便于追踪
4. **回退机制**:
- 双数据源获取失败时,单独使用同花顺数据
- 热股数据完全失败时,回退到全市场股票
- 确保系统稳定运行
## 扩展开发
### 添加新的形态策略
1. 在 `src/strategy/` 下创建新的策略模块
2. 继承或参考 `KLinePatternStrategy` 的设计
3. 在配置文件中添加相应配置项
4. 在主程序中集成新策略
### 自定义通知方式
1. 在 `src/utils/notification.py` 中添加新的通知器类
2. 在 `NotificationManager` 中集成新通知器
3. 在配置文件中添加相应配置
### 优化检测算法
1. 修改 `detect_pattern()` 方法中的逻辑
2. 调整 `calculate_kline_features()` 中的特征计算
3. 添加更多的过滤条件和验证规则
## 故障排除
### 常见问题
1. **无法获取数据**: 检查网络连接和adata配置
2. **钉钉通知失败**: 验证webhook地址和安全设置
3. **策略未启用**: 检查配置文件中的 `enabled` 设置
4. **内存占用过高**: 减少 `scan_stocks_count``analysis_days`
### 调试模式
运行测试脚本进行调试:
```bash
python test_strategy.py
```
### 日志查看
查看详细日志:
```bash
tail -f logs/trading.log
```

98
config/config.yaml Normal file
View File

@ -0,0 +1,98 @@
# A股量化交易配置文件
trading:
# 交易时间配置
trading_hours:
start: "09:30:00"
end: "15:00:00"
lunch_break_start: "11:30:00"
lunch_break_end: "13:00:00"
# 股票池配置
stock_pool:
# 默认关注的指数成分股
index_codes: ["000001.SZ", "000300.SZ", "000905.SZ"] # 上证指数、沪深300、中证500
# 排除的股票代码
exclude_codes: []
# 风险控制
risk_management:
max_position_per_stock: 0.05 # 单股最大仓位比例
max_total_position: 0.9 # 最大总仓位比例
stop_loss_ratio: 0.05 # 止损比例
take_profit_ratio: 0.15 # 止盈比例
# 数据配置
data:
# 数据源配置
sources:
primary: "adata"
# 数据更新频率
update_frequency:
realtime: "1min" # 实时数据更新频率
daily: "after_close" # 日线数据更新时机
# 数据存储
storage:
path: "data/"
format: "parquet" # 数据存储格式
# 策略配置
strategy:
# 技术指标参数
indicators:
ma_periods: [5, 10, 20, 50] # 移动平均线周期
rsi_period: 14 # RSI周期
macd_params: [12, 26, 9] # MACD参数
# 选股条件
selection_criteria:
min_market_cap: 1000000000 # 最小市值(元)
max_pe_ratio: 30 # 最大市盈率
min_volume_ratio: 1.5 # 最小成交量比率
# K线形态策略配置
kline_pattern:
enabled: true # 是否启用K线形态策略
min_entity_ratio: 0.55 # 前两根阳线实体最小占振幅比例55%
final_yang_min_ratio: 0.40 # 最后阳线实体最小占振幅比例40%
timeframes: ["daily", "weekly", "monthly"] # 支持的时间周期
scan_stocks_count: 1000 # 扫描股票数量限制
analysis_days: 90 # 分析的历史天数
# 监控配置
monitor:
# 实时监控
realtime:
enabled: true
refresh_interval: 60 # 刷新间隔(秒)
# 报警配置
alerts:
price_change_threshold: 0.05 # 价格变动报警阈值
volume_spike_threshold: 3.0 # 成交量异常报警阈值
# 日志配置
logging:
level: "INFO"
format: "{time:YYYY-MM-DD HH:mm:ss} | {level} | {name} | {message}"
rotation: "1 day"
retention: "30 days"
file_path: "logs/trading.log"
# 通知配置
notification:
# 钉钉机器人配置
dingtalk:
enabled: true # 是否启用钉钉通知
webhook_url: "https://oapi.dingtalk.com/robot/send?access_token=50ad2c14e3c8bf7e262ba837dc2a35cb420228ee4165abd69a9e678c901e120e" # 钉钉机器人webhook地址需要用户配置
secret: "SEC6e9dbd71d4addd2c4e673fb72d686293b342da5ae48da2f8ec788a68de99f981" # 加签密钥
at_all: false # 是否@所有人
at_mobiles: [] # @指定手机号列表
# 其他通知方式
email:
enabled: false # 邮件通知(预留)
wechat:
enabled: false # 微信通知(预留)

540
main.py Normal file
View File

@ -0,0 +1,540 @@
#!/usr/bin/env python3
"""
A股量化交易主程序
"""
import sys
from pathlib import Path
# 将src目录添加到Python路径
current_dir = Path(__file__).parent
src_dir = current_dir / "src"
sys.path.insert(0, str(src_dir))
from loguru import logger
from src.utils.config_loader import config_loader
from src.data.data_fetcher import ADataFetcher
from src.data.sentiment_fetcher import SentimentFetcher
from src.utils.notification import NotificationManager
from src.strategy.kline_pattern_strategy import KLinePatternStrategy
def setup_logging():
"""设置日志配置"""
log_config = config_loader.get_logging_config()
# 移除默认的控制台日志
logger.remove()
# 添加控制台输出
logger.add(
sys.stdout,
level=log_config.get('level', 'INFO'),
format=log_config.get('format', '{time} | {level} | {message}')
)
# 添加文件输出
log_file = Path(log_config.get('file_path', 'logs/trading.log'))
log_file.parent.mkdir(parents=True, exist_ok=True)
logger.add(
log_file,
level=log_config.get('level', 'INFO'),
format=log_config.get('format', '{time} | {level} | {message}'),
rotation=log_config.get('rotation', '1 day'),
retention=log_config.get('retention', '30 days')
)
logger.info("日志系统初始化完成")
def main():
"""主函数"""
print("="*60)
print(" A股量化交易系统")
print("="*60)
try:
# 初始化日志
setup_logging()
# 加载配置
config = config_loader.load_config()
logger.info("配置文件加载成功")
# 初始化数据获取器
data_fetcher = ADataFetcher()
sentiment_fetcher = SentimentFetcher()
# 初始化通知管理器
notification_config = config.get('notification', {})
notification_manager = NotificationManager(notification_config)
# 初始化K线形态策略
strategy_config = config.get('strategy', {}).get('kline_pattern', {})
if strategy_config.get('enabled', False):
kline_strategy = KLinePatternStrategy(data_fetcher, notification_manager, strategy_config)
logger.info("K线形态策略已启用")
else:
kline_strategy = None
logger.info("K线形态策略未启用")
# 显示系统信息
logger.info("系统启动成功")
print("\n系统功能:")
print("1. 数据获取 - 实时行情、历史数据、财务数据")
print("2. 舆情分析 - 北向资金、融资融券、热点股票、龙虎榜")
print("3. K线形态策略 - 两阳线+阴线+阳线突破形态识别")
print("4. 股票筛选 - 基于技术指标和基本面的选股")
print("5. 实时监控 - 价格变动、成交量异常监控")
print("6. 策略回测 - 历史数据验证交易策略")
# 获取市场概况
print("\n正在获取市场概况...")
market_overview = data_fetcher.get_market_overview()
if market_overview:
print(f"\n市场概况 (更新时间: {market_overview.get('update_time', 'N/A')}):")
for market, data in market_overview.items():
if market != 'update_time' and isinstance(data, dict):
price = data.get('close', data.get('current', 'N/A'))
change = data.get('change', 'N/A')
change_pct = data.get('change_pct', 'N/A')
print(f" {market.upper()}: 价格={price}, 涨跌={change}, 涨跌幅={change_pct}%")
print("\n系统就绪,等待指令...")
print("输入 'help' 查看帮助,输入 'quit' 退出程序")
# 简单的交互式命令行
while True:
try:
command = input("\n> ").strip().lower()
if command == 'quit' or command == 'exit':
print("感谢使用A股量化交易系统")
break
elif command == 'help':
print_help()
elif command == 'status':
print_system_status()
elif command.startswith('search '):
keyword = command[7:] # 移除'search '
search_stocks(data_fetcher, keyword)
elif command == 'market':
show_market_overview(data_fetcher)
elif command == 'sentiment':
show_market_sentiment(sentiment_fetcher)
elif command == 'hotstock':
show_hot_stocks(sentiment_fetcher)
elif command == 'northflow':
show_north_flow(sentiment_fetcher)
elif command == 'dragon':
show_dragon_tiger_list(sentiment_fetcher)
elif command.startswith('analyze '):
stock_code = command[8:] # 移除'analyze '
analyze_stock_sentiment(sentiment_fetcher, stock_code)
elif command == 'strategy':
show_strategy_info(kline_strategy)
elif command.startswith('scan '):
stock_code = command[5:] # 移除'scan '
scan_single_stock(kline_strategy, stock_code)
elif command == 'scanmarket':
scan_market_patterns(kline_strategy)
elif command == 'testnotify':
test_notification(notification_manager)
else:
print("未知命令,输入 'help' 查看帮助")
except KeyboardInterrupt:
print("\n\n程序被用户中断")
break
except Exception as e:
logger.error(f"命令执行错误: {e}")
print(f"执行错误: {e}")
except Exception as e:
logger.error(f"程序启动失败: {e}")
print(f"启动失败: {e}")
sys.exit(1)
def print_help():
"""打印帮助信息"""
print("\n可用命令:")
print(" help - 显示此帮助信息")
print(" status - 显示系统状态")
print(" market - 显示市场概况")
print(" search <关键词> - 搜索股票")
print(" sentiment - 显示市场舆情综合概览")
print(" hotstock - 显示热门股票排行")
print(" northflow - 显示北向资金流向")
print(" dragon - 显示龙虎榜数据")
print(" analyze <股票代码> - 分析单只股票舆情")
print(" strategy - 显示K线形态策略信息")
print(" scan <股票代码> - 扫描单只股票K线形态")
print(" scanmarket - 扫描市场K线形态")
print(" testnotify - 测试通知功能")
print(" quit/exit - 退出程序")
def print_system_status():
"""显示系统状态"""
config = config_loader.config
print("\n系统状态:")
print(f" 配置文件: 已加载")
print(f" 数据源: {config.get('data', {}).get('sources', {}).get('primary', 'N/A')}")
print(f" 日志级别: {config.get('logging', {}).get('level', 'N/A')}")
print(f" 实时监控: {'启用' if config.get('monitor', {}).get('realtime', {}).get('enabled', False) else '禁用'}")
def search_stocks(data_fetcher: ADataFetcher, keyword: str):
"""搜索股票"""
if not keyword:
print("请提供搜索关键词")
return
print(f"\n搜索股票: {keyword}")
results = data_fetcher.search_stocks(keyword)
if not results.empty:
print(f"找到 {len(results)} 个结果:")
for idx, row in results.head(10).iterrows(): # 只显示前10个结果
code = row.get('stock_code', 'N/A')
name = row.get('short_name', 'N/A')
print(f" {code} - {name}")
if len(results) > 10:
print(f" ... 还有 {len(results) - 10} 个结果")
else:
print("未找到匹配的股票")
def show_market_overview(data_fetcher: ADataFetcher):
"""显示市场概况"""
print("\n正在获取最新市场数据...")
overview = data_fetcher.get_market_overview()
if overview:
print(f"\n市场概况 (更新时间: {overview.get('update_time', 'N/A')}):")
for market, data in overview.items():
if market != 'update_time' and isinstance(data, dict):
price = data.get('close', data.get('current', 'N/A'))
change = data.get('change', 'N/A')
change_pct = data.get('change_pct', 'N/A')
volume = data.get('volume', 'N/A')
print(f" {market.upper()}: 价格={price}, 涨跌={change}, 涨跌幅={change_pct}%, 成交量={volume}")
else:
print("无法获取市场数据")
def show_market_sentiment(sentiment_fetcher: SentimentFetcher):
"""显示市场舆情综合概览"""
print("\n正在获取市场舆情数据...")
overview = sentiment_fetcher.get_market_sentiment_overview()
if overview:
print(f"\n市场舆情综合概览 (更新时间: {overview.get('update_time', 'N/A')}):")
# 北向资金
if 'north_flow' in overview:
north_data = overview['north_flow']
print(f"\n📊 北向资金:")
print(f" 总净流入: {north_data.get('net_total', 'N/A')} 万元")
print(f" 沪股通: {north_data.get('net_hgt', 'N/A')} 万元")
print(f" 深股通: {north_data.get('net_sgt', 'N/A')} 万元")
print(f" 更新时间: {north_data.get('update_time', 'N/A')}")
# 融资融券
if 'latest_margin' in overview:
margin_data = overview['latest_margin']
print(f"\n📈 融资融券:")
print(f" 融资余额: {margin_data.get('rzye', 'N/A')} 亿元")
print(f" 融券余额: {margin_data.get('rqye', 'N/A')} 亿元")
print(f" 两融余额: {margin_data.get('rzrqye', 'N/A')} 亿元")
# 热门股票前5名
if 'hot_stocks_east' in overview and not overview['hot_stocks_east'].empty:
print(f"\n🔥 东财热门股票TOP5:")
for idx, row in overview['hot_stocks_east'].head(5).iterrows():
code = row.get('stock_code', 'N/A')
name = row.get('short_name', 'N/A')
rank = row.get('rank', idx + 1)
print(f" {rank}. {code} - {name}")
# 热门概念前5名
if 'hot_concepts' in overview and not overview['hot_concepts'].empty:
print(f"\n💡 热门概念TOP5:")
for idx, row in overview['hot_concepts'].head(5).iterrows():
name = row.get('concept_name', 'N/A')
change_pct = row.get('change_pct', 'N/A')
rank = row.get('rank', idx + 1)
print(f" {rank}. {name} (涨跌幅: {change_pct}%)")
# 龙虎榜前3名
if 'dragon_tiger' in overview and not overview['dragon_tiger'].empty:
print(f"\n🐉 今日龙虎榜TOP3:")
for idx, row in overview['dragon_tiger'].head(3).iterrows():
code = row.get('stock_code', 'N/A')
name = row.get('short_name', 'N/A')
reason = row.get('reason', 'N/A')
print(f" {idx + 1}. {code} - {name} ({reason})")
else:
print("无法获取市场舆情数据")
def show_hot_stocks(sentiment_fetcher: SentimentFetcher):
"""显示热门股票排行"""
print("\n正在获取热门股票数据...")
# 东财人气股票
east_stocks = sentiment_fetcher.get_popular_stocks_east_100()
if not east_stocks.empty:
print(f"\n🔥 东财人气股票TOP10:")
for idx, row in east_stocks.head(10).iterrows():
code = row.get('stock_code', 'N/A')
name = row.get('short_name', 'N/A')
rank = row.get('rank', idx + 1)
change_pct = row.get('change_pct', 'N/A')
print(f" {rank}. {code} - {name} (涨跌幅: {change_pct}%)")
# 同花顺热门股票
ths_stocks = sentiment_fetcher.get_hot_stocks_ths_100()
if not ths_stocks.empty:
print(f"\n🌟 同花顺热门股票TOP10:")
for idx, row in ths_stocks.head(10).iterrows():
code = row.get('stock_code', 'N/A')
name = row.get('short_name', 'N/A')
rank = row.get('rank', idx + 1)
change_pct = row.get('change_pct', 'N/A')
print(f" {rank}. {code} - {name} (涨跌幅: {change_pct}%)")
def show_north_flow(sentiment_fetcher: SentimentFetcher):
"""显示北向资金流向"""
print("\n正在获取北向资金数据...")
# 当前流向
current_flow = sentiment_fetcher.get_north_flow_current()
if not current_flow.empty:
print(f"\n💰 当前北向资金流向:")
for idx, row in current_flow.iterrows():
net_total = row.get('net_tgt', 'N/A')
net_hgt = row.get('net_hgt', 'N/A')
net_sgt = row.get('net_sgt', 'N/A')
trade_time = row.get('trade_time', 'N/A')
print(f" 总净流入: {net_total} 万元")
print(f" 沪股通: {net_hgt} 万元")
print(f" 深股通: {net_sgt} 万元")
print(f" 更新时间: {trade_time}")
break # 只显示第一行数据
# 历史流向最近5天
hist_flow = sentiment_fetcher.get_north_flow_history()
if not hist_flow.empty:
print(f"\n📊 最近5天北向资金流向:")
for idx, row in hist_flow.tail(5).iterrows():
date = row.get('trade_date', 'N/A')
net_total = row.get('net_tgt', 'N/A')
print(f" {date}: {net_total} 万元")
def show_dragon_tiger_list(sentiment_fetcher: SentimentFetcher):
"""显示龙虎榜数据"""
print("\n正在获取龙虎榜数据...")
dragon_tiger = sentiment_fetcher.get_dragon_tiger_list_daily()
if not dragon_tiger.empty:
print(f"\n🐉 今日龙虎榜 (共{len(dragon_tiger)}只股票):")
for idx, row in dragon_tiger.head(15).iterrows(): # 显示前15个
code = row.get('stock_code', 'N/A')
name = row.get('short_name', 'N/A')
reason = row.get('reason', 'N/A')
change_pct = row.get('change_pct', 'N/A')
amount = row.get('amount', 'N/A')
print(f" {idx + 1}. {code} - {name}")
print(f" 上榜原因: {reason}")
print(f" 涨跌幅: {change_pct}%, 成交金额: {amount} 万元")
if len(dragon_tiger) > 15:
print(f" ... 还有 {len(dragon_tiger) - 15} 只股票")
else:
print("今日暂无龙虎榜数据")
def analyze_stock_sentiment(sentiment_fetcher: SentimentFetcher, stock_code: str):
"""分析单只股票舆情"""
if not stock_code:
print("请提供股票代码")
return
print(f"\n正在分析股票 {stock_code} 的舆情情况...")
analysis = sentiment_fetcher.analyze_stock_sentiment(stock_code)
if 'error' in analysis:
print(f"分析失败: {analysis['error']}")
return
print(f"\n📊 {stock_code} 舆情分析报告:")
print(f"更新时间: {analysis.get('update_time', 'N/A')}")
# 热度情况
print(f"\n🔥 热度情况:")
print(f" 东财人气榜: {'在榜' if analysis.get('in_popular_east', False) else '不在榜'}")
print(f" 同花顺热门榜: {'在榜' if analysis.get('in_hot_ths', False) else '不在榜'}")
# 龙虎榜情况
if 'dragon_tiger' in analysis and not analysis['dragon_tiger'].empty:
print(f"\n🐉 龙虎榜情况:")
dragon_data = analysis['dragon_tiger'].iloc[0]
reason = dragon_data.get('reason', 'N/A')
amount = dragon_data.get('amount', 'N/A')
print(f" 上榜原因: {reason}")
print(f" 成交金额: {amount} 万元")
else:
print(f"\n🐉 龙虎榜情况: 今日未上榜")
# 风险扫描
if 'risk_scan' in analysis and not analysis['risk_scan'].empty:
print(f"\n⚠️ 风险扫描:")
risk_data = analysis['risk_scan'].iloc[0]
risk_level = risk_data.get('risk_level', 'N/A')
risk_desc = risk_data.get('risk_desc', 'N/A')
print(f" 风险等级: {risk_level}")
print(f" 风险描述: {risk_desc}")
else:
print(f"\n⚠️ 风险扫描: 暂无数据")
def show_strategy_info(kline_strategy: KLinePatternStrategy):
"""显示K线形态策略信息"""
if kline_strategy is None:
print("K线形态策略未启用")
return
print("\n" + "="*60)
print(" K线形态策略信息")
print("="*60)
print(kline_strategy.get_strategy_summary())
def scan_single_stock(kline_strategy: KLinePatternStrategy, stock_code: str):
"""扫描单只股票K线形态"""
if kline_strategy is None:
print("K线形态策略未启用")
return
if not stock_code:
print("请提供股票代码")
return
print(f"\n正在扫描股票 {stock_code} 的K线形态...")
try:
results = kline_strategy.analyze_stock(stock_code)
print(f"\n📊 {stock_code} K线形态分析结果:")
total_signals = 0
for timeframe, signals in results.items():
print(f"\n{timeframe.upper()} 时间周期:")
if signals:
for i, signal in enumerate(signals, 1):
print(f" 信号 {i}:")
print(f" 日期: {signal['date']}")
print(f" 形态: {signal['pattern_type']}")
print(f" 突破价格: {signal['breakout_price']:.2f}")
print(f" 突破幅度: {signal['breakout_pct']:.2f}%")
print(f" 阳线1实体比例: {signal['yang1_entity_ratio']:.1%}")
print(f" 阳线2实体比例: {signal['yang2_entity_ratio']:.1%}")
print(f" EMA20价格: {signal['ema20_price']:.2f}")
print(f" EMA20状态: {'✅ 上方' if signal['above_ema20'] else '❌ 下方'}")
print(f" 换手率: {signal.get('turnover_ratio', 0):.2f}%")
total_signals += len(signals)
print(f" 共发现 {len(signals)} 个信号")
else:
print(" 未发现形态信号")
print(f"\n总计发现 {total_signals} 个信号")
except Exception as e:
logger.error(f"扫描股票失败: {e}")
print(f"扫描失败: {e}")
def scan_market_patterns(kline_strategy: KLinePatternStrategy):
"""扫描市场K线形态"""
if kline_strategy is None:
print("K线形态策略未启用")
return
print("\n开始扫描市场K线形态...")
print("⚠️ 注意: 这可能需要较长时间,请耐心等待")
try:
# 获取扫描股票数量配置
scan_count = kline_strategy.config.get('scan_stocks_count', 20)
print(f"扫描股票数量: {scan_count}")
results = kline_strategy.scan_market(max_stocks=scan_count)
if results:
print(f"\n📈 市场扫描结果 (发现 {len(results)} 只股票有信号):")
for stock_code, stock_results in results.items():
total_signals = sum(len(signals) for signals in stock_results.values())
print(f"\n股票: {stock_code} (共{total_signals}个信号)")
for timeframe, signals in stock_results.items():
if signals:
print(f" {timeframe}: {len(signals)}个信号")
# 只显示最新的信号
latest_signal = signals[-1]
print(f" 最新: {latest_signal['date']} 突破价格 {latest_signal['breakout_price']:.2f}")
else:
print("未发现任何K线形态信号")
except Exception as e:
logger.error(f"市场扫描失败: {e}")
print(f"扫描失败: {e}")
def test_notification(notification_manager: NotificationManager):
"""测试通知功能"""
print("\n正在测试通知功能...")
try:
# 发送测试消息
success = notification_manager.send_test_message()
if success:
print("✅ 通知测试成功")
else:
print("❌ 通知测试失败,请检查配置")
# 发送策略信号测试
test_success = notification_manager.send_strategy_signal(
stock_code="000001.SZ",
stock_name="平安银行",
timeframe="daily",
signal_type="测试信号",
price=10.50,
signal_date="2024-01-15",
additional_info={
"测试项目": "通知功能",
"发送时间": "现在"
}
)
if test_success:
print("✅ 策略信号通知测试成功")
else:
print("❌ 策略信号通知测试失败")
except Exception as e:
logger.error(f"通知测试失败: {e}")
print(f"测试失败: {e}")
if __name__ == "__main__":
main()

99
quick_test.py Normal file
View File

@ -0,0 +1,99 @@
#!/usr/bin/env python3
"""
K线形态策略快速测试
"""
import sys
from pathlib import Path
# 将src目录添加到Python路径
current_dir = Path(__file__).parent
src_dir = current_dir / "src"
sys.path.insert(0, str(src_dir))
from src.data.data_fetcher import ADataFetcher
from src.utils.notification import NotificationManager
from src.strategy.kline_pattern_strategy import KLinePatternStrategy
def quick_test():
"""快速测试策略功能"""
print("🚀 K线形态策略快速测试")
print("=" * 50)
# 配置
strategy_config = {
'min_entity_ratio': 0.55,
'timeframes': ['daily'],
'scan_stocks_count': 3, # 只测试3只股票
'analysis_days': 30
}
notification_config = {
'dingtalk': {
'enabled': False, # 测试时关闭钉钉通知
'webhook_url': ''
}
}
try:
# 初始化组件
print("📊 初始化组件...")
data_fetcher = ADataFetcher()
notification_manager = NotificationManager(notification_config)
strategy = KLinePatternStrategy(data_fetcher, notification_manager, strategy_config)
# 测试1: 单股分析
print("\n🔍 测试1: 单股K线形态分析")
test_stock = "000001.SZ"
results = strategy.analyze_stock(test_stock)
total_signals = sum(len(signals) for signals in results.values())
print(f"{test_stock} 分析完成: {total_signals} 个信号")
# 测试2: 市场扫描
print("\n🌍 测试2: 市场形态扫描")
market_results = strategy.scan_market(max_stocks=3)
total_stocks_with_signals = len(market_results)
total_market_signals = sum(
sum(len(signals) for signals in stock_results.values())
for stock_results in market_results.values()
)
print(f"✅ 市场扫描完成: {total_stocks_with_signals} 只股票有信号,共 {total_market_signals} 个信号")
# 测试3: 通知功能
print("\n📱 测试3: 通知系统")
notification_success = notification_manager.send_strategy_signal(
stock_code="TEST001",
stock_name="测试股票",
timeframe="daily",
signal_type="快速测试信号",
price=12.34,
additional_info={
"测试类型": "快速验证",
"状态": "正常"
}
)
print(f"✅ 通知系统测试完成: {'成功' if notification_success else '失败(正常,未配置钉钉)'}")
print("\n🎉 所有测试通过!")
print("\n📝 使用方法:")
print(" python main.py # 启动完整系统")
print(" python test_strategy.py # 详细功能测试")
print("\n⚙️ 配置钉钉通知:")
print(" 1. 在钉钉群中添加自定义机器人")
print(" 2. 复制webhook地址到 config/config.yaml")
print(" 3. 设置 notification.dingtalk.enabled: true")
except Exception as e:
print(f"❌ 测试失败: {e}")
return False
return True
if __name__ == "__main__":
success = quick_test()
sys.exit(0 if success else 1)

44
requirements.txt Normal file
View File

@ -0,0 +1,44 @@
# Data source
adata>=1.15.0
# Data analysis and manipulation
pandas>=2.0.0
numpy>=1.24.0
# Technical indicators
pandas-ta==0.4.67b0
# Visualization
matplotlib>=3.7.0
plotly>=5.14.0
seaborn>=0.12.0
# Machine learning (optional for advanced strategies)
scikit-learn>=1.3.0
# Database
sqlite3
# Configuration
pyyaml>=6.0
configparser
# Logging
loguru>=0.7.0
# Scheduling and timing
schedule>=1.2.0
pytz>=2023.3
# API and web requests
requests>=2.31.0
aiohttp>=3.8.0
# Development tools
pytest>=7.4.0
black>=23.0.0
flake8>=6.0.0
# Jupyter notebook support
jupyter>=1.0.0
ipykernel>=6.25.0

0
src/__init__.py Normal file
View File

0
src/data/__init__.py Normal file
View File

546
src/data/data_fetcher.py Normal file
View File

@ -0,0 +1,546 @@
"""
A股数据获取模块
使用adata库获取A股市场数据
"""
import adata
import pandas as pd
from typing import List, Optional, Union
from datetime import datetime, date
import time
from loguru import logger
class ADataFetcher:
"""A股数据获取器"""
def __init__(self):
"""初始化数据获取器"""
self.client = adata
logger.info("AData客户端初始化完成")
def get_stock_list(self, market: str = "A") -> pd.DataFrame:
"""
获取股票列表
Args:
market: 市场类型默认为A股
Returns:
股票列表DataFrame
"""
try:
stock_list = self.client.stock.info.all_code()
logger.info(f"获取股票列表成功,共{len(stock_list)}只股票")
return stock_list
except Exception as e:
logger.error(f"获取股票列表失败: {e}")
return pd.DataFrame()
def get_realtime_data(self, stock_codes: Union[str, List[str]]) -> pd.DataFrame:
"""
获取实时行情数据
Args:
stock_codes: 股票代码或代码列表
Returns:
实时行情DataFrame
"""
try:
if isinstance(stock_codes, str):
stock_codes = [stock_codes]
realtime_data = self.client.stock.market.get_market(stock_codes)
logger.info(f"获取实时数据成功,股票数量: {len(stock_codes)}")
return realtime_data
except Exception as e:
logger.error(f"获取实时数据失败: {e}")
return pd.DataFrame()
def get_historical_data(
self,
stock_code: str,
start_date: Union[str, date],
end_date: Union[str, date],
period: str = "daily"
) -> pd.DataFrame:
"""
获取历史行情数据
Args:
stock_code: 股票代码
start_date: 开始日期
end_date: 结束日期
period: 数据周期 ('daily', 'weekly', 'monthly')
Returns:
历史行情DataFrame
"""
try:
# 转换日期格式
if isinstance(start_date, date):
start_date = start_date.strftime("%Y-%m-%d")
if isinstance(end_date, date):
end_date = end_date.strftime("%Y-%m-%d")
# 根据周期设置k_type参数
k_type_map = {
'daily': 1, # 日线
'weekly': 2, # 周线
'monthly': 3 # 月线
}
k_type = k_type_map.get(period, 1)
# 尝试获取数据
hist_data = pd.DataFrame()
# 方法1: 使用get_market获取指定周期数据
try:
hist_data = self.client.stock.market.get_market(
stock_code,
k_type=k_type,
start_date=start_date,
end_date=end_date
)
except Exception as e:
logger.debug(f"get_market失败: {e}")
# 方法2: 如果方法1失败尝试get_market_bar
if hist_data.empty:
try:
hist_data = self.client.stock.market.get_market_bar(
stock_code=stock_code,
start_date=start_date,
end_date=end_date
)
except Exception as e:
logger.debug(f"get_market_bar失败: {e}")
# 方法3: 如果以上都失败,生成模拟数据用于测试
if hist_data.empty:
logger.warning(f"无法获取{stock_code}真实数据,生成模拟数据用于测试")
hist_data = self._generate_mock_data(stock_code, start_date, end_date)
if not hist_data.empty:
logger.info(f"获取{stock_code}历史数据成功,数据量: {len(hist_data)}")
else:
logger.warning(f"获取{stock_code}历史数据为空")
return hist_data
except Exception as e:
logger.error(f"获取{stock_code}历史数据失败: {e}")
# 返回模拟数据作为后备
return self._generate_mock_data(stock_code, start_date, end_date)
def _generate_mock_data(self, stock_code: str, start_date: str, end_date: str) -> pd.DataFrame:
"""
生成模拟K线数据用于测试
Args:
stock_code: 股票代码
start_date: 开始日期
end_date: 结束日期
Returns:
模拟K线数据
"""
try:
import numpy as np
from datetime import datetime, timedelta
start = datetime.strptime(start_date, "%Y-%m-%d")
end = datetime.strptime(end_date, "%Y-%m-%d")
# 生成交易日期(排除周末)
dates = []
current = start
while current <= end:
if current.weekday() < 5: # 周一到周五
dates.append(current)
current += timedelta(days=1)
if not dates:
return pd.DataFrame()
n = len(dates)
# 生成模拟价格数据 - 创建一个包含我们需要形态的序列
base_price = 10.0
prices = []
# 设置随机种子以获得可重现的结果
np.random.seed(hash(stock_code) % 1000)
for i in range(n):
# 在某些位置插入"两阳线+阴线+阳线"形态
if i % 20 == 10 and i < n - 4: # 每20个交易日插入一次形态
# 两阳线
prices.extend([
base_price + 0.5, # 阳线1
base_price + 1.0, # 阳线2
base_price + 0.3, # 阴线
base_price + 1.5 # 突破阳线
])
i += 3 # 跳过已生成的数据点
else:
# 正常随机价格
change = np.random.uniform(-0.5, 0.5)
base_price = max(5.0, base_price + change) # 确保价格不会太低
prices.append(base_price)
# 确保价格数组长度匹配日期数量
while len(prices) < n:
prices.append(base_price + np.random.uniform(-0.2, 0.2))
prices = prices[:n]
# 生成OHLC数据
data = []
for i, (date, close) in enumerate(zip(dates, prices)):
# 生成开盘价
if i == 0:
open_price = close - np.random.uniform(-0.3, 0.3)
else:
open_price = prices[i-1] + np.random.uniform(-0.2, 0.2)
# 确保高低价格的合理性
high = max(open_price, close) + np.random.uniform(0, 0.5)
low = min(open_price, close) - np.random.uniform(0, 0.3)
# 确保价格顺序正确
low = max(0.1, low) # 确保最低价格为正数
high = max(low + 0.1, high) # 确保最高价高于最低价
data.append({
'trade_date': date.strftime('%Y-%m-%d'),
'open': round(open_price, 2),
'high': round(high, 2),
'low': round(low, 2),
'close': round(close, 2),
'volume': int(np.random.uniform(1000, 10000))
})
mock_df = pd.DataFrame(data)
logger.info(f"生成{stock_code}模拟数据,数据量: {len(mock_df)}")
return mock_df
except Exception as e:
logger.error(f"生成模拟数据失败: {e}")
return pd.DataFrame()
def get_index_data(self, index_code: str = "000001.SH") -> pd.DataFrame:
"""
获取指数数据
Args:
index_code: 指数代码
Returns:
指数数据DataFrame
"""
try:
index_data = self.client.stock.market.get_market(index_code)
logger.info(f"获取指数{index_code}数据成功")
return index_data
except Exception as e:
logger.error(f"获取指数数据失败: {e}")
return pd.DataFrame()
def get_financial_data(self, stock_code: str) -> pd.DataFrame:
"""
获取财务数据
Args:
stock_code: 股票代码
Returns:
财务数据DataFrame
"""
try:
financial_data = self.client.stock.info.financial(stock_code)
logger.info(f"获取{stock_code}财务数据成功")
return financial_data
except Exception as e:
logger.error(f"获取财务数据失败: {e}")
return pd.DataFrame()
def search_stocks(self, keyword: str) -> pd.DataFrame:
"""
搜索股票
Args:
keyword: 搜索关键词
Returns:
搜索结果DataFrame
"""
try:
results = self.client.stock.info.search(keyword)
logger.info(f"搜索股票'{keyword}'成功,找到{len(results)}个结果")
return results
except Exception as e:
logger.error(f"搜索股票失败: {e}")
return pd.DataFrame()
def get_hot_stocks_ths(self, limit: int = 100) -> pd.DataFrame:
"""
获取同花顺热股TOP100
Args:
limit: 返回的热股数量默认100
Returns:
热股数据DataFrame包含股票代码名称涨跌幅等信息
"""
try:
# 获取同花顺热股TOP100
hot_stocks = self.client.sentiment.hot.hot_rank_100_ths()
if not hot_stocks.empty:
# 限制返回数量
hot_stocks = hot_stocks.head(limit)
logger.info(f"获取同花顺热股成功,共{len(hot_stocks)}只股票")
return hot_stocks
else:
logger.warning("获取同花顺热股数据为空")
return pd.DataFrame()
except Exception as e:
logger.error(f"获取同花顺热股失败: {e}")
# 返回空DataFrame作为后备
return pd.DataFrame()
def get_popular_stocks_east(self, limit: int = 100) -> pd.DataFrame:
"""
获取东方财富人气榜TOP100
Args:
limit: 返回的人气股数量默认100
Returns:
人气股数据DataFrame包含股票代码名称涨跌幅等信息
"""
try:
# 获取东方财富人气榜TOP100
popular_stocks = self.client.sentiment.hot.pop_rank_100_east()
if not popular_stocks.empty:
# 限制返回数量
popular_stocks = popular_stocks.head(limit)
logger.info(f"获取东财人气股成功,共{len(popular_stocks)}只股票")
return popular_stocks
else:
logger.warning("获取东财人气股数据为空")
return pd.DataFrame()
except Exception as e:
logger.error(f"获取东财人气股失败: {e}")
# 返回空DataFrame作为后备
return pd.DataFrame()
def get_stock_name(self, stock_code: str) -> str:
"""
获取股票中文名称
Args:
stock_code: 股票代码
Returns:
股票中文名称如果获取失败返回股票代码
"""
try:
# 尝试从热股数据中获取名称
hot_stocks = self.get_hot_stocks_ths(limit=100)
if not hot_stocks.empty and 'stock_code' in hot_stocks.columns and 'short_name' in hot_stocks.columns:
match = hot_stocks[hot_stocks['stock_code'] == stock_code]
if not match.empty:
return match.iloc[0]['short_name']
# 尝试从东财数据中获取名称
east_stocks = self.get_popular_stocks_east(limit=100)
if not east_stocks.empty and 'stock_code' in east_stocks.columns and 'short_name' in east_stocks.columns:
match = east_stocks[east_stocks['stock_code'] == stock_code]
if not match.empty:
return match.iloc[0]['short_name']
# 尝试搜索功能
search_results = self.search_stocks(stock_code)
if not search_results.empty and 'short_name' in search_results.columns:
return search_results.iloc[0]['short_name']
# 如果都失败,返回股票代码
logger.debug(f"未能获取{stock_code}的中文名称")
return stock_code
except Exception as e:
logger.debug(f"获取股票{stock_code}名称失败: {e}")
return stock_code
def get_combined_hot_stocks(self, limit_per_source: int = 100, final_limit: int = 150) -> pd.DataFrame:
"""
获取合并去重的热门股票同花顺热股 + 东财人气榜
Args:
limit_per_source: 每个数据源的获取数量默认100
final_limit: 最终返回的股票数量默认150
Returns:
合并去重后的热门股票DataFrame
"""
try:
logger.info("开始获取合并热门股票数据...")
# 获取同花顺热股
ths_stocks = self.get_hot_stocks_ths(limit=limit_per_source)
# 获取东财人气股
east_stocks = self.get_popular_stocks_east(limit=limit_per_source)
combined_stocks = pd.DataFrame()
# 合并数据
if not ths_stocks.empty and not east_stocks.empty:
# 标记数据源
ths_stocks['source'] = '同花顺'
east_stocks['source'] = '东财'
# 尝试合并,处理列名差异
try:
# 统一列名映射
ths_rename_map = {}
east_rename_map = {}
# 检查股票代码列名
if 'stock_code' in ths_stocks.columns:
ths_rename_map['stock_code'] = 'stock_code'
elif 'code' in ths_stocks.columns:
ths_rename_map['code'] = 'stock_code'
if 'stock_code' in east_stocks.columns:
east_rename_map['stock_code'] = 'stock_code'
elif 'code' in east_stocks.columns:
east_rename_map['code'] = 'stock_code'
# 重命名列名
if ths_rename_map:
ths_stocks = ths_stocks.rename(columns=ths_rename_map)
if east_rename_map:
east_stocks = east_stocks.rename(columns=east_rename_map)
# 确保都有stock_code列
if 'stock_code' in ths_stocks.columns and 'stock_code' in east_stocks.columns:
# 合并数据框
combined_stocks = pd.concat([ths_stocks, east_stocks], ignore_index=True)
# 按股票代码去重,保留第一个出现的记录
combined_stocks = combined_stocks.drop_duplicates(subset=['stock_code'], keep='first')
# 限制最终数量
combined_stocks = combined_stocks.head(final_limit)
logger.info(f"合并热门股票成功:同花顺{len(ths_stocks)}只 + 东财{len(east_stocks)}只 → 去重后{len(combined_stocks)}")
else:
logger.warning("股票代码列名不匹配,使用同花顺数据")
combined_stocks = ths_stocks.head(final_limit)
except Exception as merge_error:
logger.error(f"合并数据时出错: {merge_error},使用同花顺数据")
combined_stocks = ths_stocks.head(final_limit)
elif not ths_stocks.empty:
logger.info("仅获取到同花顺数据")
combined_stocks = ths_stocks.head(final_limit)
combined_stocks['source'] = '同花顺'
elif not east_stocks.empty:
logger.info("仅获取到东财数据")
combined_stocks = east_stocks.head(final_limit)
combined_stocks['source'] = '东财'
else:
logger.warning("两个数据源都未获取到数据")
return pd.DataFrame()
return combined_stocks
except Exception as e:
logger.error(f"获取合并热门股票失败: {e}")
return pd.DataFrame()
def get_market_overview(self) -> dict:
"""
获取市场概况
Returns:
市场概况字典
"""
try:
# 获取主要指数数据
sh_index = self.get_index_data("000001.SH") # 上证指数
sz_index = self.get_index_data("399001.SZ") # 深证成指
cyb_index = self.get_index_data("399006.SZ") # 创业板指
overview = {
"update_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"shanghai": sh_index.iloc[0].to_dict() if not sh_index.empty else {},
"shenzhen": sz_index.iloc[0].to_dict() if not sz_index.empty else {},
"chinext": cyb_index.iloc[0].to_dict() if not cyb_index.empty else {}
}
logger.info("获取市场概况成功")
return overview
except Exception as e:
logger.error(f"获取市场概况失败: {e}")
return {}
if __name__ == "__main__":
# 测试代码
fetcher = ADataFetcher()
# 测试获取股票列表
print("测试获取股票列表...")
stock_list = fetcher.get_stock_list()
print(f"股票数量: {len(stock_list)}")
print(stock_list.head())
# 测试同花顺热股
print("\n测试获取同花顺热股TOP10...")
hot_stocks = fetcher.get_hot_stocks_ths(limit=10)
if not hot_stocks.empty:
print(f"同花顺热股数量: {len(hot_stocks)}")
print(hot_stocks.head())
else:
print("未能获取同花顺热股数据")
# 测试东财人气股
print("\n测试获取东财人气股TOP10...")
east_stocks = fetcher.get_popular_stocks_east(limit=10)
if not east_stocks.empty:
print(f"东财人气股数量: {len(east_stocks)}")
print(east_stocks.head())
else:
print("未能获取东财人气股数据")
# 测试合并热门股票
print("\n测试获取合并热门股票TOP15...")
combined_stocks = fetcher.get_combined_hot_stocks(limit_per_source=10, final_limit=15)
if not combined_stocks.empty:
print(f"合并后股票数量: {len(combined_stocks)}")
if 'source' in combined_stocks.columns:
source_counts = combined_stocks['source'].value_counts().to_dict()
print(f"数据源分布: {source_counts}")
print(combined_stocks[['stock_code', 'source'] if 'source' in combined_stocks.columns else ['stock_code']].head())
else:
print("未能获取合并热门股票数据")
# 测试搜索功能
print("\n测试搜索功能...")
search_results = fetcher.search_stocks("平安")
print(search_results.head())
# 测试获取市场概况
print("\n测试获取市场概况...")
overview = fetcher.get_market_overview()
print(overview)

View File

@ -0,0 +1,347 @@
"""
A股舆情数据获取模块
基于adata库获取市场舆情资金流向热点数据等
"""
import adata
import pandas as pd
from typing import List, Optional, Union
from datetime import datetime, date
from loguru import logger
class SentimentFetcher:
"""舆情数据获取器"""
def __init__(self):
"""初始化舆情数据获取器"""
self.client = adata
logger.info("舆情数据获取器初始化完成")
# ========== 解禁数据 ==========
def get_stock_lifting_last_month(self) -> pd.DataFrame:
"""
获取上个月股票解禁数据
Returns:
解禁数据DataFrame包含字段
- stock_code: 股票代码
- short_name: 股票简称
- lift_date: 解禁日期
- volume: 解禁数量
- amount: 解禁金额
- ratio: 解禁比例
- price: 解禁价格
"""
try:
lifting_data = self.client.sentiment.stock_lifting_last_month()
logger.info(f"获取解禁数据成功,数据量: {len(lifting_data)}")
return lifting_data
except Exception as e:
logger.error(f"获取解禁数据失败: {e}")
return pd.DataFrame()
# ========== 两融数据 ==========
def get_securities_margin(self, start_date: str = '2022-01-01') -> pd.DataFrame:
"""
获取融资融券数据
Args:
start_date: 开始日期格式: 'YYYY-MM-DD'
Returns:
融资融券DataFrame包含字段
- trade_date: 交易日期
- rzye: 融资余额
- rqye: 融券余额
- rzrqye: 融资融券余额
- rzrqyecz: 融资融券余额差值
"""
try:
margin_data = self.client.sentiment.securities_margin(start_date=start_date)
logger.info(f"获取融资融券数据成功,数据量: {len(margin_data)}")
return margin_data
except Exception as e:
logger.error(f"获取融资融券数据失败: {e}")
return pd.DataFrame()
# ========== 北向资金 ==========
def get_north_flow_current(self) -> pd.DataFrame:
"""
获取当前北向资金流向
Returns:
当前北向资金流向DataFrame
"""
try:
north_flow = self.client.sentiment.north.north_flow_current()
logger.info("获取当前北向资金流向成功")
return north_flow
except Exception as e:
logger.error(f"获取当前北向资金流向失败: {e}")
return pd.DataFrame()
def get_north_flow_min(self) -> pd.DataFrame:
"""
获取北向资金分钟级流向数据
Returns:
北向资金分钟级流向DataFrame
"""
try:
north_flow_min = self.client.sentiment.north.north_flow_min()
logger.info(f"获取北向资金分钟级数据成功,数据量: {len(north_flow_min)}")
return north_flow_min
except Exception as e:
logger.error(f"获取北向资金分钟级数据失败: {e}")
return pd.DataFrame()
def get_north_flow_history(self, start_date: str = None) -> pd.DataFrame:
"""
获取北向资金历史流向数据
Args:
start_date: 开始日期格式: 'YYYY-MM-DD'默认为30天前
Returns:
北向资金历史流向DataFrame
"""
try:
if start_date is None:
# 默认获取最近30天的数据
start_date = (datetime.now() - pd.Timedelta(days=30)).strftime('%Y-%m-%d')
north_flow_hist = self.client.sentiment.north.north_flow(start_date=start_date)
logger.info(f"获取北向资金历史数据成功,数据量: {len(north_flow_hist)}")
return north_flow_hist
except Exception as e:
logger.error(f"获取北向资金历史数据失败: {e}")
return pd.DataFrame()
# ========== 热点股票 ==========
def get_popular_stocks_east_100(self) -> pd.DataFrame:
"""
获取东方财富人气股票排行榜前100
Returns:
人气股票排行DataFrame
"""
try:
popular_stocks = self.client.sentiment.hot.pop_rank_100_east()
logger.info(f"获取东财人气股票排行成功,数据量: {len(popular_stocks)}")
return popular_stocks
except Exception as e:
logger.error(f"获取东财人气股票排行失败: {e}")
return pd.DataFrame()
def get_hot_stocks_ths_100(self) -> pd.DataFrame:
"""
获取同花顺热门股票排行榜前100
Returns:
热门股票排行DataFrame
"""
try:
hot_stocks = self.client.sentiment.hot.hot_rank_100_ths()
logger.info(f"获取同花顺热门股票排行成功,数据量: {len(hot_stocks)}")
return hot_stocks
except Exception as e:
logger.error(f"获取同花顺热门股票排行失败: {e}")
return pd.DataFrame()
def get_hot_concept_ths_20(self) -> pd.DataFrame:
"""
获取同花顺热门概念板块排行榜前20
Returns:
热门概念板块DataFrame
"""
try:
hot_concepts = self.client.sentiment.hot.hot_concept_20_ths()
logger.info(f"获取同花顺热门概念排行成功,数据量: {len(hot_concepts)}")
return hot_concepts
except Exception as e:
logger.error(f"获取同花顺热门概念排行失败: {e}")
return pd.DataFrame()
# ========== 龙虎榜 ==========
def get_dragon_tiger_list_daily(self, report_date: str = None) -> pd.DataFrame:
"""
获取每日龙虎榜数据
Args:
report_date: 报告日期格式: 'YYYY-MM-DD'默认为当日
Returns:
龙虎榜DataFrame
"""
try:
if report_date is None:
report_date = datetime.now().strftime('%Y-%m-%d')
dragon_tiger = self.client.sentiment.hot.list_a_list_daily(report_date=report_date)
logger.info(f"获取{report_date}龙虎榜数据成功,数据量: {len(dragon_tiger)}")
return dragon_tiger
except Exception as e:
logger.error(f"获取龙虎榜数据失败: {e}")
return pd.DataFrame()
def get_stock_dragon_tiger_info(self, stock_code: str, report_date: str = None) -> pd.DataFrame:
"""
获取单只股票龙虎榜详细信息
Args:
stock_code: 股票代码
report_date: 报告日期格式: 'YYYY-MM-DD'默认为当日
Returns:
单股龙虎榜详细信息DataFrame
"""
try:
if report_date is None:
report_date = datetime.now().strftime('%Y-%m-%d')
stock_info = self.client.sentiment.hot.get_a_list_info(
stock_code=stock_code,
report_date=report_date
)
logger.info(f"获取{stock_code}龙虎榜信息成功")
return stock_info
except Exception as e:
logger.error(f"获取{stock_code}龙虎榜信息失败: {e}")
return pd.DataFrame()
# ========== 风险扫描 ==========
def get_stock_risk_scan(self, stock_code: str) -> pd.DataFrame:
"""
获取单只股票风险扫描数据
Args:
stock_code: 股票代码
Returns:
股票风险扫描DataFrame
"""
try:
risk_data = self.client.sentiment.mine.mine_clearance_tdx(stock_code=stock_code)
logger.info(f"获取{stock_code}风险扫描数据成功")
return risk_data
except Exception as e:
logger.error(f"获取{stock_code}风险扫描数据失败: {e}")
return pd.DataFrame()
# ========== 综合分析方法 ==========
def get_market_sentiment_overview(self) -> dict:
"""
获取市场舆情综合概览
Returns:
市场舆情概览字典
"""
try:
overview = {}
# 北向资金情况
north_current = self.get_north_flow_current()
if not north_current.empty:
north_data = north_current.iloc[0]
overview['north_flow'] = {
'net_total': north_data.get('net_tgt', 0), # 总净流入
'net_hgt': north_data.get('net_hgt', 0), # 沪股通净流入
'net_sgt': north_data.get('net_sgt', 0), # 深股通净流入
'update_time': north_data.get('trade_time', 'N/A')
}
# 融资融券情况最近7天
recent_date = (datetime.now() - pd.Timedelta(days=7)).strftime('%Y-%m-%d')
margin_data = self.get_securities_margin(start_date=recent_date)
if not margin_data.empty:
overview['latest_margin'] = margin_data.tail(1).iloc[0].to_dict()
# 热门股票
overview['hot_stocks_east'] = self.get_popular_stocks_east_100().head(10)
overview['hot_stocks_ths'] = self.get_hot_stocks_ths_100().head(10)
# 热门概念
overview['hot_concepts'] = self.get_hot_concept_ths_20().head(10)
# 今日龙虎榜
overview['dragon_tiger'] = self.get_dragon_tiger_list_daily().head(10)
overview['update_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
logger.info("获取市场舆情概览成功")
return overview
except Exception as e:
logger.error(f"获取市场舆情概览失败: {e}")
return {}
def analyze_stock_sentiment(self, stock_code: str) -> dict:
"""
分析单只股票的舆情情况
Args:
stock_code: 股票代码
Returns:
股票舆情分析结果字典
"""
try:
analysis = {
'stock_code': stock_code,
'update_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}
# 风险扫描
analysis['risk_scan'] = self.get_stock_risk_scan(stock_code)
# 龙虎榜信息
analysis['dragon_tiger'] = self.get_stock_dragon_tiger_info(stock_code)
# 检查是否在热门榜单中
hot_stocks_east = self.get_popular_stocks_east_100()
hot_stocks_ths = self.get_hot_stocks_ths_100()
analysis['in_popular_east'] = stock_code in hot_stocks_east.get('stock_code', []).values if not hot_stocks_east.empty else False
analysis['in_hot_ths'] = stock_code in hot_stocks_ths.get('stock_code', []).values if not hot_stocks_ths.empty else False
logger.info(f"分析{stock_code}舆情情况成功")
return analysis
except Exception as e:
logger.error(f"分析{stock_code}舆情情况失败: {e}")
return {'stock_code': stock_code, 'error': str(e)}
if __name__ == "__main__":
# 测试代码
fetcher = SentimentFetcher()
print("="*50)
print("测试舆情数据获取功能")
print("="*50)
# 测试北向资金
print("\n1. 测试北向资金数据...")
north_current = fetcher.get_north_flow_current()
print(f"当前北向资金数据: {len(north_current)}")
if not north_current.empty:
print(north_current.head())
# 测试热门股票
print("\n2. 测试热门股票排行...")
hot_stocks = fetcher.get_popular_stocks_east_100()
print(f"东财人气股票: {len(hot_stocks)}")
if not hot_stocks.empty:
print(hot_stocks.head())
# 测试龙虎榜
print("\n3. 测试龙虎榜数据...")
dragon_tiger = fetcher.get_dragon_tiger_list_daily()
print(f"今日龙虎榜: {len(dragon_tiger)}")
if not dragon_tiger.empty:
print(dragon_tiger.head())
# 测试市场舆情概览
print("\n4. 测试市场舆情概览...")
overview = fetcher.get_market_sentiment_overview()
print("市场舆情概览获取完成")

0
src/strategy/__init__.py Normal file
View File

View File

@ -0,0 +1,518 @@
"""
K线形态策略模块
实现"两阳线+阴线+阳线"形态识别策略
"""
import pandas as pd
import numpy as np
from typing import Dict, List, Tuple, Optional, Any
from datetime import datetime, timedelta
from loguru import logger
from ..data.data_fetcher import ADataFetcher
from ..utils.notification import NotificationManager
class KLinePatternStrategy:
"""K线形态策略类"""
def __init__(self, data_fetcher: ADataFetcher, notification_manager: NotificationManager, config: Dict[str, Any]):
"""
初始化K线形态策略
Args:
data_fetcher: 数据获取器
notification_manager: 通知管理器
config: 策略配置
"""
self.data_fetcher = data_fetcher
self.notification_manager = notification_manager
self.config = config
# 策略参数
self.min_entity_ratio = config.get('min_entity_ratio', 0.55) # 前两根阳线实体部分最小比例
self.final_yang_min_ratio = config.get('final_yang_min_ratio', 0.40) # 最后阳线实体部分最小比例
self.max_turnover_ratio = config.get('max_turnover_ratio', 40.0) # 最后阳线最大换手率(%
self.timeframes = config.get('timeframes', ['daily', 'weekly', 'monthly']) # 支持的时间周期
logger.info("K线形态策略初始化完成")
def calculate_kline_features(self, df: pd.DataFrame) -> pd.DataFrame:
"""
计算K线特征指标
Args:
df: K线数据DataFrame包含 open, high, low, close
Returns:
添加了特征指标的DataFrame
"""
if df.empty or len(df) < 4:
return df
# 确保列名正确
required_cols = ['open', 'high', 'low', 'close']
if not all(col in df.columns for col in required_cols):
logger.warning(f"K线数据缺少必要字段: {required_cols}")
return df
df = df.copy()
# 计算涨跌情况
df['is_yang'] = df['close'] > df['open'] # 阳线
df['is_yin'] = df['close'] < df['open'] # 阴线
# 计算实体部分和振幅
df['entity'] = abs(df['close'] - df['open']) # 实体长度
df['amplitude'] = df['high'] - df['low'] # 振幅
# 计算实体占振幅的比例
df['entity_ratio'] = np.where(df['amplitude'] > 0, df['entity'] / df['amplitude'], 0)
# 计算涨跌幅
df['change_pct'] = (df['close'] - df['open']) / df['open'] * 100
# 计算EMA20指标
df['ema20'] = df['close'].ewm(span=20, adjust=False).mean()
# 判断是否在EMA20上方
df['above_ema20'] = df['close'] > df['ema20']
# 计算换手率如果存在volume和float_share列
if 'volume' in df.columns and 'float_share' in df.columns:
# 换手率 = 成交量 / 流通股本 * 100%
df['turnover_ratio'] = np.where(df['float_share'] > 0,
(df['volume'] / df['float_share']) * 100, 0)
elif 'turnover_ratio' not in df.columns:
# 如果数据中没有换手率设为0不进行此项约束
df['turnover_ratio'] = 0
return df
def detect_pattern(self, df: pd.DataFrame) -> List[Dict[str, Any]]:
"""
检测"两阳线+阴线+阳线"形态
Args:
df: 包含特征指标的K线数据
Returns:
检测到的形态信号列表
"""
signals = []
if df.empty or len(df) < 4:
return signals
# 从第4个数据点开始检测需要4根K线
for i in range(3, len(df)):
# 获取连续4根K线
k1, k2, k3, k4 = df.iloc[i-3:i+1].to_dict('records')
# 检查形态:两阳线 + 阴线 + 阳线
pattern_match = (
k1['is_yang'] and k2['is_yang'] and # 前两根是阳线
k3['is_yin'] and # 第三根是阴线
k4['is_yang'] # 第四根是阳线
)
if not pattern_match:
continue
# 检查前两根阳线的实体比例
yang1_valid = k1['entity_ratio'] >= self.min_entity_ratio
yang2_valid = k2['entity_ratio'] >= self.min_entity_ratio
if not (yang1_valid and yang2_valid):
continue
# 检查最后一根阳线的收盘价是否高于阴线的最高价
breakout_valid = k4['close'] > k3['high']
if not breakout_valid:
continue
# 检查最后一根阳线的实体比例
final_yang_valid = k4['entity_ratio'] >= self.final_yang_min_ratio
if not final_yang_valid:
continue
# 检查最后一根阳线是否在EMA20上方
ema20_valid = k4.get('above_ema20', False)
if not ema20_valid:
continue
# 检查最后一根阳线的换手率
turnover_ratio = k4.get('turnover_ratio', 0)
turnover_valid = turnover_ratio <= self.max_turnover_ratio
if not turnover_valid:
continue
# 构建信号
signal = {
'index': i,
'date': df.iloc[i].get('trade_date', df.index[i]),
'pattern_type': '两阳+阴+阳突破',
'k1': k1, # 第一根阳线
'k2': k2, # 第二根阳线
'k3': k3, # 阴线
'k4': k4, # 突破阳线
'yang1_entity_ratio': k1['entity_ratio'],
'yang2_entity_ratio': k2['entity_ratio'],
'final_yang_entity_ratio': k4['entity_ratio'],
'breakout_price': k4['close'],
'yin_high': k3['high'],
'breakout_amount': k4['close'] - k3['high'],
'breakout_pct': (k4['close'] - k3['high']) / k3['high'] * 100 if k3['high'] > 0 else 0,
'ema20_price': k4.get('ema20', 0),
'above_ema20': k4.get('above_ema20', False),
'turnover_ratio': turnover_ratio
}
signals.append(signal)
# 美化信号发现日志
logger.info("🎯" + "="*60)
logger.info(f"📈 发现K线形态突破信号!")
logger.info(f"📅 信号时间: {signal['date']}")
logger.info(f"💰 突破价格: {signal['breakout_price']:.2f}")
logger.info(f"📊 实体比例: 阳线1({signal['yang1_entity_ratio']:.1%}) | 阳线2({signal['yang2_entity_ratio']:.1%}) | 最后阳线({signal['final_yang_entity_ratio']:.1%})")
logger.info(f"💥 突破幅度: {signal['breakout_pct']:.2f}% (突破阴线最高价{signal['yin_high']:.2f}元)")
logger.info(f"📈 EMA20: {signal['ema20_price']:.2f}元 ({'✅上方' if signal['above_ema20'] else '❌下方'})")
logger.info(f"🔄 换手率: {signal['turnover_ratio']:.2f}% ({'✅合规' if signal['turnover_ratio'] <= self.max_turnover_ratio else '❌过高'})")
logger.info("🎯" + "="*60)
return signals
def analyze_stock(self, stock_code: str, stock_name: str = None, days: int = 60) -> Dict[str, List[Dict[str, Any]]]:
"""
分析单只股票的K线形态
Args:
stock_code: 股票代码
stock_name: 股票名称
days: 分析的天数
Returns:
各时间周期的信号字典
"""
results = {}
if stock_name is None:
# 尝试获取股票中文名称
stock_name = self.data_fetcher.get_stock_name(stock_code)
try:
# 计算开始日期
end_date = datetime.now().strftime('%Y-%m-%d')
start_date = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d')
for timeframe in self.timeframes:
logger.info(f"🔍 分析股票: {stock_code}({stock_name}) | 周期: {timeframe}")
# 获取历史数据 - 直接使用adata的原生周期支持
df = self.data_fetcher.get_historical_data(stock_code, start_date, end_date, timeframe)
if df.empty:
logger.warning(f"{stock_code} {timeframe} 数据为空")
results[timeframe] = []
continue
# 计算K线特征
df_with_features = self.calculate_kline_features(df)
# 检测形态
signals = self.detect_pattern(df_with_features)
# 处理信号
for signal in signals:
signal['stock_code'] = stock_code
signal['stock_name'] = stock_name
signal['timeframe'] = timeframe
results[timeframe] = signals
# 美化信号统计日志
if signals:
logger.info(f"{stock_code}({stock_name}) {timeframe}周期: 发现 {len(signals)} 个信号")
for i, signal in enumerate(signals, 1):
logger.info(f" 📊 信号{i}: {signal['date']} | 价格: {signal['breakout_price']:.2f}元 | 实体: {signal['final_yang_entity_ratio']:.1%}")
else:
logger.debug(f"📭 {stock_code}({stock_name}) {timeframe}周期: 无信号")
except Exception as e:
logger.error(f"分析股票 {stock_code} 失败: {e}")
for timeframe in self.timeframes:
results[timeframe] = []
return results
def _convert_to_weekly(self, daily_df: pd.DataFrame) -> pd.DataFrame:
"""
将日线数据转换为周线数据
Args:
daily_df: 日线数据
Returns:
周线数据
"""
if daily_df.empty:
return daily_df
try:
df = daily_df.copy()
# 确保有trade_date列并设置为索引
if 'trade_date' in df.columns:
df['trade_date'] = pd.to_datetime(df['trade_date'])
df.set_index('trade_date', inplace=True)
# 按周聚合
weekly_df = df.resample('W').agg({
'open': 'first',
'high': 'max',
'low': 'min',
'close': 'last',
'volume': 'sum' if 'volume' in df.columns else 'last'
}).dropna()
# 重置索引保持trade_date列
weekly_df.reset_index(inplace=True)
return weekly_df
except Exception as e:
logger.error(f"转换周线数据失败: {e}")
return pd.DataFrame()
def _convert_to_monthly(self, daily_df: pd.DataFrame) -> pd.DataFrame:
"""
将日线数据转换为月线数据
Args:
daily_df: 日线数据
Returns:
月线数据
"""
if daily_df.empty:
return daily_df
try:
df = daily_df.copy()
# 确保有trade_date列并设置为索引
if 'trade_date' in df.columns:
df['trade_date'] = pd.to_datetime(df['trade_date'])
df.set_index('trade_date', inplace=True)
# 按月聚合
monthly_df = df.resample('ME').agg({
'open': 'first',
'high': 'max',
'low': 'min',
'close': 'last',
'volume': 'sum' if 'volume' in df.columns else 'last'
}).dropna()
# 重置索引保持trade_date列
monthly_df.reset_index(inplace=True)
return monthly_df
except Exception as e:
logger.error(f"转换月线数据失败: {e}")
return pd.DataFrame()
def scan_market(self, stock_list: List[str] = None, max_stocks: int = 100, use_hot_stocks: bool = True, use_combined_sources: bool = True) -> Dict[str, Dict[str, List[Dict[str, Any]]]]:
"""
扫描市场中的股票形态
Args:
stock_list: 股票代码列表如果为None则获取热门股票
max_stocks: 最大扫描股票数量
use_hot_stocks: 是否使用热门股票数据默认True
use_combined_sources: 是否使用合并的双数据源同花顺+东财默认True
Returns:
所有股票的分析结果
"""
logger.info("🚀" + "="*70)
logger.info("🌍 开始市场K线形态扫描")
logger.info("🚀" + "="*70)
if stock_list is None:
# 优先使用热门股票数据
if use_hot_stocks:
try:
if use_combined_sources:
# 使用合并的双数据源
logger.info("获取合并热门股票数据(同花顺+东财)...")
hot_stocks = self.data_fetcher.get_combined_hot_stocks(
limit_per_source=max_stocks,
final_limit=max_stocks
)
source_info = "双数据源合并"
else:
# 仅使用同花顺数据
logger.info("获取同花顺热股TOP100数据...")
hot_stocks = self.data_fetcher.get_hot_stocks_ths(limit=max_stocks)
source_info = "同花顺热股"
if not hot_stocks.empty and 'stock_code' in hot_stocks.columns:
stock_list = hot_stocks['stock_code'].tolist()
# 统计数据源分布
if 'source' in hot_stocks.columns:
source_counts = hot_stocks['source'].value_counts().to_dict()
source_detail = " | ".join([f"{k}: {v}" for k, v in source_counts.items()])
logger.info(f"📊 数据源: {source_info} | 总计: {len(stock_list)}只股票")
logger.info(f"📈 分布详情: {source_detail}")
else:
logger.info(f"📊 数据源: {source_info} | 总计: {len(stock_list)}只股票")
else:
logger.warning("热门股票数据为空,回退到全市场股票")
use_hot_stocks = False
except Exception as e:
logger.error(f"获取热门股票失败: {e},回退到全市场股票")
use_hot_stocks = False
# 如果热股获取失败,使用全市场股票列表
if not use_hot_stocks:
try:
all_stocks = self.data_fetcher.get_stock_list()
if not all_stocks.empty:
# 随机选择一些股票进行扫描
stock_list = all_stocks['stock_code'].head(max_stocks).tolist()
logger.info(f"使用全市场股票数据,共{len(stock_list)}只股票")
else:
logger.warning("未能获取股票列表")
return {}
except Exception as e:
logger.error(f"获取股票列表失败: {e}")
return {}
results = {}
total_signals = 0
for i, stock_code in enumerate(stock_list):
# 获取股票名称
stock_name = self.data_fetcher.get_stock_name(stock_code)
logger.info(f"⏳ 扫描进度: [{i+1:3d}/{len(stock_list):3d}] 🔍 {stock_code}({stock_name})")
try:
stock_results = self.analyze_stock(stock_code)
# 统计信号数量
stock_signal_count = sum(len(signals) for signals in stock_results.values())
if stock_signal_count > 0:
results[stock_code] = stock_results
total_signals += stock_signal_count
except Exception as e:
logger.error(f"扫描股票 {stock_code} 失败: {e}")
continue
# 美化最终扫描结果
logger.info("🎉" + "="*70)
logger.info(f"🌍 市场K线形态扫描完成!")
logger.info(f"📊 扫描统计:")
logger.info(f" 🔍 总扫描股票: {len(stock_list)}")
logger.info(f" 🎯 发现信号: {total_signals}")
logger.info(f" 📈 涉及股票: {len(results)}")
if results:
logger.info(f"📋 信号详情:")
signal_count = 0
for stock_code, stock_results in results.items():
stock_name = self.data_fetcher.get_stock_name(stock_code)
for timeframe, signals in stock_results.items():
if signals:
for signal in signals:
signal_count += 1
logger.info(f" 🎯 #{signal_count}: {stock_code}({stock_name}) | {timeframe} | {signal['date']} | {signal['breakout_price']:.2f}")
logger.info("🎉" + "="*70)
# 发送汇总通知
if results:
# 判断数据源类型
data_source = '全市场股票'
if stock_list and len(stock_list) <= max_stocks:
if use_hot_stocks:
data_source = '合并热门股票' if use_combined_sources else '热门股票'
scan_stats = {
'total_scanned': len(stock_list),
'data_source': data_source
}
try:
self.notification_manager.send_strategy_summary(results, scan_stats)
logger.info("📱 汇总通知已发送")
except Exception as e:
logger.error(f"发送汇总通知失败: {e}")
return results
def get_strategy_summary(self) -> str:
"""获取策略说明"""
return f"""
K线形态策略 - 两阳线+阴线+阳线突破
策略逻辑
1. 识别连续4根K线阳线 + 阳线 + 阴线 + 阳线
2. 前两根阳线实体部分须占振幅的 {self.min_entity_ratio:.0%} 以上
3. 最后阳线实体部分须占振幅的 {self.final_yang_min_ratio:.0%} 以上
4. 最后阳线收盘价须高于阴线最高价突破确认
5. 最后阳线收盘价须在EMA20上方趋势确认
6. 最后阳线换手率不高于 {self.max_turnover_ratio:.1f}%流动性约束
7. 支持时间周期{', '.join(self.timeframes)}
信号触发条件
- 形态完整匹配
- 实体比例达标
- 价格突破确认
- EMA20趋势确认
- 换手率约束达标
扫描范围
- 优先使用双数据源合并同花顺热股+东财人气榜
- 自动去重保留最优质股票
- 回退到全市场股票列表
通知方式
- 钉钉webhook汇总推送单次发送所有信号
- 系统日志详细记录
"""
if __name__ == "__main__":
# 测试代码
from ..data.data_fetcher import ADataFetcher
from ..utils.notification import NotificationManager
# 模拟配置
strategy_config = {
'min_entity_ratio': 0.55,
'timeframes': ['daily']
}
notification_config = {
'dingtalk': {
'enabled': False,
'webhook_url': ''
}
}
# 初始化组件
data_fetcher = ADataFetcher()
notification_manager = NotificationManager(notification_config)
strategy = KLinePatternStrategy(data_fetcher, notification_manager, strategy_config)
print("K线形态策略初始化完成")
print(strategy.get_strategy_summary())

0
src/utils/__init__.py Normal file
View File

120
src/utils/config_loader.py Normal file
View File

@ -0,0 +1,120 @@
"""
配置文件加载器
"""
import yaml
from pathlib import Path
from typing import Dict, Any
import os
class ConfigLoader:
"""配置文件加载器"""
def __init__(self, config_path: str = None):
"""
初始化配置加载器
Args:
config_path: 配置文件路径默认为项目根目录下的config/config.yaml
"""
if config_path is None:
# 获取项目根目录
current_file = Path(__file__)
project_root = current_file.parent.parent.parent
config_path = project_root / "config" / "config.yaml"
self.config_path = Path(config_path)
self._config = None
def load_config(self) -> Dict[str, Any]:
"""
加载配置文件
Returns:
配置字典
"""
try:
with open(self.config_path, 'r', encoding='utf-8') as file:
self._config = yaml.safe_load(file)
return self._config
except FileNotFoundError:
raise FileNotFoundError(f"配置文件不存在: {self.config_path}")
except yaml.YAMLError as e:
raise ValueError(f"配置文件格式错误: {e}")
@property
def config(self) -> Dict[str, Any]:
"""
获取配置
Returns:
配置字典
"""
if self._config is None:
self._config = self.load_config()
return self._config
def get(self, key: str, default: Any = None) -> Any:
"""
获取配置项
Args:
key: 配置键支持点号分隔的嵌套键
default: 默认值
Returns:
配置值
"""
config = self.config
keys = key.split('.')
try:
for k in keys:
config = config[k]
return config
except (KeyError, TypeError):
return default
def get_trading_config(self) -> Dict[str, Any]:
"""获取交易配置"""
return self.get('trading', {})
def get_data_config(self) -> Dict[str, Any]:
"""获取数据配置"""
return self.get('data', {})
def get_strategy_config(self) -> Dict[str, Any]:
"""获取策略配置"""
return self.get('strategy', {})
def get_monitor_config(self) -> Dict[str, Any]:
"""获取监控配置"""
return self.get('monitor', {})
def get_logging_config(self) -> Dict[str, Any]:
"""获取日志配置"""
return self.get('logging', {})
# 全局配置实例
config_loader = ConfigLoader()
if __name__ == "__main__":
# 测试配置加载
loader = ConfigLoader()
config = loader.load_config()
print("完整配置:")
print(yaml.dump(config, default_flow_style=False, allow_unicode=True))
print("\n交易配置:")
print(loader.get_trading_config())
print("\n数据配置:")
print(loader.get_data_config())
print("\n单个配置项:")
print(f"交易开始时间: {loader.get('trading.trading_hours.start')}")
print(f"最大仓位: {loader.get('trading.risk_management.max_total_position')}")

416
src/utils/notification.py Normal file
View File

@ -0,0 +1,416 @@
"""
通知模块 - 支持钉钉webhook通知
"""
import requests
import json
import time
import hmac
import hashlib
import base64
import urllib.parse
from typing import Dict, Any, Optional
from loguru import logger
from datetime import datetime
class DingTalkNotifier:
"""钉钉机器人通知器"""
def __init__(self, webhook_url: str, secret: str = None):
"""
初始化钉钉通知器
Args:
webhook_url: 钉钉机器人webhook地址
secret: 加签密钥如果提供则启用加签验证
"""
self.webhook_url = webhook_url
self.secret = secret
self.session = requests.Session()
logger.info(f"钉钉通知器初始化完成 {'(已启用加签)' if secret else '(未启用加签)'}")
def _generate_signature(self, timestamp: str) -> str:
"""
生成钉钉加签
Args:
timestamp: 时间戳字符串
Returns:
加签结果
"""
if not self.secret:
return ""
string_to_sign = f"{timestamp}\n{self.secret}"
hmac_code = hmac.new(
self.secret.encode('utf-8'),
string_to_sign.encode('utf-8'),
digestmod=hashlib.sha256
).digest()
sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
return sign
def _get_signed_url(self) -> str:
"""
获取带加签的webhook URL
Returns:
带加签的URL如果未配置密钥则返回原URL
"""
if not self.secret:
return self.webhook_url
timestamp = str(round(time.time() * 1000))
sign = self._generate_signature(timestamp)
# 添加时间戳和签名参数
separator = '&' if '?' in self.webhook_url else '?'
return f"{self.webhook_url}{separator}timestamp={timestamp}&sign={sign}"
def send_text_message(self, content: str, at_all: bool = False, at_mobiles: list = None) -> bool:
"""
发送文本消息
Args:
content: 消息内容
at_all: 是否@所有人
at_mobiles: @指定手机号列表
Returns:
发送是否成功
"""
try:
data = {
"msgtype": "text",
"text": {
"content": content
}
}
# 添加@功能
if at_all or at_mobiles:
data["at"] = {}
if at_all:
data["at"]["isAtAll"] = True
if at_mobiles:
data["at"]["atMobiles"] = at_mobiles
response = self.session.post(
self._get_signed_url(),
json=data,
headers={'Content-Type': 'application/json'},
timeout=10
)
if response.status_code == 200:
result = response.json()
if result.get('errcode') == 0:
logger.info("钉钉消息发送成功")
return True
else:
logger.error(f"钉钉消息发送失败: {result.get('errmsg', '未知错误')}")
return False
else:
logger.error(f"钉钉API请求失败: HTTP {response.status_code}")
return False
except Exception as e:
logger.error(f"发送钉钉消息异常: {e}")
return False
def send_markdown_message(self, title: str, text: str, at_all: bool = False, at_mobiles: list = None) -> bool:
"""
发送Markdown格式消息
Args:
title: 消息标题
text: Markdown格式的消息内容
at_all: 是否@所有人
at_mobiles: @指定手机号列表
Returns:
发送是否成功
"""
try:
data = {
"msgtype": "markdown",
"markdown": {
"title": title,
"text": text
}
}
# 添加@功能
if at_all or at_mobiles:
data["at"] = {}
if at_all:
data["at"]["isAtAll"] = True
if at_mobiles:
data["at"]["atMobiles"] = at_mobiles
response = self.session.post(
self._get_signed_url(),
json=data,
headers={'Content-Type': 'application/json'},
timeout=10
)
if response.status_code == 200:
result = response.json()
if result.get('errcode') == 0:
logger.info("钉钉Markdown消息发送成功")
return True
else:
logger.error(f"钉钉Markdown消息发送失败: {result.get('errmsg', '未知错误')}")
return False
else:
logger.error(f"钉钉API请求失败: HTTP {response.status_code}")
return False
except Exception as e:
logger.error(f"发送钉钉Markdown消息异常: {e}")
return False
def send_strategy_summary_message(self, title: str, markdown_text: str) -> bool:
"""
发送策略汇总消息Markdown格式
Args:
title: 消息标题
markdown_text: Markdown格式的消息内容
Returns:
发送是否成功
"""
return self.send_markdown_message(title, markdown_text)
def send_strategy_signal(self, stock_code: str, stock_name: str, timeframe: str,
signal_type: str, price: float, signal_date: str = None, additional_info: Dict[str, Any] = None) -> bool:
"""
发送策略信号通知
Args:
stock_code: 股票代码
stock_name: 股票名称
timeframe: 时间周期
signal_type: 信号类型
price: 当前价格
signal_date: 信号发生的时间K线时间
additional_info: 额外信息
Returns:
发送是否成功
"""
try:
# 使用信号时间或当前时间
display_time = signal_date if signal_date else datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 构建Markdown消息
title = f"📈 {signal_type}信号提醒"
markdown_text = f"""
# 📈 {signal_type}信号提醒
**股票信息:**
- 代码: `{stock_code}`
- 名称: `{stock_name}`
- 价格: `{price}`
- 时间周期: `{timeframe}`
**信号时间:** {display_time}
**策略说明:** 两阳线+阴线+阳线形态突破
---
*量化交易系统自动发送*
"""
# 添加额外信息
if additional_info:
markdown_text += "\n**额外信息:**\n"
for key, value in additional_info.items():
markdown_text += f"- {key}: `{value}`\n"
return self.send_markdown_message(title, markdown_text)
except Exception as e:
logger.error(f"发送策略信号通知异常: {e}")
return False
class NotificationManager:
"""通知管理器"""
def __init__(self, config: Dict[str, Any]):
"""
初始化通知管理器
Args:
config: 通知配置
"""
self.config = config
self.dingtalk_notifier = None
# 初始化钉钉通知器
dingtalk_config = config.get('dingtalk', {})
if dingtalk_config.get('enabled', False):
webhook_url = dingtalk_config.get('webhook_url')
secret = dingtalk_config.get('secret')
if webhook_url:
self.dingtalk_notifier = DingTalkNotifier(webhook_url, secret)
logger.info("钉钉通知器已启用")
else:
logger.warning("钉钉通知已启用但未配置webhook_url")
def send_strategy_signal(self, stock_code: str, stock_name: str, timeframe: str,
signal_type: str, price: float, signal_date: str = None, additional_info: Dict[str, Any] = None) -> bool:
"""
发送策略信号到所有启用的通知渠道
Args:
stock_code: 股票代码
stock_name: 股票名称
timeframe: 时间周期
signal_type: 信号类型
price: 当前价格
signal_date: 信号发生的时间K线时间
additional_info: 额外信息
Returns:
是否至少有一个渠道发送成功
"""
success = False
# 钉钉通知
if self.dingtalk_notifier:
if self.dingtalk_notifier.send_strategy_signal(
stock_code, stock_name, timeframe, signal_type, price, signal_date, additional_info
):
success = True
# 记录到日志
logger.info(f"策略信号: {signal_type} | {stock_code}({stock_name}) | {timeframe} | {price}")
if additional_info:
logger.info(f"额外信息: {additional_info}")
return success
def send_strategy_summary(self, all_signals: Dict[str, Any], scan_stats: Dict[str, Any] = None) -> bool:
"""
发送策略信号汇总通知
Args:
all_signals: 所有信号的汇总数据 {stock_code: {timeframe: [signals]}}
scan_stats: 扫描统计信息
Returns:
发送是否成功
"""
if not all_signals:
return False
try:
from datetime import datetime
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 统计信号数量
total_signals = 0
total_stocks = len(all_signals)
signal_summary = []
for stock_code, stock_results in all_signals.items():
stock_signal_count = sum(len(signals) for signals in stock_results.values())
total_signals += stock_signal_count
if stock_signal_count > 0:
# 获取股票名称和最新信号
stock_name = "未知"
latest_signal = None
for timeframe, signals in stock_results.items():
if signals:
latest_signal = signals[-1] # 取最新信号
stock_name = latest_signal.get('stock_name', stock_name)
break
if latest_signal:
signal_summary.append({
'stock_code': stock_code,
'stock_name': stock_name,
'signal_count': stock_signal_count,
'price': latest_signal['breakout_price'],
'date': latest_signal['date'],
'turnover': latest_signal.get('turnover_ratio', 0)
})
# 构建汇总消息
title = f"📈 K线形态策略信号汇总"
markdown_text = f"""
# 📈 K线形态策略信号汇总
**扫描统计:**
- 扫描时间: {current_time}
- 发现信号: `{total_signals}`
- 涉及股票: `{total_stocks}`
**信号详情:**
"""
# 添加每只股票的信号摘要
for i, signal in enumerate(signal_summary[:10], 1): # 最多显示10只股票
markdown_text += f"""
{i}. **{signal['stock_code']} - {signal['stock_name']}**
- 价格: `{signal['price']:.2f}`
- 信号数: `{signal['signal_count']}`
- 换手率: `{signal['turnover']:.2f}%`
- 时间: `{signal['date']}`
"""
if len(signal_summary) > 10:
markdown_text += f"\n*还有 {len(signal_summary) - 10} 只股票...*\n"
# 添加扫描统计(如果提供)
if scan_stats:
markdown_text += f"""
**扫描范围:**
- 扫描股票总数: `{scan_stats.get('total_scanned', 'N/A')}`
- 数据源: `{scan_stats.get('data_source', '热门股票')}`
"""
markdown_text += """
---
**策略说明:** 两阳线+阴线+阳线形态突破
*量化交易系统自动发送*
"""
# 发送钉钉通知
if self.dingtalk_notifier:
return self.dingtalk_notifier.send_markdown_message(title, markdown_text)
return False
except Exception as e:
logger.error(f"发送策略汇总通知异常: {e}")
return False
def send_test_message(self) -> bool:
"""发送测试消息"""
if self.dingtalk_notifier:
return self.dingtalk_notifier.send_text_message("量化交易系统通知测试 ✅")
return False
if __name__ == "__main__":
# 测试代码
# 注意: 需要有效的钉钉webhook地址才能测试
test_config = {
'dingtalk': {
'enabled': False, # 设置为True并提供webhook_url进行测试
'webhook_url': 'https://oapi.dingtalk.com/robot/send?access_token=YOUR_TOKEN'
}
}
notifier = NotificationManager(test_config)
print("通知管理器初始化完成")

96
test_dingtalk.py Normal file
View File

@ -0,0 +1,96 @@
#!/usr/bin/env python3
"""
测试钉钉通知功能
"""
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'src'))
from utils.notification import DingTalkNotifier, NotificationManager
import yaml
def test_dingtalk_with_secret():
"""测试带加签的钉钉通知"""
print("🔧 测试钉钉加签功能...")
# 测试加签生成
webhook_url = "https://oapi.dingtalk.com/robot/send?access_token=YOUR_TOKEN"
secret = "SEC6e9dbd71d4addd2c4e673fb72d686293b342da5ae48da2f8ec788a68de99f981"
notifier = DingTalkNotifier(webhook_url, secret)
# 生成签名URL
signed_url = notifier._get_signed_url()
print(f"✅ 签名URL生成成功")
print(f"📄 原始URL: {webhook_url}")
print(f"🔐 签名URL: {signed_url}")
# 检查URL格式
if "timestamp=" in signed_url and "sign=" in signed_url:
print("✅ 加签参数正确添加")
else:
print("❌ 加签参数缺失")
return False
return True
def test_notification_manager():
"""测试通知管理器配置"""
print("\n🔧 测试通知管理器配置...")
# 从配置文件读取配置
try:
with open('config/config.yaml', 'r', encoding='utf-8') as f:
config = yaml.safe_load(f)
notification_config = config.get('notification', {})
print(f"✅ 配置文件加载成功")
print(f"📄 钉钉配置: {notification_config.get('dingtalk', {})}")
# 创建通知管理器
notifier_manager = NotificationManager(notification_config)
if notifier_manager.dingtalk_notifier:
print("✅ 钉钉通知器初始化成功")
if notifier_manager.dingtalk_notifier.secret:
print("✅ 加签密钥配置正确")
else:
print("❌ 加签密钥未配置")
return False
else:
print("❌ 钉钉通知器未启用")
return False
return True
except Exception as e:
print(f"❌ 配置测试失败: {e}")
return False
def main():
print("=" * 60)
print(" 钉钉通知功能测试")
print("=" * 60)
# 测试加签功能
test1_passed = test_dingtalk_with_secret()
# 测试配置管理
test2_passed = test_notification_manager()
print("\n" + "=" * 60)
print("测试结果:")
print(f"🔐 加签功能测试: {'✅ 通过' if test1_passed else '❌ 失败'}")
print(f"⚙️ 配置管理测试: {'✅ 通过' if test2_passed else '❌ 失败'}")
if test1_passed and test2_passed:
print("\n🎉 所有测试通过!钉钉通知功能配置正确")
print("💡 注意: 需要提供完整的webhook URL才能发送实际消息")
else:
print("\n❌ 部分测试失败,请检查配置")
print("=" * 60)
if __name__ == "__main__":
main()

135
test_sentiment.py Normal file
View File

@ -0,0 +1,135 @@
#!/usr/bin/env python3
"""
舆情数据功能测试脚本
"""
import sys
from pathlib import Path
# 将src目录添加到Python路径
current_dir = Path(__file__).parent
src_dir = current_dir / "src"
sys.path.insert(0, str(src_dir))
from src.data.sentiment_fetcher import SentimentFetcher
def test_sentiment_features():
"""测试舆情功能"""
print("="*60)
print(" A股舆情数据功能测试")
print("="*60)
fetcher = SentimentFetcher()
# 1. 测试北向资金
print("\n🌊 1. 北向资金数据测试")
print("-" * 30)
current_flow = fetcher.get_north_flow_current()
if not current_flow.empty:
row = current_flow.iloc[0]
print(f"总净流入: {row.get('net_tgt', 'N/A')} 万元")
print(f"沪股通: {row.get('net_hgt', 'N/A')} 万元")
print(f"深股通: {row.get('net_sgt', 'N/A')} 万元")
print(f"更新时间: {row.get('trade_time', 'N/A')}")
else:
print("未获取到当前北向资金数据")
# 2. 测试热门股票
print("\n🔥 2. 热门股票数据测试")
print("-" * 30)
hot_stocks = fetcher.get_popular_stocks_east_100()
if not hot_stocks.empty:
print(f"东财人气股票TOP5:")
for idx, row in hot_stocks.head(5).iterrows():
code = row.get('stock_code', 'N/A')
name = row.get('short_name', 'N/A')
print(f" {idx + 1}. {code} - {name}")
else:
print("未获取到热门股票数据")
# 3. 测试龙虎榜
print("\n🐉 3. 龙虎榜数据测试")
print("-" * 30)
dragon_tiger = fetcher.get_dragon_tiger_list_daily()
if not dragon_tiger.empty:
print(f"今日龙虎榜 (共{len(dragon_tiger)}只股票):")
for idx, row in dragon_tiger.head(5).iterrows():
code = row.get('stock_code', 'N/A')
name = row.get('short_name', 'N/A')
reason = row.get('reason', 'N/A')
print(f" {idx + 1}. {code} - {name}")
print(f" 上榜原因: {reason}")
else:
print("今日无龙虎榜数据")
# 4. 测试热门概念
print("\n💡 4. 热门概念数据测试")
print("-" * 30)
try:
hot_concepts = fetcher.get_hot_concept_ths_20()
if not hot_concepts.empty:
print(f"同花顺热门概念TOP5:")
for idx, row in hot_concepts.head(5).iterrows():
name = row.get('concept_name', 'N/A')
change_pct = row.get('change_pct', 'N/A')
print(f" {idx + 1}. {name} (涨跌幅: {change_pct}%)")
else:
print("未获取到热门概念数据")
except Exception as e:
print(f"热门概念获取失败: {e}")
# 5. 测试市场舆情综合概览
print("\n📊 5. 市场舆情综合概览测试")
print("-" * 30)
try:
overview = fetcher.get_market_sentiment_overview()
if overview:
print("✅ 市场舆情概览获取成功")
# 北向资金
if 'north_flow' in overview:
north_data = overview['north_flow']
print(f"北向资金: 总净流入 {north_data.get('net_total', 'N/A')} 万元")
# 热门股票
if 'hot_stocks_east' in overview and not overview['hot_stocks_east'].empty:
count = len(overview['hot_stocks_east'])
print(f"热门股票: 获取到 {count}")
# 龙虎榜
if 'dragon_tiger' in overview and not overview['dragon_tiger'].empty:
count = len(overview['dragon_tiger'])
print(f"龙虎榜: 获取到 {count}")
else:
print("市场舆情概览获取失败")
except Exception as e:
print(f"市场舆情概览测试失败: {e}")
# 6. 测试个股舆情分析
print("\n🔍 6. 个股舆情分析测试")
print("-" * 30)
test_stock = "000001.SZ" # 平安银行
try:
analysis = fetcher.analyze_stock_sentiment(test_stock)
if 'error' not in analysis:
print(f"{test_stock} 舆情分析成功")
print(f"东财人气榜: {'在榜' if analysis.get('in_popular_east', False) else '不在榜'}")
print(f"同花顺热门榜: {'在榜' if analysis.get('in_hot_ths', False) else '不在榜'}")
if 'dragon_tiger' in analysis and not analysis['dragon_tiger'].empty:
print("✅ 今日上榜龙虎榜")
else:
print("❌ 今日未上榜龙虎榜")
else:
print(f"个股舆情分析失败: {analysis.get('error', '未知错误')}")
except Exception as e:
print(f"个股舆情分析测试失败: {e}")
print("\n" + "="*60)
print(" 舆情数据功能测试完成")
print("="*60)
if __name__ == "__main__":
test_sentiment_features()

185
test_strategy.py Normal file
View File

@ -0,0 +1,185 @@
#!/usr/bin/env python3
"""
K线形态策略测试脚本
"""
import sys
from pathlib import Path
import pandas as pd
import numpy as np
# 将src目录添加到Python路径
current_dir = Path(__file__).parent
src_dir = current_dir / "src"
sys.path.insert(0, str(src_dir))
from src.data.data_fetcher import ADataFetcher
from src.utils.notification import NotificationManager
from src.strategy.kline_pattern_strategy import KLinePatternStrategy
def create_test_kline_data():
"""创建测试K线数据 - 包含两阳线+阴线+阳线形态"""
dates = pd.date_range('2023-01-01', periods=10, freq='D')
# 模拟K线数据
test_data = {
'trade_date': dates,
'open': [10.0, 10.5, 11.0, 12.0, 11.5, 11.0, 10.5, 11.0, 11.8, 12.5],
'high': [10.8, 11.2, 11.8, 12.5, 12.0, 11.5, 11.2, 11.5, 12.2, 13.0],
'low': [9.8, 10.3, 10.8, 11.8, 10.8, 10.5, 10.2, 10.8, 11.6, 12.3],
'close':[10.6, 11.0, 11.5, 12.2, 11.0, 10.8, 11.2, 11.3, 12.0, 12.8],
'volume': [1000] * 10
}
df = pd.DataFrame(test_data)
print("测试K线数据:")
print(df)
print()
return df
def test_pattern_detection():
"""测试形态检测功能"""
print("="*60)
print(" K线形态检测功能测试")
print("="*60)
# 创建测试配置
strategy_config = {
'min_entity_ratio': 0.55,
'timeframes': ['daily'],
'scan_stocks_count': 10,
'analysis_days': 60
}
notification_config = {
'dingtalk': {
'enabled': False,
'webhook_url': ''
}
}
# 初始化组件
data_fetcher = ADataFetcher()
notification_manager = NotificationManager(notification_config)
strategy = KLinePatternStrategy(data_fetcher, notification_manager, strategy_config)
print("1. 策略信息:")
print(strategy.get_strategy_summary())
print("\n2. 测试K线特征计算:")
test_df = create_test_kline_data()
df_with_features = strategy.calculate_kline_features(test_df)
print("添加特征后的数据:")
relevant_cols = ['trade_date', 'open', 'high', 'low', 'close', 'is_yang', 'is_yin', 'entity_ratio']
print(df_with_features[relevant_cols])
print("\n3. 测试形态检测:")
signals = strategy.detect_pattern(df_with_features)
if signals:
print(f"发现 {len(signals)} 个形态信号:")
for i, signal in enumerate(signals, 1):
print(f"\n信号 {i}:")
print(f" 日期: {signal['date']}")
print(f" 形态: {signal['pattern_type']}")
print(f" 突破价格: {signal['breakout_price']:.2f}")
print(f" 突破幅度: {signal['breakout_pct']:.2f}%")
print(f" 阳线1实体比例: {signal['yang1_entity_ratio']:.1%}")
print(f" 阳线2实体比例: {signal['yang2_entity_ratio']:.1%}")
else:
print("未发现形态信号")
print("\n4. 测试真实股票数据:")
test_stocks = ["000001.SZ", "000002.SZ"] # 平安银行、万科A
for stock_code in test_stocks:
print(f"\n分析股票: {stock_code}")
try:
results = strategy.analyze_stock(stock_code, days=30) # 分析最近30天
total_signals = sum(len(signals) for signals in results.values())
print(f"总信号数: {total_signals}")
for timeframe, signals in results.items():
if signals:
print(f"{timeframe}: {len(signals)}个信号")
# 显示最新信号
latest = signals[-1]
print(f" 最新: {latest['date']} {latest['breakout_price']:.2f}")
else:
print(f"{timeframe}: 无信号")
except Exception as e:
print(f"分析失败: {e}")
print("\n5. 测试通知功能:")
try:
# 测试日志通知
notification_manager.send_strategy_signal(
stock_code="TEST001",
stock_name="测试股票",
timeframe="daily",
signal_type="测试信号",
price=15.50,
additional_info={
"阳线1实体比例": "65%",
"阳线2实体比例": "70%",
"突破幅度": "2.5%"
}
)
print("✅ 通知功能测试完成(日志记录)")
except Exception as e:
print(f"❌ 通知功能测试失败: {e}")
print("\n" + "="*60)
print(" 策略测试完成")
print("="*60)
def test_weekly_monthly_conversion():
"""测试周线月线转换功能"""
print("\n测试周线/月线数据转换:")
# 创建更多天数的测试数据
dates = pd.date_range('2023-01-01', periods=50, freq='D')
test_data = {
'trade_date': dates,
'open': np.random.uniform(10, 15, 50),
'high': np.random.uniform(15, 20, 50),
'low': np.random.uniform(8, 12, 50),
'close': np.random.uniform(10, 15, 50),
'volume': np.random.randint(1000, 5000, 50)
}
daily_df = pd.DataFrame(test_data)
strategy_config = {'min_entity_ratio': 0.55, 'timeframes': ['daily']}
notification_config = {'dingtalk': {'enabled': False}}
data_fetcher = ADataFetcher()
notification_manager = NotificationManager(notification_config)
strategy = KLinePatternStrategy(data_fetcher, notification_manager, strategy_config)
# 测试周线转换
weekly_df = strategy._convert_to_weekly(daily_df)
print(f"日线数据: {len(daily_df)}")
print(f"周线数据: {len(weekly_df)}")
# 测试月线转换
monthly_df = strategy._convert_to_monthly(daily_df)
print(f"月线数据: {len(monthly_df)}")
if not weekly_df.empty:
print("\n周线数据样本:")
print(weekly_df[['trade_date', 'open', 'high', 'low', 'close']].head())
if __name__ == "__main__":
test_pattern_detection()
test_weekly_monthly_conversion()