diff --git a/backend/app/__pycache__/main.cpython-313.pyc b/backend/app/__pycache__/main.cpython-313.pyc index 5ece7efa..419c8fd4 100644 Binary files a/backend/app/__pycache__/main.cpython-313.pyc and b/backend/app/__pycache__/main.cpython-313.pyc differ diff --git a/backend/app/api/__pycache__/chat.cpython-313.pyc b/backend/app/api/__pycache__/chat.cpython-313.pyc index f653c2c4..5acba5d3 100644 Binary files a/backend/app/api/__pycache__/chat.cpython-313.pyc and b/backend/app/api/__pycache__/chat.cpython-313.pyc differ diff --git a/backend/app/api/__pycache__/market.cpython-313.pyc b/backend/app/api/__pycache__/market.cpython-313.pyc index 7ed4d02b..cd8d0984 100644 Binary files a/backend/app/api/__pycache__/market.cpython-313.pyc and b/backend/app/api/__pycache__/market.cpython-313.pyc differ diff --git a/backend/app/api/__pycache__/recommendations.cpython-313.pyc b/backend/app/api/__pycache__/recommendations.cpython-313.pyc index fc023d39..c6cbb806 100644 Binary files a/backend/app/api/__pycache__/recommendations.cpython-313.pyc and b/backend/app/api/__pycache__/recommendations.cpython-313.pyc differ diff --git a/backend/app/api/__pycache__/stocks.cpython-313.pyc b/backend/app/api/__pycache__/stocks.cpython-313.pyc index ad2a240e..a740dfb1 100644 Binary files a/backend/app/api/__pycache__/stocks.cpython-313.pyc and b/backend/app/api/__pycache__/stocks.cpython-313.pyc differ diff --git a/backend/app/api/chat.py b/backend/app/api/chat.py index 4136a698..9707c7f8 100644 --- a/backend/app/api/chat.py +++ b/backend/app/api/chat.py @@ -6,10 +6,11 @@ POST /api/chat/stream - SSE 流式对话 import json import logging -from fastapi import APIRouter +from fastapi import APIRouter, Depends from fastapi.responses import StreamingResponse from pydantic import BaseModel +from app.core.deps import get_current_user from app.llm.chat_agent import chat_stream logger = logging.getLogger(__name__) @@ -26,13 +27,13 @@ class ChatRequest(BaseModel): @router.post("/stream") -async def chat_stream_endpoint(req: ChatRequest): +async def chat_stream_endpoint(req: ChatRequest, current_user: dict = Depends(get_current_user)): """流式对话接口(SSE)""" messages = [{"role": m.role, "content": m.content} for m in req.messages] async def event_generator(): try: - async for msg in chat_stream(messages): + async for msg in chat_stream(messages, current_user=current_user): data = json.dumps(msg, ensure_ascii=False) yield f"data: {data}\n\n" yield "data: [DONE]\n\n" diff --git a/backend/app/api/market.py b/backend/app/api/market.py index 0948e89b..e7d2024a 100644 --- a/backend/app/api/market.py +++ b/backend/app/api/market.py @@ -1,11 +1,14 @@ """市场概览 API""" -from fastapi import APIRouter +from datetime import datetime + +from fastapi import APIRouter, Depends from app.data.tushare_client import tushare_client from app.data import tencent_client from app.engine.recommender import get_latest_recommendations from app.config import is_trading_hours, is_market_session +from app.core.deps import get_current_admin router = APIRouter(prefix="/api/market", tags=["market"]) @@ -73,8 +76,112 @@ async def get_daily_review(): return {"reviews": reviews} +@router.get("/strategy-board") +async def get_strategy_board(): + """获取今日市场作战面板(只读,不触发 LLM)""" + from app.llm.strategy_board import build_strategy_board + return await build_strategy_board(include_llm=False) + + +@router.get("/strategy-iteration") +async def get_strategy_iteration(limit: int = 50): + """获取策略复盘迭代建议(只读,不触发 LLM)""" + from app.llm.strategy_iteration import build_strategy_iteration_report + return await build_strategy_iteration_report(limit=limit, include_llm=False) + + +@router.get("/ops-status") +async def get_ops_status(): + """管理员任务中心状态与数据新鲜度(只读,不触发扫描或 LLM)。""" + from sqlalchemy import text + from app.db.database import get_db + from app.engine.recommender import _scan_running + + async with get_db() as db: + rec_row = (await db.execute( + text( + "SELECT created_at FROM recommendations " + "ORDER BY created_at DESC LIMIT 1" + ) + )).fetchone() + tracking_row = (await db.execute( + text( + "SELECT track_date, created_at FROM recommendation_tracking " + "ORDER BY track_date DESC, id DESC LIMIT 1" + ) + )).fetchone() + market_row = (await db.execute( + text( + "SELECT trade_date, created_at FROM market_temperature " + "ORDER BY REPLACE(trade_date, '-', '') DESC, id DESC LIMIT 1" + ) + )).fetchone() + sector_row = (await db.execute( + text( + "SELECT trade_date, created_at FROM sector_heat " + "ORDER BY REPLACE(trade_date, '-', '') DESC, id DESC LIMIT 1" + ) + )).fetchone() + board_row = (await db.execute( + text( + "SELECT created_at FROM daily_reviews " + "ORDER BY trade_date DESC LIMIT 1" + ) + )).fetchone() + + def _fmt_dt(value): + return str(value or "") + + latest_market_date = str(market_row._mapping["trade_date"]) if market_row else "" + latest_sector_date = str(sector_row._mapping["trade_date"]) if sector_row else "" + latest_tracking_date = str(tracking_row._mapping["track_date"]) if tracking_row else "" + + return { + "scan_running": _scan_running, + "scan_mode": "intraday" if is_trading_hours() else "post_market", + "is_trading": is_trading_hours(), + "data_freshness": { + "market_trade_date": latest_market_date, + "sector_trade_date": latest_sector_date, + "tracking_trade_date": latest_tracking_date, + "last_recommendation_created_at": _fmt_dt(rec_row._mapping["created_at"]) if rec_row else "", + "last_tracking_created_at": _fmt_dt(tracking_row._mapping["created_at"]) if tracking_row else "", + "last_market_created_at": _fmt_dt(market_row._mapping["created_at"]) if market_row else "", + "last_sector_created_at": _fmt_dt(sector_row._mapping["created_at"]) if sector_row else "", + "last_review_created_at": _fmt_dt(board_row._mapping["created_at"]) if board_row else "", + "status": "fresh" if latest_market_date else "empty", + "message": ( + f"最新市场日期 {latest_market_date},最近跟踪 {latest_tracking_date or '暂无'}" + if latest_market_date else + "暂无市场缓存数据,请由管理员触发扫描。" + ), + "generated_at": datetime.now().isoformat(), + }, + "actions": [ + {"key": "refresh", "label": "立即扫描", "admin_only": True}, + {"key": "update_tracking", "label": "更新跟踪", "admin_only": True}, + {"key": "generate_strategy_board", "label": "生成策略板", "admin_only": True}, + {"key": "generate_strategy_iteration", "label": "生成策略复盘", "admin_only": True}, + ], + } + + +@router.post("/generate-strategy-board") +async def generate_strategy_board(_admin: dict = Depends(get_current_admin)): + """管理员手动生成带 LLM 说明的策略看板""" + from app.llm.strategy_board import build_strategy_board + return await build_strategy_board(include_llm=True) + + +@router.post("/generate-strategy-iteration") +async def generate_strategy_iteration(limit: int = 50, _admin: dict = Depends(get_current_admin)): + """管理员手动生成带 LLM 分析的策略复盘""" + from app.llm.strategy_iteration import build_strategy_iteration_report + return await build_strategy_iteration_report(limit=limit, include_llm=True) + + @router.post("/generate-review") -async def generate_daily_review(): +async def generate_daily_review(_admin: dict = Depends(get_current_admin)): """手动触发生成每日复盘""" from app.llm.daily_review import generate_review result = await generate_review() diff --git a/backend/app/api/recommendations.py b/backend/app/api/recommendations.py index 70acb6d9..5d9ad2a1 100644 --- a/backend/app/api/recommendations.py +++ b/backend/app/api/recommendations.py @@ -4,7 +4,7 @@ import asyncio import logging import traceback from datetime import datetime -from fastapi import APIRouter +from fastapi import APIRouter, Depends from app.engine.recommender import ( refresh_recommendations, @@ -13,6 +13,7 @@ from app.engine.recommender import ( get_performance_stats, ) from app.config import is_trading_hours +from app.core.deps import get_current_admin logger = logging.getLogger(__name__) @@ -60,6 +61,13 @@ async def get_latest(): "risk_note": r.risk_note, "llm_analysis": r.llm_analysis, "entry_timing": r.entry_timing, + "action_plan": r.action_plan, + "trigger_condition": r.trigger_condition, + "invalidation_condition": r.invalidation_condition, + "suggested_position_pct": r.suggested_position_pct, + "review_after_days": r.review_after_days, + "lifecycle_status": r.lifecycle_status, + "data_freshness": r.data_freshness, "llm_score": r.llm_score, "strategy": r.strategy, "entry_signal_type": r.entry_signal_type, @@ -69,11 +77,12 @@ async def get_latest(): for r in result.get("recommendations", []) ], "scan_mode": result.get("scan_mode", "unknown"), + "strategy_profile": result.get("strategy_profile"), } @router.post("/refresh") -async def refresh(scan_session: str = "manual"): +async def refresh(scan_session: str = "manual", _admin: dict = Depends(get_current_admin)): """手动触发一次全量筛选(后台执行,立即返回)""" from app.engine.recommender import _scan_running, _scan_lock @@ -126,7 +135,7 @@ async def _run_scan_background(scan_session: str): @router.post("/update-tracking") -async def update_tracking(): +async def update_tracking(_admin: dict = Depends(get_current_admin)): """独立更新推荐跟踪数据(不触发新扫描,盘中可单独调用)""" from app.engine.recommender import _update_tracking await _update_tracking() diff --git a/backend/app/api/stocks.py b/backend/app/api/stocks.py index 3c073c25..24f68413 100644 --- a/backend/app/api/stocks.py +++ b/backend/app/api/stocks.py @@ -5,7 +5,7 @@ import logging import traceback from datetime import datetime, timedelta -from fastapi import APIRouter +from fastapi import APIRouter, Query from starlette.responses import StreamingResponse from app.data.tushare_client import tushare_client @@ -19,6 +19,188 @@ logger = logging.getLogger(__name__) router = APIRouter(prefix="/api/stocks", tags=["stocks"]) +@router.get("/search") +async def search_stock(keyword: str): + """搜索股票""" + basic = tushare_client.get_stock_basic() + if basic.empty: + return [] + matches = basic[ + basic["name"].str.contains(keyword, na=False) | + basic["ts_code"].str.contains(keyword, na=False) | + basic["symbol"].str.contains(keyword, na=False) + ].head(20) + return matches[["ts_code", "name", "industry"]].to_dict(orient="records") + + +@router.get("/{ts_code}/thesis") +async def get_stock_thesis(ts_code: str): + """获取个股推荐推演归档(只读缓存,不触发扫描或 LLM)。""" + from sqlalchemy import text + + async with get_db() as db: + rec_result = await db.execute( + text( + "SELECT * FROM recommendations " + "WHERE ts_code = :code " + "ORDER BY created_at DESC, id DESC LIMIT 1" + ), + {"code": ts_code}, + ) + rec_row = rec_result.fetchone() + + tracking_rows = [] + diagnosis_rows = [] + if rec_row: + rec_id = rec_row._mapping["id"] + tracking_result = await db.execute( + text( + "SELECT * FROM recommendation_tracking " + "WHERE recommendation_id = :rid " + "ORDER BY track_date DESC, id DESC LIMIT 10" + ), + {"rid": rec_id}, + ) + tracking_rows = tracking_result.fetchall() + + diagnosis_result = await db.execute( + text( + "SELECT id, diagnosis, created_at FROM stock_diagnoses " + "WHERE ts_code = :code " + "ORDER BY created_at DESC LIMIT 3" + ), + {"code": ts_code}, + ) + diagnosis_rows = diagnosis_result.fetchall() + + if not rec_row: + return { + "ts_code": ts_code, + "name": ts_code, + "has_recommendation": False, + "recommendation": None, + "latest_tracking": None, + "tracking_history": [], + "diagnoses": [ + { + "id": row._mapping["id"], + "diagnosis": row._mapping["diagnosis"] or "", + "created_at": str(row._mapping["created_at"] or ""), + } + for row in diagnosis_rows + ], + "decision_points": [], + "data_freshness": { + "recommendation_created_at": "", + "tracking_date": "", + "status": "no_recommendation", + "message": "暂无推荐归档,可从 AI 诊断页生成个股诊断。", + }, + } + + r = rec_row._mapping + + def _safe_json_list(value: str | None) -> list: + if not value: + return [] + try: + parsed = json.loads(value) + return parsed if isinstance(parsed, list) else [] + except Exception: + return [] + + tracking_history = [] + for row in tracking_rows: + t = row._mapping + tracking_history.append({ + "track_date": t["track_date"], + "current_price": t["current_price"], + "pct_from_entry": t["pct_from_entry"], + "max_return_pct": t["max_return_pct"], + "max_drawdown_pct": t["max_drawdown_pct"], + "days_since_recommendation": t["days_since_recommendation"], + "hit_target": bool(t["hit_target"]), + "hit_stop_loss": bool(t["hit_stop_loss"]), + "close_reason": t["close_reason"] or "", + "review_note": t["review_note"] or "", + "status": t["status"] or "", + }) + latest_tracking = tracking_history[0] if tracking_history else None + + decision_points = [ + {"label": "操作计划", "value": r["action_plan"] or "观察"}, + {"label": "触发条件", "value": r["trigger_condition"] or "等待触发条件归档"}, + {"label": "失效条件", "value": r["invalidation_condition"] or "等待失效条件归档"}, + {"label": "建议仓位", "value": f"{r['suggested_position_pct']}%" if r["suggested_position_pct"] is not None else "未设置"}, + {"label": "复盘周期", "value": f"{r['review_after_days'] or 3}个交易日"}, + ] + + freshness_status = "fresh" + freshness_message = "推荐归档可用" + if not latest_tracking: + freshness_status = "needs_tracking" + freshness_message = "暂无跟踪记录,建议管理员执行跟踪更新。" + elif latest_tracking.get("track_date"): + freshness_message = f"最近跟踪日期 {latest_tracking['track_date']}" + + return { + "ts_code": r["ts_code"], + "name": r["name"], + "has_recommendation": True, + "recommendation": { + "id": r["id"], + "ts_code": r["ts_code"], + "name": r["name"], + "sector": r["sector"] or "", + "score": r["score"] or 0, + "market_temp_score": r["market_temp_score"] or 0, + "sector_score": r["sector_score"] or 0, + "capital_score": r["capital_score"] or 0, + "technical_score": r["technical_score"] or 0, + "supply_demand_score": r["supply_demand_score"] or 0, + "price_action_score": r["price_action_score"] or 0, + "position_score": r["position_score"] or 50, + "valuation_score": r["valuation_score"] or 50, + "entry_price": r["entry_price"], + "target_price": r["target_price"], + "stop_loss": r["stop_loss"], + "reasons": _safe_json_list(r["reasons"]), + "risk_note": r["risk_note"] or "", + "action_plan": r["action_plan"] or "观察", + "trigger_condition": r["trigger_condition"] or "", + "invalidation_condition": r["invalidation_condition"] or "", + "suggested_position_pct": r["suggested_position_pct"] or 0, + "review_after_days": r["review_after_days"] or 3, + "lifecycle_status": r["lifecycle_status"] or "candidate", + "data_freshness": r["data_freshness"] or "", + "llm_analysis": r["llm_analysis"] or "", + "llm_score": r["llm_score"], + "strategy": r["strategy"] or "trend_breakout", + "entry_signal_type": r["entry_signal_type"] or "none", + "entry_timing": r["entry_timing"] or "", + "scan_session": r["scan_session"] or "", + "created_at": str(r["created_at"] or ""), + }, + "latest_tracking": latest_tracking, + "tracking_history": tracking_history, + "diagnoses": [ + { + "id": row._mapping["id"], + "diagnosis": row._mapping["diagnosis"] or "", + "created_at": str(row._mapping["created_at"] or ""), + } + for row in diagnosis_rows + ], + "decision_points": decision_points, + "data_freshness": { + "recommendation_created_at": str(r["created_at"] or ""), + "tracking_date": latest_tracking["track_date"] if latest_tracking else "", + "status": freshness_status, + "message": freshness_message, + }, + } + + @router.get("/{ts_code}/quote") async def get_quote(ts_code: str): """获取个股实时行情""" @@ -86,20 +268,6 @@ async def get_capital_flow(ts_code: str, days: int = 10): return records -@router.get("/search") -async def search_stock(keyword: str): - """搜索股票""" - basic = tushare_client.get_stock_basic() - if basic.empty: - return [] - matches = basic[ - basic["name"].str.contains(keyword, na=False) | - basic["ts_code"].str.contains(keyword, na=False) | - basic["symbol"].str.contains(keyword, na=False) - ].head(20) - return matches[["ts_code", "name", "industry"]].to_dict(orient="records") - - @router.get("/{ts_code}/diagnose/history") async def get_diagnose_history(ts_code: str): """获取个股最近5次诊断历史""" @@ -123,6 +291,7 @@ async def get_diagnose_history(ts_code: str): "id": r["id"], "ts_code": r["ts_code"], "name": r["name"], + "diagnosis_mode": r.get("diagnosis_mode", "entry"), "diagnosis": r["diagnosis"], "created_at": str(r["created_at"]), }) @@ -133,7 +302,7 @@ async def get_diagnose_history(ts_code: str): @router.post("/{ts_code}/diagnose") -async def diagnose_stock(ts_code: str): +async def diagnose_stock(ts_code: str, mode: str = Query("entry")): """AI 诊断个股(SSE 流式返回)""" from app.config import settings if not settings.deepseek_api_key: @@ -150,10 +319,11 @@ async def diagnose_stock(ts_code: str): "SELECT id, ts_code, name, diagnosis, created_at " "FROM stock_diagnoses " "WHERE ts_code = :code " + "AND diagnosis_mode = :mode " "AND created_at >= datetime('now', '-30 minutes', 'localtime') " "ORDER BY created_at DESC LIMIT 1" ), - {"code": ts_code}, + {"code": ts_code, "mode": mode}, ) recent_row = result.fetchone() if recent_row: @@ -342,7 +512,25 @@ async def diagnose_stock(ts_code: str): except Exception: pass - user_msg = f"""请对以下A股进行全面诊断分析: + mode_instruction_map = { + "entry": "这是建仓前诊断。重点判断是否值得纳入操作或重点关注,强调触发条件和失效条件。", + "holding": "这是持仓复核。重点判断原有逻辑是否仍成立,是否该继续持有、减仓或退出。", + "review": "这是回撤复盘。重点分析问题出在个股、板块还是市场环境,并给出修正建议。", + "tracking": "这是继续跟踪。重点判断是否保留在观察池、何时升级为可操作或何时移除。", + } + mode_label_map = { + "entry": "建仓前诊断", + "holding": "持仓复核", + "review": "回撤复盘", + "tracking": "继续跟踪", + } + mode_instruction = mode_instruction_map.get(mode, mode_instruction_map["entry"]) + mode_label = mode_label_map.get(mode, mode_label_map["entry"]) + + user_msg = f"""请基于当前 AI 推荐体系,对以下A股进行结构化个股会诊: + +诊断模式: {mode_label} +模式要求: {mode_instruction} 股票: {ts_code} ({basic_info}) {quote_str} @@ -358,23 +546,44 @@ async def diagnose_stock(ts_code: str): {sector_str} 重要提示: -1. 趋势评分是推荐体系的技术面核心分数(均线排列40+高低点结构35+MA20方向25=满分100),辅助信号计数仅供参考不参与主评分。 -2. 位置安全评分高(>80)表示股价处于相对低位,低(<40)表示可能追高。 -3. 如果有推荐体系评分,请作为主要分析依据;趋势评分和信号计数从不同维度描述技术面状态。 +1. 你不是在写传统研报,而是在给交易作战台输出结构化会诊意见。 +2. 如果有推荐体系评分、操作计划、跟踪信息,请优先沿用当前推荐体系,而不是另起一套标准。 +3. 趋势评分是推荐体系的技术面核心分数(均线排列40+高低点结构35+MA20方向25=满分100),辅助信号计数仅供参考不参与主评分。 +4. 位置安全评分高(>80)表示股价处于相对低位,低(<40)表示可能追高。 +5. 板块信息和推荐体系信息优先级高于单一技术指标。 {freshness_note} -请从以下维度分析(Markdown格式,简洁专业): -## 综合评级 -(给出1-5星评级和一句话总结,综合趋势评分、位置安全和供需形态) +请严格按以下 Markdown 结构输出,不要写成泛泛长文: -## 技术面分析 -(趋势方向、均线关系、支撑压力、量价配合,优先参考趋势评分而非信号计数) +## 当前结论 +- 结论: 只能从「可操作 / 重点关注 / 观察 / 回避」中选一个 +- 一句话判断: 用一句话解释为什么 +- 适配模式: 说明更适合启动试错、分歧回流、趋势跟随还是只观察 -## 资金面分析 -(主力资金态度、板块联动效应) +## 核心逻辑 +- 市场环境: 当前大盘和风格是否支持这只票 +- 板块位置: 所属板块是主线、次主线还是观察线 +- 个股角色: 龙头 / 跟风 / 独立逻辑 / 非核心 -## 操作建议 -(适合什么类型的投资者、入场时机、风险提示)""" +## 执行动作 +- 触发条件: 什么情况下才可以行动 +- 失效条件: 什么情况下放弃 +- 仓位建议: 用低 / 中 / 高 或百分比表达 +- 适合谁: 适合激进试错、低吸等待、还是不适合参与 + +## 风险清单 +- 风险1: +- 风险2: +- 风险3: + +## 复盘问题 +- 如果后续走势不符合预期,优先检查哪两个问题 + +要求: +- 结论必须明确,不能模糊两可 +- 少写形容词,多写交易判断 +- 不要重复原始数据 +- 文字保持简洁,避免旧式研报语气""" # ── SSE 流式返回 ── async def _stream_diagnosis(): @@ -384,7 +593,7 @@ async def diagnose_stock(ts_code: str): stream = await client.chat.completions.create( model=settings.deepseek_model, messages=[ - {"role": "system", "content": "你是一位专业的A股分析师,擅长技术面和资金面分析。回复使用Markdown格式,简洁专业,客观理性。"}, + {"role": "system", "content": "你是A股AI投研作战台中的个股会诊模块。你的职责不是写传统长文研报,而是基于市场环境、板块地位、推荐体系评分和跟踪结果,输出可执行、结构化的交易会诊意见。回复必须使用Markdown,结论明确,强调触发条件、失效条件、仓位和风险。"}, {"role": "user", "content": user_msg}, ], max_tokens=1500, @@ -407,6 +616,7 @@ async def diagnose_stock(ts_code: str): tables.stock_diagnoses_table.insert().values( ts_code=ts_code, name=stock_name or ts_code, + diagnosis_mode=mode, diagnosis=full_content, ) ) @@ -425,4 +635,4 @@ async def diagnose_stock(ts_code: str): yield f"data: {json.dumps({'error': error_msg}, ensure_ascii=False)}\n\n" yield f"data: {json.dumps({'done': True, 'ts_code': ts_code}, ensure_ascii=False)}\n\n" - return StreamingResponse(_stream_diagnosis(), media_type="text/event-stream") \ No newline at end of file + return StreamingResponse(_stream_diagnosis(), media_type="text/event-stream") diff --git a/backend/app/api/watchlists.py b/backend/app/api/watchlists.py new file mode 100644 index 00000000..4743a5a0 --- /dev/null +++ b/backend/app/api/watchlists.py @@ -0,0 +1,250 @@ +"""用户自选股 API""" + +from fastapi import APIRouter, Depends, HTTPException +from pydantic import BaseModel +from sqlalchemy import text + +from app.core.deps import get_current_user +from app.db.database import get_db +from app.engine.watchlist import analyze_watchlist_for_all_users, analyze_watchlist_item + +router = APIRouter(prefix="/api/watchlists", tags=["watchlists"]) + + +class WatchlistCreateRequest(BaseModel): + ts_code: str + name: str + note: str = "" + watch_group: str = "observe" + cost_price: float | None = None + + +class WatchlistUpdateRequest(BaseModel): + note: str | None = None + watch_group: str | None = None + cost_price: float | None = None + + +WATCH_GROUPS = {"observe", "focus", "candidate", "holding"} + + +@router.get("") +async def list_watchlists(current_user: dict = Depends(get_current_user)): + async with get_db() as db: + rows = (await db.execute( + text( + "SELECT w.id, w.ts_code, w.name, w.note, w.watch_group, w.cost_price, w.created_at, " + "a.conclusion, a.advice, a.trigger_condition, a.risk_note, a.summary, a.created_at AS analysis_created_at " + "FROM user_watchlists w " + "LEFT JOIN watchlist_analyses a ON a.id = (" + " SELECT id FROM watchlist_analyses " + " WHERE watchlist_id = w.id ORDER BY created_at DESC, id DESC LIMIT 1" + ") " + "WHERE w.user_id = :uid AND COALESCE(w.is_active, 1) = 1 " + "ORDER BY w.created_at DESC" + ), + {"uid": current_user["id"]}, + )).fetchall() + + return [dict(row._mapping) for row in rows] + + +@router.post("") +async def create_watchlist(req: WatchlistCreateRequest, current_user: dict = Depends(get_current_user)): + normalized_code = req.ts_code.strip().upper() + normalized_name = req.name.strip() + normalized_note = req.note.strip() + normalized_group = (req.watch_group or "observe").strip().lower() + normalized_cost = req.cost_price if req.cost_price and req.cost_price > 0 else None + + if not normalized_code or not normalized_name: + raise HTTPException(status_code=400, detail="股票代码和名称不能为空") + if normalized_group not in WATCH_GROUPS: + raise HTTPException(status_code=400, detail="无效的自选分组") + + async with get_db() as db: + exists = (await db.execute( + text( + "SELECT id FROM user_watchlists " + "WHERE user_id = :uid AND ts_code = :code AND COALESCE(is_active, 1) = 1" + ), + {"uid": current_user["id"], "code": normalized_code}, + )).fetchone() + if exists: + raise HTTPException(status_code=400, detail="该股票已在自选列表中") + + result = await db.execute( + text( + "INSERT INTO user_watchlists (user_id, ts_code, name, note, watch_group, cost_price, is_active) " + "VALUES (:uid, :code, :name, :note, :watch_group, :cost_price, 1)" + ), + { + "uid": current_user["id"], + "code": normalized_code, + "name": normalized_name, + "note": normalized_note, + "watch_group": normalized_group, + "cost_price": normalized_cost, + }, + ) + await db.commit() + watchlist_id = getattr(result, "lastrowid", None) + + if not watchlist_id: + inserted = (await db.execute( + text( + "SELECT id FROM user_watchlists " + "WHERE user_id = :uid AND ts_code = :code " + "ORDER BY id DESC LIMIT 1" + ), + {"uid": current_user["id"], "code": normalized_code}, + )).fetchone() + if not inserted: + raise HTTPException(status_code=500, detail="自选股创建失败") + watchlist_id = inserted._mapping["id"] + + await analyze_watchlist_item( + watchlist_id=watchlist_id, + user_id=current_user["id"], + ts_code=normalized_code, + name=normalized_name, + note=normalized_note, + watch_group=normalized_group, + cost_price=normalized_cost, + mode="manual", + ) + + return {"status": "ok", "message": "已加入自选并完成首次分析", "watchlist_id": watchlist_id} + + +@router.patch("/{watchlist_id}") +async def update_watchlist(watchlist_id: int, req: WatchlistUpdateRequest, current_user: dict = Depends(get_current_user)): + updates: list[str] = [] + params: dict = {"id": watchlist_id, "uid": current_user["id"]} + + if req.note is not None: + updates.append("note = :note") + params["note"] = req.note.strip() + if req.watch_group is not None: + normalized_group = req.watch_group.strip().lower() + if normalized_group not in WATCH_GROUPS: + raise HTTPException(status_code=400, detail="无效的自选分组") + updates.append("watch_group = :watch_group") + params["watch_group"] = normalized_group + if req.cost_price is not None: + updates.append("cost_price = :cost_price") + params["cost_price"] = req.cost_price if req.cost_price > 0 else None + + if not updates: + raise HTTPException(status_code=400, detail="没有可更新的字段") + + updates.append("updated_at = CURRENT_TIMESTAMP") + + async with get_db() as db: + result = await db.execute( + text( + f"UPDATE user_watchlists SET {', '.join(updates)} " + "WHERE id = :id AND user_id = :uid AND COALESCE(is_active, 1) = 1" + ), + params, + ) + await db.commit() + + if result.rowcount == 0: + raise HTTPException(status_code=404, detail="自选股不存在") + + row = (await db.execute( + text( + "SELECT id, user_id, ts_code, name, note, watch_group, cost_price " + "FROM user_watchlists " + "WHERE id = :id AND user_id = :uid" + ), + {"id": watchlist_id, "uid": current_user["id"]}, + )).fetchone() + + item = row._mapping + return {"status": "ok", "item": dict(item)} + + +@router.delete("/{watchlist_id}") +async def delete_watchlist(watchlist_id: int, current_user: dict = Depends(get_current_user)): + async with get_db() as db: + await db.execute( + text( + "UPDATE user_watchlists SET is_active = 0 " + "WHERE id = :id AND user_id = :uid" + ), + {"id": watchlist_id, "uid": current_user["id"]}, + ) + await db.commit() + return {"status": "ok"} + + +@router.post("/{watchlist_id}/analyze") +async def analyze_single_watchlist(watchlist_id: int, current_user: dict = Depends(get_current_user)): + async with get_db() as db: + row = (await db.execute( + text( + "SELECT id, user_id, ts_code, name, note, watch_group, cost_price FROM user_watchlists " + "WHERE id = :id AND user_id = :uid AND COALESCE(is_active, 1) = 1" + ), + {"id": watchlist_id, "uid": current_user["id"]}, + )).fetchone() + if not row: + raise HTTPException(status_code=404, detail="自选股不存在") + + item = row._mapping + result = await analyze_watchlist_item( + watchlist_id=item["id"], + user_id=item["user_id"], + ts_code=item["ts_code"], + name=item["name"], + note=item["note"] or "", + watch_group=item["watch_group"] or "observe", + cost_price=item["cost_price"], + mode="manual", + ) + return {"status": "ok", "result": result} + + +@router.post("/analyze-all") +async def analyze_all_watchlists(current_user: dict = Depends(get_current_user)): + async with get_db() as db: + rows = (await db.execute( + text( + "SELECT id, user_id, ts_code, name, note, watch_group, cost_price FROM user_watchlists " + "WHERE user_id = :uid AND COALESCE(is_active, 1) = 1" + ), + {"uid": current_user["id"]}, + )).fetchall() + + count = 0 + for row in rows: + item = row._mapping + await analyze_watchlist_item( + watchlist_id=item["id"], + user_id=item["user_id"], + ts_code=item["ts_code"], + name=item["name"], + note=item["note"] or "", + watch_group=item["watch_group"] or "observe", + cost_price=item["cost_price"], + mode="manual", + ) + count += 1 + return {"status": "ok", "count": count, "message": f"已完成 {count} 条自选股分析"} + + +@router.get("/{watchlist_id}/history") +async def watchlist_history(watchlist_id: int, current_user: dict = Depends(get_current_user)): + async with get_db() as db: + rows = (await db.execute( + text( + "SELECT a.* FROM watchlist_analyses a " + "INNER JOIN user_watchlists w ON w.id = a.watchlist_id " + "WHERE a.watchlist_id = :wid AND w.user_id = :uid " + "ORDER BY a.created_at DESC LIMIT 20" + ), + {"wid": watchlist_id, "uid": current_user["id"]}, + )).fetchall() + return [dict(row._mapping) for row in rows] diff --git a/backend/app/data/__pycache__/models.cpython-313.pyc b/backend/app/data/__pycache__/models.cpython-313.pyc index cd9d5d97..60b466d9 100644 Binary files a/backend/app/data/__pycache__/models.cpython-313.pyc and b/backend/app/data/__pycache__/models.cpython-313.pyc differ diff --git a/backend/app/data/models.py b/backend/app/data/models.py index a788b3a4..ea4c85a0 100644 --- a/backend/app/data/models.py +++ b/backend/app/data/models.py @@ -133,7 +133,46 @@ class Recommendation(BaseModel): strategy: str = "trend_breakout" # trend_breakout / momentum(旧) / potential(旧) entry_signal_type: str = "none" # breakout / pullback / launch / none entry_timing: str = "" # 进场时机建议(盘中适用) + action_plan: str = "观察" # 可操作 / 重点关注 / 观察 + trigger_condition: str = "" # 触发条件 + invalidation_condition: str = "" # 失效条件 + suggested_position_pct: float = 0 # 建议仓位 % + review_after_days: int = 3 # 建议复盘天数 + lifecycle_status: str = "candidate" # candidate / actionable / tracking / closed + data_freshness: str = "" # 数据新鲜度说明 llm_analysis: str = "" # LLM 深度分析 llm_score: float | None = None # AI 评分 1-10 scan_session: str = "" created_at: datetime | None = None + + +class StrategyFocus(BaseModel): + label: str + description: str + + +class StrategySectorFocus(BaseModel): + sector_name: str + stage: str = "mid" + heat_score: float = 0 + pct_change: float = 0 + limit_up_count: int = 0 + view: str = "" + + +class StrategyBoard(BaseModel): + trade_date: str + market_regime: str + risk_level: str + action_bias: str + position_suggestion: str + summary: str + recommended_mode: str + strategy_focus: list[StrategyFocus] = [] + watch_sectors: list[StrategySectorFocus] = [] + avoid_rules: list[str] = [] + iteration_notes: list[str] = [] + iteration_report: dict = {} + metrics: dict = {} + ai_review: str = "" + generated_by: str = "rules" diff --git a/backend/app/db/__pycache__/database.cpython-313.pyc b/backend/app/db/__pycache__/database.cpython-313.pyc index ce4fabae..d47e232c 100644 Binary files a/backend/app/db/__pycache__/database.cpython-313.pyc and b/backend/app/db/__pycache__/database.cpython-313.pyc differ diff --git a/backend/app/db/__pycache__/tables.cpython-313.pyc b/backend/app/db/__pycache__/tables.cpython-313.pyc index 36974dc2..c63e2b8b 100644 Binary files a/backend/app/db/__pycache__/tables.cpython-313.pyc and b/backend/app/db/__pycache__/tables.cpython-313.pyc differ diff --git a/backend/app/db/database.py b/backend/app/db/database.py index cfdc6684..5af2fe78 100644 --- a/backend/app/db/database.py +++ b/backend/app/db/database.py @@ -40,12 +40,21 @@ async def init_db(): "ALTER TABLE recommendations ADD COLUMN price_action_score REAL DEFAULT 0", "ALTER TABLE recommendations ADD COLUMN position_score REAL", "ALTER TABLE recommendations ADD COLUMN valuation_score REAL", + "ALTER TABLE recommendations ADD COLUMN risk_note TEXT DEFAULT ''", + "ALTER TABLE recommendations ADD COLUMN action_plan TEXT DEFAULT '观察'", + "ALTER TABLE recommendations ADD COLUMN trigger_condition TEXT DEFAULT ''", + "ALTER TABLE recommendations ADD COLUMN invalidation_condition TEXT DEFAULT ''", + "ALTER TABLE recommendations ADD COLUMN suggested_position_pct REAL DEFAULT 0", + "ALTER TABLE recommendations ADD COLUMN review_after_days INTEGER DEFAULT 3", + "ALTER TABLE recommendations ADD COLUMN lifecycle_status TEXT DEFAULT 'candidate'", + "ALTER TABLE recommendations ADD COLUMN data_freshness TEXT DEFAULT ''", "ALTER TABLE recommendations ADD COLUMN llm_analysis TEXT DEFAULT ''", "ALTER TABLE recommendations ADD COLUMN strategy TEXT DEFAULT 'momentum'", "ALTER TABLE recommendations ADD COLUMN llm_score REAL", "ALTER TABLE market_temperature ADD COLUMN max_streak INTEGER", "ALTER TABLE market_temperature ADD COLUMN broken_rate REAL", "ALTER TABLE recommendations ADD COLUMN entry_signal_type TEXT DEFAULT 'none'", + "ALTER TABLE recommendations ADD COLUMN entry_timing TEXT DEFAULT ''", "ALTER TABLE sector_heat ADD COLUMN stage TEXT", "ALTER TABLE sector_heat ADD COLUMN days_continuous INTEGER", "ALTER TABLE sector_heat ADD COLUMN member_count INTEGER", @@ -53,6 +62,27 @@ async def init_db(): "ALTER TABLE sector_heat ADD COLUMN pct_trend TEXT", "ALTER TABLE sector_heat ADD COLUMN turnover_avg REAL", "ALTER TABLE sector_heat ADD COLUMN main_force_ratio REAL", + "ALTER TABLE recommendation_tracking ADD COLUMN max_price REAL", + "ALTER TABLE recommendation_tracking ADD COLUMN min_price REAL", + "ALTER TABLE recommendation_tracking ADD COLUMN max_return_pct REAL", + "ALTER TABLE recommendation_tracking ADD COLUMN max_drawdown_pct REAL", + "ALTER TABLE recommendation_tracking ADD COLUMN days_since_recommendation INTEGER DEFAULT 0", + "ALTER TABLE recommendation_tracking ADD COLUMN close_reason TEXT DEFAULT ''", + "ALTER TABLE recommendation_tracking ADD COLUMN review_note TEXT DEFAULT ''", + "ALTER TABLE stock_diagnoses ADD COLUMN diagnosis_mode TEXT DEFAULT 'entry'", + "ALTER TABLE user_watchlists ADD COLUMN note TEXT DEFAULT ''", + "ALTER TABLE user_watchlists ADD COLUMN watch_group TEXT DEFAULT 'observe'", + "ALTER TABLE user_watchlists ADD COLUMN cost_price REAL", + "ALTER TABLE user_watchlists ADD COLUMN is_active BOOLEAN DEFAULT 1", + "ALTER TABLE user_watchlists ADD COLUMN updated_at DATETIME DEFAULT CURRENT_TIMESTAMP", + "ALTER TABLE watchlist_analyses ADD COLUMN conclusion TEXT DEFAULT '观察'", + "ALTER TABLE watchlist_analyses ADD COLUMN advice TEXT DEFAULT ''", + "ALTER TABLE watchlist_analyses ADD COLUMN trigger_condition TEXT DEFAULT ''", + "ALTER TABLE watchlist_analyses ADD COLUMN risk_note TEXT DEFAULT ''", + "ALTER TABLE watchlist_analyses ADD COLUMN summary TEXT DEFAULT ''", + "ALTER TABLE watchlist_analyses ADD COLUMN full_analysis TEXT DEFAULT ''", + "ALTER TABLE watchlist_analyses ADD COLUMN score_reference REAL DEFAULT 0", + "ALTER TABLE watchlist_analyses ADD COLUMN analysis_mode TEXT DEFAULT 'scheduled'", ]: try: await conn.execute( diff --git a/backend/app/db/tables.py b/backend/app/db/tables.py index 296b900a..f3f3aab8 100644 --- a/backend/app/db/tables.py +++ b/backend/app/db/tables.py @@ -27,9 +27,18 @@ recommendations_table = Table( Column("target_price", Float), Column("stop_loss", Float), Column("reasons", Text), + Column("risk_note", Text, default=""), + Column("action_plan", Text, default="观察"), + Column("trigger_condition", Text, default=""), + Column("invalidation_condition", Text, default=""), + Column("suggested_position_pct", Float, default=0), + Column("review_after_days", Integer, default=3), + Column("lifecycle_status", Text, default="candidate"), + Column("data_freshness", Text, default=""), Column("llm_analysis", Text, default=""), Column("strategy", Text, default="trend_breakout"), Column("entry_signal_type", Text, default="none"), + Column("entry_timing", Text, default=""), Column("llm_score", Float, default=None), Column("scan_session", Text), Column("created_at", DateTime, server_default=func.now()), @@ -76,8 +85,15 @@ recommendation_tracking_table = Table( Column("track_date", Text, nullable=False), Column("current_price", Float), Column("pct_from_entry", Float), + Column("max_price", Float), + Column("min_price", Float), + Column("max_return_pct", Float), + Column("max_drawdown_pct", Float), + Column("days_since_recommendation", Integer, default=0), Column("hit_target", Boolean, default=False), Column("hit_stop_loss", Boolean, default=False), + Column("close_reason", Text, default=""), + Column("review_note", Text, default=""), Column("status", Text, default="active"), Column("created_at", DateTime, server_default=func.now()), ) @@ -106,10 +122,43 @@ stock_diagnoses_table = Table( Column("id", Integer, primary_key=True, autoincrement=True), Column("ts_code", Text, nullable=False), Column("name", Text, nullable=False), + Column("diagnosis_mode", Text, default="entry"), Column("diagnosis", Text, nullable=False), Column("created_at", DateTime, server_default=func.now()), ) +user_watchlists_table = Table( + "user_watchlists", metadata, + Column("id", Integer, primary_key=True, autoincrement=True), + Column("user_id", Integer, nullable=False), + Column("ts_code", Text, nullable=False), + Column("name", Text, nullable=False), + Column("note", Text, default=""), + Column("watch_group", Text, default="observe"), + Column("cost_price", Float, default=None), + Column("is_active", Boolean, default=True), + Column("created_at", DateTime, server_default=func.now()), + Column("updated_at", DateTime, server_default=func.now()), +) + +watchlist_analyses_table = Table( + "watchlist_analyses", metadata, + Column("id", Integer, primary_key=True, autoincrement=True), + Column("user_id", Integer, nullable=False), + Column("watchlist_id", Integer, nullable=False), + Column("ts_code", Text, nullable=False), + Column("name", Text, nullable=False), + Column("conclusion", Text, default="观察"), + Column("advice", Text, default=""), + Column("trigger_condition", Text, default=""), + Column("risk_note", Text, default=""), + Column("summary", Text, default=""), + Column("full_analysis", Text, default=""), + Column("score_reference", Float, default=0), + Column("analysis_mode", Text, default="scheduled"), + Column("created_at", DateTime, server_default=func.now()), +) + error_logs_table = Table( "error_logs", metadata, Column("id", Integer, primary_key=True, autoincrement=True), diff --git a/backend/app/engine/__pycache__/recommender.cpython-313.pyc b/backend/app/engine/__pycache__/recommender.cpython-313.pyc index 80469adf..1820ec7a 100644 Binary files a/backend/app/engine/__pycache__/recommender.cpython-313.pyc and b/backend/app/engine/__pycache__/recommender.cpython-313.pyc differ diff --git a/backend/app/engine/__pycache__/scheduler.cpython-313.pyc b/backend/app/engine/__pycache__/scheduler.cpython-313.pyc index 01db5f81..9f7731d8 100644 Binary files a/backend/app/engine/__pycache__/scheduler.cpython-313.pyc and b/backend/app/engine/__pycache__/scheduler.cpython-313.pyc differ diff --git a/backend/app/engine/__pycache__/screener.cpython-313.pyc b/backend/app/engine/__pycache__/screener.cpython-313.pyc index 77030fab..4d84deb9 100644 Binary files a/backend/app/engine/__pycache__/screener.cpython-313.pyc and b/backend/app/engine/__pycache__/screener.cpython-313.pyc differ diff --git a/backend/app/engine/recommender.py b/backend/app/engine/recommender.py index 74020cb9..db61dd8f 100644 --- a/backend/app/engine/recommender.py +++ b/backend/app/engine/recommender.py @@ -62,11 +62,12 @@ async def _update_tracking(): # 查找所有活跃的推荐(有 entry_price 且未被标记为 closed) result = await db.execute( text( - "SELECT id, ts_code, entry_price, target_price, stop_loss " + "SELECT id, ts_code, entry_price, target_price, stop_loss, " + "review_after_days, lifecycle_status, created_at " "FROM recommendations " "WHERE entry_price IS NOT NULL " "AND entry_price > 0 " - "AND id NOT IN (SELECT DISTINCT recommendation_id FROM recommendation_tracking WHERE status = 'closed') " + "AND COALESCE(lifecycle_status, 'candidate') NOT IN ('closed_win', 'closed_loss', 'invalidated', 'expired') " "AND date(created_at) <= date(:today) " "ORDER BY created_at DESC LIMIT 50" ), @@ -88,15 +89,44 @@ async def _update_tracking(): tracked = 0 for r in rows: - rec_id, ts_code, entry_price, target_price, stop_loss = r + rec_id, ts_code, entry_price, target_price, stop_loss, review_after_days, lifecycle_status, created_at = r current_price = price_map.get(ts_code) if current_price is None or entry_price is None or entry_price <= 0: continue - pct = round((current_price - entry_price) / entry_price * 100, 2) - hit_target = target_price and current_price >= target_price - hit_stop = stop_loss and current_price <= stop_loss - status = "closed" if (hit_target or hit_stop) else "active" + track_metrics = _calculate_tracking_metrics( + ts_code=ts_code, + entry_price=float(entry_price), + current_price=float(current_price), + created_at=created_at, + latest_trade_date=trade_date, + ) + pct = track_metrics["pct_from_entry"] + max_price = track_metrics["max_price"] + min_price = track_metrics["min_price"] + max_return_pct = track_metrics["max_return_pct"] + max_drawdown_pct = track_metrics["max_drawdown_pct"] + days_since = track_metrics["days_since_recommendation"] + + hit_target = bool(target_price and max_price >= target_price) + hit_stop = bool(stop_loss and min_price <= stop_loss) + review_days = int(review_after_days or 3) + expired = days_since >= review_days and not hit_target and not hit_stop + + status, new_lifecycle, close_reason = _derive_lifecycle_status( + hit_target=hit_target, + hit_stop=hit_stop, + expired=expired, + pct=pct, + previous_status=lifecycle_status or "candidate", + ) + review_note = _build_review_note( + pct=pct, + max_return_pct=max_return_pct, + max_drawdown_pct=max_drawdown_pct, + days_since=days_since, + close_reason=close_reason, + ) # 检查今天是否已经跟踪过 exists = await db.execute( @@ -115,11 +145,25 @@ async def _update_tracking(): track_date=trade_date, current_price=current_price, pct_from_entry=pct, + max_price=max_price, + min_price=min_price, + max_return_pct=max_return_pct, + max_drawdown_pct=max_drawdown_pct, + days_since_recommendation=days_since, hit_target=hit_target, hit_stop_loss=hit_stop, + close_reason=close_reason, + review_note=review_note, status=status, ) ) + await db.execute( + text( + "UPDATE recommendations SET lifecycle_status = :status " + "WHERE id = :rid" + ), + {"status": new_lifecycle, "rid": rec_id}, + ) tracked += 1 await db.commit() @@ -131,6 +175,93 @@ async def _update_tracking(): await log_error("recommender", f"更新推荐跟踪失败: {e}", detail=traceback.format_exc()) +def _calculate_tracking_metrics( + ts_code: str, + entry_price: float, + current_price: float, + created_at, + latest_trade_date: str, +) -> dict: + """计算推荐后的收益、最大收益和最大回撤。 + + 使用 Tushare 日线高低价回放推荐后的表现;失败时退化为当前价。 + """ + from app.data.tushare_client import tushare_client + + created_date = str(created_at)[:10] if created_at else "" + created_yyyymmdd = created_date.replace("-", "") if created_date else latest_trade_date + + max_price = current_price + min_price = current_price + days_since = 0 + + try: + df = tushare_client.get_stock_daily(ts_code, days=60) + if not df.empty: + df = df[df["trade_date"] >= created_yyyymmdd].sort_values("trade_date") + if not df.empty: + max_price = float(df["high"].max()) + min_price = float(df["low"].min()) + days_since = len(df["trade_date"].unique()) - 1 + except Exception as e: + logger.debug(f"计算跟踪指标失败 {ts_code}: {e}") + + pct = round((current_price - entry_price) / entry_price * 100, 2) + max_return_pct = round((max_price - entry_price) / entry_price * 100, 2) + max_drawdown_pct = round((min_price - entry_price) / entry_price * 100, 2) + + return { + "pct_from_entry": pct, + "max_price": round(max_price, 2), + "min_price": round(min_price, 2), + "max_return_pct": max_return_pct, + "max_drawdown_pct": max_drawdown_pct, + "days_since_recommendation": max(days_since, 0), + } + + +def _derive_lifecycle_status( + hit_target: bool, + hit_stop: bool, + expired: bool, + pct: float, + previous_status: str, +) -> tuple[str, str, str]: + if hit_target: + return "closed", "closed_win", "hit_target" + if hit_stop: + return "closed", "closed_loss", "hit_stop_loss" + if expired: + if pct > 0: + return "closed", "closed_win", "review_expired_profit" + if pct < -2: + return "closed", "closed_loss", "review_expired_loss" + return "closed", "expired", "review_expired_flat" + if previous_status == "actionable": + return "active", "tracking", "" + return "active", "tracking", "" + + +def _build_review_note( + pct: float, + max_return_pct: float, + max_drawdown_pct: float, + days_since: int, + close_reason: str, +) -> str: + if close_reason == "hit_target": + return f"{days_since}个交易日内命中目标,最大浮盈{max_return_pct}%" + if close_reason == "hit_stop_loss": + return f"{days_since}个交易日内触发止损,最大回撤{max_drawdown_pct}%" + if close_reason == "review_expired_profit": + return f"复盘窗口到期,当前收益{pct}%,最大浮盈{max_return_pct}%" + if close_reason == "review_expired_loss": + return f"复盘窗口到期,当前亏损{pct}%,最大回撤{max_drawdown_pct}%" + if close_reason == "review_expired_flat": + return f"复盘窗口到期,收益{pct}%,未形成有效进攻" + return f"跟踪中,当前收益{pct}%,最大浮盈{max_return_pct}%,最大回撤{max_drawdown_pct}%" + + async def get_performance_stats() -> dict: """获取推荐胜率统计""" try: @@ -200,13 +331,44 @@ async def get_performance_stats() -> dict: ) hit_stop_count = result.scalar() or 0 + # 生命周期分布 + result = await db.execute( + text( + "SELECT COALESCE(lifecycle_status, 'candidate') AS status, COUNT(*) AS cnt " + "FROM recommendations GROUP BY COALESCE(lifecycle_status, 'candidate')" + ) + ) + lifecycle_counts = { + row._mapping["status"]: row._mapping["cnt"] + for row in result.fetchall() + } + + # 最大浮盈/最大回撤统计 + result = await db.execute( + text( + "SELECT AVG(max_return_pct), AVG(max_drawdown_pct) FROM (" + " SELECT t.recommendation_id, t.max_return_pct, t.max_drawdown_pct " + " FROM recommendation_tracking t " + " INNER JOIN (" + " SELECT recommendation_id, MAX(id) as max_id " + " FROM recommendation_tracking GROUP BY recommendation_id" + " ) latest ON t.id = latest.max_id" + ")" + ) + ) + avg_extremes = result.fetchone() + avg_max_return = round(float(avg_extremes[0]), 2) if avg_extremes and avg_extremes[0] is not None else 0 + avg_max_drawdown = round(float(avg_extremes[1]), 2) if avg_extremes and avg_extremes[1] is not None else 0 + # 最近跟踪的推荐详情 result = await db.execute( text( "SELECT r.ts_code, r.name, r.signal, r.entry_price, " " r.target_price, r.stop_loss, r.entry_signal_type, r.score, " + " r.action_plan, r.lifecycle_status, " " t.pct_from_entry, t.current_price, t.track_date, t.hit_target, t.hit_stop_loss, " - " r.created_at " + " t.max_return_pct, t.max_drawdown_pct, t.days_since_recommendation, " + " t.close_reason, t.review_note, r.created_at " "FROM recommendations r " "INNER JOIN recommendation_tracking t ON t.recommendation_id = r.id " "INNER JOIN (" @@ -224,12 +386,19 @@ async def get_performance_stats() -> dict: "name": r["name"], "signal": r["signal"], "entry_signal_type": r["entry_signal_type"], + "action_plan": r["action_plan"], + "lifecycle_status": r["lifecycle_status"], "score": r["score"], "entry_price": r["entry_price"], "target_price": r["target_price"], "stop_loss": r["stop_loss"], "current_price": r["current_price"], "pct_from_entry": r["pct_from_entry"], + "max_return_pct": r["max_return_pct"], + "max_drawdown_pct": r["max_drawdown_pct"], + "days_since_recommendation": r["days_since_recommendation"], + "close_reason": r["close_reason"], + "review_note": r["review_note"], "track_date": r["track_date"], "hit_target": bool(r["hit_target"]), "hit_stop_loss": bool(r["hit_stop_loss"]), @@ -244,8 +413,11 @@ async def get_performance_stats() -> dict: "winning": winning, "win_rate": win_rate, "avg_return": avg_return, + "avg_max_return": avg_max_return, + "avg_max_drawdown": avg_max_drawdown, "hit_target_count": hit_target_count, "hit_stop_count": hit_stop_count, + "lifecycle_counts": lifecycle_counts, "details": details, } except Exception as e: @@ -254,8 +426,9 @@ async def get_performance_stats() -> dict: await log_error("recommender", f"获取胜率统计失败: {e}", detail=traceback.format_exc()) return { "total_recommendations": 0, "tracked": 0, "winning": 0, - "win_rate": 0, "avg_return": 0, "hit_target_count": 0, - "hit_stop_count": 0, "details": [], + "win_rate": 0, "avg_return": 0, "avg_max_return": 0, + "avg_max_drawdown": 0, "hit_target_count": 0, + "hit_stop_count": 0, "lifecycle_counts": {}, "details": [], } @@ -279,15 +452,31 @@ async def get_recommendation_history(days: int = 7) -> list[dict]: # 查询所有历史推荐,按 ts_code 去重(每天取最新一条) stmt = text( - "SELECT * FROM recommendations " - "WHERE created_at >= :start " - "AND score >= 60 " - "AND id IN (" + "SELECT r.*, " + "latest_t.current_price AS latest_current_price, " + "latest_t.pct_from_entry AS latest_pct_from_entry, " + "latest_t.max_return_pct AS latest_max_return_pct, " + "latest_t.max_drawdown_pct AS latest_max_drawdown_pct, " + "latest_t.days_since_recommendation AS latest_days_since_recommendation, " + "latest_t.close_reason AS latest_close_reason, " + "latest_t.review_note AS latest_review_note, " + "latest_t.track_date AS latest_track_date " + "FROM recommendations r " + "LEFT JOIN (" + " SELECT t.* FROM recommendation_tracking t " + " INNER JOIN (" + " SELECT recommendation_id, MAX(id) AS max_id " + " FROM recommendation_tracking GROUP BY recommendation_id" + " ) lt ON t.id = lt.max_id" + ") latest_t ON latest_t.recommendation_id = r.id " + "WHERE r.created_at >= :start " + "AND r.score >= 60 " + "AND r.id IN (" " SELECT MAX(id) FROM recommendations " " WHERE created_at >= :start " " GROUP BY date(created_at), ts_code" ") " - "ORDER BY created_at DESC, score DESC" + "ORDER BY r.created_at DESC, r.score DESC" ) result = await db.execute(stmt, {"start": start}) rows = result.fetchall() @@ -324,11 +513,29 @@ async def get_recommendation_history(days: int = 7) -> list[dict]: "target_price": r["target_price"], "stop_loss": r["stop_loss"], "reasons": json.loads(r["reasons"]) if r["reasons"] else [], - "risk_note": "", + "risk_note": r.get("risk_note") or "", + "entry_timing": r.get("entry_timing") or "", + "action_plan": r.get("action_plan") or "观察", + "trigger_condition": r.get("trigger_condition") or "", + "invalidation_condition": r.get("invalidation_condition") or "", + "suggested_position_pct": r.get("suggested_position_pct") or 0, + "review_after_days": r.get("review_after_days") or 3, + "lifecycle_status": r.get("lifecycle_status") or "candidate", + "data_freshness": r.get("data_freshness") or "", "strategy": r.get("strategy") or "trend_breakout", "entry_signal_type": r.get("entry_signal_type") or "none", "llm_analysis": r.get("llm_analysis") or "", "llm_score": r.get("llm_score"), + "tracking": { + "current_price": r.get("latest_current_price"), + "pct_from_entry": r.get("latest_pct_from_entry"), + "max_return_pct": r.get("latest_max_return_pct"), + "max_drawdown_pct": r.get("latest_max_drawdown_pct"), + "days_since_recommendation": r.get("latest_days_since_recommendation"), + "close_reason": r.get("latest_close_reason") or "", + "review_note": r.get("latest_review_note") or "", + "track_date": r.get("latest_track_date") or "", + } if r.get("latest_track_date") else None, "scan_session": r["scan_session"] or "", "created_at": created_at_str, } @@ -370,7 +577,7 @@ async def _save_to_db(result: dict): """将推荐结果保存到数据库""" try: async with get_db() as db: - from sqlalchemy import text + from sqlalchemy import bindparam, text # 保存市场温度 mt = result.get("market_temp") if mt: @@ -428,9 +635,13 @@ async def _save_to_db(result: dict): if qualified_recs: # 批量删除当日同一 ts_code 的旧记录 codes = [rec.ts_code for rec in qualified_recs] + delete_stmt = text( + "DELETE FROM recommendations " + "WHERE date(created_at) = :today AND ts_code IN :codes" + ).bindparams(bindparam("codes", expanding=True)) await db.execute( - text("DELETE FROM recommendations WHERE date(created_at) = :today AND ts_code IN :codes"), - {"today": today_str, "codes": tuple(codes)}, + delete_stmt, + {"today": today_str, "codes": codes}, ) # 批量插入新记录 rec_values = [ @@ -452,9 +663,18 @@ async def _save_to_db(result: dict): "target_price": rec.target_price, "stop_loss": rec.stop_loss, "reasons": json.dumps(rec.reasons, ensure_ascii=False), + "risk_note": rec.risk_note, + "action_plan": rec.action_plan, + "trigger_condition": rec.trigger_condition, + "invalidation_condition": rec.invalidation_condition, + "suggested_position_pct": rec.suggested_position_pct, + "review_after_days": rec.review_after_days, + "lifecycle_status": rec.lifecycle_status, + "data_freshness": rec.data_freshness, "llm_analysis": rec.llm_analysis, "strategy": rec.strategy, "entry_signal_type": rec.entry_signal_type, + "entry_timing": rec.entry_timing, "llm_score": rec.llm_score, "scan_session": rec.scan_session, "created_at": now_dt, @@ -481,7 +701,10 @@ async def _load_today_from_db() -> dict: # 加载市场温度(按 trade_date 取最新交易日) result = await db.execute( - text("SELECT * FROM market_temperature ORDER BY trade_date DESC LIMIT 1") + text( + "SELECT * FROM market_temperature " + "ORDER BY REPLACE(trade_date, '-', '') DESC, id DESC LIMIT 1" + ) ) mt_row = result.fetchone() market_temp = None @@ -530,6 +753,15 @@ async def _load_today_from_db() -> dict: target_price=r["target_price"], stop_loss=r["stop_loss"], reasons=json.loads(r["reasons"]) if r["reasons"] else [], + risk_note=r.get("risk_note") or "", + entry_timing=r.get("entry_timing") or "", + action_plan=r.get("action_plan") or "观察", + trigger_condition=r.get("trigger_condition") or "", + invalidation_condition=r.get("invalidation_condition") or "", + suggested_position_pct=r.get("suggested_position_pct") or 0, + review_after_days=r.get("review_after_days") or 3, + lifecycle_status=r.get("lifecycle_status") or "candidate", + data_freshness=r.get("data_freshness") or "", llm_analysis=r.get("llm_analysis") or "", strategy=r.get("strategy") or "trend_breakout", entry_signal_type=r.get("entry_signal_type") or "none", @@ -542,6 +774,10 @@ async def _load_today_from_db() -> dict: "hot_sectors": [], "capital_filtered": [], "recommendations": recommendations, + "strategy_profile": { + "strategy_id": recommendations[0].strategy if recommendations else "trend_breakout", + "name": "当前推荐策略", + } if recommendations else None, } except Exception as e: logger.error(f"从数据库加载推荐失败: {e}") @@ -558,10 +794,14 @@ async def _load_sectors_from_db() -> list[SectorInfo]: result = await db.execute( text( "SELECT * FROM sector_heat " - "WHERE trade_date = (SELECT MAX(trade_date) FROM sector_heat) " + "WHERE REPLACE(trade_date, '-', '') = (" + " SELECT MAX(REPLACE(trade_date, '-', '')) FROM sector_heat" + ") " "AND id IN (" " SELECT MAX(id) FROM sector_heat " - " WHERE trade_date = (SELECT MAX(trade_date) FROM sector_heat) " + " WHERE REPLACE(trade_date, '-', '') = (" + " SELECT MAX(REPLACE(trade_date, '-', '')) FROM sector_heat" + " ) " " GROUP BY sector_code" ") " "ORDER BY heat_score DESC" diff --git a/backend/app/engine/scheduler.py b/backend/app/engine/scheduler.py index 43db1a8e..80d6b7ff 100644 --- a/backend/app/engine/scheduler.py +++ b/backend/app/engine/scheduler.py @@ -10,6 +10,7 @@ from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.cron import CronTrigger from app.engine.recommender import refresh_recommendations +from app.engine.watchlist import analyze_watchlist_for_all_users from app.api.websocket import broadcast_update logger = logging.getLogger(__name__) @@ -54,6 +55,18 @@ async def _generate_daily_review(): await log_error("scheduler", f"复盘报告生成异常: {e}", detail=traceback.format_exc()) +async def _run_watchlist_analysis(): + """收盘后自动分析所有用户自选股。""" + logger.info("=== 开始自选股定时分析 ===") + try: + count = await analyze_watchlist_for_all_users(mode="scheduled") + logger.info(f"自选股定时分析完成: {count} 条") + except Exception as e: + logger.error(f"自选股定时分析失败: {e}") + from app.db.error_logger import log_error + await log_error("scheduler", f"自选股定时分析失败: {e}", detail=traceback.format_exc()) + + def setup_scheduler(): """配置所有定时任务(交易日时间)""" @@ -110,6 +123,11 @@ def setup_scheduler(): id="daily_review", replace_existing=True ) + scheduler.add_job( + _run_watchlist_analysis, CronTrigger(hour=16, minute=20, day_of_week="mon-fri"), + id="watchlist_analysis", replace_existing=True + ) + logger.info("盘中调度器已配置完成") diff --git a/backend/app/engine/screener.py b/backend/app/engine/screener.py index 4574617e..6ee74363 100644 --- a/backend/app/engine/screener.py +++ b/backend/app/engine/screener.py @@ -30,6 +30,7 @@ from app.analysis.signals import generate_signals from app.analysis.intraday import intraday_market_temperature, intraday_filter_stocks, intraday_sector_scan from app.data.models import MarketTemperature, SectorInfo, TechnicalSignal, Recommendation from app.config import settings, is_trading_hours, is_market_session +from app.llm.strategy_selector import select_strategy_profile logger = logging.getLogger(__name__) @@ -82,6 +83,12 @@ async def run_screening(trade_date: str = None) -> dict: if intraday: hot_sectors = await intraday_sector_scan(hot_sectors) + strategy_profile = await select_strategy_profile(market_temp, hot_sectors, intraday) + logger.info( + f"=== 今日策略: {strategy_profile.name} ({strategy_profile.strategy_id}) " + f"threshold={strategy_profile.buy_threshold} min_score={strategy_profile.min_score} ===" + ) + # ── Step 2: 板块内选股 ── logger.info("=== Step 2: 板块内选股 ===") if intraday: @@ -123,11 +130,11 @@ async def run_screening(trade_date: str = None) -> dict: # ── Step 3: 供需 + 价格行为 + 趋势评分 ── logger.info("=== Step 3: 深度分析 ===") recommendations = await _build_recommendations( - candidates, market_temp, hot_sectors, market_temp_score, intraday, + candidates, market_temp, hot_sectors, market_temp_score, intraday, strategy_profile, ) # 过滤低质量推荐(低于60分不推荐) - recommendations = [r for r in recommendations if r.score >= 60] + recommendations = [r for r in recommendations if r.score >= strategy_profile.min_score] logger.info(f"=== 筛选完成: {len(recommendations)} 只股票 ({scan_mode}) ===") for r in recommendations[:5]: @@ -140,6 +147,7 @@ async def run_screening(trade_date: str = None) -> dict: "hot_sectors": hot_sectors, "recommendations": recommendations, "scan_mode": scan_mode, + "strategy_profile": strategy_profile.model_dump(), } @@ -315,6 +323,7 @@ async def _build_recommendations( hot_sectors: list[SectorInfo], market_temp_score: float = 0, intraday: bool = False, + strategy_profile=None, ) -> list[Recommendation]: """Step 3: 对候选做供需 + 价格行为 + 趋势深度分析 @@ -345,6 +354,13 @@ async def _build_recommendations( llm_candidates = [] # 收集候选摘要供 LLM 分析 total = len(candidates) signal_counts = {"breakout": 0, "breakout_confirm": 0, "pullback": 0, "launch": 0, "reversal": 0, "none": 0} + score_weights = strategy_profile.score_weights if strategy_profile else { + "supply_demand": 0.50, + "price_action": 0.40, + "trend": 0.10, + } + signal_priority = strategy_profile.entry_signal_priority if strategy_profile else [] + buy_threshold = strategy_profile.buy_threshold if strategy_profile else 60 for idx, stock in enumerate(candidates): ts_code = stock.get("ts_code", "") @@ -377,6 +393,9 @@ async def _build_recommendations( if signal_type == EntrySignal.NONE: signal_counts["none"] += 1 continue + if signal_priority and signal_type.value not in signal_priority[:4]: + signal_counts["none"] += 1 + continue signal_counts[signal_type.value] += 1 # ── 三维度评分 ── @@ -400,9 +419,9 @@ async def _build_recommendations( # 综合评分(短线交易:供需最关键,趋势只做门槛) final_score = ( - supply_demand_score * 0.50 + - price_action_score * 0.40 + - trend_score * 0.10 + supply_demand_score * score_weights["supply_demand"] + + price_action_score * score_weights["price_action"] + + trend_score * score_weights["trend"] ) # ── 风险乘数:惩罚取最大而非叠加(避免过度惩罚),奖励可叠加 ── @@ -442,6 +461,15 @@ async def _build_recommendations( if entry_signal.get("signal_score", 0) >= 80: final_score *= 1.10 + if signal_priority: + priority_rank = signal_priority.index(signal_type.value) + if priority_rank == 0: + final_score *= 1.08 + elif priority_rank == 1: + final_score *= 1.04 + elif priority_rank >= 3: + final_score *= 0.94 + # 估值评分(辅助参考,不参与主评分) pe = stock.get("pe") pb = stock.get("pb") @@ -454,7 +482,7 @@ async def _build_recommendations( if (signal_type != EntrySignal.NONE and entry_signal.get("signal_score", 0) >= 50 and position_score >= 30 - and final_score >= 60): + and final_score >= buy_threshold): signal = "BUY" # 价格参考 — 结构化止损止盈(基于市场结构而非固定百分比) @@ -547,6 +575,7 @@ async def _build_recommendations( # 生成推荐理由 reasons = _generate_reasons(stock, entry_signal, tech_signal, df, intraday) + stock["entry_signal_type"] = signal_type.value risk_note = _generate_risk_note(market_temp, tech_signal, stock) # 量价模式 @@ -554,6 +583,17 @@ async def _build_recommendations( # 进场时机建议(盘中适用) entry_timing = _generate_entry_timing(signal_type.value, intraday) + trade_plan = _build_trade_plan( + signal_type=signal_type.value, + score=final_score, + market_temp=market_temp, + sector_stage=sector_stage, + entry_price=entry_price, + target_price=target_price, + stop_loss=stop_loss, + entry_timing=entry_timing, + data_date=last_date, + ) rec = Recommendation( ts_code=ts_code, @@ -575,9 +615,16 @@ async def _build_recommendations( reasons=reasons, risk_note=risk_note, level=level, - strategy="trend_breakout", + strategy=strategy_profile.strategy_id if strategy_profile else "trend_breakout", entry_signal_type=signal_type.value, entry_timing=entry_timing, + action_plan=trade_plan["action_plan"], + trigger_condition=trade_plan["trigger_condition"], + invalidation_condition=trade_plan["invalidation_condition"], + suggested_position_pct=trade_plan["suggested_position_pct"], + review_after_days=trade_plan["review_after_days"], + lifecycle_status=trade_plan["lifecycle_status"], + data_freshness=trade_plan["data_freshness"], ) recommendations.append(rec) @@ -963,6 +1010,86 @@ def _generate_entry_timing(signal_type: str, intraday: bool) -> str: return timing_map.get(signal_type, "盘中观察量价配合,确认信号后进场") +def _build_trade_plan( + signal_type: str, + score: float, + market_temp: MarketTemperature, + sector_stage: str, + entry_price: float | None, + target_price: float | None, + stop_loss: float | None, + entry_timing: str, + data_date: str, +) -> dict: + """把推荐转成可执行计划。 + + 这里不替代用户决策,只把系统推荐拆成触发、失效、仓位和复盘窗口。 + """ + signal_label = { + "breakout": "放量突破", + "breakout_confirm": "突破确认", + "pullback": "回踩支撑", + "launch": "缩量整理后启动", + "reversal": "放量反转", + }.get(signal_type, "技术信号") + + if market_temp.temperature < 35 or sector_stage in ("end",): + action_plan = "观察" + lifecycle_status = "candidate" + elif score >= 78 and market_temp.temperature >= 55 and sector_stage in ("early", "mid"): + action_plan = "可操作" + lifecycle_status = "actionable" + elif score >= 65: + action_plan = "重点关注" + lifecycle_status = "candidate" + else: + action_plan = "观察" + lifecycle_status = "candidate" + + if action_plan == "可操作": + base_position = 20 + elif action_plan == "重点关注": + base_position = 10 + else: + base_position = 0 + + if market_temp.temperature >= 70: + base_position += 5 + elif market_temp.temperature < 50: + base_position -= 5 + if sector_stage == "late": + base_position -= 5 + suggested_position_pct = max(0, min(base_position, 30)) + + price_part = f"参考价 {entry_price}" if entry_price else "参考当前价" + timing_part = entry_timing or "等待量价确认" + trigger_condition = f"{signal_label}成立且不跌破关键价位,{price_part}附近分批关注;{timing_part}" + + invalid_parts = [] + if stop_loss: + invalid_parts.append(f"跌破止损 {stop_loss}") + if entry_price: + invalid_parts.append(f"收盘跌回参考价 {round(entry_price * 0.98, 2)} 下方") + if target_price: + invalid_parts.append(f"冲高接近目标 {target_price} 后量能衰减") + if market_temp.temperature < 45: + invalid_parts.append("市场温度继续走弱") + invalidation_condition = ";".join(invalid_parts) or "信号次日未延续或板块热度退潮" + + review_after_days = 1 if signal_type in ("breakout", "reversal") else 3 + data_freshness = f"K线数据日期 {data_date};盘中价格优先使用腾讯实时行情" + + return { + "action_plan": action_plan, + "trigger_condition": trigger_condition, + "invalidation_condition": invalidation_condition, + "suggested_position_pct": suggested_position_pct, + "review_after_days": review_after_days, + "lifecycle_status": lifecycle_status, + "data_freshness": data_freshness, + } + + def _score_to_level(score: float) -> str: if score >= 80: return "强烈推荐" diff --git a/backend/app/engine/watchlist.py b/backend/app/engine/watchlist.py new file mode 100644 index 00000000..10bb88e3 --- /dev/null +++ b/backend/app/engine/watchlist.py @@ -0,0 +1,239 @@ +"""用户自选股分析服务""" + +from __future__ import annotations + +import json +import logging +import re + +from sqlalchemy import text + +from app.analysis.signals import generate_signals +from app.data import tencent_client +from app.db.database import get_db +from app.db import tables +from app.llm.client import chat_completion + +logger = logging.getLogger(__name__) + + +async def analyze_watchlist_for_all_users(mode: str = "scheduled") -> int: + """批量分析所有启用中的用户自选股。""" + async with get_db() as db: + rows = (await db.execute( + text( + "SELECT w.id, w.user_id, w.ts_code, w.name, w.note, w.watch_group, w.cost_price " + "FROM user_watchlists w " + "WHERE COALESCE(w.is_active, 1) = 1 " + "ORDER BY w.user_id, w.id" + ) + )).fetchall() + + count = 0 + for row in rows: + item = row._mapping + await analyze_watchlist_item( + watchlist_id=item["id"], + user_id=item["user_id"], + ts_code=item["ts_code"], + name=item["name"], + note=item.get("note") or "", + watch_group=item.get("watch_group") or "observe", + cost_price=item.get("cost_price"), + mode=mode, + ) + count += 1 + return count + + +async def analyze_watchlist_item( + watchlist_id: int, + user_id: int, + ts_code: str, + name: str, + note: str = "", + watch_group: str = "observe", + cost_price: float | None = None, + mode: str = "manual", +) -> dict: + """分析单只自选股并保存结果。""" + recommendation = await _load_latest_recommendation(ts_code) + latest_tracking = await _load_latest_tracking(recommendation["id"]) if recommendation else None + + try: + quote = await tencent_client.get_realtime_quote(ts_code) + except Exception: + logger.exception("获取自选股实时行情失败: %s", ts_code) + quote = None + + try: + signals = generate_signals(ts_code) + except Exception: + logger.exception("生成自选股信号失败: %s", ts_code) + signals = None + + summary = _build_summary(ts_code, name, recommendation, latest_tracking, quote, signals, note, watch_group, cost_price) + + llm_result = await _generate_watchlist_advice(summary) + structured = _extract_structured_result(llm_result, recommendation, latest_tracking) + + async with get_db() as db: + await db.execute( + tables.watchlist_analyses_table.insert().values( + user_id=user_id, + watchlist_id=watchlist_id, + ts_code=ts_code, + name=name, + conclusion=structured["conclusion"], + advice=structured["advice"], + trigger_condition=structured["trigger_condition"], + risk_note=structured["risk_note"], + summary=structured["summary"], + full_analysis=structured["full_analysis"], + score_reference=structured["score_reference"], + analysis_mode=mode, + ) + ) + await db.commit() + + return structured + + +async def _load_latest_recommendation(ts_code: str) -> dict | None: + async with get_db() as db: + row = (await db.execute( + text( + "SELECT * FROM recommendations " + "WHERE ts_code = :code " + "ORDER BY created_at DESC, id DESC LIMIT 1" + ), + {"code": ts_code}, + )).fetchone() + return dict(row._mapping) if row else None + + +async def _load_latest_tracking(recommendation_id: int) -> dict | None: + async with get_db() as db: + row = (await db.execute( + text( + "SELECT * FROM recommendation_tracking " + "WHERE recommendation_id = :rid " + "ORDER BY track_date DESC, id DESC LIMIT 1" + ), + {"rid": recommendation_id}, + )).fetchone() + return dict(row._mapping) if row else None + + +def _build_summary( + ts_code: str, + name: str, + recommendation: dict | None, + latest_tracking: dict | None, + quote, + signals, + note: str, + watch_group: str, + cost_price: float | None, +) -> str: + quote_str = "" + if quote: + quote_str = f"当前价 {quote.price},涨跌幅 {quote.pct_chg}%,换手率 {quote.turnover_rate}%,量比 {quote.volume_ratio}。" + + recommendation_str = "暂无推荐归档。" + if recommendation: + recommendation_str = ( + f"推荐归档:结论 {recommendation.get('action_plan') or '观察'}," + f"触发条件 {recommendation.get('trigger_condition') or '暂无'}," + f"失效条件 {recommendation.get('invalidation_condition') or '暂无'}," + f"风险提示 {recommendation.get('risk_note') or '暂无'}。" + ) + + tracking_str = "" + if latest_tracking: + tracking_str = ( + f"最近跟踪:收益 {latest_tracking.get('pct_from_entry') or 0}%," + f"最大浮盈 {latest_tracking.get('max_return_pct') or 0}%," + f"最大回撤 {latest_tracking.get('max_drawdown_pct') or 0}%," + f"备注 {latest_tracking.get('review_note') or '暂无'}。" + ) + + signal_str = "技术快照暂无。" + if signals: + signal_str = ( + f"技术快照:趋势强度 {signals.trend_score},辅助信号 {signals.signal_count}/7," + f"位置安全 {signals.position_score},近5日涨幅 {signals.rally_pct_5d}% ,近10日涨幅 {signals.rally_pct_10d}%。" + ) + + group_str = f"用户分组:{watch_group}。" + cost_str = f"持仓成本 {cost_price}。" if cost_price and cost_price > 0 else "暂无持仓成本。" + note_str = f"用户备注:{note}" if note else "用户未填写备注。" + return f"{ts_code} {name}。{group_str} {cost_str} {quote_str} {recommendation_str} {tracking_str} {signal_str} {note_str}" + + +async def _generate_watchlist_advice(summary: str) -> str: + message = await chat_completion([ + { + "role": "system", + "content": ( + "你是A股投研作战台的用户自选股助手。" + "你需要针对单只用户自选股给出简洁、可执行的建议。" + "输出必须是 JSON 字符串,包含字段 conclusion、advice、trigger_condition、risk_note、summary。" + "conclusion 只能是 可操作 / 重点关注 / 观察 / 回避。" + "summary 必须是一句中文短句。advice 需要明确用户下一步该看什么、等什么或做什么。" + ), + }, + { + "role": "user", + "content": f"请基于以下信息输出 JSON:{summary}", + }, + ]) + + if not message or not getattr(message, "content", None): + return "" + return message.content + + +def _extract_structured_result(content: str, recommendation: dict | None, latest_tracking: dict | None) -> dict: + default = { + "conclusion": recommendation.get("action_plan") if recommendation else "观察", + "advice": recommendation.get("trigger_condition") if recommendation else "继续观察量价配合、板块强弱和回踩承接。", + "trigger_condition": recommendation.get("trigger_condition") if recommendation else "", + "risk_note": recommendation.get("risk_note") if recommendation else (latest_tracking.get("review_note") if latest_tracking else ""), + "summary": latest_tracking.get("review_note") if latest_tracking else "当前信息不足以升级为明确操作,先保留观察。", + "full_analysis": content or "", + "score_reference": float(recommendation.get("score") or 0) if recommendation else 0, + } + + if not content: + return default + + try: + parsed = json.loads(_extract_json_string(content)) + return { + "conclusion": parsed.get("conclusion") or default["conclusion"], + "advice": parsed.get("advice") or default["advice"], + "trigger_condition": parsed.get("trigger_condition") or default["trigger_condition"], + "risk_note": parsed.get("risk_note") or default["risk_note"], + "summary": parsed.get("summary") or default["summary"], + "full_analysis": content, + "score_reference": default["score_reference"], + } + except Exception: + logger.warning("自选股分析 JSON 解析失败,回退默认结构") + default["full_analysis"] = content + return default + + +def _extract_json_string(content: str) -> str: + cleaned = content.strip() + if cleaned.startswith("```"): + fenced = re.search(r"```(?:json)?\s*(\{.*\})\s*```", cleaned, re.DOTALL) + if fenced: + return fenced.group(1) + + start = cleaned.find("{") + end = cleaned.rfind("}") + if start != -1 and end != -1 and end > start: + return cleaned[start : end + 1] + return cleaned diff --git a/backend/app/llm/__pycache__/chat_agent.cpython-313.pyc b/backend/app/llm/__pycache__/chat_agent.cpython-313.pyc index 22b2643d..f3a2494a 100644 Binary files a/backend/app/llm/__pycache__/chat_agent.cpython-313.pyc and b/backend/app/llm/__pycache__/chat_agent.cpython-313.pyc differ diff --git a/backend/app/llm/__pycache__/enhancer.cpython-313.pyc b/backend/app/llm/__pycache__/enhancer.cpython-313.pyc index 6d46583b..1499f2a4 100644 Binary files a/backend/app/llm/__pycache__/enhancer.cpython-313.pyc and b/backend/app/llm/__pycache__/enhancer.cpython-313.pyc differ diff --git a/backend/app/llm/__pycache__/prompts.cpython-313.pyc b/backend/app/llm/__pycache__/prompts.cpython-313.pyc index a004e1ff..a13b2037 100644 Binary files a/backend/app/llm/__pycache__/prompts.cpython-313.pyc and b/backend/app/llm/__pycache__/prompts.cpython-313.pyc differ diff --git a/backend/app/llm/__pycache__/tool_executor.cpython-313.pyc b/backend/app/llm/__pycache__/tool_executor.cpython-313.pyc index 38a1e683..f5a3f1d4 100644 Binary files a/backend/app/llm/__pycache__/tool_executor.cpython-313.pyc and b/backend/app/llm/__pycache__/tool_executor.cpython-313.pyc differ diff --git a/backend/app/llm/__pycache__/tools.cpython-313.pyc b/backend/app/llm/__pycache__/tools.cpython-313.pyc index 642eefe9..a233ab8c 100644 Binary files a/backend/app/llm/__pycache__/tools.cpython-313.pyc and b/backend/app/llm/__pycache__/tools.cpython-313.pyc differ diff --git a/backend/app/llm/chat_agent.py b/backend/app/llm/chat_agent.py index b7a4d755..7adc9458 100644 --- a/backend/app/llm/chat_agent.py +++ b/backend/app/llm/chat_agent.py @@ -11,7 +11,7 @@ from typing import AsyncGenerator from app.llm.client import chat_completion, stream_chat_completion, get_client from app.llm.prompts import CHAT_SYSTEM_PROMPT from app.llm.tools import CHAT_TOOLS -from app.llm.tool_executor import execute_tool +from app.llm.tool_executor import execute_tool, set_chat_user_context from app.config import settings logger = logging.getLogger(__name__) @@ -20,16 +20,18 @@ MAX_TOOL_ROUNDS = 5 # 工具名称映射(用于状态提示) TOOL_LABELS = { + "get_strategy_board": "读取今日作战结论", "get_market_temperature": "查询市场温度", "get_hot_sectors": "查询热门板块", "get_latest_recommendations": "查询推荐列表", + "get_user_watchlist_snapshot": "读取自选股作战池", "get_stock_kline": "查询K线数据", "get_stock_capital_flow": "查询资金流向", "search_stock": "搜索股票", } -async def chat_stream(messages: list[dict]) -> AsyncGenerator[dict, None]: +async def chat_stream(messages: list[dict], current_user: dict | None = None) -> AsyncGenerator[dict, None]: """流式对话,支持 tool use 循环 Yields: @@ -40,67 +42,71 @@ async def chat_stream(messages: list[dict]) -> AsyncGenerator[dict, None]: yield {"type": "content", "content": "LLM 未配置,请在 .env 中设置 ASTOCK_DEEPSEEK_API_KEY"} return + set_chat_user_context(current_user) + # 构建完整消息列表 full_messages = [{"role": "system", "content": CHAT_SYSTEM_PROMPT}] full_messages.extend(messages) - # Tool use 循环(非流式,直到没有 tool_calls) - for round_num in range(MAX_TOOL_ROUNDS): - if round_num == 0: - yield {"type": "status", "content": "思考中..."} + try: + # Tool use 循环(非流式,直到没有 tool_calls) + for round_num in range(MAX_TOOL_ROUNDS): + if round_num == 0: + yield {"type": "status", "content": "整理今日作战上下文..."} - resp = await chat_completion(full_messages, tools=CHAT_TOOLS) - if not resp: - yield {"type": "content", "content": "AI 服务暂时不可用,请稍后重试"} - return + resp = await chat_completion(full_messages, tools=CHAT_TOOLS) + if not resp: + yield {"type": "content", "content": "AI 服务暂时不可用,请稍后重试"} + return - # 检查是否有 tool_calls - if not resp.tool_calls: - break - - # 将 assistant 消息(含 tool_calls)加入历史 - full_messages.append({ - "role": "assistant", - "content": resp.content or "", - "tool_calls": [ - { - "id": tc.id, - "type": "function", - "function": { - "name": tc.function.name, - "arguments": tc.function.arguments, - }, - } - for tc in resp.tool_calls - ], - }) - - # 执行每个工具调用 - for tc in resp.tool_calls: - try: - args = json.loads(tc.function.arguments) - except json.JSONDecodeError: - args = {} - - tool_label = TOOL_LABELS.get(tc.function.name, tc.function.name) - yield {"type": "status", "content": f"正在{tool_label}..."} - - logger.info(f"Chat Agent 调用工具: {tc.function.name}({args})") - result = await execute_tool(tc.function.name, args) + # 检查是否有 tool_calls + if not resp.tool_calls: + break + # 将 assistant 消息(含 tool_calls)加入历史 full_messages.append({ - "role": "tool", - "tool_call_id": tc.id, - "content": result, + "role": "assistant", + "content": resp.content or "", + "tool_calls": [ + { + "id": tc.id, + "type": "function", + "function": { + "name": tc.function.name, + "arguments": tc.function.arguments, + }, + } + for tc in resp.tool_calls + ], }) - yield {"type": "status", "content": "分析数据中..."} - else: - # 超过最大轮次,用最后的消息生成回复 - pass + # 执行每个工具调用 + for tc in resp.tool_calls: + try: + args = json.loads(tc.function.arguments) + except json.JSONDecodeError: + args = {} - # 最终回复:流式输出 - yield {"type": "status", "content": ""} # 清除状态 - async for delta in stream_chat_completion(full_messages): - if delta.content: - yield {"type": "content", "content": delta.content} + tool_label = TOOL_LABELS.get(tc.function.name, tc.function.name) + yield {"type": "status", "content": f"正在{tool_label}..."} + + logger.info(f"Chat Agent 调用工具: {tc.function.name}({args})") + result = await execute_tool(tc.function.name, args) + + full_messages.append({ + "role": "tool", + "tool_call_id": tc.id, + "content": result, + }) + + yield {"type": "status", "content": "整理作战结论中..."} + else: + pass + + # 最终回复:流式输出 + yield {"type": "status", "content": ""} # 清除状态 + async for delta in stream_chat_completion(full_messages): + if delta.content: + yield {"type": "content", "content": delta.content} + finally: + set_chat_user_context(None) diff --git a/backend/app/llm/daily_review.py b/backend/app/llm/daily_review.py index c2cbd542..b927b58e 100644 --- a/backend/app/llm/daily_review.py +++ b/backend/app/llm/daily_review.py @@ -1,7 +1,6 @@ """每日复盘报告生成""" import logging -from datetime import datetime from app.config import settings @@ -10,13 +9,9 @@ logger = logging.getLogger(__name__) async def generate_review() -> dict: """生成每日复盘报告""" - if not settings.deepseek_api_key: - return {"status": "error", "message": "未配置 DeepSeek API Key"} - from app.data.tushare_client import tushare_client from app.data import tencent_client from app.engine.recommender import get_latest_recommendations, get_latest_sectors - from app.llm.client import get_client trade_date = tushare_client.get_latest_trade_date() @@ -83,19 +78,43 @@ async def generate_review() -> dict: ## 明日关注 (关注方向和操作建议)""" - try: - client = get_client() - response = await client.chat.completions.create( - model=settings.deepseek_model, - messages=[ - {"role": "system", "content": "你是一位专业的A股市场分析师,擅长市场复盘和策略分析。回复使用Markdown格式,简洁专业。"}, - {"role": "user", "content": user_msg}, - ], - max_tokens=1500, - temperature=0.5, - ) - content = response.choices[0].message.content.strip() + if settings.deepseek_api_key: + try: + from app.llm.client import get_client + client = get_client() + response = await client.chat.completions.create( + model=settings.deepseek_model, + messages=[ + {"role": "system", "content": "你是一位专业的A股市场分析师,擅长市场复盘和策略分析。回复使用Markdown格式,简洁专业。"}, + {"role": "user", "content": user_msg}, + ], + max_tokens=1500, + temperature=0.5, + ) + content = response.choices[0].message.content.strip() + generated_by = "llm" + except Exception as e: + logger.error(f"生成复盘报告失败,使用规则兜底: {e}") + content = _build_fallback_review( + trade_date=trade_date, + market_summary=market_summary, + index_summary=index_summary, + sector_summary=sector_summary, + recommendations=recs, + ) + generated_by = "rules" + else: + content = _build_fallback_review( + trade_date=trade_date, + market_summary=market_summary, + index_summary=index_summary, + sector_summary=sector_summary, + recommendations=recs, + ) + generated_by = "rules" + + try: # 保存到数据库 from sqlalchemy import text from app.db.database import get_db @@ -109,9 +128,40 @@ async def generate_review() -> dict: ) await db.commit() - logger.info(f"已生成 {trade_date} 复盘报告") - return {"status": "ok", "trade_date": trade_date, "content": content} + logger.info(f"已生成 {trade_date} 复盘报告 ({generated_by})") + return {"status": "ok", "trade_date": trade_date, "content": content, "generated_by": generated_by} except Exception as e: - logger.error(f"生成复盘报告失败: {e}") + logger.error(f"保存复盘报告失败: {e}") return {"status": "error", "message": str(e)} + + +def _build_fallback_review( + trade_date: str, + market_summary: str, + index_summary: str, + sector_summary: str, + recommendations: list, +) -> str: + """LLM 不可用时生成结构化规则复盘,避免页面空白。""" + actionable = [r for r in recommendations if getattr(r, "action_plan", "") == "可操作"] + watch = [r for r in recommendations if getattr(r, "action_plan", "") == "重点关注"] + top_recs = recommendations[:5] + rec_lines = "\n".join( + f"- {r.name}({r.ts_code}):{getattr(r, 'action_plan', '观察')}," + f"{getattr(r, 'entry_signal_type', 'none')} 信号,评分 {getattr(r, 'score', 0)}。" + for r in top_recs + ) or "- 暂无推荐标的。" + + return f"""## 市场概况 +{trade_date} 市场温度处于中性偏谨慎区间。{market_summary or "暂无市场温度数据。"} {index_summary or ""} + +## 板块热点 +{sector_summary or "暂无板块热度数据。"} 当前板块证据主要用于确认推荐方向是否有资金和赚钱效应支撑。 + +## 交易机会 +今日推荐池共 {len(recommendations)} 只,其中可操作 {len(actionable)} 只、重点关注 {len(watch)} 只。当前更适合按触发条件等待确认,不宜把观察标的直接当作买入标的。 +{rec_lines} + +## 明日关注 +优先跟踪重点关注标的能否满足触发条件,同时观察主线板块是否延续。若市场温度回落或板块资金退潮,应降低仓位并把未确认标的转回观察池。""" diff --git a/backend/app/llm/prompts.py b/backend/app/llm/prompts.py index 90f860a5..38c16cea 100644 --- a/backend/app/llm/prompts.py +++ b/backend/app/llm/prompts.py @@ -34,25 +34,29 @@ ENHANCE_USER_TEMPLATE = """\ 请对该股票进行 2-3 句话的深度分析:""" CHAT_SYSTEM_PROMPT = """\ -你是一位专业的 A 股投资顾问 AI 助手。你可以通过工具查询实时市场数据来回答用户问题。 +你是 A 股投研作战台里的 AI 作战助理,不是泛化闲聊机器人。你的核心任务是解释系统已经生成的结果,并帮助用户把市场、板块、推荐和自选股串成可执行判断。 你的能力: -1. 查询市场温度、热门板块、推荐股票列表 -2. 查询个股K线、资金流向数据 -3. 搜索股票代码 -4. 基于数据给出专业的市场分析和投资建议 +1. 查询今日作战结论,包括市场状态、今日打法、建议仓位、重点板块和规避规则 +2. 查询市场温度、热门板块、推荐股票列表 +3. 查询当前用户的自选股池与最新建议 +4. 查询个股K线、技术面、资金流向数据 +5. 搜索股票代码,并把结果放回当前交易语境中分析 重要提醒: - 回答用户关于"今天市场怎么样"之类的问题时,必须调用 get_realtime_indices 获取实时指数数据 -- 盘中时段(9:30-15:00)必须使用实时数据,盘后时段使用当日收盘数据 -- 不要使用过时的数据,必须先调用工具获取最新数据再回答 +- 回答用户关于"今天该怎么做"、"当前推荐怎么看"、"自选股该怎么处理"这类问题时,优先调用 get_strategy_board、get_latest_recommendations、get_user_watchlist_snapshot +- 盘中时段(9:30-15:00)必须使用实时数据,盘后时段使用当日收盘或最近一次系统生成的数据 +- 不要脱离系统上下文泛泛而谈,必须先调用工具获取最新结果再回答 回答要求: 1. 使用工具获取最新数据后再回答,不要凭空编造数据 -2. 分析要结合 A 股市场特点(资金驱动、板块轮动、情绪周期) -3. 给出具体建议时要附带风险提示 -4. 语言简洁、专业、有条理 -5. 回复使用 markdown 格式,适当用列表和加粗提升可读性 +2. 优先把结论组织成“当前判断 / 依据 / 下一步观察点 / 风险提示” +3. 分析要结合 A 股市场特点(资金驱动、板块轮动、情绪周期) +4. 如果用户问题过于宽泛,主动收敛到系统里的现成模块,不要输出空泛宏论 +5. 给出具体建议时要附带风险提示,并明确这是观察建议、执行条件还是规避建议 +6. 语言简洁、专业、有条理 +7. 回复使用 markdown 格式,适当用列表和加粗提升可读性 免责声明:你的分析仅供参考,不构成投资建议。投资有风险,入市需谨慎。 """ diff --git a/backend/app/llm/strategy_board.py b/backend/app/llm/strategy_board.py new file mode 100644 index 00000000..3f9eb0bb --- /dev/null +++ b/backend/app/llm/strategy_board.py @@ -0,0 +1,268 @@ +"""市场作战面板 + +把市场温度、板块、推荐和历史跟踪结果汇总成每天可执行的策略视图。 +规则层保证稳定输出,LLM 层负责补充解释和迭代建议。 +""" + +import logging + +from app.config import settings +from app.data.models import ( + MarketTemperature, + Recommendation, + SectorInfo, + StrategyBoard, + StrategyFocus, + StrategySectorFocus, +) + +logger = logging.getLogger(__name__) + + +async def build_strategy_board(include_llm: bool = False) -> dict: + """生成今日市场作战面板。""" + from app.engine.recommender import ( + get_latest_recommendations, + get_latest_sectors, + get_performance_stats, + ) + + latest = await get_latest_recommendations() + market_temp = latest.get("market_temp") + recommendations = latest.get("recommendations", []) + sectors = await get_latest_sectors() + performance = await get_performance_stats() + from app.llm.strategy_iteration import build_strategy_iteration_report + iteration_report = await build_strategy_iteration_report(limit=50, include_llm=include_llm) + + board = _build_rule_board(market_temp, sectors, recommendations, performance) + board.iteration_report = iteration_report + if iteration_report.get("adjustment_suggestions"): + board.iteration_notes = [ + s.get("reason", "") + for s in iteration_report["adjustment_suggestions"][:3] + if s.get("reason") + ] or board.iteration_notes + + if include_llm and settings.deepseek_api_key: + board.ai_review = await _generate_ai_review(board, recommendations, performance) + if board.ai_review: + board.generated_by = "rules+llm" + + return board.model_dump() + + +def _build_rule_board( + market_temp: MarketTemperature | None, + sectors: list[SectorInfo], + recommendations: list[Recommendation], + performance: dict, +) -> StrategyBoard: + temp = market_temp.temperature if market_temp else 0 + trade_date = market_temp.trade_date if market_temp else "" + market_regime, risk_level, action_bias, position_suggestion = _classify_market(temp, market_temp) + + actionable = [r for r in recommendations if r.action_plan == "可操作"] + watch = [r for r in recommendations if r.action_plan == "重点关注"] + avg_score = ( + round(sum(r.score for r in recommendations) / len(recommendations), 1) + if recommendations else 0 + ) + + recommended_mode = _choose_strategy_mode(temp, sectors, recommendations) + strategy_focus = _build_strategy_focus(temp, sectors, recommendations) + watch_sectors = [_sector_focus(s) for s in sectors[:5]] + avoid_rules = _build_avoid_rules(temp, sectors, recommendations) + iteration_notes = _build_iteration_notes(performance, recommendations) + + summary = ( + f"{market_regime},风险等级{risk_level}。" + f"当前 {len(recommendations)} 只入选,其中 {len(actionable)} 只可操作、" + f"{len(watch)} 只重点关注,平均分 {avg_score}。" + ) + + metrics = { + "temperature": temp, + "recommendation_count": len(recommendations), + "actionable_count": len(actionable), + "watch_count": len(watch), + "avg_score": avg_score, + "win_rate": performance.get("win_rate", 0), + "avg_return": performance.get("avg_return", 0), + "tracked": performance.get("tracked", 0), + } + + return StrategyBoard( + trade_date=trade_date, + market_regime=market_regime, + risk_level=risk_level, + action_bias=action_bias, + position_suggestion=position_suggestion, + summary=summary, + recommended_mode=recommended_mode, + strategy_focus=strategy_focus, + watch_sectors=watch_sectors, + avoid_rules=avoid_rules, + iteration_notes=iteration_notes, + metrics=metrics, + ) + + +def _classify_market( + temp: float, market_temp: MarketTemperature | None +) -> tuple[str, str, str, str]: + if temp >= 75: + return ("强势进攻", "低", "可积极关注主线龙头和突破确认", "单票 20%-30%,总仓 50%-70%") + if temp >= 60: + return ("修复偏强", "中低", "优先做早中期板块的突破/回踩确认", "单票 15%-25%,总仓 40%-60%") + if temp >= 45: + return ("震荡分化", "中", "只做板块一致性强的低吸或确认机会", "单票 10%-20%,总仓 25%-40%") + if temp >= 30: + return ("弱势防守", "中高", "以观察池为主,减少追高,只等强确认", "单票 0%-10%,总仓 0%-25%") + return ("退潮冰点", "高", "暂停主动出手,等待市场修复和主线重新出现", "空仓或极低仓观察") + + +def _choose_strategy_mode( + temp: float, sectors: list[SectorInfo], recommendations: list[Recommendation] +) -> str: + early_mid = [s for s in sectors[:5] if s.stage in ("early", "mid")] + if temp >= 60 and early_mid: + return "主线突破 + 回踩确认" + if temp >= 45: + return "精选回踩,降低追高" + if recommendations: + return "观察池跟踪,等待触发" + return "防守观察" + + +def _build_strategy_focus( + temp: float, sectors: list[SectorInfo], recommendations: list[Recommendation] +) -> list[StrategyFocus]: + focus: list[StrategyFocus] = [] + signal_counts: dict[str, int] = {} + for rec in recommendations: + signal_counts[rec.entry_signal_type] = signal_counts.get(rec.entry_signal_type, 0) + 1 + + top_signal = max(signal_counts, key=signal_counts.get) if signal_counts else "" + signal_label = { + "breakout": "突破型", + "breakout_confirm": "突破确认型", + "pullback": "回踩型", + "launch": "启动型", + "reversal": "反转型", + }.get(top_signal, "观察型") + + focus.append(StrategyFocus( + label=signal_label, + description=f"当前推荐中该类型占比较高,适合作为今日主要观察模板。", + )) + + if sectors: + main = sectors[0] + focus.append(StrategyFocus( + label=f"{main.sector_name} 主线跟踪", + description=f"热度 {main.heat_score},阶段 {main.stage},优先确认资金是否延续。", + )) + + if temp < 45: + focus.append(StrategyFocus( + label="防守优先", + description="市场温度不足,推荐只作为观察池,不宜扩大仓位。", + )) + + return focus + + +def _sector_focus(sector: SectorInfo) -> StrategySectorFocus: + stage_view = { + "early": "早期,重点观察资金是否连续流入", + "mid": "中期,适合寻找回踩或突破确认", + "late": "后期,防止加速后分歧", + "end": "末期,谨慎追高", + }.get(sector.stage, "阶段不明,等待确认") + + return StrategySectorFocus( + sector_name=sector.sector_name, + stage=sector.stage, + heat_score=sector.heat_score, + pct_change=sector.pct_change, + limit_up_count=sector.limit_up_count, + view=stage_view, + ) + + +def _build_avoid_rules( + temp: float, sectors: list[SectorInfo], recommendations: list[Recommendation] +) -> list[str]: + rules = [] + if temp < 45: + rules.append("市场温度低于45时,不追突破首日,只等次日确认或回踩。") + if any(s.stage == "end" for s in sectors[:5]): + rules.append("板块进入末期时,降低同板块追高标的权重。") + if any(r.position_score < 35 for r in recommendations): + rules.append("位置安全分低于35的标的,只观察不主动追入。") + if not rules: + rules.append("推荐失效条件触发后不补仓,等待下一次扫描重新确认。") + return rules + + +def _build_iteration_notes(performance: dict, recommendations: list[Recommendation]) -> list[str]: + notes = [] + tracked = performance.get("tracked", 0) or 0 + win_rate = performance.get("win_rate", 0) or 0 + avg_return = performance.get("avg_return", 0) or 0 + hit_stop = performance.get("hit_stop_count", 0) or 0 + hit_target = performance.get("hit_target_count", 0) or 0 + + if tracked < 10: + notes.append("跟踪样本不足,暂不自动调整策略权重,优先积累推荐生命周期数据。") + else: + if win_rate < 45: + notes.append("近期胜率偏低,下轮应提高入场确认门槛,减少弱势环境下的突破型推荐。") + if avg_return < 0: + notes.append("平均收益为负,建议收紧止损触发和推荐失效条件。") + if hit_stop > hit_target: + notes.append("止损次数多于命中目标,优先复查追高和板块末期惩罚是否不足。") + + actionable_count = sum(1 for r in recommendations if r.action_plan == "可操作") + if actionable_count > 5: + notes.append("可操作标的偏多,前端应按板块集中度和评分排序控制关注数量。") + + return notes + + +async def _generate_ai_review( + board: StrategyBoard, + recommendations: list[Recommendation], + performance: dict, +) -> str: + """用 LLM 生成简短的策略解释,不参与硬性交易决策。""" + from app.llm.client import chat_completion + + rec_lines = "\n".join( + f"- {r.name}({r.ts_code}) {r.action_plan} {r.entry_signal_type} " + f"评分{r.score} 仓位{r.suggested_position_pct}% 触发: {r.trigger_condition}" + for r in recommendations[:8] + ) or "暂无推荐" + + user_msg = f"""请基于以下系统数据,生成一段今日A股策略作战说明,要求: +1. 明确区分市场事实、策略推断和风险约束; +2. 不要承诺收益,不要给绝对化买卖结论; +3. 最多220字,中文。 + +市场状态: {board.market_regime} +风险等级: {board.risk_level} +操作倾向: {board.action_bias} +仓位建议: {board.position_suggestion} +推荐策略: {board.recommended_mode} +历史跟踪: 胜率{performance.get('win_rate', 0)}%, 平均收益{performance.get('avg_return', 0)}% + +推荐摘要: +{rec_lines} +""" + + resp = await chat_completion([ + {"role": "system", "content": "你是一位谨慎的A股交易研究助手,擅长把量化结果转成可执行但有风险边界的策略说明。"}, + {"role": "user", "content": user_msg}, + ]) + return resp.content.strip() if resp and resp.content else "" diff --git a/backend/app/llm/strategy_iteration.py b/backend/app/llm/strategy_iteration.py new file mode 100644 index 00000000..49264eb5 --- /dev/null +++ b/backend/app/llm/strategy_iteration.py @@ -0,0 +1,286 @@ +"""策略复盘迭代 Agent + +基于推荐生命周期表现,输出可审查的策略调整建议。 +不直接修改策略参数,只给出建议和证据。 +""" + +import json +import logging +from collections import defaultdict +from datetime import datetime + +from app.config import settings + +logger = logging.getLogger(__name__) + + +async def build_strategy_iteration_report(limit: int = 50, include_llm: bool = False) -> dict: + rows = await _load_recent_tracking(limit) + rule_report = _build_rule_report(rows) + + if include_llm and settings.deepseek_api_key and rows: + ai_text = await _generate_ai_iteration(rule_report, rows) + if ai_text: + rule_report["ai_analysis"] = ai_text + rule_report["generated_by"] = "rules+llm" + + return rule_report + + +async def _load_recent_tracking(limit: int) -> list[dict]: + from sqlalchemy import text + from app.db.database import get_db + + async with get_db() as db: + rec_columns = await _get_table_columns(db, "recommendations") + tracking_columns = await _get_table_columns(db, "recommendation_tracking") + r_action_plan = _column_or_default(rec_columns, "action_plan", "'观察'", "r") + r_position_score = _column_or_default(rec_columns, "position_score", "50", "r") + r_lifecycle_status = _column_or_default(rec_columns, "lifecycle_status", "'candidate'", "r") + t_max_return = _column_or_default(tracking_columns, "max_return_pct", "t.pct_from_entry", "t") + t_max_drawdown = _column_or_default(tracking_columns, "max_drawdown_pct", "t.pct_from_entry", "t") + t_days_since = _column_or_default(tracking_columns, "days_since_recommendation", "0", "t") + t_close_reason = _column_or_default(tracking_columns, "close_reason", "''", "t") + t_review_note = _column_or_default(tracking_columns, "review_note", "''", "t") + + result = await db.execute( + text( + "SELECT r.id, r.ts_code, r.name, r.sector, r.strategy, r.entry_signal_type, " + f"{r_action_plan} AS action_plan, r.score, r.market_temp_score, r.sector_score, " + f"{r_position_score} AS position_score, {r_lifecycle_status} AS lifecycle_status, r.created_at, " + f"t.pct_from_entry, {t_max_return} AS max_return_pct, {t_max_drawdown} AS max_drawdown_pct, " + f"{t_days_since} AS days_since_recommendation, {t_close_reason} AS close_reason, " + f"{t_review_note} AS review_note, t.track_date " + "FROM recommendations r " + "LEFT JOIN (" + " SELECT t.* FROM recommendation_tracking t " + " INNER JOIN (" + " SELECT recommendation_id, MAX(id) AS max_id " + " FROM recommendation_tracking GROUP BY recommendation_id" + " ) latest ON t.id = latest.max_id" + ") t ON t.recommendation_id = r.id " + "ORDER BY r.created_at DESC LIMIT :limit" + ), + {"limit": limit}, + ) + return [dict(row._mapping) for row in result.fetchall()] + + +async def _get_table_columns(db, table_name: str) -> set[str]: + from sqlalchemy import text + + result = await db.execute(text(f"PRAGMA table_info({table_name})")) + return {row._mapping["name"] for row in result.fetchall()} + + +def _column_or_default(columns: set[str], column_name: str, default_sql: str, alias: str = "") -> str: + if column_name in columns: + return f"{alias}.{column_name}" if alias else column_name + return default_sql + + +def _build_rule_report(rows: list[dict]) -> dict: + if not rows: + return { + "generated_at": datetime.now().isoformat(), + "sample_size": 0, + "summary": "暂无可复盘的推荐样本。", + "strategy_stats": [], + "signal_stats": [], + "failure_patterns": ["样本不足,先积累推荐生命周期数据。"], + "adjustment_suggestions": [], + "ai_analysis": "", + "generated_by": "rules", + } + + tracked_rows = [r for r in rows if r.get("pct_from_entry") is not None] + strategy_stats = _group_stats(tracked_rows, "strategy") + signal_stats = _group_stats(tracked_rows, "entry_signal_type") + failure_patterns = _detect_failure_patterns(tracked_rows) + suggestions = _build_adjustment_suggestions(strategy_stats, signal_stats, failure_patterns, len(tracked_rows)) + + wins = sum(1 for r in tracked_rows if (r.get("pct_from_entry") or 0) > 0) + avg_return = _avg([r.get("pct_from_entry") for r in tracked_rows]) + avg_drawdown = _avg([r.get("max_drawdown_pct") for r in tracked_rows]) + win_rate = round(wins / len(tracked_rows) * 100, 1) if tracked_rows else 0 + + return { + "generated_at": datetime.now().isoformat(), + "sample_size": len(tracked_rows), + "summary": ( + f"最近 {len(rows)} 条推荐中,{len(tracked_rows)} 条已有跟踪数据;" + f"胜率 {win_rate}%,平均收益 {avg_return}%,平均最大回撤 {avg_drawdown}%。" + ), + "strategy_stats": strategy_stats, + "signal_stats": signal_stats, + "failure_patterns": failure_patterns, + "adjustment_suggestions": suggestions, + "ai_analysis": "", + "generated_by": "rules", + } + + +def _group_stats(rows: list[dict], key: str) -> list[dict]: + groups: dict[str, list[dict]] = defaultdict(list) + for row in rows: + groups[row.get(key) or "unknown"].append(row) + + stats = [] + for name, items in groups.items(): + wins = sum(1 for r in items if (r.get("pct_from_entry") or 0) > 0) + hit_stop = sum(1 for r in items if r.get("close_reason") == "hit_stop_loss") + hit_target = sum(1 for r in items if r.get("close_reason") == "hit_target") + stats.append({ + "name": name, + "count": len(items), + "win_rate": round(wins / len(items) * 100, 1) if items else 0, + "avg_return": _avg([r.get("pct_from_entry") for r in items]), + "avg_max_return": _avg([r.get("max_return_pct") for r in items]), + "avg_max_drawdown": _avg([r.get("max_drawdown_pct") for r in items]), + "hit_target": hit_target, + "hit_stop": hit_stop, + }) + + stats.sort(key=lambda x: (x["count"], x["avg_return"]), reverse=True) + return stats + + +def _detect_failure_patterns(rows: list[dict]) -> list[str]: + patterns = [] + if not rows: + return ["暂无跟踪样本。"] + + weak_market_losses = [ + r for r in rows + if (r.get("market_temp_score") or 0) < 45 and (r.get("pct_from_entry") or 0) < 0 + ] + if len(weak_market_losses) >= 2: + patterns.append("弱势市场中仍有亏损推荐,低温环境下应进一步减少 BUY 或提高确认门槛。") + + high_position_losses = [ + r for r in rows + if (r.get("position_score") or 50) < 40 and (r.get("pct_from_entry") or 0) < 0 + ] + if len(high_position_losses) >= 2: + patterns.append("位置安全分偏低的推荐亏损较多,追高惩罚需要增强。") + + stop_losses = [r for r in rows if r.get("close_reason") == "hit_stop_loss"] + if len(stop_losses) >= 2: + patterns.append("触发止损样本偏多,需要复查止损位置和入场触发条件是否过宽。") + + expired_flat = [ + r for r in rows + if r.get("close_reason") in ("review_expired_flat", "review_expired_loss") + ] + if len(expired_flat) >= 3: + patterns.append("多只推荐到期未形成有效进攻,观察池转可操作的条件需要更严格。") + + if not patterns: + patterns.append("暂无明显集中失败模式,继续积累样本并按策略分组观察。") + return patterns + + +def _build_adjustment_suggestions( + strategy_stats: list[dict], + signal_stats: list[dict], + failure_patterns: list[str], + sample_size: int, +) -> list[dict]: + suggestions = [] + + if sample_size < 10: + return [{ + "target": "全局策略", + "action": "observe", + "reason": "跟踪样本少于10条,暂不建议调整参数。", + "confidence": "low", + }] + + for stat in strategy_stats: + if stat["count"] >= 3 and stat["win_rate"] < 40 and stat["avg_return"] < 0: + suggestions.append({ + "target": stat["name"], + "action": "tighten", + "reason": f"{stat['name']} 胜率{stat['win_rate']}%,平均收益{stat['avg_return']}%,建议提高买入门槛。", + "confidence": "medium", + }) + elif stat["count"] >= 3 and stat["win_rate"] >= 60 and stat["avg_return"] > 1: + suggestions.append({ + "target": stat["name"], + "action": "promote", + "reason": f"{stat['name']} 近期表现较好,可在相似市场环境下优先使用。", + "confidence": "medium", + }) + + for stat in signal_stats: + if stat["count"] >= 3 and stat["avg_max_drawdown"] < -5: + suggestions.append({ + "target": stat["name"], + "action": "reduce", + "reason": f"{stat['name']} 平均最大回撤{stat['avg_max_drawdown']}%,建议降低排序权重或增加位置过滤。", + "confidence": "medium", + }) + + if any("弱势市场" in p for p in failure_patterns): + suggestions.append({ + "target": "defensive_watch", + "action": "tighten", + "reason": "弱势市场亏损样本集中,防守策略下应只保留观察池,减少 BUY。", + "confidence": "high", + }) + + if not suggestions: + suggestions.append({ + "target": "全局策略", + "action": "keep", + "reason": "当前样本未显示需要立即调整的集中问题。", + "confidence": "medium", + }) + + return suggestions[:6] + + +async def _generate_ai_iteration(rule_report: dict, rows: list[dict]) -> str: + from app.llm.client import chat_completion + + sample = [ + { + "name": r.get("name"), + "strategy": r.get("strategy"), + "signal": r.get("entry_signal_type"), + "return": r.get("pct_from_entry"), + "max_return": r.get("max_return_pct"), + "drawdown": r.get("max_drawdown_pct"), + "reason": r.get("close_reason"), + "market_temp": r.get("market_temp_score"), + "position_score": r.get("position_score"), + } + for r in rows[:20] + ] + + user_msg = f"""请基于以下推荐复盘数据,输出策略迭代建议。 +要求: +1. 明确指出最该收紧、保留、加强的策略或信号; +2. 只提出可执行调整建议,不要泛泛而谈; +3. 不要承诺收益; +4. 180字以内。 + +规则复盘: +{json.dumps(rule_report, ensure_ascii=False)} + +样本: +{json.dumps(sample, ensure_ascii=False)} +""" + + resp = await chat_completion([ + {"role": "system", "content": "你是一位A股策略复盘研究员,负责基于推荐结果提出保守、可验证的策略迭代建议。"}, + {"role": "user", "content": user_msg}, + ]) + return resp.content.strip() if resp and resp.content else "" + + +def _avg(values: list) -> float: + clean = [float(v) for v in values if v is not None] + if not clean: + return 0 + return round(sum(clean) / len(clean), 2) diff --git a/backend/app/llm/strategy_selector.py b/backend/app/llm/strategy_selector.py new file mode 100644 index 00000000..f1c9c53d --- /dev/null +++ b/backend/app/llm/strategy_selector.py @@ -0,0 +1,211 @@ +"""动态策略选择器 + +在固定筛选引擎前增加一层“先选打法,再选股票”的策略决策。 +规则负责稳定分类,LLM 负责补充语义判断和操作建议。 +""" + +import json +import logging + +from pydantic import BaseModel + +from app.config import settings +from app.data.models import MarketTemperature, SectorInfo + +logger = logging.getLogger(__name__) + + +class StrategyProfile(BaseModel): + strategy_id: str + name: str + description: str + entry_signal_priority: list[str] + score_weights: dict[str, float] + min_score: float + buy_threshold: float + max_position_pct: float + notes: list[str] = [] + generated_by: str = "rules" + + +async def select_strategy_profile( + market_temp: MarketTemperature | None, + hot_sectors: list[SectorInfo], + intraday: bool, +) -> StrategyProfile: + profile = _select_rule_profile(market_temp, hot_sectors, intraday) + + if settings.deepseek_api_key: + llm_profile = await _select_llm_profile(market_temp, hot_sectors, intraday, profile) + if llm_profile: + profile = llm_profile + + return profile + + +def _select_rule_profile( + market_temp: MarketTemperature | None, + hot_sectors: list[SectorInfo], + intraday: bool, +) -> StrategyProfile: + temp = market_temp.temperature if market_temp else 0 + early_count = sum(1 for s in hot_sectors[:5] if s.stage == "early") + late_count = sum(1 for s in hot_sectors[:5] if s.stage in ("late", "end")) + + if temp >= 65 and early_count >= 1: + return StrategyProfile( + strategy_id="breakout_attack", + name="主线突破", + description="市场偏强,优先寻找主线板块内的突破和突破确认。", + entry_signal_priority=["breakout", "breakout_confirm", "launch", "pullback", "reversal"], + score_weights={"supply_demand": 0.45, "price_action": 0.35, "trend": 0.20}, + min_score=62, + buy_threshold=66, + max_position_pct=30, + notes=["优先做主线早中期板块", "放量突破优先于回踩低吸"], + ) + + if temp >= 45 and late_count < 2: + return StrategyProfile( + strategy_id="pullback_rotation", + name="回踩轮动", + description="市场震荡分化,优先做回踩支撑和板块轮动中的低吸确认。", + entry_signal_priority=["pullback", "breakout_confirm", "launch", "breakout", "reversal"], + score_weights={"supply_demand": 0.40, "price_action": 0.30, "trend": 0.30}, + min_score=60, + buy_threshold=63, + max_position_pct=20, + notes=["降低追高仓位", "更看重位置安全和回踩承接"], + ) + + if temp >= 30: + return StrategyProfile( + strategy_id="launch_probe", + name="启动试错", + description="市场偏弱,适合少量观察启动型和反转型机会,不做强追涨。", + entry_signal_priority=["launch", "reversal", "pullback", "breakout_confirm", "breakout"], + score_weights={"supply_demand": 0.35, "price_action": 0.35, "trend": 0.30}, + min_score=58, + buy_threshold=61, + max_position_pct=10, + notes=["仅做小仓位试错", "突破型需要更强板块一致性才可介入"], + ) + + return StrategyProfile( + strategy_id="defensive_watch", + name="防守观察", + description="市场退潮,系统以观察池为主,不主动扩大出手。", + entry_signal_priority=["pullback", "launch", "reversal", "breakout_confirm", "breakout"], + score_weights={"supply_demand": 0.35, "price_action": 0.40, "trend": 0.25}, + min_score=56, + buy_threshold=64, + max_position_pct=5, + notes=["原则上只保留观察池", "等待市场温度修复后再转入主动进攻"], + ) + + +async def _select_llm_profile( + market_temp: MarketTemperature | None, + hot_sectors: list[SectorInfo], + intraday: bool, + fallback: StrategyProfile, +) -> StrategyProfile | None: + from app.llm.client import chat_completion + + sector_text = "\n".join( + f"- {s.sector_name}: 涨幅{s.pct_change}%, 热度{s.heat_score}, 阶段{s.stage}, 涨停{s.limit_up_count}" + for s in hot_sectors[:5] + ) or "暂无板块数据" + + user_msg = f"""你需要为今日A股环境选择一个短线策略模板。 + +市场温度: {market_temp.temperature if market_temp else 0} +上涨家数: {market_temp.up_count if market_temp else 0} +下跌家数: {market_temp.down_count if market_temp else 0} +涨停数: {market_temp.limit_up_count if market_temp else 0} +炸板率: {market_temp.broken_rate if market_temp else 0} +盘中模式: {'是' if intraday else '否'} + +热门板块: +{sector_text} + +规则候选策略: +- breakout_attack: 主线突破 +- pullback_rotation: 回踩轮动 +- launch_probe: 启动试错 +- defensive_watch: 防守观察 + +请输出 JSON,格式: +{{ + "strategy_id": "上面四选一", + "notes": ["两条以内理由"], + "buy_threshold_delta": -3到3之间的整数 +}} +""" + + resp = await chat_completion([ + {"role": "system", "content": "你是一位A股短线策略研究员,只能在给定策略模板中选择,不要发明新策略。回复必须是 JSON。"}, + {"role": "user", "content": user_msg}, + ]) + if not resp or not resp.content: + return None + + try: + data = json.loads(resp.content) + strategy_id = data.get("strategy_id") + if strategy_id not in {"breakout_attack", "pullback_rotation", "launch_probe", "defensive_watch"}: + return None + selected = _select_rule_profile(market_temp, hot_sectors, intraday) + if selected.strategy_id != strategy_id: + selected = { + "breakout_attack": StrategyProfile( + strategy_id="breakout_attack", + name="主线突破", + description="市场偏强,优先寻找主线板块内的突破和突破确认。", + entry_signal_priority=["breakout", "breakout_confirm", "launch", "pullback", "reversal"], + score_weights={"supply_demand": 0.45, "price_action": 0.35, "trend": 0.20}, + min_score=62, + buy_threshold=66, + max_position_pct=30, + ), + "pullback_rotation": StrategyProfile( + strategy_id="pullback_rotation", + name="回踩轮动", + description="市场震荡分化,优先做回踩支撑和板块轮动中的低吸确认。", + entry_signal_priority=["pullback", "breakout_confirm", "launch", "breakout", "reversal"], + score_weights={"supply_demand": 0.40, "price_action": 0.30, "trend": 0.30}, + min_score=60, + buy_threshold=63, + max_position_pct=20, + ), + "launch_probe": StrategyProfile( + strategy_id="launch_probe", + name="启动试错", + description="市场偏弱,适合少量观察启动型和反转型机会,不做强追涨。", + entry_signal_priority=["launch", "reversal", "pullback", "breakout_confirm", "breakout"], + score_weights={"supply_demand": 0.35, "price_action": 0.35, "trend": 0.30}, + min_score=58, + buy_threshold=61, + max_position_pct=10, + ), + "defensive_watch": StrategyProfile( + strategy_id="defensive_watch", + name="防守观察", + description="市场退潮,系统以观察池为主,不主动扩大出手。", + entry_signal_priority=["pullback", "launch", "reversal", "breakout_confirm", "breakout"], + score_weights={"supply_demand": 0.35, "price_action": 0.40, "trend": 0.25}, + min_score=56, + buy_threshold=64, + max_position_pct=5, + ), + }[strategy_id] + + delta = int(data.get("buy_threshold_delta", 0)) + delta = max(-3, min(3, delta)) + selected.buy_threshold += delta + selected.notes.extend(data.get("notes", [])[:2]) + selected.generated_by = "rules+llm" + return selected + except Exception as e: + logger.debug(f"LLM 策略选择解析失败: {e}") + return fallback diff --git a/backend/app/llm/tool_executor.py b/backend/app/llm/tool_executor.py index 4444493c..7bfc2419 100644 --- a/backend/app/llm/tool_executor.py +++ b/backend/app/llm/tool_executor.py @@ -9,16 +9,27 @@ import math logger = logging.getLogger(__name__) +_chat_user_context: dict | None = None + + +def set_chat_user_context(user: dict | None) -> None: + global _chat_user_context + _chat_user_context = user + async def execute_tool(name: str, arguments: dict) -> str: """执行工具调用,返回 JSON 字符串""" try: - if name == "get_market_temperature": + if name == "get_strategy_board": + return await _get_strategy_board() + elif name == "get_market_temperature": return await _get_market_temperature() elif name == "get_hot_sectors": return await _get_hot_sectors(arguments.get("limit", 10)) elif name == "get_latest_recommendations": return await _get_latest_recommendations() + elif name == "get_user_watchlist_snapshot": + return await _get_user_watchlist_snapshot() elif name == "get_stock_kline": return await _get_stock_kline( arguments["ts_code"], arguments.get("days", 60) @@ -53,6 +64,28 @@ def _clean_for_json(obj): return obj +async def _get_strategy_board() -> str: + from app.llm.strategy_board import build_strategy_board + + board = await build_strategy_board(include_llm=False) + payload = { + "trade_date": board.get("trade_date", ""), + "market_regime": board.get("market_regime", ""), + "risk_level": board.get("risk_level", ""), + "action_bias": board.get("action_bias", ""), + "position_suggestion": board.get("position_suggestion", ""), + "summary": board.get("summary", ""), + "recommended_mode": board.get("recommended_mode", ""), + "watch_sectors": board.get("watch_sectors", [])[:5], + "strategy_focus": board.get("strategy_focus", [])[:4], + "avoid_rules": board.get("avoid_rules", [])[:4], + "iteration_notes": board.get("iteration_notes", [])[:3], + "metrics": board.get("metrics", {}), + "generated_by": board.get("generated_by", "rules"), + } + return json.dumps(_clean_for_json(payload), ensure_ascii=False, default=str) + + async def _get_market_temperature() -> str: from app.engine.recommender import get_latest_recommendations result = await get_latest_recommendations() @@ -73,10 +106,64 @@ async def _get_latest_recommendations() -> str: from app.engine.recommender import get_latest_recommendations result = await get_latest_recommendations() recs = result.get("recommendations", []) - data = [r.model_dump(exclude={"created_at"}) for r in recs] + data = [] + for rec in recs: + item = rec.model_dump(exclude={"created_at"}) + item["llm_analysis"] = "" + data.append(item) return json.dumps(data, ensure_ascii=False, default=str) +async def _get_user_watchlist_snapshot() -> str: + from sqlalchemy import text + from app.db.database import get_db + + user_id = (_chat_user_context or {}).get("id") + if not user_id: + return json.dumps({"error": "当前会话缺少用户上下文"}, ensure_ascii=False) + + async with get_db() as db: + rows = (await db.execute( + text( + "SELECT w.id, w.ts_code, w.name, w.note, w.watch_group, w.cost_price, w.updated_at, " + "a.conclusion, a.advice, a.trigger_condition, a.risk_note, a.summary, " + "a.analysis_mode, a.created_at AS analysis_created_at " + "FROM user_watchlists w " + "LEFT JOIN watchlist_analyses a ON a.id = (" + " SELECT id FROM watchlist_analyses " + " WHERE watchlist_id = w.id ORDER BY created_at DESC, id DESC LIMIT 1" + ") " + "WHERE w.user_id = :uid AND COALESCE(w.is_active, 1) = 1 " + "ORDER BY CASE w.watch_group " + " WHEN 'focus' THEN 1 " + " WHEN 'candidate' THEN 2 " + " WHEN 'holding' THEN 3 " + " ELSE 4 END, w.updated_at DESC, w.id DESC" + ), + {"uid": user_id}, + )).fetchall() + + items = [dict(row._mapping) for row in rows] + grouped = {"focus": 0, "candidate": 0, "holding": 0, "observe": 0} + for item in items: + key = item.get("watch_group") or "observe" + if key in grouped: + grouped[key] += 1 + + actionable = [ + item for item in items + if item.get("conclusion") in {"可操作", "重点关注"} + ][:8] + + payload = { + "count": len(items), + "group_counts": grouped, + "high_priority": actionable, + "items": items[:20], + } + return json.dumps(_clean_for_json(payload), ensure_ascii=False, default=str) + + async def _get_stock_kline(ts_code: str, days: int) -> str: from app.data.tushare_client import tushare_client from app.analysis.technical import add_all_indicators diff --git a/backend/app/llm/tools.py b/backend/app/llm/tools.py index 85247ca7..c7d2c0bd 100644 --- a/backend/app/llm/tools.py +++ b/backend/app/llm/tools.py @@ -4,6 +4,18 @@ """ CHAT_TOOLS = [ + { + "type": "function", + "function": { + "name": "get_strategy_board", + "description": "获取今日作战结论,包括市场状态、今日打法、建议仓位、重点板块和规避规则", + "parameters": { + "type": "object", + "properties": {}, + "required": [], + }, + }, + }, { "type": "function", "function": { @@ -45,6 +57,18 @@ CHAT_TOOLS = [ }, }, }, + { + "type": "function", + "function": { + "name": "get_user_watchlist_snapshot", + "description": "获取当前用户自选股概览,包括分组、最新结论、建议、触发条件和摘要", + "parameters": { + "type": "object", + "properties": {}, + "required": [], + }, + }, + }, { "type": "function", "function": { diff --git a/backend/app/main.py b/backend/app/main.py index e11434ef..b3c2a7b6 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -8,7 +8,7 @@ from fastapi.middleware.cors import CORSMiddleware from app.config import settings from app.db.database import init_db from app.engine.scheduler import start_scheduler, stop_scheduler -from app.api import market, sectors, recommendations, stocks, websocket, chat, auth, debug +from app.api import market, sectors, recommendations, stocks, watchlists, websocket, chat, auth, debug logging.basicConfig( level=logging.DEBUG if settings.debug else logging.INFO, @@ -78,6 +78,7 @@ app.include_router(market.router) app.include_router(sectors.router) app.include_router(recommendations.router) app.include_router(stocks.router) +app.include_router(watchlists.router) app.include_router(chat.router) app.include_router(auth.router) app.include_router(debug.router) diff --git a/backend/astock.db b/backend/astock.db index e1b0e525..31ffc2d1 100644 Binary files a/backend/astock.db and b/backend/astock.db differ diff --git a/frontend/.next/app-build-manifest.json b/frontend/.next/app-build-manifest.json index 0ec4fa49..bce6a549 100644 --- a/frontend/.next/app-build-manifest.json +++ b/frontend/.next/app-build-manifest.json @@ -1,5 +1,11 @@ { "pages": { + "/layout": [ + "static/chunks/webpack.js", + "static/chunks/main-app.js", + "static/css/app/layout.css", + "static/chunks/app/layout.js" + ], "/(auth)/dashboard/page": [ "static/chunks/webpack.js", "static/chunks/main-app.js", @@ -10,22 +16,61 @@ "static/chunks/main-app.js", "static/chunks/app/(auth)/layout.js" ], - "/layout": [ - "static/chunks/webpack.js", - "static/chunks/main-app.js", - "static/css/app/layout.css", - "static/chunks/app/layout.js" - ], "/(auth)/recommendations/page": [ "static/chunks/webpack.js", "static/chunks/main-app.js", "static/chunks/app/(auth)/recommendations/page.js" ], + "/(public)/layout": [ + "static/chunks/webpack.js", + "static/chunks/main-app.js", + "static/chunks/app/(public)/layout.js" + ], + "/(auth)/strategy/page": [ + "static/chunks/webpack.js", + "static/chunks/main-app.js", + "static/chunks/app/(auth)/strategy/page.js" + ], + "/(auth)/sectors/page": [ + "static/chunks/webpack.js", + "static/chunks/main-app.js", + "static/chunks/app/(auth)/sectors/page.js" + ], + "/(auth)/diagnose/page": [ + "static/chunks/webpack.js", + "static/chunks/main-app.js", + "static/chunks/app/(auth)/diagnose/page.js" + ], + "/(auth)/stock/[code]/page": [ + "static/chunks/webpack.js", + "static/chunks/main-app.js", + "static/chunks/app/(auth)/stock/[code]/page.js" + ], "/(auth)/settings/page": [ "static/chunks/webpack.js", "static/chunks/main-app.js", "static/chunks/app/(auth)/settings/page.js" ], + "/(auth)/watchlists/page": [ + "static/chunks/webpack.js", + "static/chunks/main-app.js", + "static/chunks/app/(auth)/watchlists/page.js" + ], + "/(public)/login/page": [ + "static/chunks/webpack.js", + "static/chunks/main-app.js", + "static/chunks/app/(public)/login/page.js" + ], + "/(auth)/chat/page": [ + "static/chunks/webpack.js", + "static/chunks/main-app.js", + "static/chunks/app/(auth)/chat/page.js" + ], + "/(public)/page": [ + "static/chunks/webpack.js", + "static/chunks/main-app.js", + "static/chunks/app/(public)/page.js" + ], "/_not-found/page": [ "static/chunks/webpack.js", "static/chunks/main-app.js", diff --git a/frontend/.next/build-manifest.json b/frontend/.next/build-manifest.json index 018cb67f..b4e9156a 100644 --- a/frontend/.next/build-manifest.json +++ b/frontend/.next/build-manifest.json @@ -2,7 +2,9 @@ "polyfillFiles": [ "static/chunks/polyfills.js" ], - "devFiles": [], + "devFiles": [ + "static/chunks/react-refresh.js" + ], "ampDevFiles": [], "lowPriorityFiles": [ "static/development/_buildManifest.js", @@ -13,7 +15,16 @@ "static/chunks/main-app.js" ], "pages": { - "/_app": [] + "/_app": [ + "static/chunks/webpack.js", + "static/chunks/main.js", + "static/chunks/pages/_app.js" + ], + "/_error": [ + "static/chunks/webpack.js", + "static/chunks/main.js", + "static/chunks/pages/_error.js" + ] }, "ampFirstPages": [] } \ No newline at end of file diff --git a/frontend/.next/react-loadable-manifest.json b/frontend/.next/react-loadable-manifest.json index 9e26dfee..708968d6 100644 --- a/frontend/.next/react-loadable-manifest.json +++ b/frontend/.next/react-loadable-manifest.json @@ -1 +1,20 @@ -{} \ No newline at end of file +{ + "app/(auth)/sectors/page.tsx -> echarts": { + "id": "app/(auth)/sectors/page.tsx -> echarts", + "files": [ + "static/chunks/_app-pages-browser_node_modules_echarts_index_js.js" + ] + }, + "components/capital-flow.tsx -> echarts": { + "id": "components/capital-flow.tsx -> echarts", + "files": [ + "static/chunks/_app-pages-browser_node_modules_echarts_index_js.js" + ] + }, + "components/kline-chart.tsx -> echarts": { + "id": "components/kline-chart.tsx -> echarts", + "files": [ + "static/chunks/_app-pages-browser_node_modules_echarts_index_js.js" + ] + } +} \ No newline at end of file diff --git a/frontend/.next/server/app-paths-manifest.json b/frontend/.next/server/app-paths-manifest.json index d8cf5ff6..32c12767 100644 --- a/frontend/.next/server/app-paths-manifest.json +++ b/frontend/.next/server/app-paths-manifest.json @@ -1,6 +1,8 @@ { "/_not-found/page": "app/_not-found/page.js", "/(auth)/dashboard/page": "app/(auth)/dashboard/page.js", - "/(auth)/recommendations/page": "app/(auth)/recommendations/page.js", - "/(auth)/settings/page": "app/(auth)/settings/page.js" + "/(auth)/stock/[code]/page": "app/(auth)/stock/[code]/page.js", + "/(auth)/chat/page": "app/(auth)/chat/page.js", + "/(public)/login/page": "app/(public)/login/page.js", + "/(public)/page": "app/(public)/page.js" } \ No newline at end of file diff --git a/frontend/.next/server/interception-route-rewrite-manifest.js b/frontend/.next/server/interception-route-rewrite-manifest.js index 24f77ba7..82d3ab17 100644 --- a/frontend/.next/server/interception-route-rewrite-manifest.js +++ b/frontend/.next/server/interception-route-rewrite-manifest.js @@ -1 +1 @@ -self.__INTERCEPTION_ROUTE_REWRITE_MANIFEST="[]"; \ No newline at end of file +self.__INTERCEPTION_ROUTE_REWRITE_MANIFEST="[]" \ No newline at end of file diff --git a/frontend/.next/server/middleware-build-manifest.js b/frontend/.next/server/middleware-build-manifest.js index 36489d8c..424a1a19 100644 --- a/frontend/.next/server/middleware-build-manifest.js +++ b/frontend/.next/server/middleware-build-manifest.js @@ -2,7 +2,9 @@ self.__BUILD_MANIFEST = { "polyfillFiles": [ "static/chunks/polyfills.js" ], - "devFiles": [], + "devFiles": [ + "static/chunks/react-refresh.js" + ], "ampDevFiles": [], "lowPriorityFiles": [], "rootMainFiles": [ @@ -10,7 +12,16 @@ self.__BUILD_MANIFEST = { "static/chunks/main-app.js" ], "pages": { - "/_app": [] + "/_app": [ + "static/chunks/webpack.js", + "static/chunks/main.js", + "static/chunks/pages/_app.js" + ], + "/_error": [ + "static/chunks/webpack.js", + "static/chunks/main.js", + "static/chunks/pages/_error.js" + ] }, "ampFirstPages": [] }; diff --git a/frontend/.next/server/middleware-react-loadable-manifest.js b/frontend/.next/server/middleware-react-loadable-manifest.js index ca34f09f..679c4feb 100644 --- a/frontend/.next/server/middleware-react-loadable-manifest.js +++ b/frontend/.next/server/middleware-react-loadable-manifest.js @@ -1 +1 @@ -self.__REACT_LOADABLE_MANIFEST="{}" \ No newline at end of file +self.__REACT_LOADABLE_MANIFEST="{\"app/(auth)/sectors/page.tsx -> echarts\":{\"id\":\"app/(auth)/sectors/page.tsx -> echarts\",\"files\":[\"static/chunks/_app-pages-browser_node_modules_echarts_index_js.js\"]},\"components/capital-flow.tsx -> echarts\":{\"id\":\"components/capital-flow.tsx -> echarts\",\"files\":[\"static/chunks/_app-pages-browser_node_modules_echarts_index_js.js\"]},\"components/kline-chart.tsx -> echarts\":{\"id\":\"components/kline-chart.tsx -> echarts\",\"files\":[\"static/chunks/_app-pages-browser_node_modules_echarts_index_js.js\"]}}" \ No newline at end of file diff --git a/frontend/.next/server/pages-manifest.json b/frontend/.next/server/pages-manifest.json index 9e26dfee..a679766a 100644 --- a/frontend/.next/server/pages-manifest.json +++ b/frontend/.next/server/pages-manifest.json @@ -1 +1,5 @@ -{} \ No newline at end of file +{ + "/_app": "pages/_app.js", + "/_error": "pages/_error.js", + "/_document": "pages/_document.js" +} \ No newline at end of file diff --git a/frontend/.next/server/server-reference-manifest.json b/frontend/.next/server/server-reference-manifest.json index cbf0b4da..c5c83141 100644 --- a/frontend/.next/server/server-reference-manifest.json +++ b/frontend/.next/server/server-reference-manifest.json @@ -1,5 +1,5 @@ { "node": {}, "edge": {}, - "encryptionKey": "f4eykmt9lLjeIDNHjaA0ZKJupk05dXT0k2cBaExPwP8=" + "encryptionKey": "5a77t1jXySke+j0Es8vduY/7S7yObSbYfKeh0OReITs=" } \ No newline at end of file diff --git a/frontend/src/app/(auth)/chat/page.tsx b/frontend/src/app/(auth)/chat/page.tsx index af32c45a..c9305013 100644 --- a/frontend/src/app/(auth)/chat/page.tsx +++ b/frontend/src/app/(auth)/chat/page.tsx @@ -1,9 +1,9 @@ "use client"; -import { useState, useRef, useEffect } from "react"; +import { useEffect, useRef, useState } from "react"; import { useTheme } from "next-themes"; -import { streamChat, type ChatMessage } from "@/lib/api"; import { formatMarkdown } from "@/lib/markdown"; +import { streamChat, type ChatMessage } from "@/lib/api"; interface DisplayMessage { role: "user" | "assistant"; @@ -11,9 +11,25 @@ interface DisplayMessage { } const QUICK_QUESTIONS = [ - "今日市场怎么样?", - "有哪些推荐股票?", - "哪些板块最热门?", + "结合今日作战结论,告诉我今天应该重点看什么。", + "把当前推荐池分成可操作、重点关注和仅观察三层讲给我。", + "看看我的自选股里哪些需要明天优先盯盘。", + "如果今天只允许做一个方向,你建议我盯哪个主线,为什么?", +]; + +const CHAT_SCENES = [ + { + title: "问今日打法", + description: "把今日结论翻译成人话,说明现在该进攻、试错还是防守。", + }, + { + title: "问推荐池", + description: "追问某只推荐股为什么进池、什么条件下能看、什么条件下放弃。", + }, + { + title: "问自选股", + description: "围绕你自己的观察池、候选池和持仓池做连续追问。", + }, ]; export default function ChatPage() { @@ -37,42 +53,41 @@ export default function ChatPage() { }, [messages, status]); const sendMessage = async (text: string) => { - if (!text.trim() || streaming) return; + const content = text.trim(); + if (!content || streaming) return; - const userMsg: DisplayMessage = { role: "user", content: text.trim() }; + const userMsg: DisplayMessage = { role: "user", content }; const newMessages = [...messages, userMsg]; - setMessages(newMessages); + + setMessages([...newMessages, { role: "assistant", content: "" }]); setInput(""); setStreaming(true); setStatus(""); - // Add empty assistant message for streaming - setMessages([...newMessages, { role: "assistant", content: "" }]); - try { - const chatMessages: ChatMessage[] = newMessages.map((m) => ({ - role: m.role, - content: m.content, + const chatMessages: ChatMessage[] = newMessages.map((message) => ({ + role: message.role, + content: message.content, })); let fullContent = ""; for await (const event of streamChat(chatMessages)) { if (event.type === "status") { setStatus(event.content); - } else if (event.type === "content") { - fullContent += event.content; - setMessages([ - ...newMessages, - { role: "assistant", content: fullContent }, - ]); - setStatus(""); + continue; } + + fullContent += event.content; + setMessages([ + ...newMessages, + { role: "assistant", content: fullContent }, + ]); } - } catch (e) { - console.error("Chat error:", e); + } catch (error) { + console.error("Chat error:", error); setMessages([ ...newMessages, - { role: "assistant", content: "连接失败,请检查网络后重试。" }, + { role: "assistant", content: "连接失败,暂时无法读取作战数据,请稍后重试。" }, ]); } finally { setStreaming(false); @@ -80,140 +95,177 @@ export default function ChatPage() { } }; - const handleKeyDown = (e: React.KeyboardEvent) => { - if (e.key === "Enter" && !e.shiftKey) { - e.preventDefault(); + const handleKeyDown = (event: React.KeyboardEvent) => { + if (event.key === "Enter" && !event.shiftKey) { + event.preventDefault(); sendMessage(input); } }; return ( -
- {/* Header */} -
-
-
- - - -
-
-

AI 投资顾问

-

- 基于实时市场数据的智能问答 -

-
-
- {messages.length > 0 && ( - - )} -
- - {/* Messages */} -
- {messages.length === 0 ? ( -
-
- - - +
+
+ + +
+
+
+
+ + + +
+
+

围绕作战结果继续追问

+

+ 今日作战 / 推荐池 / 自选股 / 主线板块 +

+
+
+ + {messages.length > 0 ? ( + + ) : null} +
+ +
+ {messages.length === 0 ? ( +
+
+
+ + + +
+

先用它来拆解系统已经给出的结论

+

+ 这不是泛用 AI 问答框。更好的用法是直接追问今天该怎么打、推荐池为什么这样分层、你的自选股哪些该提级或降级。 +

+
+ +
+ {QUICK_QUESTIONS.map((question) => ( + + ))}
- ))} - {/* Status indicator during tool calls */} - {streaming && status && messages[messages.length - 1]?.content && ( -
-
- - {status} -
+ ) : ( +
+ {messages.map((message, index) => ( +
+
+ {message.role === "assistant" ? ( + message.content ? ( +
+ ) : ( + {status || "读取作战上下文中..."} + ) + ) : ( + {message.content} + )} + {streaming && index === messages.length - 1 && message.role === "assistant" && message.content ? ( + + ) : null} +
+
+ ))} + + {streaming && status && messages[messages.length - 1]?.content ? ( +
+
+ + {status} +
+
+ ) : null}
)} - - )} -
+
- {/* Input */} -
-
-