stock-ai-agent/backend/app/api/admin.py
2026-02-04 21:28:38 +08:00

163 lines
4.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
后台管理API
"""
from fastapi import APIRouter, HTTPException, Depends, Request
from typing import Optional
from app.models.database import User
from app.services.db_service import db_service
from app.utils.logger import logger
router = APIRouter(prefix="/api/admin", tags=["后台管理"])
# 管理密码
ADMIN_PASSWORD = "223388"
def verify_admin_password(password: str):
"""验证管理密码"""
if password != ADMIN_PASSWORD:
raise HTTPException(status_code=403, detail="密码错误")
@router.post("/verify")
async def verify_password(password: str):
"""
验证管理密码
Args:
password: 管理密码
Returns:
验证结果
"""
try:
verify_admin_password(password)
return {"success": True, "message": "验证成功"}
except HTTPException:
return {"success": False, "message": "密码错误"}
@router.get("/users")
async def get_users(
password: str,
page: int = 1,
page_size: int = 20,
search: Optional[str] = None
):
"""
获取用户列表
Args:
password: 管理密码
page: 页码
page_size: 每页数量
search: 搜索关键词(手机号)
Returns:
用户列表
"""
try:
verify_admin_password(password)
db = db_service.get_session()
try:
# 构建查询
query = db.query(User)
# 搜索过滤
if search:
query = query.filter(User.phone.like(f"%{search}%"))
# 总数
total = query.count()
# 分页
users = query.order_by(User.created_at.desc()).offset(
(page - 1) * page_size
).limit(page_size).all()
# 格式化返回数据
user_list = []
for user in users:
user_list.append({
"id": user.id,
"phone": user.phone,
"created_at": user.created_at.isoformat() if user.created_at else None,
"last_login_at": user.last_login_at.isoformat() if user.last_login_at else None,
"is_active": user.is_active
})
return {
"success": True,
"data": {
"users": user_list,
"total": total,
"page": page,
"page_size": page_size,
"total_pages": (total + page_size - 1) // page_size
}
}
finally:
db.close()
except HTTPException as e:
raise e
except Exception as e:
logger.error(f"获取用户列表失败: {e}")
raise HTTPException(status_code=500, detail="获取用户列表失败")
@router.get("/stats")
async def get_stats(password: str):
"""
获取统计数据
Args:
password: 管理密码
Returns:
统计数据
"""
try:
verify_admin_password(password)
db = db_service.get_session()
try:
from app.models.database import Conversation, Message
from datetime import datetime, timedelta
# 用户总数
total_users = db.query(User).count()
# 活跃用户7天内登录
seven_days_ago = datetime.utcnow() - timedelta(days=7)
active_users = db.query(User).filter(
User.last_login_at >= seven_days_ago
).count()
# 对话总数
total_conversations = db.query(Conversation).count()
# 消息总数
total_messages = db.query(Message).count()
return {
"success": True,
"data": {
"total_users": total_users,
"active_users": active_users,
"total_conversations": total_conversations,
"total_messages": total_messages
}
}
finally:
db.close()
except HTTPException as e:
raise e
except Exception as e:
logger.error(f"获取统计数据失败: {e}")
raise HTTPException(status_code=500, detail="获取统计数据失败")