people-reading/backend/app/services/reading_service.py
2026-05-12 20:50:15 +08:00

63 lines
2.8 KiB
Python

from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.reading import Reading
from app.models.uploaded_image import UploadedImage
from app.services.bazi_analyzer import BaziAnalyzer
from app.services.bazi_calculator import BaziCalculator
from app.services.face_analyzer import FaceAnalyzer
from app.services.image_service import ImageService
from app.services.palm_analyzer import PalmAnalyzer
class ReadingService:
async def generate(self, db: AsyncSession, reading_id: str) -> None:
result = await db.execute(select(Reading).where(Reading.id == reading_id))
reading = result.scalar_one()
reading.status = "processing"
await db.flush()
await db.commit()
try:
result = await db.execute(select(Reading).where(Reading.id == reading_id))
reading = result.scalar_one()
if reading.reading_type == "palm":
data = await self._generate_palm(db, reading)
elif reading.reading_type == "face":
data = await self._generate_face(db, reading)
elif reading.reading_type == "bazi":
data = await self._generate_bazi(reading)
else:
raise ValueError(f"Unsupported reading type: {reading.reading_type}")
reading.report_data = data
reading.status = "failed" if not data["quality_check"]["can_analyze"] else "completed"
reading.error_message = None if reading.status == "completed" else data["quality_check"]["reason"]
except Exception as exc:
reading.status = "failed"
reading.error_message = str(exc)
await db.commit()
async def _generate_palm(self, db: AsyncSession, reading: Reading) -> dict:
image = await self._get_image(db, reading)
image_bytes = ImageService().read_bytes(image.storage_key)
hand_side = reading.input_data.get("hand_side", "unknown")
return await PalmAnalyzer().analyze(image_bytes, image.content_type, hand_side)
async def _generate_face(self, db: AsyncSession, reading: Reading) -> dict:
image = await self._get_image(db, reading)
image_bytes = ImageService().read_bytes(image.storage_key)
return await FaceAnalyzer().analyze(image_bytes, image.content_type)
async def _generate_bazi(self, reading: Reading) -> dict:
chart = BaziCalculator().calculate(reading.input_data)
reading.input_data = {**reading.input_data, "chart": chart}
return await BaziAnalyzer().analyze(chart)
async def _get_image(self, db: AsyncSession, reading: Reading) -> UploadedImage:
result = await db.execute(select(UploadedImage).where(UploadedImage.id == reading.image_id))
image = result.scalar_one_or_none()
if image is None:
raise ValueError("Image not found")
return image