astock-agent/backend/app/analysis/signals.py
2026-04-30 10:05:41 +08:00

311 lines
9.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""买卖信号生成
基于量价与趋势判断辅助信号。RSI 等滞后指标只做风险备注,不主导推荐。
"""
import logging
import pandas as pd
import numpy as np
from app.data.tushare_client import tushare_client
from app.analysis.technical import add_all_indicators
from app.data.models import TechnicalSignal
from app.config import settings
logger = logging.getLogger(__name__)
def _check_ma_bullish(df: pd.DataFrame) -> bool:
"""均线多头排列: MA5>MA10>MA20>MA60 且 MA20 向上"""
if len(df) < 60:
return False
last = df.iloc[-1]
prev = df.iloc[-3] if len(df) >= 3 else last
return (
last["ma5"] > last["ma10"] > last["ma20"] > last["ma60"]
and last["ma20"] > prev["ma20"] # MA20 向上
)
def _check_volume_breakout(df: pd.DataFrame) -> bool:
"""放量突破: 当日量 > MA5量的1.5倍 且 价格突破近20日高点"""
if len(df) < 20:
return False
last = df.iloc[-1]
recent_high = df["high"].iloc[-20:-1].max()
return (
last["vol"] > last["vol_ma5"] * 1.5
and last["close"] > recent_high
)
def _check_macd_golden(df: pd.DataFrame) -> bool:
"""MACD 金叉: DIF 上穿 DEA且在零轴附近或上方"""
if len(df) < 3:
return False
last = df.iloc[-1]
prev = df.iloc[-2]
return (
prev["dif"] <= prev["dea"]
and last["dif"] > last["dea"]
and last["dif"] > -0.5 # 零轴附近或上方
)
def _check_rsi_healthy(df: pd.DataFrame) -> bool:
"""RSI 节奏区间: RSI(14) 在 40-70仅作为辅助节奏提示"""
if df.empty:
return False
rsi = df.iloc[-1].get("rsi14", 50)
return 40 <= rsi <= 70
def _check_pullback_support(df: pd.DataFrame) -> bool:
"""缩量回踩支撑: 回调至 MA10/MA20 附近,回调量 < 上涨量的60%"""
if len(df) < 20:
return False
last = df.iloc[-1]
close = last["close"]
ma10 = last["ma10"]
ma20 = last["ma20"]
# 价格在 MA10 或 MA20 附近±3%
near_ma = (
abs(close - ma10) / ma10 < 0.03
or abs(close - ma20) / ma20 < 0.03
)
if not near_ma:
return False
# 近3日为回调跌或小涨量缩
recent_3 = df.tail(3)
prev_5 = df.iloc[-8:-3] if len(df) >= 8 else df.head(5)
avg_recent_vol = recent_3["vol"].mean()
avg_prev_vol = prev_5["vol"].mean() if len(prev_5) > 0 else avg_recent_vol
return avg_recent_vol < avg_prev_vol * 0.6
def _check_big_yang(df: pd.DataFrame) -> bool:
"""底部放量长阳: 近10日内出现涨幅>5%的放量阳线"""
if len(df) < 10:
return False
recent = df.tail(10)
for _, row in recent.iterrows():
pct = row.get("pct_chg", 0) or 0
vol = row["vol"]
vol_ma = row["vol_ma5"]
if pct > 5 and vol > vol_ma * 1.3:
return True
return False
def _check_boll_support(df: pd.DataFrame) -> bool:
"""布林带下轨支撑: 价格触及下轨后反弹 且 带宽收窄后扩张"""
if len(df) < 5:
return False
# 近5日内有触及下轨
recent = df.tail(5)
touched = False
for _, row in recent.iterrows():
if row["low"] <= row["boll_lower"] * 1.01:
touched = True
break
if not touched:
return False
# 当前价格在下轨上方(反弹)
last = df.iloc[-1]
if last["close"] <= last["boll_lower"]:
return False
# 带宽从收窄变扩张
bw = df["boll_bw"].tail(10)
if len(bw) < 5:
return False
min_bw_idx = bw.idxmin()
last_idx = bw.index[-1]
return min_bw_idx != last_idx and bw.iloc[-1] > bw.iloc[-3]
def _calc_position_score(df: pd.DataFrame) -> tuple[float, float, float, float]:
"""计算位置安全得分,防追高
返回: (rally_pct_5d, rally_pct_10d, distance_from_high, position_score)
position_score: 0-100越高越安全底部/低位),越低越危险(高位/连涨)
"""
if len(df) < 10:
return 0, 0, 0, 50
last_close = float(df.iloc[-1]["close"])
# 近5日累计涨幅
close_5d_ago = float(df.iloc[-6]["close"]) if len(df) >= 6 else float(df.iloc[0]["close"])
rally_pct_5d = (last_close - close_5d_ago) / close_5d_ago * 100
# 近10日累计涨幅
close_10d_ago = float(df.iloc[-11]["close"]) if len(df) >= 11 else float(df.iloc[0]["close"])
rally_pct_10d = (last_close - close_10d_ago) / close_10d_ago * 100
# 距离60日高点
lookback = min(60, len(df))
high_60d = float(df["high"].tail(lookback).max())
distance_from_high = (last_close - high_60d) / high_60d * 100
# 位置安全评分
score = 100.0
# 1) 近5日涨幅惩罚 (权重40%)
if rally_pct_5d > 20:
score -= 40 # 5日涨超20%,极端追高
elif rally_pct_5d > 15:
score -= 32
elif rally_pct_5d > 10:
score -= 24
elif rally_pct_5d > 5:
score -= 12
elif rally_pct_5d > 0:
score -= 4 # 温和上涨,小扣分
else:
score -= 0 # 回调中,不扣分
# 2) 近10日涨幅惩罚 (权重30%)
if rally_pct_10d > 30:
score -= 30
elif rally_pct_10d > 20:
score -= 22
elif rally_pct_10d > 15:
score -= 16
elif rally_pct_10d > 8:
score -= 8
elif rally_pct_10d > 0:
score -= 2
# 3) 距离60日高点的位置 (权重30%)
# distance_from_high <= 0 表示低于高点(已回调),越负越安全
# distance_from_high == 0 表示就在高点,最危险
if distance_from_high >= 0:
score -= 25 # 创新高或刚好在高点
elif distance_from_high > -3:
score -= 18 # 接近高点
elif distance_from_high > -8:
score -= 10 # 轻微回调
elif distance_from_high > -15:
score -= 3 # 适度回调,较安全
else:
score -= 0 # 大幅回调,位置低
# 补充奖励:缩量回踩 MA20 附近 (最多+10)
if len(df) >= 20:
ma20 = float(df.iloc[-1].get("ma20", last_close))
if ma20 > 0 and abs(last_close - ma20) / ma20 < 0.03:
# 在 MA20 附近
recent_vol = float(df["vol"].tail(3).mean())
prev_vol = float(df["vol"].iloc[-8:-3].mean()) if len(df) >= 8 else recent_vol
if recent_vol < prev_vol * 0.7:
score += 10 # 缩量回踩 MA20好位置
return (
round(rally_pct_5d, 2),
round(rally_pct_10d, 2),
round(distance_from_high, 2),
round(max(0, min(100, score)), 1),
)
def _calc_support_resist(df: pd.DataFrame) -> tuple[float | None, float | None]:
"""计算支撑位和压力位"""
if len(df) < 20:
return None, None
last = df.iloc[-1]
# 支撑位MA20 和近20日最低价取较高者
ma20 = last["ma20"]
recent_low = df["low"].tail(20).min()
support = max(ma20, recent_low)
# 压力位近20日最高价
resist = df["high"].tail(20).max()
return round(support, 2), round(resist, 2)
def generate_signals(ts_code: str, name: str = "") -> TechnicalSignal:
"""对单只股票生成技术面买卖信号"""
df = tushare_client.get_stock_daily(ts_code, days=120)
if df.empty or len(df) < 20:
return TechnicalSignal(ts_code=ts_code, name=name)
# 按日期升序
df = df.sort_values("trade_date").reset_index(drop=True)
df = add_all_indicators(df)
# 检查各项信号
ma_bullish = _check_ma_bullish(df)
volume_breakout = _check_volume_breakout(df)
macd_golden = _check_macd_golden(df)
rsi_healthy = _check_rsi_healthy(df)
pullback_support = _check_pullback_support(df)
big_yang = _check_big_yang(df)
boll_support = _check_boll_support(df)
# 计算分数
score = 0
signal_count = 0
signal_map = [
(ma_bullish, 15),
(volume_breakout, 20),
(macd_golden, 15),
(rsi_healthy, 3),
(pullback_support, 15),
(big_yang, 15),
(boll_support, 10),
]
for is_true, points in signal_map:
if is_true:
score += points
signal_count += 1
# 趋势评分(与推荐体系一致)
from app.engine.screener import _score_trend
trend_score = round(_score_trend(df), 1)
# 支撑压力位
support, resist = _calc_support_resist(df)
last_close = float(df.iloc[-1]["close"])
stop_loss = round(last_close * (1 - settings.stop_loss_pct / 100), 2)
# 位置安全评估
rally_5d, rally_10d, dist_high, pos_score = _calc_position_score(df)
result = TechnicalSignal(
ts_code=ts_code,
name=name,
ma_bullish=ma_bullish,
volume_breakout=volume_breakout,
macd_golden=macd_golden,
rsi_healthy=rsi_healthy,
pullback_support=pullback_support,
big_yang=big_yang,
boll_support=boll_support,
score=score,
trend_score=trend_score,
signal_count=signal_count,
rally_pct_5d=rally_5d,
rally_pct_10d=rally_10d,
distance_from_high=dist_high,
position_score=pos_score,
support_price=support,
resist_price=resist,
stop_loss_price=stop_loss,
)
logger.debug(f"技术信号 {name}({ts_code}): score={score} signals={signal_count} "
f"MA={ma_bullish} VOL={volume_breakout} MACD={macd_golden} "
f"RSI={rsi_healthy} PB={pullback_support} YANG={big_yang} BOLL={boll_support} "
f"位置安全={pos_score} 5d涨幅={rally_5d}% 10d涨幅={rally_10d}%")
return result