astock-agent/backend/app/api/market.py
2026-04-28 15:06:35 +08:00

238 lines
9.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""市场概览 API"""
from datetime import datetime
from fastapi import APIRouter, Depends
from app.data.tushare_client import tushare_client
from app.data import tencent_client
from app.data.cache import cache
from app.data.market_breadth_client import get_market_breadth
from app.analysis.market_temp import build_realtime_market_temperature, calculate_market_temperature
from app.engine.recommender import get_latest_recommendations
from app.config import settings, is_trading_hours, is_market_session, should_prefer_realtime_today, today_trade_date
from app.core.deps import get_current_admin
router = APIRouter(prefix="/api/market", tags=["market"])
@router.get("/temperature")
async def get_temperature():
"""获取市场温度。
交易日 09:15 后优先做轻量实时计算,不触发完整扫描或 LLM。
"""
result = await get_latest_recommendations()
mt = result.get("market_temp")
realtime_used = False
if should_prefer_realtime_today(mt.trade_date if mt else None):
baseline = mt or calculate_market_temperature()
mt, realtime_used = await build_realtime_market_temperature(baseline)
breadth = await get_market_breadth() if realtime_used else None
if mt:
return {
"trade_date": mt.trade_date,
"temperature": mt.temperature,
"up_count": mt.up_count,
"down_count": mt.down_count,
"limit_up_count": mt.limit_up_count,
"limit_down_count": mt.limit_down_count,
"max_streak": mt.max_streak,
"broken_rate": mt.broken_rate,
"index_above_ma20": getattr(mt, "index_above_ma20", False),
"is_trading": is_trading_hours(),
"data_mode": "realtime_today" if realtime_used else "daily_snapshot",
"limit_counts_reliable": breadth.limit_counts_reliable if breadth else False,
}
return {
"trade_date": "",
"temperature": 0,
"up_count": 0,
"down_count": 0,
"limit_up_count": 0,
"limit_down_count": 0,
"max_streak": 0,
"broken_rate": 0,
"index_above_ma20": False,
"is_trading": is_trading_hours(),
}
@router.get("/overview")
async def get_overview():
"""市场概况:上证、深证、创业板指数
盘中用腾讯实时行情,盘后用 Tushare 日线(有缓存)。
"""
latest_trade_date = tushare_client.get_latest_trade_date()
if should_prefer_realtime_today(latest_trade_date):
return await _overview_realtime()
return _overview_daily()
@router.get("/strategy-board")
async def get_strategy_board():
"""获取今日市场作战面板(只读,不触发 LLM"""
cache_key = "market:strategy_board:rules"
cached = cache.get(cache_key)
if cached is not None:
return cached
from app.llm.strategy_board import build_strategy_board
result = await build_strategy_board(include_llm=False)
cache.set(cache_key, result, settings.cache_ttl_realtime)
return result
@router.get("/strategy-iteration")
async def get_strategy_iteration(limit: int = 50):
"""获取策略复盘迭代建议(只读,不触发 LLM"""
cache_key = f"market:strategy_iteration:{limit}:rules"
cached = cache.get(cache_key)
if cached is not None:
return cached
from app.llm.strategy_iteration import build_strategy_iteration_report
result = await build_strategy_iteration_report(limit=limit, include_llm=False)
cache.set(cache_key, result, settings.cache_ttl_realtime)
return result
@router.get("/ops-status")
async def get_ops_status():
"""管理员任务中心状态与数据新鲜度(只读,不触发扫描或 LLM"""
from sqlalchemy import text
from app.db.database import get_db
from app.engine.recommender import _scan_running
async with get_db() as db:
rec_row = (await db.execute(
text(
"SELECT created_at FROM recommendations "
"ORDER BY created_at DESC LIMIT 1"
)
)).fetchone()
tracking_row = (await db.execute(
text(
"SELECT track_date, created_at FROM recommendation_tracking "
"ORDER BY track_date DESC, id DESC LIMIT 1"
)
)).fetchone()
market_row = (await db.execute(
text(
"SELECT trade_date, created_at FROM market_temperature "
"ORDER BY REPLACE(trade_date, '-', '') DESC, id DESC LIMIT 1"
)
)).fetchone()
sector_row = (await db.execute(
text(
"SELECT trade_date, created_at FROM sector_heat "
"ORDER BY REPLACE(trade_date, '-', '') DESC, id DESC LIMIT 1"
)
)).fetchone()
def _fmt_dt(value):
return str(value or "")
latest_market_date = str(market_row._mapping["trade_date"]) if market_row else ""
latest_sector_date = str(sector_row._mapping["trade_date"]) if sector_row else ""
latest_tracking_date = str(tracking_row._mapping["track_date"]) if tracking_row else ""
today = today_trade_date()
sector_lagging = bool(latest_sector_date and latest_sector_date.replace("-", "") < today)
market_lagging = bool(latest_market_date and latest_market_date.replace("-", "") < today)
return {
"scan_running": _scan_running,
"scan_mode": "realtime_today" if should_prefer_realtime_today(latest_market_date) else "post_market",
"is_trading": is_trading_hours(),
"data_freshness": {
"market_trade_date": latest_market_date,
"sector_trade_date": latest_sector_date,
"tracking_trade_date": latest_tracking_date,
"last_recommendation_created_at": _fmt_dt(rec_row._mapping["created_at"]) if rec_row else "",
"last_tracking_created_at": _fmt_dt(tracking_row._mapping["created_at"]) if tracking_row else "",
"last_market_created_at": _fmt_dt(market_row._mapping["created_at"]) if market_row else "",
"last_sector_created_at": _fmt_dt(sector_row._mapping["created_at"]) if sector_row else "",
"status": "stale" if sector_lagging or market_lagging else "fresh" if latest_market_date else "empty",
"message": (
f"板块快照仍停留在 {latest_sector_date},展示层将优先使用今日实时板块榜。"
if sector_lagging else
f"最新市场日期 {latest_market_date},最近跟踪 {latest_tracking_date or '暂无'}"
if latest_market_date else
"暂无市场缓存数据,请由管理员触发扫描。"
),
"generated_at": datetime.now().isoformat(),
},
"actions": [
{"key": "refresh", "label": "立即扫描", "admin_only": True},
{"key": "update_tracking", "label": "更新跟踪", "admin_only": True},
{"key": "generate_strategy_board", "label": "生成策略板", "admin_only": True},
{"key": "generate_strategy_iteration", "label": "生成策略复盘", "admin_only": True},
],
}
@router.post("/generate-strategy-board")
async def generate_strategy_board(_admin: dict = Depends(get_current_admin)):
"""管理员手动生成带 LLM 说明的策略看板"""
from app.llm.strategy_board import build_strategy_board
result = await build_strategy_board(include_llm=True)
cache.delete("market:strategy_board:rules")
return result
@router.post("/generate-strategy-iteration")
async def generate_strategy_iteration(limit: int = 50, _admin: dict = Depends(get_current_admin)):
"""管理员手动生成带 LLM 分析的策略复盘"""
from app.llm.strategy_iteration import build_strategy_iteration_report
result = await build_strategy_iteration_report(limit=limit, include_llm=True)
cache.delete(f"market:strategy_iteration:{limit}:rules")
return result
async def _overview_realtime():
"""盘中:腾讯实时指数行情"""
index_data = await tencent_client.get_index_realtime()
result = []
name_map = {
"000001.SH": "上证指数",
"399001.SZ": "深证成指",
"399006.SZ": "创业板指",
}
for code in ["000001.SH", "399001.SZ", "399006.SZ"]:
data = index_data.get(code)
if not data:
continue
result.append({
"name": name_map.get(code, data.get("name", code)),
"code": code,
"close": round(data["price"], 2),
"pct_chg": round(data["pct_chg"], 2),
"volume": round(data["volume"], 2),
"realtime": True,
})
return result
def _overview_daily():
"""盘后Tushare 日线数据"""
indices = {
"上证指数": "000001.SH",
"深证成指": "399001.SZ",
"创业板指": "399006.SZ",
}
result = []
for name, code in indices.items():
df = tushare_client.get_index_daily(code, days=5)
if df.empty:
continue
df = df.sort_values("trade_date")
latest = df.iloc[-1]
prev = df.iloc[-2] if len(df) > 1 else latest
pct = (latest["close"] - prev["close"]) / prev["close"] * 100
result.append({
"name": name,
"code": code,
"close": round(float(latest["close"]), 2),
"pct_chg": round(pct, 2),
"volume": round(float(latest["vol"]), 2),
"realtime": False,
})
return result