crypto.ai/cryptoai/models/user.py
2025-05-24 15:33:38 +08:00

302 lines
9.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
from typing import Dict, Any, List, Optional
from datetime import datetime
from fastapi import HTTPException, status
from sqlalchemy import Column, Integer, String, DateTime, Index
from sqlalchemy.orm import relationship
from cryptoai.models.base import Base, logger
# 定义用户数据模型
class User(Base):
"""用户数据表模型"""
__tablename__ = 'users'
id = Column(Integer, primary_key=True, autoincrement=True)
mail = Column(String(100), nullable=False, unique=True, comment='邮箱')
nickname = Column(String(50), nullable=False, comment='昵称')
password = Column(String(100), nullable=False, comment='密码')
level = Column(Integer, nullable=False, default=0, comment='用户级别(0=普通用户,1=VIP,2=SVIP)')
points = Column(Integer, nullable=False, default=0, comment='用户积分')
create_time = Column(DateTime, nullable=False, default=datetime.now, comment='创建时间')
# 关系
questions = relationship("UserQuestion", back_populates="user")
analysis_histories = relationship("AnalysisHistory", back_populates="user")
# 索引和表属性
__table_args__ = (
Index('idx_mail', 'mail'),
Index('idx_level', 'level'),
Index('idx_create_time', 'create_time'),
{'mysql_charset': 'utf8mb4', 'mysql_collate': 'utf8mb4_unicode_ci'}
)
class UserManager:
"""用户管理类"""
def __init__(self, db_session):
self.session = db_session
def register_user(self, mail: str, nickname: str, password: str, level: int = 0, points: int = 0) -> bool:
"""
注册新用户
Args:
mail: 邮箱
nickname: 昵称
password: 密码
level: 用户级别默认为0普通用户
points: 初始积分默认为0
Returns:
注册是否成功
"""
try:
# 检查邮箱是否已存在
existing_user = self.session.query(User).filter(User.mail == mail).first()
if existing_user:
logger.warning(f"邮箱 {mail} 已被注册")
return False
# 创建新用户
new_user = User(
mail=mail,
nickname=nickname,
password=password, # 实际应用中应该对密码进行哈希处理
level=level,
points=points,
create_time=datetime.now()
)
# 添加并提交
self.session.add(new_user)
self.session.commit()
logger.info(f"成功注册用户: {mail},初始积分: {points}")
return True
except Exception as e:
self.session.rollback()
logger.error(f"注册用户失败: {e}")
return False
def get_user_count(self) -> int:
"""
获取用户数量
"""
try:
# 查询用户数量
user_count = self.session.query(User).count()
return user_count
except Exception as e:
logger.error(f"获取用户数量失败: {e}")
return 0
def get_user_by_id(self, user_id: int) -> Optional[Dict[str, Any]]:
"""
通过ID获取用户信息
Args:
user_id: 用户ID
Returns:
用户信息如果用户不存在则返回None
"""
try:
# 查询用户
user = self.session.query(User).filter(User.id == user_id).first()
if user:
# 转换为字典
return {
'id': user.id,
'mail': user.mail,
'nickname': user.nickname,
'level': user.level,
'points': user.points,
'create_time': user.create_time
}
else:
return None
except Exception as e:
logger.error(f"获取用户信息失败: {e}")
return None
def get_user_by_mail_and_password(self, mail: str, password: str) -> Optional[Dict[str, Any]]:
"""
登录
"""
user = self.session.query(User).filter(User.mail == mail).first()
if not user:
return None
if user.password != password:
return None
return {'id': user.id, 'mail': user.mail, 'nickname': user.nickname, 'level': user.level, 'points': user.points, 'create_time': user.create_time}
def get_user_by_mail(self, mail: str) -> Optional[Dict[str, Any]]:
"""
通过邮箱获取用户信息
Args:
mail: 邮箱
Returns:
用户信息如果用户不存在则返回None
"""
try:
# 查询用户
user = self.session.query(User).filter(User.mail == mail).first()
if user:
# 转换为字典
return {
'id': user.id,
'mail': user.mail,
'nickname': user.nickname,
'level': user.level,
'points': user.points,
'create_time': user.create_time
}
else:
return None
except Exception as e:
logger.error(f"获取用户信息失败: {e}")
return None
def update_password(self, mail: str, password: str) -> bool:
"""
更新用户密码
"""
try:
user = self.session.query(User).filter(User.mail == mail).first()
if not user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="用户不存在"
)
user.password = password
self.session.commit()
return True
except Exception as e:
logger.error(f"更新用户密码失败: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"更新用户密码失败: {e}"
)
def update_user_level(self, user_id: int, level: int) -> bool:
"""
更新用户级别
Args:
user_id: 用户ID
level: 新的用户级别
Returns:
更新是否成功
"""
try:
# 查询用户
user = self.session.query(User).filter(User.id == user_id).first()
if not user:
logger.warning(f"用户ID {user_id} 不存在")
return False
# 更新级别
user.level = level
self.session.commit()
logger.info(f"成功更新用户 {user.mail} 的级别为 {level}")
return True
except Exception as e:
self.session.rollback()
logger.error(f"更新用户级别失败: {e}")
return False
def add_user_points(self, user_id: int, points: int) -> bool:
"""
为用户增加积分
Args:
user_id: 用户ID
points: 增加的积分数量(正数)
Returns:
操作是否成功
"""
if points <= 0:
logger.warning(f"增加的积分必须是正数: {points}")
return False
try:
# 查询用户
user = self.session.query(User).filter(User.id == user_id).first()
if not user:
logger.warning(f"用户ID {user_id} 不存在")
return False
# 增加积分
user.points += points
self.session.commit()
logger.info(f"成功为用户 {user.mail} 增加 {points} 积分,当前积分: {user.points}")
return True
except Exception as e:
self.session.rollback()
logger.error(f"增加用户积分失败: {e}")
return False
def consume_user_points(self, user_id: int, points: int) -> bool:
"""
用户消费积分
Args:
user_id: 用户ID
points: 消费的积分数量(正数)
Returns:
操作是否成功
"""
if points <= 0:
logger.warning(f"消费的积分必须是正数: {points}")
return False
try:
# 查询用户
user = self.session.query(User).filter(User.id == user_id).first()
if not user:
logger.warning(f"用户ID {user_id} 不存在")
return False
# 检查积分是否足够
if user.points < points:
logger.warning(f"用户 {user.mail} 积分不足,当前积分: {user.points},需要消费: {points}")
return False
# 消费积分
user.points -= points
self.session.commit()
logger.info(f"成功从用户 {user.mail} 消费 {points} 积分,剩余积分: {user.points}")
return True
except Exception as e:
self.session.rollback()
logger.error(f"消费用户积分失败: {e}")
return False