astock-agent/backend/app/engine/watchlist.py
2026-04-22 11:02:19 +08:00

240 lines
8.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""用户自选股分析服务"""
from __future__ import annotations
import json
import logging
import re
from sqlalchemy import text
from app.analysis.signals import generate_signals
from app.data import tencent_client
from app.db.database import get_db
from app.db import tables
from app.llm.client import chat_completion
logger = logging.getLogger(__name__)
async def analyze_watchlist_for_all_users(mode: str = "scheduled") -> int:
"""批量分析所有启用中的用户自选股。"""
async with get_db() as db:
rows = (await db.execute(
text(
"SELECT w.id, w.user_id, w.ts_code, w.name, w.note, w.watch_group, w.cost_price "
"FROM user_watchlists w "
"WHERE COALESCE(w.is_active, 1) = 1 "
"ORDER BY w.user_id, w.id"
)
)).fetchall()
count = 0
for row in rows:
item = row._mapping
await analyze_watchlist_item(
watchlist_id=item["id"],
user_id=item["user_id"],
ts_code=item["ts_code"],
name=item["name"],
note=item.get("note") or "",
watch_group=item.get("watch_group") or "observe",
cost_price=item.get("cost_price"),
mode=mode,
)
count += 1
return count
async def analyze_watchlist_item(
watchlist_id: int,
user_id: int,
ts_code: str,
name: str,
note: str = "",
watch_group: str = "observe",
cost_price: float | None = None,
mode: str = "manual",
) -> dict:
"""分析单只自选股并保存结果。"""
recommendation = await _load_latest_recommendation(ts_code)
latest_tracking = await _load_latest_tracking(recommendation["id"]) if recommendation else None
try:
quote = await tencent_client.get_realtime_quote(ts_code)
except Exception:
logger.exception("获取自选股实时行情失败: %s", ts_code)
quote = None
try:
signals = generate_signals(ts_code)
except Exception:
logger.exception("生成自选股信号失败: %s", ts_code)
signals = None
summary = _build_summary(ts_code, name, recommendation, latest_tracking, quote, signals, note, watch_group, cost_price)
llm_result = await _generate_watchlist_advice(summary)
structured = _extract_structured_result(llm_result, recommendation, latest_tracking)
async with get_db() as db:
await db.execute(
tables.watchlist_analyses_table.insert().values(
user_id=user_id,
watchlist_id=watchlist_id,
ts_code=ts_code,
name=name,
conclusion=structured["conclusion"],
advice=structured["advice"],
trigger_condition=structured["trigger_condition"],
risk_note=structured["risk_note"],
summary=structured["summary"],
full_analysis=structured["full_analysis"],
score_reference=structured["score_reference"],
analysis_mode=mode,
)
)
await db.commit()
return structured
async def _load_latest_recommendation(ts_code: str) -> dict | None:
async with get_db() as db:
row = (await db.execute(
text(
"SELECT * FROM recommendations "
"WHERE ts_code = :code "
"ORDER BY created_at DESC, id DESC LIMIT 1"
),
{"code": ts_code},
)).fetchone()
return dict(row._mapping) if row else None
async def _load_latest_tracking(recommendation_id: int) -> dict | None:
async with get_db() as db:
row = (await db.execute(
text(
"SELECT * FROM recommendation_tracking "
"WHERE recommendation_id = :rid "
"ORDER BY track_date DESC, id DESC LIMIT 1"
),
{"rid": recommendation_id},
)).fetchone()
return dict(row._mapping) if row else None
def _build_summary(
ts_code: str,
name: str,
recommendation: dict | None,
latest_tracking: dict | None,
quote,
signals,
note: str,
watch_group: str,
cost_price: float | None,
) -> str:
quote_str = ""
if quote:
quote_str = f"当前价 {quote.price},涨跌幅 {quote.pct_chg}%,换手率 {quote.turnover_rate}%,量比 {quote.volume_ratio}"
recommendation_str = "暂无推荐归档。"
if recommendation:
recommendation_str = (
f"推荐归档:结论 {recommendation.get('action_plan') or '观察'}"
f"触发条件 {recommendation.get('trigger_condition') or '暂无'}"
f"失效条件 {recommendation.get('invalidation_condition') or '暂无'}"
f"风险提示 {recommendation.get('risk_note') or '暂无'}"
)
tracking_str = ""
if latest_tracking:
tracking_str = (
f"最近跟踪:收益 {latest_tracking.get('pct_from_entry') or 0}%"
f"最大浮盈 {latest_tracking.get('max_return_pct') or 0}%"
f"最大回撤 {latest_tracking.get('max_drawdown_pct') or 0}%"
f"备注 {latest_tracking.get('review_note') or '暂无'}"
)
signal_str = "技术快照暂无。"
if signals:
signal_str = (
f"技术快照:趋势强度 {signals.trend_score},辅助信号 {signals.signal_count}/7"
f"位置安全 {signals.position_score}近5日涨幅 {signals.rally_pct_5d}% 近10日涨幅 {signals.rally_pct_10d}%。"
)
group_str = f"用户分组:{watch_group}"
cost_str = f"持仓成本 {cost_price}" if cost_price and cost_price > 0 else "暂无持仓成本。"
note_str = f"用户备注:{note}" if note else "用户未填写备注。"
return f"{ts_code} {name}{group_str} {cost_str} {quote_str} {recommendation_str} {tracking_str} {signal_str} {note_str}"
async def _generate_watchlist_advice(summary: str) -> str:
message = await chat_completion([
{
"role": "system",
"content": (
"你是A股投研作战台的用户自选股助手。"
"你需要针对单只用户自选股给出简洁、可执行的建议。"
"输出必须是 JSON 字符串,包含字段 conclusion、advice、trigger_condition、risk_note、summary。"
"conclusion 只能是 可操作 / 重点关注 / 观察 / 回避。"
"summary 必须是一句中文短句。advice 需要明确用户下一步该看什么、等什么或做什么。"
),
},
{
"role": "user",
"content": f"请基于以下信息输出 JSON{summary}",
},
])
if not message or not getattr(message, "content", None):
return ""
return message.content
def _extract_structured_result(content: str, recommendation: dict | None, latest_tracking: dict | None) -> dict:
default = {
"conclusion": recommendation.get("action_plan") if recommendation else "观察",
"advice": recommendation.get("trigger_condition") if recommendation else "继续观察量价配合、板块强弱和回踩承接。",
"trigger_condition": recommendation.get("trigger_condition") if recommendation else "",
"risk_note": recommendation.get("risk_note") if recommendation else (latest_tracking.get("review_note") if latest_tracking else ""),
"summary": latest_tracking.get("review_note") if latest_tracking else "当前信息不足以升级为明确操作,先保留观察。",
"full_analysis": content or "",
"score_reference": float(recommendation.get("score") or 0) if recommendation else 0,
}
if not content:
return default
try:
parsed = json.loads(_extract_json_string(content))
return {
"conclusion": parsed.get("conclusion") or default["conclusion"],
"advice": parsed.get("advice") or default["advice"],
"trigger_condition": parsed.get("trigger_condition") or default["trigger_condition"],
"risk_note": parsed.get("risk_note") or default["risk_note"],
"summary": parsed.get("summary") or default["summary"],
"full_analysis": content,
"score_reference": default["score_reference"],
}
except Exception:
logger.warning("自选股分析 JSON 解析失败,回退默认结构")
default["full_analysis"] = content
return default
def _extract_json_string(content: str) -> str:
cleaned = content.strip()
if cleaned.startswith("```"):
fenced = re.search(r"```(?:json)?\s*(\{.*\})\s*```", cleaned, re.DOTALL)
if fenced:
return fenced.group(1)
start = cleaned.find("{")
end = cleaned.rfind("}")
if start != -1 and end != -1 and end > start:
return cleaned[start : end + 1]
return cleaned