aidress/app/services/qcloud_service.py
2025-03-21 17:06:54 +08:00

259 lines
8.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import logging
import time
import uuid
import base64
import hashlib
from typing import Optional, List, Dict, Any, BinaryIO, Union
from urllib.parse import urljoin
from qcloud_cos import CosConfig, CosS3Client
from qcloud_cos.cos_exception import CosServiceError, CosClientError
import sts.sts
from app.utils.config import get_settings
logger = logging.getLogger(__name__)
class QCloudCOSService:
"""腾讯云对象存储COS服务类"""
def __init__(self):
"""初始化腾讯云COS服务"""
settings = get_settings()
self.secret_id = settings.qcloud_secret_id
self.secret_key = settings.qcloud_secret_key
self.region = settings.qcloud_cos_region
self.bucket = settings.qcloud_cos_bucket
self.domain = settings.qcloud_cos_domain
# 创建COS客户端
config = CosConfig(
Region=self.region,
SecretId=self.secret_id,
SecretKey=self.secret_key
)
self.client = CosS3Client(config)
async def upload_file(
self,
file_content: Union[bytes, BinaryIO],
file_name: Optional[str] = None,
directory: str = "uploads",
content_type: Optional[str] = None
) -> Dict[str, str]:
"""
上传文件到腾讯云COS
Args:
file_content: 文件内容bytes或文件对象
file_name: 文件名,如果不提供则生成随机文件名
directory: 存储目录
content_type: 文件MIME类型
Returns:
Dict[str, str]: 包含文件URL等信息的字典
"""
try:
# 生成文件名(如果未提供)
if not file_name:
# 使用UUID生成随机文件名
random_filename = str(uuid.uuid4())
if isinstance(file_content, bytes):
# 对于字节流,我们无法确定文件扩展名,默认用.bin
file_name = f"{random_filename}.bin"
else:
# 假设file_content是文件对象尝试从其name属性获取扩展名
if hasattr(file_content, 'name'):
original_name = os.path.basename(file_content.name)
ext = os.path.splitext(original_name)[1]
file_name = f"{random_filename}{ext}"
else:
file_name = f"{random_filename}.bin"
# 构建对象键文件在COS中的完整路径
key = f"{directory}/{file_name}"
# 上传文件
response = self.client.put_object(
Bucket=self.bucket,
Body=file_content,
Key=key,
ContentType=content_type
)
# 构建文件URL
file_url = urljoin(self.domain, key)
return {
"url": file_url,
"key": key,
"file_name": file_name,
"content_type": content_type,
"etag": response["ETag"] if "ETag" in response else None
}
except (CosServiceError, CosClientError) as e:
logger.error(f"腾讯云COS上传文件失败: {str(e)}")
raise Exception(f"文件上传失败: {str(e)}")
async def delete_file(self, key: str) -> bool:
"""
从腾讯云COS删除文件
Args:
key: 文件的对象键COS路径
Returns:
bool: 删除是否成功
"""
try:
self.client.delete_object(
Bucket=self.bucket,
Key=key
)
return True
except (CosServiceError, CosClientError) as e:
logger.error(f"腾讯云COS删除文件失败: {str(e)}")
return False
async def get_file_url(self, key: str, expires: int = 3600) -> str:
"""
获取COS文件的临时访问URL
Args:
key: 文件的对象键COS路径
expires: URL的有效期
Returns:
str: 临时访问URL
"""
try:
url = self.client.get_presigned_url(
Method='GET',
Bucket=self.bucket,
Key=key,
Expired=expires
)
return url
except (CosServiceError, CosClientError) as e:
logger.error(f"获取腾讯云COS文件URL失败: {str(e)}")
raise Exception(f"获取文件URL失败: {str(e)}")
async def generate_cos_sts_token(
self,
allow_actions: Optional[List[str]] = None,
allow_prefix: str = "*",
duration_seconds: int = 1800
) -> Dict[str, Any]:
"""
生成COS的临时安全凭证STS用于前端直传
Args:
allow_actions: 允许的COS操作列表
allow_prefix: 允许操作的对象前缀
duration_seconds: 凭证有效期(秒)
Returns:
Dict[str, Any]: STS凭证信息
"""
try:
if allow_actions is None:
# 默认只允许上传操作
allow_actions = [
'name/cos:PutObject',
'name/cos:PostObject',
'name/cos:InitiateMultipartUpload',
'name/cos:ListMultipartUploads',
'name/cos:ListParts',
'name/cos:UploadPart',
'name/cos:CompleteMultipartUpload'
]
# 配置STS
config = {
'url': 'https://sts.tencentcloudapi.com/',
'domain': 'sts.tencentcloudapi.com',
'duration_seconds': duration_seconds,
'secret_id': self.secret_id,
'secret_key': self.secret_key,
'region': self.region,
'policy': {
'version': '2.0',
'statement': [
{
'action': allow_actions,
'effect': 'allow',
'resource': [
f'qcs::cos:{self.region}:uid/:{self.bucket}/{allow_prefix}'
]
}
]
}
}
sts_client = sts.sts.Sts(config)
response = sts_client.get_credential()
return response
except Exception as e:
logger.error(f"生成腾讯云COS STS凭证失败: {str(e)}")
raise Exception(f"生成COS临时凭证失败: {str(e)}")
async def list_files(
self,
directory: str = "",
limit: int = 100,
marker: str = ""
) -> Dict[str, Any]:
"""
列出COS中的文件
Args:
directory: 目录前缀
limit: 返回的最大文件数
marker: 分页标记
Returns:
Dict[str, Any]: 文件列表信息
"""
try:
# 处理目录前缀
prefix = directory
if prefix and not prefix.endswith('/'):
prefix += '/'
# 调用COS API列出对象
response = self.client.list_objects(
Bucket=self.bucket,
Prefix=prefix,
Marker=marker,
MaxKeys=limit
)
# 处理响应
files = []
if 'Contents' in response:
for item in response['Contents']:
key = item.get('Key', '')
# 过滤出文件(忽略目录)
if not key.endswith('/'):
file_url = urljoin(self.domain, key)
files.append({
'key': key,
'url': file_url,
'size': item.get('Size', 0),
'last_modified': item.get('LastModified', ''),
'etag': item.get('ETag', '').strip('"')
})
return {
'files': files,
'is_truncated': response.get('IsTruncated', False),
'next_marker': response.get('NextMarker', ''),
'common_prefixes': response.get('CommonPrefixes', [])
}
except (CosServiceError, CosClientError) as e:
logger.error(f"列出腾讯云COS文件失败: {str(e)}")
raise Exception(f"列出文件失败: {str(e)}")