#!/usr/bin/env python # -*- coding: utf-8 -*- from typing import Dict, Any, List, Optional from datetime import datetime from fastapi import HTTPException, status from sqlalchemy import Column, Integer, String, DateTime, Float, Index from sqlalchemy.orm import Session from cryptoai.models.base import Base, logger # 定义订阅订单数据模型 class SubscriptionOrder(Base): """订阅订单数据表模型""" __tablename__ = 'subscription_orders' id = Column(Integer, primary_key=True, autoincrement=True) user_id = Column(Integer, nullable=False, comment='用户ID') order_id = Column(String(100), nullable=False, unique=True, comment='订单ID') payment_order_id = Column(String(100), nullable=False, comment='支付订单ID') amount = Column(Float, nullable=False, comment='金额') member_type = Column(Integer, nullable=False, comment='会员类型(1=VIP,2=SVIP)') currency = Column(String(10), nullable=False, comment='货币类型') time_type = Column(Integer, nullable=False, comment='时间类型(1=包月,2=包年)') status = Column(Integer, nullable=False, default=1, comment='订单状态(1=待支付,2=已完成)') create_time = Column(DateTime, nullable=False, default=datetime.now, comment='创建时间') # 索引和表属性 __table_args__ = ( Index('idx_user_id', 'user_id'), Index('idx_order_id', 'order_id'), Index('idx_payment_order_id', 'payment_order_id'), Index('idx_member_type', 'member_type'), Index('idx_status', 'status'), Index('idx_create_time', 'create_time'), {'mysql_charset': 'utf8mb4', 'mysql_collate': 'utf8mb4_unicode_ci'} ) class SubscriptionOrderManager: """订阅订单管理类""" def __init__(self, session: Session = None): self.session = session def create_order(self, user_id: int, order_id: str, payment_order_id: str, amount: float, member_type: int, currency: str, time_type: int, status: int = 1) -> bool: """ 创建订阅订单 Args: user_id: 用户ID order_id: 订单ID payment_order_id: 支付订单ID amount: 金额 member_type: 会员类型(1=VIP,2=SVIP) currency: 货币类型 time_type: 时间类型(1=包月,2=包年) status: 订单状态(1=待支付,2=已完成),默认为1 Returns: 创建是否成功 """ try: # 验证参数 if member_type not in [1, 2]: logger.warning(f"无效的会员类型: {member_type}") return False if time_type not in [1, 2]: logger.warning(f"无效的时间类型: {time_type}") return False if status not in [1, 2]: logger.warning(f"无效的订单状态: {status}") return False if amount <= 0: logger.warning(f"无效的金额: {amount}") return False # 检查订单ID是否已存在 existing_order = self.session.query(SubscriptionOrder).filter( SubscriptionOrder.order_id == order_id ).first() if existing_order: logger.warning(f"订单ID {order_id} 已存在") return False # 创建新订单 new_order = SubscriptionOrder( user_id=user_id, order_id=order_id, payment_order_id=payment_order_id, amount=amount, member_type=member_type, currency=currency, time_type=time_type, status=status, create_time=datetime.now() ) # 添加并提交 self.session.add(new_order) self.session.commit() logger.info(f"成功创建订阅订单: {order_id},用户: {user_id}") return True except Exception as e: self.session.rollback() logger.error(f"创建订阅订单失败: {e}") return False def get_order_by_id(self, order_id: str) -> Optional[Dict[str, Any]]: """ 通过订单ID获取订单信息 Args: order_id: 订单ID Returns: 订单信息,如果订单不存在则返回None """ try: # 查询订单 order = self.session.query(SubscriptionOrder).filter( SubscriptionOrder.order_id == order_id ).first() if order: # 转换为字典 return { 'id': order.id, 'user_id': order.user_id, 'order_id': order.order_id, 'payment_order_id': order.payment_order_id, 'amount': order.amount, 'member_type': order.member_type, 'currency': order.currency, 'time_type': order.time_type, 'status': order.status, 'create_time': order.create_time } else: return None except Exception as e: logger.error(f"获取订单信息失败: {e}") return None def get_orders_by_user_id(self, user_id: int) -> List[Dict[str, Any]]: """ 通过用户ID获取用户的所有订单 Args: user_id: 用户ID Returns: 订单列表 """ try: # 查询用户的所有订单 orders = self.session.query(SubscriptionOrder).filter( SubscriptionOrder.user_id == user_id ).order_by(SubscriptionOrder.create_time.desc()).all() # 转换为字典列表 order_list = [] for order in orders: order_list.append({ 'id': order.id, 'user_id': order.user_id, 'order_id': order.order_id, 'payment_order_id': order.payment_order_id, 'amount': order.amount, 'member_type': order.member_type, 'currency': order.currency, 'time_type': order.time_type, 'status': order.status, 'create_time': order.create_time }) return order_list except Exception as e: logger.error(f"获取用户订单列表失败: {e}") return [] def get_order_count(self) -> int: """ 获取订单总数 """ try: # 查询订单数量 order_count = self.session.query(SubscriptionOrder).count() return order_count except Exception as e: logger.error(f"获取订单数量失败: {e}") return 0 def update_order_status(self, order_id: str, status: int) -> bool: """ 更新订单状态 Args: order_id: 订单ID status: 新的订单状态(1=待支付,2=已完成) Returns: 更新是否成功 """ try: # 验证状态值 if status not in [1, 2]: logger.warning(f"无效的订单状态: {status}") return False # 查询订单 order = self.session.query(SubscriptionOrder).filter( SubscriptionOrder.order_id == order_id ).first() if not order: logger.warning(f"订单ID {order_id} 不存在") return False # 更新状态 order.status = status self.session.commit() status_name = "待支付" if status == 1 else "已完成" logger.info(f"成功更新订单 {order_id} 状态为: {status_name}") return True except Exception as e: self.session.rollback() logger.error(f"更新订单状态失败: {e}") return False def get_orders_by_status(self, status: int) -> List[Dict[str, Any]]: """ 根据状态获取订单列表 Args: status: 订单状态(1=待支付,2=已完成) Returns: 订单列表 """ try: # 验证状态值 if status not in [1, 2]: logger.warning(f"无效的订单状态: {status}") return [] # 查询指定状态的订单 orders = self.session.query(SubscriptionOrder).filter( SubscriptionOrder.status == status ).order_by(SubscriptionOrder.create_time.desc()).all() # 转换为字典列表 order_list = [] for order in orders: order_list.append({ 'id': order.id, 'user_id': order.user_id, 'order_id': order.order_id, 'payment_order_id': order.payment_order_id, 'amount': order.amount, 'member_type': order.member_type, 'currency': order.currency, 'time_type': order.time_type, 'status': order.status, 'create_time': order.create_time }) return order_list except Exception as e: logger.error(f"获取订单列表失败: {e}") return []