"""错误日志持久化""" import asyncio import logging import traceback from datetime import datetime from app.db.database import get_db from app.db import tables from app.notifications.feishu import send_feishu_alert async def log_error( source: str, message: str, detail: str = "", level: str = "error", context: dict | None = None, notify: bool = True, ): """将错误写入数据库,并按策略发送告警。""" try: async with get_db() as db: stmt = tables.error_logs_table.insert().values( source=source, level=level, message=message, detail=detail, created_at=datetime.now(), ) await db.execute(stmt) await db.commit() except Exception: pass # 写日志失败不应影响主业务 if notify and level.lower() in {"error", "critical"}: try: await send_feishu_alert( source=source, message=message, detail=detail, level=level, context=context, ) except Exception: pass def log_error_background( source: str, message: str, detail: str = "", level: str = "error", context: dict | None = None, notify: bool = True, ): """在存在事件循环时后台投递错误记录。""" try: loop = asyncio.get_running_loop() loop.create_task( log_error( source=source, message=message, detail=detail, level=level, context=context, notify=notify, ) ) except RuntimeError: pass class PersistentErrorLogHandler(logging.Handler): """Persist application ERROR/CRITICAL log records into error_logs.""" def emit(self, record: logging.LogRecord) -> None: if record.levelno < logging.ERROR: return if getattr(record, "skip_error_persist", False): return if record.name.startswith("app.db.error_logger"): return try: detail = "" if record.exc_info: detail = "".join(traceback.format_exception(*record.exc_info)) log_error_background( source=record.name, message=record.getMessage(), detail=detail, level=record.levelname.lower(), notify=False, ) except Exception: pass