alphax/app/web/routes_pages.py
2026-05-18 10:47:07 +08:00

190 lines
8.3 KiB
Python

from pathlib import Path
from fastapi import APIRouter, Cookie, HTTPException, Request
from fastapi.responses import HTMLResponse
from app.db import auth_db
from app.web.shared import require_admin, require_page_user
def build_router(templates, repo_root: Path, stock_report_template: str):
router = APIRouter()
def render_page(template_name: str, request: Request, active_nav: str = "", **kwargs):
try:
user = auth_db.get_user_by_session_token(request.cookies.get("altcoin_session", ""))
if user:
auth_db.log_user_activity(
user["id"],
"page_view",
template_name.replace(".html", ""),
ip=request.client.host if request.client else "",
)
except Exception:
pass
nav = active_nav or template_name.replace(".html", "").replace("-", "_")
return templates.TemplateResponse(request=request, name=template_name, context={"show_nav": True, "active_nav": nav, **kwargs})
@router.get("/", response_class=HTMLResponse)
async def index():
with open(repo_root / "static" / "index.html", "r", encoding="utf-8") as f:
return HTMLResponse(content=f.read())
@router.get("/auth", response_class=HTMLResponse)
async def auth_page():
with open(repo_root / "static" / "auth.html", "r", encoding="utf-8") as f:
return HTMLResponse(content=f.read())
@router.get("/watchlist", response_class=HTMLResponse)
async def watchlist_page(request: Request):
user, redirect = require_page_user(request)
if redirect:
return redirect
return render_page("watchlist.html", request, active_nav="watchlist")
@router.get("/pipeline", response_class=HTMLResponse)
async def pipeline_page(request: Request):
user, redirect = require_page_user(request)
if redirect:
return redirect
return render_page("pipeline.html", request, active_nav="pipeline")
@router.get("/chat", response_class=HTMLResponse)
async def chat_page(request: Request):
user, redirect = require_page_user(request)
if redirect:
return redirect
return render_page("chat.html", request, active_nav="chat")
@router.get("/llm-insights", response_class=HTMLResponse)
async def llm_insights_page(request: Request):
user, redirect = require_page_user(request)
if redirect:
return redirect
return render_page("llm_insights.html", request, active_nav="llm_insights")
@router.get("/cron", response_class=HTMLResponse)
async def cron_page(request: Request):
user, redirect = require_page_user(request)
if redirect:
return redirect
try:
require_admin(request.cookies.get("altcoin_session", ""))
except HTTPException as exc:
return HTMLResponse(content=f"<meta charset=utf-8><h2>需要管理员权限</h2><p>{exc.detail}</p><a href=/app>返回看板</a>", status_code=exc.status_code)
return render_page("cron.html", request, active_nav="cron")
@router.get("/config", response_class=HTMLResponse)
async def config_page(request: Request):
user, redirect = require_page_user(request)
if redirect:
return redirect
try:
require_admin(request.cookies.get("altcoin_session", ""))
except HTTPException as exc:
return HTMLResponse(content=f"<meta charset=utf-8><h2>需要管理员权限</h2><p>{exc.detail}</p><a href=/app>返回看板</a>", status_code=exc.status_code)
return render_page("config.html", request, active_nav="config")
@router.get("/system-logs", response_class=HTMLResponse)
async def system_logs_page(request: Request):
user, redirect = require_page_user(request)
if redirect:
return redirect
try:
require_admin(request.cookies.get("altcoin_session", ""))
except HTTPException as exc:
return HTMLResponse(content=f"<meta charset=utf-8><h2>需要管理员权限</h2><p>{exc.detail}</p><a href=/app>返回看板</a>", status_code=exc.status_code)
return render_page("system_logs.html", request, active_nav="system_logs")
@router.get("/chat-logs", response_class=HTMLResponse)
async def chat_logs_page(request: Request):
user, redirect = require_page_user(request)
if redirect:
return redirect
try:
require_admin(request.cookies.get("altcoin_session", ""))
except HTTPException as exc:
return HTMLResponse(content=f"<meta charset=utf-8><h2>需要管理员权限</h2><p>{exc.detail}</p><a href=/app>返回看板</a>", status_code=exc.status_code)
return render_page("chat_logs.html", request, active_nav="chat_logs")
@router.get("/paper-trading", response_class=HTMLResponse)
async def paper_trading_page(request: Request):
user, redirect = require_page_user(request)
if redirect:
return redirect
try:
require_admin(request.cookies.get("altcoin_session", ""))
except HTTPException as exc:
return HTMLResponse(content=f"<meta charset=utf-8><h2>需要管理员权限</h2><p>{exc.detail}</p><a href=/app>返回看板</a>", status_code=exc.status_code)
return render_page("paper_trading.html", request, active_nav="paper_trading")
@router.get("/strategy", response_class=HTMLResponse)
async def strategy_page(request: Request):
user, redirect = require_page_user(request)
if redirect:
return redirect
return render_page("strategy.html", request, active_nav="strategy")
@router.get("/subscription", response_class=HTMLResponse)
async def subscription_page(request: Request):
user, redirect = require_page_user(request, require_subscription=False)
if redirect:
return redirect
return render_page("subscription.html", request, active_nav="subscription")
@router.get("/referral", response_class=HTMLResponse)
async def referral_page(request: Request, altcoin_session: str = Cookie(default="")):
user, redirect = require_page_user(request)
if redirect:
return redirect
return render_page("referral.html", request, active_nav="referral")
@router.get("/app", response_class=HTMLResponse)
async def app_page(altcoin_session: str = Cookie(default=""), request: Request = None):
user, redirect = require_page_user(request)
if redirect:
return redirect
try:
auth_db.log_user_activity(user["id"], "page_view", "app", ip=request.client.host if request and request.client else "")
except Exception:
pass
resp = templates.TemplateResponse(request=request, name="app.html", context={"show_nav": True, "active_nav": "app"})
resp.headers["Cache-Control"] = "no-cache, no-store, must-revalidate"
resp.headers["Pragma"] = "no-cache"
resp.headers["Expires"] = "0"
return resp
@router.get("/market", response_class=HTMLResponse)
async def market_page(request: Request):
user, redirect = require_page_user(request)
if redirect:
return redirect
return render_page("market.html", request, active_nav="market")
@router.get("/sentiment", response_class=HTMLResponse)
async def sentiment_page(request: Request):
user, redirect = require_page_user(request)
if redirect:
return redirect
return render_page("sentiment.html", request, active_nav="sentiment")
@router.get("/onchain", response_class=HTMLResponse)
async def onchain_page(request: Request):
user, redirect = require_page_user(request)
if redirect:
return redirect
return render_page("onchain.html", request, active_nav="onchain")
@router.get("/iteration", response_class=HTMLResponse)
async def iteration_page(request: Request):
user, redirect = require_page_user(request)
if redirect:
return redirect
return render_page("iteration.html", request, active_nav="iteration")
@router.get("/stock-report", response_class=HTMLResponse)
async def stock_report_page():
return HTMLResponse(content=stock_report_template)
return router