49 lines
1.3 KiB
Python
49 lines
1.3 KiB
Python
import uuid
|
|
from datetime import datetime
|
|
from io import BytesIO
|
|
|
|
from qcloud_cos import CosConfig, CosS3Client
|
|
|
|
from app.config import settings
|
|
|
|
config = CosConfig(
|
|
Region=settings.cos_region,
|
|
SecretId=settings.cos_secret_id,
|
|
SecretKey=settings.cos_secret_key,
|
|
)
|
|
client = CosS3Client(config)
|
|
|
|
ALLOWED_IMAGE_TYPES = {
|
|
"image/jpeg": ".jpg",
|
|
"image/png": ".png",
|
|
"image/gif": ".gif",
|
|
"image/webp": ".webp",
|
|
}
|
|
|
|
|
|
def upload_bytes(key: str, data: bytes, content_type: str) -> str:
|
|
"""Upload raw bytes to COS, return the public URL."""
|
|
client.put_object(
|
|
Bucket=settings.cos_bucket,
|
|
Key=key,
|
|
Body=BytesIO(data),
|
|
ContentType=content_type,
|
|
)
|
|
return f"{settings.cos_base_url}/{key}"
|
|
|
|
|
|
def upload_image(prefix: str, filename: str, data: bytes, content_type: str) -> str:
|
|
"""
|
|
Upload an image to COS under the given prefix.
|
|
Returns the public URL.
|
|
"""
|
|
if content_type not in ALLOWED_IMAGE_TYPES:
|
|
raise ValueError(f"Unsupported image type: {content_type}")
|
|
|
|
ext = ALLOWED_IMAGE_TYPES[content_type]
|
|
date_path = datetime.now().strftime("%Y/%m/%d")
|
|
unique_name = uuid.uuid4().hex[:12]
|
|
key = f"{prefix}/{date_path}/{unique_name}{ext}"
|
|
|
|
return upload_bytes(key, data, content_type)
|