79 lines
3.4 KiB
Python
79 lines
3.4 KiB
Python
from datetime import datetime, timedelta
|
|
from io import BytesIO
|
|
from pathlib import Path
|
|
from uuid import uuid4
|
|
|
|
from fastapi import HTTPException, UploadFile, status
|
|
from PIL import Image, ImageStat, UnidentifiedImageError
|
|
|
|
from app.core.config import settings
|
|
|
|
ALLOWED_CONTENT_TYPES = {"image/jpeg", "image/png", "image/webp"}
|
|
|
|
|
|
class ImageService:
|
|
def __init__(self) -> None:
|
|
self.upload_dir = Path(settings.upload_dir)
|
|
self.upload_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
async def validate_and_store(self, file: UploadFile, user_id: str) -> tuple[str, int, datetime, dict]:
|
|
content_type = file.content_type or ""
|
|
if content_type not in ALLOWED_CONTENT_TYPES:
|
|
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Only jpg, png and webp images are supported")
|
|
|
|
data = await file.read()
|
|
max_bytes = settings.max_image_mb * 1024 * 1024
|
|
if len(data) > max_bytes:
|
|
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=f"Image must be smaller than {settings.max_image_mb}MB")
|
|
|
|
quality_check = self._inspect_image(data)
|
|
if not quality_check["is_acceptable"]:
|
|
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=quality_check["reason"])
|
|
|
|
suffix = self._suffix_for_type(content_type)
|
|
storage_key = f"{user_id}/{uuid4()}{suffix}"
|
|
path = self.upload_dir / storage_key
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
path.write_bytes(data)
|
|
expires_at = datetime.utcnow() + timedelta(days=settings.image_retention_days)
|
|
return storage_key, len(data), expires_at, quality_check
|
|
|
|
def read_bytes(self, storage_key: str) -> bytes:
|
|
return (self.upload_dir / storage_key).read_bytes()
|
|
|
|
def delete(self, storage_key: str) -> None:
|
|
path = self.upload_dir / storage_key
|
|
if path.exists():
|
|
path.unlink()
|
|
|
|
def _inspect_image(self, data: bytes) -> dict:
|
|
try:
|
|
with Image.open(BytesIO(data)) as image:
|
|
image.verify()
|
|
with Image.open(BytesIO(data)) as image:
|
|
width, height = image.size
|
|
rgb = image.convert("RGB")
|
|
gray = image.convert("L")
|
|
brightness = ImageStat.Stat(gray).mean[0]
|
|
resized = gray.resize((64, 64))
|
|
variance = ImageStat.Stat(resized).var[0]
|
|
except UnidentifiedImageError as exc:
|
|
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid image file") from exc
|
|
|
|
checks = {
|
|
"width": width,
|
|
"height": height,
|
|
"brightness": round(brightness, 2),
|
|
"sharpness_score": round(variance, 2),
|
|
}
|
|
if width < 640 or height < 640:
|
|
return {**checks, "is_acceptable": False, "reason": "照片分辨率太低,请上传更清晰的掌心照片"}
|
|
if brightness < 35:
|
|
return {**checks, "is_acceptable": False, "reason": "照片过暗,请在光线更充足的环境重拍"}
|
|
if variance < 80:
|
|
return {**checks, "is_acceptable": False, "reason": "照片可能过于模糊,请保持手掌和镜头稳定后重拍"}
|
|
return {**checks, "is_acceptable": True, "reason": "ok"}
|
|
|
|
def _suffix_for_type(self, content_type: str) -> str:
|
|
return {"image/jpeg": ".jpg", "image/png": ".png", "image/webp": ".webp"}[content_type]
|