""" 后台管理API """ from fastapi import APIRouter, HTTPException, Depends, Request from typing import Optional from app.models.database import User from app.services.db_service import db_service from app.utils.logger import logger router = APIRouter(prefix="/api/admin", tags=["后台管理"]) # 管理密码 ADMIN_PASSWORD = "223388" def verify_admin_password(password: str): """验证管理密码""" if password != ADMIN_PASSWORD: raise HTTPException(status_code=403, detail="密码错误") @router.post("/verify") async def verify_password(password: str): """ 验证管理密码 Args: password: 管理密码 Returns: 验证结果 """ try: verify_admin_password(password) return {"success": True, "message": "验证成功"} except HTTPException: return {"success": False, "message": "密码错误"} @router.get("/users") async def get_users( password: str, page: int = 1, page_size: int = 20, search: Optional[str] = None ): """ 获取用户列表 Args: password: 管理密码 page: 页码 page_size: 每页数量 search: 搜索关键词(手机号) Returns: 用户列表 """ try: verify_admin_password(password) db = db_service.get_session() try: # 构建查询 query = db.query(User) # 搜索过滤 if search: query = query.filter(User.phone.like(f"%{search}%")) # 总数 total = query.count() # 分页 users = query.order_by(User.created_at.desc()).offset( (page - 1) * page_size ).limit(page_size).all() # 格式化返回数据 user_list = [] for user in users: user_list.append({ "id": user.id, "phone": user.phone, "created_at": user.created_at.isoformat() if user.created_at else None, "last_login_at": user.last_login_at.isoformat() if user.last_login_at else None, "is_active": user.is_active }) return { "success": True, "data": { "users": user_list, "total": total, "page": page, "page_size": page_size, "total_pages": (total + page_size - 1) // page_size } } finally: db.close() except HTTPException as e: raise e except Exception as e: logger.error(f"获取用户列表失败: {e}") raise HTTPException(status_code=500, detail="获取用户列表失败") @router.get("/stats") async def get_stats(password: str): """ 获取统计数据 Args: password: 管理密码 Returns: 统计数据 """ try: verify_admin_password(password) db = db_service.get_session() try: from app.models.database import Conversation, Message from datetime import datetime, timedelta # 用户总数 total_users = db.query(User).count() # 活跃用户(7天内登录) seven_days_ago = datetime.utcnow() - timedelta(days=7) active_users = db.query(User).filter( User.last_login_at >= seven_days_ago ).count() # 对话总数 total_conversations = db.query(Conversation).count() # 消息总数 total_messages = db.query(Message).count() return { "success": True, "data": { "total_users": total_users, "active_users": active_users, "total_conversations": total_conversations, "total_messages": total_messages } } finally: db.close() except HTTPException as e: raise e except Exception as e: logger.error(f"获取统计数据失败: {e}") raise HTTPException(status_code=500, detail="获取统计数据失败")