240 lines
8.8 KiB
Python
240 lines
8.8 KiB
Python
"""用户自选股分析服务"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import json
|
||
import logging
|
||
import re
|
||
|
||
from sqlalchemy import text
|
||
|
||
from app.analysis.signals import generate_signals
|
||
from app.data import tencent_client
|
||
from app.db.database import get_db
|
||
from app.db import tables
|
||
from app.llm.client import chat_completion
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
async def analyze_watchlist_for_all_users(mode: str = "scheduled") -> int:
|
||
"""批量分析所有启用中的用户自选股。"""
|
||
async with get_db() as db:
|
||
rows = (await db.execute(
|
||
text(
|
||
"SELECT w.id, w.user_id, w.ts_code, w.name, w.note, w.watch_group, w.cost_price "
|
||
"FROM user_watchlists w "
|
||
"WHERE COALESCE(w.is_active, 1) = 1 "
|
||
"ORDER BY w.user_id, w.id"
|
||
)
|
||
)).fetchall()
|
||
|
||
count = 0
|
||
for row in rows:
|
||
item = row._mapping
|
||
await analyze_watchlist_item(
|
||
watchlist_id=item["id"],
|
||
user_id=item["user_id"],
|
||
ts_code=item["ts_code"],
|
||
name=item["name"],
|
||
note=item.get("note") or "",
|
||
watch_group=item.get("watch_group") or "observe",
|
||
cost_price=item.get("cost_price"),
|
||
mode=mode,
|
||
)
|
||
count += 1
|
||
return count
|
||
|
||
|
||
async def analyze_watchlist_item(
|
||
watchlist_id: int,
|
||
user_id: int,
|
||
ts_code: str,
|
||
name: str,
|
||
note: str = "",
|
||
watch_group: str = "observe",
|
||
cost_price: float | None = None,
|
||
mode: str = "manual",
|
||
) -> dict:
|
||
"""分析单只自选股并保存结果。"""
|
||
recommendation = await _load_latest_recommendation(ts_code)
|
||
latest_tracking = await _load_latest_tracking(recommendation["id"]) if recommendation else None
|
||
|
||
try:
|
||
quote = await tencent_client.get_realtime_quote(ts_code)
|
||
except Exception:
|
||
logger.exception("获取自选股实时行情失败: %s", ts_code)
|
||
quote = None
|
||
|
||
try:
|
||
signals = generate_signals(ts_code)
|
||
except Exception:
|
||
logger.exception("生成自选股信号失败: %s", ts_code)
|
||
signals = None
|
||
|
||
summary = _build_summary(ts_code, name, recommendation, latest_tracking, quote, signals, note, watch_group, cost_price)
|
||
|
||
llm_result = await _generate_watchlist_advice(summary)
|
||
structured = _extract_structured_result(llm_result, recommendation, latest_tracking)
|
||
|
||
async with get_db() as db:
|
||
await db.execute(
|
||
tables.watchlist_analyses_table.insert().values(
|
||
user_id=user_id,
|
||
watchlist_id=watchlist_id,
|
||
ts_code=ts_code,
|
||
name=name,
|
||
conclusion=structured["conclusion"],
|
||
advice=structured["advice"],
|
||
trigger_condition=structured["trigger_condition"],
|
||
risk_note=structured["risk_note"],
|
||
summary=structured["summary"],
|
||
full_analysis=structured["full_analysis"],
|
||
score_reference=structured["score_reference"],
|
||
analysis_mode=mode,
|
||
)
|
||
)
|
||
await db.commit()
|
||
|
||
return structured
|
||
|
||
|
||
async def _load_latest_recommendation(ts_code: str) -> dict | None:
|
||
async with get_db() as db:
|
||
row = (await db.execute(
|
||
text(
|
||
"SELECT * FROM recommendations "
|
||
"WHERE ts_code = :code "
|
||
"ORDER BY created_at DESC, id DESC LIMIT 1"
|
||
),
|
||
{"code": ts_code},
|
||
)).fetchone()
|
||
return dict(row._mapping) if row else None
|
||
|
||
|
||
async def _load_latest_tracking(recommendation_id: int) -> dict | None:
|
||
async with get_db() as db:
|
||
row = (await db.execute(
|
||
text(
|
||
"SELECT * FROM recommendation_tracking "
|
||
"WHERE recommendation_id = :rid "
|
||
"ORDER BY track_date DESC, id DESC LIMIT 1"
|
||
),
|
||
{"rid": recommendation_id},
|
||
)).fetchone()
|
||
return dict(row._mapping) if row else None
|
||
|
||
|
||
def _build_summary(
|
||
ts_code: str,
|
||
name: str,
|
||
recommendation: dict | None,
|
||
latest_tracking: dict | None,
|
||
quote,
|
||
signals,
|
||
note: str,
|
||
watch_group: str,
|
||
cost_price: float | None,
|
||
) -> str:
|
||
quote_str = ""
|
||
if quote:
|
||
quote_str = f"当前价 {quote.price},涨跌幅 {quote.pct_chg}%,换手率 {quote.turnover_rate}%,量比 {quote.volume_ratio}。"
|
||
|
||
recommendation_str = "暂无推荐归档。"
|
||
if recommendation:
|
||
recommendation_str = (
|
||
f"推荐归档:结论 {recommendation.get('action_plan') or '观察'},"
|
||
f"触发条件 {recommendation.get('trigger_condition') or '暂无'},"
|
||
f"失效条件 {recommendation.get('invalidation_condition') or '暂无'},"
|
||
f"风险提示 {recommendation.get('risk_note') or '暂无'}。"
|
||
)
|
||
|
||
tracking_str = ""
|
||
if latest_tracking:
|
||
tracking_str = (
|
||
f"最近跟踪:收益 {latest_tracking.get('pct_from_entry') or 0}%,"
|
||
f"最大浮盈 {latest_tracking.get('max_return_pct') or 0}%,"
|
||
f"最大回撤 {latest_tracking.get('max_drawdown_pct') or 0}%,"
|
||
f"备注 {latest_tracking.get('review_note') or '暂无'}。"
|
||
)
|
||
|
||
signal_str = "技术快照暂无。"
|
||
if signals:
|
||
signal_str = (
|
||
f"技术快照:趋势强度 {signals.trend_score},辅助信号 {signals.signal_count}/7,"
|
||
f"位置安全 {signals.position_score},近5日涨幅 {signals.rally_pct_5d}% ,近10日涨幅 {signals.rally_pct_10d}%。"
|
||
)
|
||
|
||
group_str = f"用户分组:{watch_group}。"
|
||
cost_str = f"持仓成本 {cost_price}。" if cost_price and cost_price > 0 else "暂无持仓成本。"
|
||
note_str = f"用户备注:{note}" if note else "用户未填写备注。"
|
||
return f"{ts_code} {name}。{group_str} {cost_str} {quote_str} {recommendation_str} {tracking_str} {signal_str} {note_str}"
|
||
|
||
|
||
async def _generate_watchlist_advice(summary: str) -> str:
|
||
message = await chat_completion([
|
||
{
|
||
"role": "system",
|
||
"content": (
|
||
"你是A股投研作战台的用户自选股助手。"
|
||
"你需要针对单只用户自选股给出简洁、可执行的建议。"
|
||
"输出必须是 JSON 字符串,包含字段 conclusion、advice、trigger_condition、risk_note、summary。"
|
||
"conclusion 只能是 可操作 / 重点关注 / 观察 / 回避。"
|
||
"summary 必须是一句中文短句。advice 需要明确用户下一步该看什么、等什么或做什么。"
|
||
),
|
||
},
|
||
{
|
||
"role": "user",
|
||
"content": f"请基于以下信息输出 JSON:{summary}",
|
||
},
|
||
])
|
||
|
||
if not message or not getattr(message, "content", None):
|
||
return ""
|
||
return message.content
|
||
|
||
|
||
def _extract_structured_result(content: str, recommendation: dict | None, latest_tracking: dict | None) -> dict:
|
||
default = {
|
||
"conclusion": recommendation.get("action_plan") if recommendation else "观察",
|
||
"advice": recommendation.get("trigger_condition") if recommendation else "继续观察量价配合、板块强弱和回踩承接。",
|
||
"trigger_condition": recommendation.get("trigger_condition") if recommendation else "",
|
||
"risk_note": recommendation.get("risk_note") if recommendation else (latest_tracking.get("review_note") if latest_tracking else ""),
|
||
"summary": latest_tracking.get("review_note") if latest_tracking else "当前信息不足以升级为明确操作,先保留观察。",
|
||
"full_analysis": content or "",
|
||
"score_reference": float(recommendation.get("score") or 0) if recommendation else 0,
|
||
}
|
||
|
||
if not content:
|
||
return default
|
||
|
||
try:
|
||
parsed = json.loads(_extract_json_string(content))
|
||
return {
|
||
"conclusion": parsed.get("conclusion") or default["conclusion"],
|
||
"advice": parsed.get("advice") or default["advice"],
|
||
"trigger_condition": parsed.get("trigger_condition") or default["trigger_condition"],
|
||
"risk_note": parsed.get("risk_note") or default["risk_note"],
|
||
"summary": parsed.get("summary") or default["summary"],
|
||
"full_analysis": content,
|
||
"score_reference": default["score_reference"],
|
||
}
|
||
except Exception:
|
||
logger.warning("自选股分析 JSON 解析失败,回退默认结构")
|
||
default["full_analysis"] = content
|
||
return default
|
||
|
||
|
||
def _extract_json_string(content: str) -> str:
|
||
cleaned = content.strip()
|
||
if cleaned.startswith("```"):
|
||
fenced = re.search(r"```(?:json)?\s*(\{.*\})\s*```", cleaned, re.DOTALL)
|
||
if fenced:
|
||
return fenced.group(1)
|
||
|
||
start = cleaned.find("{")
|
||
end = cleaned.rfind("}")
|
||
if start != -1 and end != -1 and end > start:
|
||
return cleaned[start : end + 1]
|
||
return cleaned
|