stock-ai-agent/backend/app/stock_agent/stock_agent.py
2026-02-19 21:20:20 +08:00

357 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
美股交易智能体 - 主控制器LLM 驱动版)
只进行市场分析和通知,不执行模拟交易
"""
import asyncio
from typing import Dict, Any, List, Optional
from datetime import datetime, timedelta
import pandas as pd
from app.utils.logger import logger
from app.config import get_settings
from app.services.yfinance_service import get_yfinance_service
from app.services.feishu_service import get_feishu_service
from app.services.telegram_service import get_telegram_service
from app.crypto_agent.llm_signal_analyzer import LLMSignalAnalyzer
class StockAgent:
"""美股交易信号智能体LLM 驱动,仅分析通知)"""
def __init__(self):
"""初始化智能体"""
self.settings = get_settings()
self.yfinance = get_yfinance_service()
self.feishu = get_feishu_service()
self.telegram = get_telegram_service()
self.llm_analyzer = LLMSignalAnalyzer()
# 状态管理
self.last_signals: Dict[str, Dict[str, Any]] = {}
self.signal_cooldown: Dict[str, datetime] = {}
# 配置
self.symbols = self.settings.stock_symbols.split(',')
# 运行状态
self.running = False
self._event_loop = None
self._task = None
logger.info(f"美股智能体初始化完成,监控股票: {self.symbols}")
async def start(self):
"""启动智能体"""
if self.running:
logger.warning("美股智能体已在运行中")
return
self.running = True
self._event_loop = asyncio.get_event_loop()
logger.info("美股智能体已启动")
# 启动分析任务
self._task = asyncio.create_task(self._analysis_loop())
async def stop(self):
"""停止智能体"""
self.running = False
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
logger.info("美股智能体已停止")
async def _analysis_loop(self):
"""分析循环 - 只在美股交易时间内运行"""
while self.running:
try:
# 检查是否在美股交易时间
if not self._is_market_hours():
# 不在交易时间,等待 10 分钟后再次检查
logger.debug("非美股交易时间,等待中...")
await asyncio.sleep(600)
continue
# 在交易时间内,分析所有股票
for symbol in self.symbols:
if not self.running:
break
await self.analyze_symbol(symbol)
# 等待 1 小时后进行下次分析
logger.info("本次分析完成,等待 1 小时后进行下次分析...")
await asyncio.sleep(3600)
except Exception as e:
logger.error(f"分析循环出错: {e}")
await asyncio.sleep(60) # 出错后等待 1 分钟再重试
def _is_market_hours(self) -> bool:
"""
判断当前是否在美股交易时间
美股交易时间: 周一至周五 9:30-16:00 (EST)
北京时间:
- 冬令时 (11月-3月): 22:30-05:00 (次日)
- 夏令时 (3月-11月): 21:30-04:00 (次日)
Returns:
是否在交易时间
"""
from datetime import datetime
# 获取当前时间
now = datetime.now()
# 检查是否为周末
if now.weekday() >= 5: # 5=周六, 6=周日
return False
# 获取当前小时和分钟
hour = now.hour
minute = now.minute
current_time = hour * 100 + minute # 转换为数字,如 2130 表示 21:30
# 判断夏令时/冬令时简单判断3-11月为夏令时
is_summer = 3 <= now.month <= 11
if is_summer:
# 夏令时: 21:30-04:00 (次日)
# 即 2130-2359 或 0000-0400
if current_time >= 2130 or current_time < 400:
return True
else:
# 冬令时: 22:30-05:00 (次日)
# 即 2230-2359 或 0000-0500
if current_time >= 2230 or current_time < 500:
return True
return False
async def analyze_symbol(self, symbol: str):
"""
分析单个股票
Args:
symbol: 股票代码
"""
try:
# 1. 获取多时间周期数据
data = self.yfinance.get_multi_timeframe_data(symbol)
# 2. 验证数据完整性
if not self._validate_data(data):
logger.warning(f"{symbol} 数据不完整,跳过本次分析")
return
# 3. 获取当前价格
ticker = self.yfinance.get_ticker(symbol)
if not ticker:
logger.warning(f"无法获取 {symbol} 当前价格")
return
current_price = ticker['lastPrice']
logger.info(f"\n{'='*60}")
logger.info(f"📊 分析 {symbol} @ ${current_price:,.2f}")
logger.info(f"{'='*60}")
# 4. LLM 分析
logger.info(f"\n🤖 【LLM 分析中...】")
result = await self.llm_analyzer.analyze(
symbol, data,
symbols=self.symbols,
position_info=None # 美股不跟踪持仓
)
# 输出分析摘要
summary = result.get('analysis_summary', '')
logger.info(f" 市场状态: {summary}")
# 输出新闻情绪
news_sentiment = result.get('news_sentiment', '')
news_impact = result.get('news_impact', '')
if news_sentiment:
sentiment_icon = {'positive': '📈', 'negative': '📉', 'neutral': ''}.get(news_sentiment, '')
logger.info(f" 新闻情绪: {sentiment_icon} {news_sentiment}")
if news_impact:
logger.info(f" 消息影响: {news_impact}")
# 输出关键价位
levels = result.get('key_levels', {})
if levels.get('support') or levels.get('resistance'):
support_str = ', '.join([f"${s:,.2f}" for s in levels.get('support', [])[:2]])
resistance_str = ', '.join([f"${r:,.2f}" for r in levels.get('resistance', [])[:2]])
logger.info(f" 支撑位: {support_str or '-'}")
logger.info(f" 阻力位: {resistance_str or '-'}")
# 5. 处理信号
signals = result.get('signals', [])
if not signals:
logger.info(f"\n⏸️ 结论: 无交易信号,继续观望")
return
# 输出所有信号
logger.info(f"\n🎯 【发现 {len(signals)} 个信号】")
for sig in signals:
signal_type = sig.get('type', 'unknown')
type_map = {'short_term': '短线', 'medium_term': '中线', 'long_term': '长线'}
type_text = type_map.get(signal_type, signal_type)
action = sig.get('action', 'wait')
action_map = {'buy': '🟢 做多', 'sell': '🔴 做空'}
action_text = action_map.get(action, action)
grade = sig.get('grade', 'D')
confidence = sig.get('confidence', 0)
grade_icon = {'A': '⭐⭐⭐', 'B': '⭐⭐', 'C': '', 'D': ''}.get(grade, '')
logger.info(f"\n {type_text} {action_text} [{grade}{grade_icon}] {confidence}%")
# 6. 过滤并通知最佳信号
best_signal = self._get_best_signal(signals)
if not best_signal:
logger.info(f"\n⏸️ 信号质量不高,不发送通知")
return
# 检查置信度阈值
threshold = self.settings.stock_llm_threshold * 100
if best_signal.get('confidence', 0) < threshold:
logger.info(f"\n⏸️ 置信度不足 ({best_signal.get('confidence', 0)}% < {threshold}%)")
return
# 检查冷却时间
if not self._should_send_signal(symbol, best_signal):
logger.info(f"\n⏸️ 信号冷却中,不发送通知")
return
# 发送通知
await self._send_signal_notification(symbol, best_signal, current_price)
# 更新状态
self.last_signals[symbol] = best_signal
self.signal_cooldown[symbol] = datetime.now()
except Exception as e:
logger.error(f"❌ 分析 {symbol} 出错: {e}")
import traceback
logger.error(traceback.format_exc())
def _get_best_signal(self, signals: List[Dict[str, Any]]) -> Optional[Dict[str, Any]]:
"""获取最佳信号"""
# 过滤掉 D 级信号
valid_signals = [s for s in signals if s.get('grade', 'D') != 'D']
if not valid_signals:
return None
# 按等级和置信度排序
grade_order = {'A': 0, 'B': 1, 'C': 2}
valid_signals.sort(key=lambda x: (
grade_order.get(x.get('grade', 'C'), 3),
-x.get('confidence', 0)
))
return valid_signals[0]
def _should_send_signal(self, symbol: str, signal: Dict[str, Any]) -> bool:
"""判断是否应该发送信号"""
action = signal.get('action', 'wait')
if action == 'wait':
return False
# 检查冷却时间60分钟内不重复发送相同方向的信号
if symbol in self.signal_cooldown:
cooldown_end = self.signal_cooldown[symbol] + timedelta(minutes=60)
if datetime.now() < cooldown_end:
if symbol in self.last_signals:
if self.last_signals[symbol].get('action') == action:
logger.debug(f"{symbol} 信号冷却中,跳过")
return False
return True
async def _send_signal_notification(
self,
symbol: str,
signal: Dict[str, Any],
current_price: float
):
"""发送信号通知"""
try:
# 使用正确的方法格式化信号
card = self.llm_analyzer.format_feishu_card(signal, symbol)
title = card['title']
content = card['content']
# 发送到飞书
await self.feishu.send_markdown(title, content)
# 发送到 Telegram
await self.telegram.send_message(self.llm_analyzer.format_signal_message(signal, symbol))
logger.info(f"✅ 信号通知已发送: {title}")
except Exception as e:
logger.error(f"发送通知失败: {e}")
def _validate_data(self, data: Dict[str, pd.DataFrame]) -> bool:
"""验证数据完整性"""
required_intervals = ['1d', '1h']
for interval in required_intervals:
if interval not in data or data[interval].empty:
return False
if len(data[interval]) < 20:
return False
return True
async def analyze_once(self, symbol: str) -> Dict[str, Any]:
"""单次分析(用于测试或手动触发)"""
data = self.yfinance.get_multi_timeframe_data(symbol)
if not self._validate_data(data):
return {'error': '数据不完整'}
result = await self.llm_analyzer.analyze(
symbol, data,
symbols=self.symbols,
position_info=None
)
return result
def get_status(self) -> Dict[str, Any]:
"""获取智能体状态"""
return {
'running': self.running,
'symbols': self.symbols,
'mode': 'LLM 驱动(仅分析通知)',
'last_signals': {
symbol: {
'type': sig.get('type'),
'action': sig.get('action'),
'confidence': sig.get('confidence'),
'grade': sig.get('grade')
}
for symbol, sig in self.last_signals.items()
}
}
# 全局单例
_stock_agent: Optional[StockAgent] = None
def get_stock_agent() -> StockAgent:
"""获取美股智能体单例"""
global _stock_agent
if _stock_agent is None:
_stock_agent = StockAgent()
return _stock_agent