from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker from app.core.config import settings import asyncio # 创建异步数据库URL - 使用异步驱动 SQLALCHEMY_DATABASE_URL = f"mysql+aiomysql://{settings.DB_USER}:{settings.DB_PASSWORD}@{settings.DB_HOST}:{settings.DB_PORT}/{settings.DB_NAME}" # 创建异步数据库引擎 engine = create_async_engine( SQLALCHEMY_DATABASE_URL, echo=settings.DB_ECHO, pool_pre_ping=True ) # 创建异步会话工厂 AsyncSessionLocal = sessionmaker( engine, class_=AsyncSession, expire_on_commit=False, autocommit=False, autoflush=False ) # 创建基本模型类 Base = declarative_base() # 异步数据库会话依赖项 async def get_db(): async with AsyncSessionLocal() as session: try: yield session finally: await session.close()