crypto.ai/cryptoai/models/subscription_order.py
2025-06-11 19:45:53 +08:00

281 lines
9.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
from typing import Dict, Any, List, Optional
from datetime import datetime
from fastapi import HTTPException, status
from sqlalchemy import Column, Integer, String, DateTime, Float, Index
from sqlalchemy.orm import Session
from cryptoai.models.base import Base, logger
# 定义订阅订单数据模型
class SubscriptionOrder(Base):
"""订阅订单数据表模型"""
__tablename__ = 'subscription_orders'
id = Column(Integer, primary_key=True, autoincrement=True)
user_id = Column(Integer, nullable=False, comment='用户ID')
order_id = Column(String(100), nullable=False, unique=True, comment='订单ID')
payment_order_id = Column(String(100), nullable=False, comment='支付订单ID')
amount = Column(Float, nullable=False, comment='金额')
member_type = Column(Integer, nullable=False, comment='会员类型(1=VIP2=SVIP)')
currency = Column(String(10), nullable=False, comment='货币类型')
time_type = Column(Integer, nullable=False, comment='时间类型(1=包月2=包年)')
status = Column(Integer, nullable=False, default=1, comment='订单状态(1=待支付2=已完成)')
create_time = Column(DateTime, nullable=False, default=datetime.now, comment='创建时间')
# 索引和表属性
__table_args__ = (
Index('idx_user_id', 'user_id'),
Index('idx_order_id', 'order_id'),
Index('idx_payment_order_id', 'payment_order_id'),
Index('idx_member_type', 'member_type'),
Index('idx_status', 'status'),
Index('idx_create_time', 'create_time'),
{'mysql_charset': 'utf8mb4', 'mysql_collate': 'utf8mb4_unicode_ci'}
)
class SubscriptionOrderManager:
"""订阅订单管理类"""
def __init__(self, session: Session = None):
self.session = session
def create_order(self, user_id: int, order_id: str, payment_order_id: str,
amount: float, member_type: int, currency: str, time_type: int, status: int = 1) -> bool:
"""
创建订阅订单
Args:
user_id: 用户ID
order_id: 订单ID
payment_order_id: 支付订单ID
amount: 金额
member_type: 会员类型(1=VIP2=SVIP)
currency: 货币类型
time_type: 时间类型(1=包月2=包年)
status: 订单状态(1=待支付2=已完成)默认为1
Returns:
创建是否成功
"""
try:
# 验证参数
if member_type not in [1, 2]:
logger.warning(f"无效的会员类型: {member_type}")
return False
if time_type not in [1, 2]:
logger.warning(f"无效的时间类型: {time_type}")
return False
if status not in [1, 2]:
logger.warning(f"无效的订单状态: {status}")
return False
if amount <= 0:
logger.warning(f"无效的金额: {amount}")
return False
# 检查订单ID是否已存在
existing_order = self.session.query(SubscriptionOrder).filter(
SubscriptionOrder.order_id == order_id
).first()
if existing_order:
logger.warning(f"订单ID {order_id} 已存在")
return False
# 创建新订单
new_order = SubscriptionOrder(
user_id=user_id,
order_id=order_id,
payment_order_id=payment_order_id,
amount=amount,
member_type=member_type,
currency=currency,
time_type=time_type,
status=status,
create_time=datetime.now()
)
# 添加并提交
self.session.add(new_order)
self.session.commit()
logger.info(f"成功创建订阅订单: {order_id},用户: {user_id}")
return True
except Exception as e:
self.session.rollback()
logger.error(f"创建订阅订单失败: {e}")
return False
def get_order_by_id(self, order_id: str) -> Optional[Dict[str, Any]]:
"""
通过订单ID获取订单信息
Args:
order_id: 订单ID
Returns:
订单信息如果订单不存在则返回None
"""
try:
# 查询订单
order = self.session.query(SubscriptionOrder).filter(
SubscriptionOrder.order_id == order_id
).first()
if order:
# 转换为字典
return {
'id': order.id,
'user_id': order.user_id,
'order_id': order.order_id,
'payment_order_id': order.payment_order_id,
'amount': order.amount,
'member_type': order.member_type,
'currency': order.currency,
'time_type': order.time_type,
'status': order.status,
'create_time': order.create_time
}
else:
return None
except Exception as e:
logger.error(f"获取订单信息失败: {e}")
return None
def get_orders_by_user_id(self, user_id: int) -> List[Dict[str, Any]]:
"""
通过用户ID获取用户的所有订单
Args:
user_id: 用户ID
Returns:
订单列表
"""
try:
# 查询用户的所有订单
orders = self.session.query(SubscriptionOrder).filter(
SubscriptionOrder.user_id == user_id
).order_by(SubscriptionOrder.create_time.desc()).all()
# 转换为字典列表
order_list = []
for order in orders:
order_list.append({
'id': order.id,
'user_id': order.user_id,
'order_id': order.order_id,
'payment_order_id': order.payment_order_id,
'amount': order.amount,
'member_type': order.member_type,
'currency': order.currency,
'time_type': order.time_type,
'status': order.status,
'create_time': order.create_time
})
return order_list
except Exception as e:
logger.error(f"获取用户订单列表失败: {e}")
return []
def get_order_count(self) -> int:
"""
获取订单总数
"""
try:
# 查询订单数量
order_count = self.session.query(SubscriptionOrder).count()
return order_count
except Exception as e:
logger.error(f"获取订单数量失败: {e}")
return 0
def update_order_status(self, order_id: str, status: int) -> bool:
"""
更新订单状态
Args:
order_id: 订单ID
status: 新的订单状态(1=待支付2=已完成)
Returns:
更新是否成功
"""
try:
# 验证状态值
if status not in [1, 2]:
logger.warning(f"无效的订单状态: {status}")
return False
# 查询订单
order = self.session.query(SubscriptionOrder).filter(
SubscriptionOrder.order_id == order_id
).first()
if not order:
logger.warning(f"订单ID {order_id} 不存在")
return False
# 更新状态
order.status = status
self.session.commit()
status_name = "待支付" if status == 1 else "已完成"
logger.info(f"成功更新订单 {order_id} 状态为: {status_name}")
return True
except Exception as e:
self.session.rollback()
logger.error(f"更新订单状态失败: {e}")
return False
def get_orders_by_status(self, status: int) -> List[Dict[str, Any]]:
"""
根据状态获取订单列表
Args:
status: 订单状态(1=待支付2=已完成)
Returns:
订单列表
"""
try:
# 验证状态值
if status not in [1, 2]:
logger.warning(f"无效的订单状态: {status}")
return []
# 查询指定状态的订单
orders = self.session.query(SubscriptionOrder).filter(
SubscriptionOrder.status == status
).order_by(SubscriptionOrder.create_time.desc()).all()
# 转换为字典列表
order_list = []
for order in orders:
order_list.append({
'id': order.id,
'user_id': order.user_id,
'order_id': order.order_id,
'payment_order_id': order.payment_order_id,
'amount': order.amount,
'member_type': order.member_type,
'currency': order.currency,
'time_type': order.time_type,
'status': order.status,
'create_time': order.create_time
})
return order_list
except Exception as e:
logger.error(f"获取订单列表失败: {e}")
return []