astock-agent/backend/app/research/risk_agent.py
2026-06-10 08:36:25 +08:00

335 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Risk research agent.
This v1 uses deterministic risk signals already present in the scan result.
External reduction/unlock/regulatory data can be added behind this interface.
"""
from __future__ import annotations
import logging
from datetime import datetime, timedelta
from typing import Any
from app.config import settings
from app.data.tushare_client import tushare_client
logger = logging.getLogger(__name__)
def build_risk_alerts(recommendations: list[Any], market_view: dict, latest_scan: dict | None = None) -> list[dict]:
alerts: list[dict] = []
if market_view.get("regime") in {"risk_off", "defensive_watch"}:
alerts.append({
"ts_code": "",
"risk_type": "market_regime",
"severity": "reject" if market_view.get("regime") == "risk_off" else "warning",
"reject": market_view.get("regime") == "risk_off",
"reason": market_view.get("summary", "市场环境偏弱"),
"source": "market_agent",
})
elimination = (latest_scan or {}).get("elimination_reasons") or {}
if elimination:
top_reason = sorted(elimination.items(), key=lambda item: item[1], reverse=True)[0]
alerts.append({
"ts_code": "",
"risk_type": "filter_pressure",
"severity": "warning",
"reject": False,
"reason": f"最终过滤压力较高:{top_reason[0]} {top_reason[1]}",
"source": "scan_process_logs",
})
for rec in recommendations[:20]:
trace = getattr(rec, "decision_trace", {}) or {}
position_hint = (trace.get("position_adjustment") or {}).get("hint", "")
if position_hint in {"wait_pullback", "wait_confirm"}:
alerts.append({
"ts_code": getattr(rec, "ts_code", ""),
"risk_type": "position_risk",
"severity": "warning",
"reject": False,
"reason": "位置或买点需要等待确认,避免追高。",
"source": "risk_agent",
})
if settings.research_risk_enabled:
alerts.extend(_build_external_stock_risks(recommendations[: settings.research_risk_stock_limit]))
return _dedupe_alerts(alerts)[:40]
def _build_external_stock_risks(recommendations: list[Any]) -> list[dict]:
today = datetime.now()
today_s = today.strftime("%Y%m%d")
unlock_end = (today + timedelta(days=settings.risk_unlock_lookahead_days)).strftime("%Y%m%d")
holder_start = (today - timedelta(days=settings.risk_holder_trade_lookback_days)).strftime("%Y%m%d")
forecast_start = (today - timedelta(days=settings.risk_forecast_lookback_days)).strftime("%Y%m%d")
announcement_start = (today - timedelta(days=settings.risk_announcement_lookback_days)).strftime("%Y%m%d")
financial_start = (today - timedelta(days=settings.risk_financial_lookback_days)).strftime("%Y%m%d")
alerts: list[dict] = []
for rec in recommendations:
ts_code = getattr(rec, "ts_code", "")
name = getattr(rec, "name", "") or ts_code
if not ts_code:
continue
try:
alerts.extend(_unlock_risks(ts_code, name, today_s, unlock_end))
alerts.extend(_holder_trade_risks(ts_code, name, holder_start, today_s))
alerts.extend(_forecast_risks(ts_code, name, forecast_start, today_s))
alerts.extend(_pledge_risks(ts_code, name))
alerts.extend(_announcement_risks(ts_code, name, announcement_start, today_s))
alerts.extend(_audit_risks(ts_code, name, financial_start, today_s))
alerts.extend(_financial_statement_risks(ts_code, name, financial_start, today_s))
except Exception as exc:
logger.debug("外部风险检查失败 %s: %s", ts_code, exc)
alerts.append({
"ts_code": ts_code,
"risk_type": "risk_data_source",
"severity": "warning",
"reject": False,
"reason": f"{name} 外部风险数据读取失败,保留扫描内生风险判断。",
"source": "risk_agent",
})
return alerts
_ANNOUNCEMENT_RISK_RULES: tuple[tuple[str, tuple[str, ...], str, bool], ...] = (
("regulatory", ("监管函", "问询函", "关注函", "警示函", "责令改正"), "warning", False),
("penalty", ("处罚", "行政处罚", "纪律处分", "公开谴责", "通报批评"), "reject", True),
("investigation", ("立案", "调查通知书", "涉嫌违法", "涉嫌信息披露违法"), "reject", True),
("litigation", ("重大诉讼", "重大仲裁", "诉讼进展", "仲裁进展"), "warning", False),
("asset_freeze", ("冻结", "轮候冻结", "司法冻结", "质押违约"), "reject", True),
("accounting_risk", ("会计差错", "前期差错", "非标准审计", "保留意见", "无法表示意见", "否定意见"), "reject", True),
)
def _unlock_risks(ts_code: str, name: str, start: str, end: str) -> list[dict]:
df = tushare_client.get_share_float(ts_code, start, end)
if df.empty:
return []
alerts = []
for _, row in df.head(3).iterrows():
ratio = _float(row.get("float_ratio"))
shares = _float(row.get("float_share"))
date = str(row.get("float_date") or "")
holder = str(row.get("holder_name") or "限售股东")
if ratio >= settings.risk_unlock_reject_ratio:
severity, reject = "reject", True
elif ratio >= 5 or shares >= 5000:
severity, reject = "warning", False
else:
continue
alerts.append({
"ts_code": ts_code,
"risk_type": "unlock",
"severity": severity,
"reject": reject,
"reason": f"{name} {date} 存在解禁,比例约{ratio:g}%{holder}),需规避供给冲击。",
"source": "tushare.share_float",
})
return alerts
def _holder_trade_risks(ts_code: str, name: str, start: str, end: str) -> list[dict]:
df = tushare_client.get_holder_trade(ts_code, start, end)
if df.empty:
return []
alerts = []
for _, row in df.head(5).iterrows():
direction = str(row.get("in_de") or "")
ratio = abs(_float(row.get("change_ratio")))
volume = abs(_float(row.get("change_vol")))
ann_date = str(row.get("ann_date") or "")
holder = str(row.get("holder_name") or "股东")
is_reduce = "" in direction or direction.upper() in {"D", "DECREASE"}
if not is_reduce:
continue
if ratio >= 2 or volume >= 3000:
severity, reject = "reject", ratio >= 5
alerts.append({
"ts_code": ts_code,
"risk_type": "holder_reduce",
"severity": severity,
"reject": reject,
"reason": f"{name} {ann_date} 披露股东减持:{holder},变动比例约{ratio:g}%。",
"source": "tushare.stk_holdertrade",
})
return alerts
def _forecast_risks(ts_code: str, name: str, start: str, end: str) -> list[dict]:
df = tushare_client.get_forecast(ts_code, start, end)
if df.empty:
return []
alerts = []
negative_keywords = ("预减", "亏损", "首亏", "续亏", "略减", "不确定")
for _, row in df.head(3).iterrows():
forecast_type = str(row.get("type") or "")
summary = str(row.get("summary") or row.get("change_reason") or "")
ann_date = str(row.get("ann_date") or "")
min_change = _float(row.get("p_change_min"))
max_change = _float(row.get("p_change_max"))
negative = any(word in forecast_type or word in summary for word in negative_keywords) or max_change < -20
if not negative:
continue
reject = forecast_type in {"首亏", "续亏"} or max_change < -50
alerts.append({
"ts_code": ts_code,
"risk_type": "earnings_forecast",
"severity": "reject" if reject else "warning",
"reject": reject,
"reason": f"{name} {ann_date} 业绩预告偏负面:{forecast_type},变动区间约{min_change:g}%~{max_change:g}%。",
"source": "tushare.forecast",
})
return alerts
def _pledge_risks(ts_code: str, name: str) -> list[dict]:
df = tushare_client.get_pledge_stat(ts_code)
if df.empty:
return []
row = df.iloc[-1]
ratio = _float(row.get("pledge_ratio"))
if ratio < 30:
return []
reject = ratio >= settings.risk_pledge_reject_ratio
return [{
"ts_code": ts_code,
"risk_type": "pledge",
"severity": "reject" if reject else "warning",
"reject": reject,
"reason": f"{name} 股权质押比例约{ratio:g}%,需关注平仓和融资风险。",
"source": "tushare.pledge_stat",
}]
def _announcement_risks(ts_code: str, name: str, start: str, end: str) -> list[dict]:
df = tushare_client.get_announcements(ts_code, start, end)
if df.empty:
return []
alerts = []
for _, row in df.head(80).iterrows():
title = str(row.get("title") or "")
ann_date = str(row.get("ann_date") or "")
if not title:
continue
for risk_type, keywords, severity, reject in _ANNOUNCEMENT_RISK_RULES:
if any(keyword in title for keyword in keywords):
alerts.append({
"ts_code": ts_code,
"risk_type": risk_type,
"severity": severity,
"reject": reject,
"reason": f"{name} {ann_date} 公告命中风险:{title[:90]}",
"source": "tushare.anns_d",
})
break
return alerts[:6]
def _audit_risks(ts_code: str, name: str, start: str, end: str) -> list[dict]:
df = tushare_client.get_fina_audit(ts_code, start, end)
if df.empty:
return []
alerts = []
for _, row in df.tail(3).iterrows():
result = str(row.get("audit_result") or "")
ann_date = str(row.get("ann_date") or "")
agency = str(row.get("audit_agency") or "审计机构")
severity, reject = _classify_audit_opinion(result)
if not severity:
continue
alerts.append({
"ts_code": ts_code,
"risk_type": "audit_opinion",
"severity": severity,
"reject": reject,
"reason": f"{name} {ann_date} 审计意见异常:{result}{agency})。",
"source": "tushare.fina_audit",
})
return alerts
def _classify_audit_opinion(result: str) -> tuple[str, bool]:
"""Return (severity, reject) for non-standard audit opinions.
Tushare often returns "标准无保留意见"; a plain substring check for
"保留意见" would incorrectly reject normal audit opinions.
"""
text = (result or "").strip()
if not text:
return "", False
normal_terms = ("标准无保留意见", "无保留意见")
warning_terms = ("带强调事项", "强调事项段", "非标准无保留")
reject_terms = ("无法表示", "否定意见", "保留意见", "非标")
if any(term in text for term in normal_terms) and not any(term in text for term in warning_terms):
return "", False
if any(term in text for term in ("无法表示", "否定意见", "非标")):
return "reject", True
if "保留意见" in text and not any(term in text for term in normal_terms):
return "reject", True
if any(term in text for term in warning_terms):
return "warning", False
return "", False
def _financial_statement_risks(ts_code: str, name: str, start: str, end: str) -> list[dict]:
df = tushare_client.get_balance_sheet(ts_code, start, end)
if df.empty:
return []
row = df.iloc[-1]
total_assets = _float(row.get("total_assets"))
total_liab = _float(row.get("total_liab"))
goodwill = _float(row.get("goodwill"))
end_date = str(row.get("end_date") or "")
alerts = []
if total_assets > 0:
goodwill_ratio = goodwill / total_assets * 100
if goodwill_ratio >= settings.risk_goodwill_assets_warning_ratio:
reject = goodwill_ratio >= settings.risk_goodwill_assets_reject_ratio
alerts.append({
"ts_code": ts_code,
"risk_type": "goodwill",
"severity": "reject" if reject else "warning",
"reject": reject,
"reason": f"{name} {end_date} 商誉/总资产约{goodwill_ratio:.1f}%,需警惕减值风险。",
"source": "tushare.balancesheet",
})
debt_ratio = total_liab / total_assets * 100
if debt_ratio >= settings.risk_debt_assets_warning_ratio:
reject = debt_ratio >= settings.risk_debt_assets_reject_ratio
alerts.append({
"ts_code": ts_code,
"risk_type": "debt_pressure",
"severity": "reject" if reject else "warning",
"reject": reject,
"reason": f"{name} {end_date} 资产负债率约{debt_ratio:.1f}%,需关注偿债压力。",
"source": "tushare.balancesheet",
})
return alerts
def _float(value: Any) -> float:
try:
if value in (None, ""):
return 0.0
return float(value)
except Exception:
return 0.0
def _dedupe_alerts(alerts: list[dict]) -> list[dict]:
seen = set()
result = []
severity_rank = {"reject": 2, "warning": 1, "info": 0}
alerts = sorted(alerts, key=lambda item: (severity_rank.get(item.get("severity", ""), 0), bool(item.get("reject"))), reverse=True)
for item in alerts:
key = (item.get("ts_code"), item.get("risk_type"), item.get("reason"))
if key in seen:
continue
seen.add(key)
result.append(item)
return result