335 lines
14 KiB
Python
335 lines
14 KiB
Python
"""Risk research agent.
|
||
|
||
This v1 uses deterministic risk signals already present in the scan result.
|
||
External reduction/unlock/regulatory data can be added behind this interface.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import logging
|
||
from datetime import datetime, timedelta
|
||
from typing import Any
|
||
|
||
from app.config import settings
|
||
from app.data.tushare_client import tushare_client
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
def build_risk_alerts(recommendations: list[Any], market_view: dict, latest_scan: dict | None = None) -> list[dict]:
|
||
alerts: list[dict] = []
|
||
if market_view.get("regime") in {"risk_off", "defensive_watch"}:
|
||
alerts.append({
|
||
"ts_code": "",
|
||
"risk_type": "market_regime",
|
||
"severity": "reject" if market_view.get("regime") == "risk_off" else "warning",
|
||
"reject": market_view.get("regime") == "risk_off",
|
||
"reason": market_view.get("summary", "市场环境偏弱"),
|
||
"source": "market_agent",
|
||
})
|
||
|
||
elimination = (latest_scan or {}).get("elimination_reasons") or {}
|
||
if elimination:
|
||
top_reason = sorted(elimination.items(), key=lambda item: item[1], reverse=True)[0]
|
||
alerts.append({
|
||
"ts_code": "",
|
||
"risk_type": "filter_pressure",
|
||
"severity": "warning",
|
||
"reject": False,
|
||
"reason": f"最终过滤压力较高:{top_reason[0]} {top_reason[1]} 只",
|
||
"source": "scan_process_logs",
|
||
})
|
||
|
||
for rec in recommendations[:20]:
|
||
trace = getattr(rec, "decision_trace", {}) or {}
|
||
position_hint = (trace.get("position_adjustment") or {}).get("hint", "")
|
||
if position_hint in {"wait_pullback", "wait_confirm"}:
|
||
alerts.append({
|
||
"ts_code": getattr(rec, "ts_code", ""),
|
||
"risk_type": "position_risk",
|
||
"severity": "warning",
|
||
"reject": False,
|
||
"reason": "位置或买点需要等待确认,避免追高。",
|
||
"source": "risk_agent",
|
||
})
|
||
|
||
if settings.research_risk_enabled:
|
||
alerts.extend(_build_external_stock_risks(recommendations[: settings.research_risk_stock_limit]))
|
||
|
||
return _dedupe_alerts(alerts)[:40]
|
||
|
||
|
||
def _build_external_stock_risks(recommendations: list[Any]) -> list[dict]:
|
||
today = datetime.now()
|
||
today_s = today.strftime("%Y%m%d")
|
||
unlock_end = (today + timedelta(days=settings.risk_unlock_lookahead_days)).strftime("%Y%m%d")
|
||
holder_start = (today - timedelta(days=settings.risk_holder_trade_lookback_days)).strftime("%Y%m%d")
|
||
forecast_start = (today - timedelta(days=settings.risk_forecast_lookback_days)).strftime("%Y%m%d")
|
||
announcement_start = (today - timedelta(days=settings.risk_announcement_lookback_days)).strftime("%Y%m%d")
|
||
financial_start = (today - timedelta(days=settings.risk_financial_lookback_days)).strftime("%Y%m%d")
|
||
alerts: list[dict] = []
|
||
|
||
for rec in recommendations:
|
||
ts_code = getattr(rec, "ts_code", "")
|
||
name = getattr(rec, "name", "") or ts_code
|
||
if not ts_code:
|
||
continue
|
||
try:
|
||
alerts.extend(_unlock_risks(ts_code, name, today_s, unlock_end))
|
||
alerts.extend(_holder_trade_risks(ts_code, name, holder_start, today_s))
|
||
alerts.extend(_forecast_risks(ts_code, name, forecast_start, today_s))
|
||
alerts.extend(_pledge_risks(ts_code, name))
|
||
alerts.extend(_announcement_risks(ts_code, name, announcement_start, today_s))
|
||
alerts.extend(_audit_risks(ts_code, name, financial_start, today_s))
|
||
alerts.extend(_financial_statement_risks(ts_code, name, financial_start, today_s))
|
||
except Exception as exc:
|
||
logger.debug("外部风险检查失败 %s: %s", ts_code, exc)
|
||
alerts.append({
|
||
"ts_code": ts_code,
|
||
"risk_type": "risk_data_source",
|
||
"severity": "warning",
|
||
"reject": False,
|
||
"reason": f"{name} 外部风险数据读取失败,保留扫描内生风险判断。",
|
||
"source": "risk_agent",
|
||
})
|
||
return alerts
|
||
|
||
|
||
_ANNOUNCEMENT_RISK_RULES: tuple[tuple[str, tuple[str, ...], str, bool], ...] = (
|
||
("regulatory", ("监管函", "问询函", "关注函", "警示函", "责令改正"), "warning", False),
|
||
("penalty", ("处罚", "行政处罚", "纪律处分", "公开谴责", "通报批评"), "reject", True),
|
||
("investigation", ("立案", "调查通知书", "涉嫌违法", "涉嫌信息披露违法"), "reject", True),
|
||
("litigation", ("重大诉讼", "重大仲裁", "诉讼进展", "仲裁进展"), "warning", False),
|
||
("asset_freeze", ("冻结", "轮候冻结", "司法冻结", "质押违约"), "reject", True),
|
||
("accounting_risk", ("会计差错", "前期差错", "非标准审计", "保留意见", "无法表示意见", "否定意见"), "reject", True),
|
||
)
|
||
|
||
|
||
def _unlock_risks(ts_code: str, name: str, start: str, end: str) -> list[dict]:
|
||
df = tushare_client.get_share_float(ts_code, start, end)
|
||
if df.empty:
|
||
return []
|
||
alerts = []
|
||
for _, row in df.head(3).iterrows():
|
||
ratio = _float(row.get("float_ratio"))
|
||
shares = _float(row.get("float_share"))
|
||
date = str(row.get("float_date") or "")
|
||
holder = str(row.get("holder_name") or "限售股东")
|
||
if ratio >= settings.risk_unlock_reject_ratio:
|
||
severity, reject = "reject", True
|
||
elif ratio >= 5 or shares >= 5000:
|
||
severity, reject = "warning", False
|
||
else:
|
||
continue
|
||
alerts.append({
|
||
"ts_code": ts_code,
|
||
"risk_type": "unlock",
|
||
"severity": severity,
|
||
"reject": reject,
|
||
"reason": f"{name} {date} 存在解禁,比例约{ratio:g}%({holder}),需规避供给冲击。",
|
||
"source": "tushare.share_float",
|
||
})
|
||
return alerts
|
||
|
||
|
||
def _holder_trade_risks(ts_code: str, name: str, start: str, end: str) -> list[dict]:
|
||
df = tushare_client.get_holder_trade(ts_code, start, end)
|
||
if df.empty:
|
||
return []
|
||
alerts = []
|
||
for _, row in df.head(5).iterrows():
|
||
direction = str(row.get("in_de") or "")
|
||
ratio = abs(_float(row.get("change_ratio")))
|
||
volume = abs(_float(row.get("change_vol")))
|
||
ann_date = str(row.get("ann_date") or "")
|
||
holder = str(row.get("holder_name") or "股东")
|
||
is_reduce = "减" in direction or direction.upper() in {"D", "DECREASE"}
|
||
if not is_reduce:
|
||
continue
|
||
if ratio >= 2 or volume >= 3000:
|
||
severity, reject = "reject", ratio >= 5
|
||
alerts.append({
|
||
"ts_code": ts_code,
|
||
"risk_type": "holder_reduce",
|
||
"severity": severity,
|
||
"reject": reject,
|
||
"reason": f"{name} {ann_date} 披露股东减持:{holder},变动比例约{ratio:g}%。",
|
||
"source": "tushare.stk_holdertrade",
|
||
})
|
||
return alerts
|
||
|
||
|
||
def _forecast_risks(ts_code: str, name: str, start: str, end: str) -> list[dict]:
|
||
df = tushare_client.get_forecast(ts_code, start, end)
|
||
if df.empty:
|
||
return []
|
||
alerts = []
|
||
negative_keywords = ("预减", "亏损", "首亏", "续亏", "略减", "不确定")
|
||
for _, row in df.head(3).iterrows():
|
||
forecast_type = str(row.get("type") or "")
|
||
summary = str(row.get("summary") or row.get("change_reason") or "")
|
||
ann_date = str(row.get("ann_date") or "")
|
||
min_change = _float(row.get("p_change_min"))
|
||
max_change = _float(row.get("p_change_max"))
|
||
negative = any(word in forecast_type or word in summary for word in negative_keywords) or max_change < -20
|
||
if not negative:
|
||
continue
|
||
reject = forecast_type in {"首亏", "续亏"} or max_change < -50
|
||
alerts.append({
|
||
"ts_code": ts_code,
|
||
"risk_type": "earnings_forecast",
|
||
"severity": "reject" if reject else "warning",
|
||
"reject": reject,
|
||
"reason": f"{name} {ann_date} 业绩预告偏负面:{forecast_type},变动区间约{min_change:g}%~{max_change:g}%。",
|
||
"source": "tushare.forecast",
|
||
})
|
||
return alerts
|
||
|
||
|
||
def _pledge_risks(ts_code: str, name: str) -> list[dict]:
|
||
df = tushare_client.get_pledge_stat(ts_code)
|
||
if df.empty:
|
||
return []
|
||
row = df.iloc[-1]
|
||
ratio = _float(row.get("pledge_ratio"))
|
||
if ratio < 30:
|
||
return []
|
||
reject = ratio >= settings.risk_pledge_reject_ratio
|
||
return [{
|
||
"ts_code": ts_code,
|
||
"risk_type": "pledge",
|
||
"severity": "reject" if reject else "warning",
|
||
"reject": reject,
|
||
"reason": f"{name} 股权质押比例约{ratio:g}%,需关注平仓和融资风险。",
|
||
"source": "tushare.pledge_stat",
|
||
}]
|
||
|
||
|
||
def _announcement_risks(ts_code: str, name: str, start: str, end: str) -> list[dict]:
|
||
df = tushare_client.get_announcements(ts_code, start, end)
|
||
if df.empty:
|
||
return []
|
||
alerts = []
|
||
for _, row in df.head(80).iterrows():
|
||
title = str(row.get("title") or "")
|
||
ann_date = str(row.get("ann_date") or "")
|
||
if not title:
|
||
continue
|
||
for risk_type, keywords, severity, reject in _ANNOUNCEMENT_RISK_RULES:
|
||
if any(keyword in title for keyword in keywords):
|
||
alerts.append({
|
||
"ts_code": ts_code,
|
||
"risk_type": risk_type,
|
||
"severity": severity,
|
||
"reject": reject,
|
||
"reason": f"{name} {ann_date} 公告命中风险:{title[:90]}",
|
||
"source": "tushare.anns_d",
|
||
})
|
||
break
|
||
return alerts[:6]
|
||
|
||
|
||
def _audit_risks(ts_code: str, name: str, start: str, end: str) -> list[dict]:
|
||
df = tushare_client.get_fina_audit(ts_code, start, end)
|
||
if df.empty:
|
||
return []
|
||
alerts = []
|
||
for _, row in df.tail(3).iterrows():
|
||
result = str(row.get("audit_result") or "")
|
||
ann_date = str(row.get("ann_date") or "")
|
||
agency = str(row.get("audit_agency") or "审计机构")
|
||
severity, reject = _classify_audit_opinion(result)
|
||
if not severity:
|
||
continue
|
||
alerts.append({
|
||
"ts_code": ts_code,
|
||
"risk_type": "audit_opinion",
|
||
"severity": severity,
|
||
"reject": reject,
|
||
"reason": f"{name} {ann_date} 审计意见异常:{result}({agency})。",
|
||
"source": "tushare.fina_audit",
|
||
})
|
||
return alerts
|
||
|
||
|
||
def _classify_audit_opinion(result: str) -> tuple[str, bool]:
|
||
"""Return (severity, reject) for non-standard audit opinions.
|
||
|
||
Tushare often returns "标准无保留意见"; a plain substring check for
|
||
"保留意见" would incorrectly reject normal audit opinions.
|
||
"""
|
||
text = (result or "").strip()
|
||
if not text:
|
||
return "", False
|
||
normal_terms = ("标准无保留意见", "无保留意见")
|
||
warning_terms = ("带强调事项", "强调事项段", "非标准无保留")
|
||
reject_terms = ("无法表示", "否定意见", "保留意见", "非标")
|
||
if any(term in text for term in normal_terms) and not any(term in text for term in warning_terms):
|
||
return "", False
|
||
if any(term in text for term in ("无法表示", "否定意见", "非标")):
|
||
return "reject", True
|
||
if "保留意见" in text and not any(term in text for term in normal_terms):
|
||
return "reject", True
|
||
if any(term in text for term in warning_terms):
|
||
return "warning", False
|
||
return "", False
|
||
|
||
|
||
def _financial_statement_risks(ts_code: str, name: str, start: str, end: str) -> list[dict]:
|
||
df = tushare_client.get_balance_sheet(ts_code, start, end)
|
||
if df.empty:
|
||
return []
|
||
row = df.iloc[-1]
|
||
total_assets = _float(row.get("total_assets"))
|
||
total_liab = _float(row.get("total_liab"))
|
||
goodwill = _float(row.get("goodwill"))
|
||
end_date = str(row.get("end_date") or "")
|
||
alerts = []
|
||
if total_assets > 0:
|
||
goodwill_ratio = goodwill / total_assets * 100
|
||
if goodwill_ratio >= settings.risk_goodwill_assets_warning_ratio:
|
||
reject = goodwill_ratio >= settings.risk_goodwill_assets_reject_ratio
|
||
alerts.append({
|
||
"ts_code": ts_code,
|
||
"risk_type": "goodwill",
|
||
"severity": "reject" if reject else "warning",
|
||
"reject": reject,
|
||
"reason": f"{name} {end_date} 商誉/总资产约{goodwill_ratio:.1f}%,需警惕减值风险。",
|
||
"source": "tushare.balancesheet",
|
||
})
|
||
debt_ratio = total_liab / total_assets * 100
|
||
if debt_ratio >= settings.risk_debt_assets_warning_ratio:
|
||
reject = debt_ratio >= settings.risk_debt_assets_reject_ratio
|
||
alerts.append({
|
||
"ts_code": ts_code,
|
||
"risk_type": "debt_pressure",
|
||
"severity": "reject" if reject else "warning",
|
||
"reject": reject,
|
||
"reason": f"{name} {end_date} 资产负债率约{debt_ratio:.1f}%,需关注偿债压力。",
|
||
"source": "tushare.balancesheet",
|
||
})
|
||
return alerts
|
||
|
||
|
||
def _float(value: Any) -> float:
|
||
try:
|
||
if value in (None, ""):
|
||
return 0.0
|
||
return float(value)
|
||
except Exception:
|
||
return 0.0
|
||
|
||
|
||
def _dedupe_alerts(alerts: list[dict]) -> list[dict]:
|
||
seen = set()
|
||
result = []
|
||
severity_rank = {"reject": 2, "warning": 1, "info": 0}
|
||
alerts = sorted(alerts, key=lambda item: (severity_rank.get(item.get("severity", ""), 0), bool(item.get("reject"))), reverse=True)
|
||
for item in alerts:
|
||
key = (item.get("ts_code"), item.get("risk_type"), item.get("reason"))
|
||
if key in seen:
|
||
continue
|
||
seen.add(key)
|
||
result.append(item)
|
||
return result
|