#!/usr/bin/env python # -*- coding: utf-8 -*- from typing import Dict, Any, List, Optional from datetime import datetime from sqlalchemy import Column, Integer, String, DateTime, Index from sqlalchemy.orm import relationship from cryptoai.models.base import Base, logger # 定义用户数据模型 class User(Base): """用户数据表模型""" __tablename__ = 'users' id = Column(Integer, primary_key=True, autoincrement=True) mail = Column(String(100), nullable=False, unique=True, comment='邮箱') nickname = Column(String(50), nullable=False, comment='昵称') password = Column(String(100), nullable=False, comment='密码') level = Column(Integer, nullable=False, default=0, comment='用户级别(0=普通用户,1=VIP,2=SVIP)') points = Column(Integer, nullable=False, default=0, comment='用户积分') create_time = Column(DateTime, nullable=False, default=datetime.now, comment='创建时间') # 关系 questions = relationship("UserQuestion", back_populates="user") analysis_histories = relationship("AnalysisHistory", back_populates="user") # 索引和表属性 __table_args__ = ( Index('idx_mail', 'mail'), Index('idx_level', 'level'), Index('idx_create_time', 'create_time'), {'mysql_charset': 'utf8mb4', 'mysql_collate': 'utf8mb4_unicode_ci'} ) class UserManager: """用户管理类""" def __init__(self, db_session): self.session = db_session def register_user(self, mail: str, nickname: str, password: str, level: int = 0, points: int = 0) -> bool: """ 注册新用户 Args: mail: 邮箱 nickname: 昵称 password: 密码 level: 用户级别,默认为0(普通用户) points: 初始积分,默认为0 Returns: 注册是否成功 """ try: # 检查邮箱是否已存在 existing_user = self.session.query(User).filter(User.mail == mail).first() if existing_user: logger.warning(f"邮箱 {mail} 已被注册") return False # 创建新用户 new_user = User( mail=mail, nickname=nickname, password=password, # 实际应用中应该对密码进行哈希处理 level=level, points=points, create_time=datetime.now() ) # 添加并提交 self.session.add(new_user) self.session.commit() logger.info(f"成功注册用户: {mail},初始积分: {points}") return True except Exception as e: self.session.rollback() logger.error(f"注册用户失败: {e}") return False def get_user_count(self) -> int: """ 获取用户数量 """ try: # 查询用户数量 user_count = self.session.query(User).count() return user_count except Exception as e: logger.error(f"获取用户数量失败: {e}") return 0 def get_user_by_id(self, user_id: int) -> Optional[Dict[str, Any]]: """ 通过ID获取用户信息 Args: user_id: 用户ID Returns: 用户信息,如果用户不存在则返回None """ try: # 查询用户 user = self.session.query(User).filter(User.id == user_id).first() if user: # 转换为字典 return { 'id': user.id, 'mail': user.mail, 'nickname': user.nickname, 'level': user.level, 'points': user.points, 'create_time': user.create_time } else: return None except Exception as e: logger.error(f"获取用户信息失败: {e}") return None def login(self, mail: str, password: str) -> Optional[Dict[str, Any]]: """ 登录 """ user = self.session.query(User).filter(User.mail == mail).first() if not user: return None if user.password != password: return None return {'id': user.id, 'mail': user.mail, 'nickname': user.nickname, 'level': user.level, 'points': user.points, 'create_time': user.create_time} def get_user_by_mail(self, mail: str) -> Optional[Dict[str, Any]]: """ 通过邮箱获取用户信息 Args: mail: 邮箱 Returns: 用户信息,如果用户不存在则返回None """ try: # 查询用户 user = self.session.query(User).filter(User.mail == mail).first() if user: # 转换为字典 return { 'id': user.id, 'mail': user.mail, 'nickname': user.nickname, 'level': user.level, 'points': user.points, 'create_time': user.create_time } else: return None except Exception as e: logger.error(f"获取用户信息失败: {e}") return None def update_user_level(self, user_id: int, level: int) -> bool: """ 更新用户级别 Args: user_id: 用户ID level: 新的用户级别 Returns: 更新是否成功 """ try: # 查询用户 user = self.session.query(User).filter(User.id == user_id).first() if not user: logger.warning(f"用户ID {user_id} 不存在") return False # 更新级别 user.level = level self.session.commit() logger.info(f"成功更新用户 {user.mail} 的级别为 {level}") return True except Exception as e: self.session.rollback() logger.error(f"更新用户级别失败: {e}") return False def add_user_points(self, user_id: int, points: int) -> bool: """ 为用户增加积分 Args: user_id: 用户ID points: 增加的积分数量(正数) Returns: 操作是否成功 """ if points <= 0: logger.warning(f"增加的积分必须是正数: {points}") return False try: # 查询用户 user = self.session.query(User).filter(User.id == user_id).first() if not user: logger.warning(f"用户ID {user_id} 不存在") return False # 增加积分 user.points += points self.session.commit() logger.info(f"成功为用户 {user.mail} 增加 {points} 积分,当前积分: {user.points}") return True except Exception as e: self.session.rollback() logger.error(f"增加用户积分失败: {e}") return False def consume_user_points(self, user_id: int, points: int) -> bool: """ 用户消费积分 Args: user_id: 用户ID points: 消费的积分数量(正数) Returns: 操作是否成功 """ if points <= 0: logger.warning(f"消费的积分必须是正数: {points}") return False try: # 查询用户 user = self.session.query(User).filter(User.id == user_id).first() if not user: logger.warning(f"用户ID {user_id} 不存在") return False # 检查积分是否足够 if user.points < points: logger.warning(f"用户 {user.mail} 积分不足,当前积分: {user.points},需要消费: {points}") return False # 消费积分 user.points -= points self.session.commit() logger.info(f"成功从用户 {user.mail} 消费 {points} 积分,剩余积分: {user.points}") return True except Exception as e: self.session.rollback() logger.error(f"消费用户积分失败: {e}") return False