astock-agent/backend/app/db/error_logger.py
2026-05-28 23:32:52 +08:00

97 lines
2.6 KiB
Python

"""错误日志持久化"""
import asyncio
import logging
import traceback
from datetime import datetime
from app.db.database import get_db
from app.db import tables
from app.notifications.feishu import send_feishu_alert
async def log_error(
source: str,
message: str,
detail: str = "",
level: str = "error",
context: dict | None = None,
notify: bool = True,
):
"""将错误写入数据库,并按策略发送告警。"""
try:
async with get_db() as db:
stmt = tables.error_logs_table.insert().values(
source=source,
level=level,
message=message,
detail=detail,
created_at=datetime.now(),
)
await db.execute(stmt)
await db.commit()
except Exception:
pass # 写日志失败不应影响主业务
if notify and level.lower() in {"error", "critical"}:
try:
await send_feishu_alert(
source=source,
message=message,
detail=detail,
level=level,
context=context,
)
except Exception:
pass
def log_error_background(
source: str,
message: str,
detail: str = "",
level: str = "error",
context: dict | None = None,
notify: bool = True,
):
"""在存在事件循环时后台投递错误记录。"""
try:
loop = asyncio.get_running_loop()
loop.create_task(
log_error(
source=source,
message=message,
detail=detail,
level=level,
context=context,
notify=notify,
)
)
except RuntimeError:
pass
class PersistentErrorLogHandler(logging.Handler):
"""Persist application ERROR/CRITICAL log records into error_logs."""
def emit(self, record: logging.LogRecord) -> None:
if record.levelno < logging.ERROR:
return
if getattr(record, "skip_error_persist", False):
return
if record.name.startswith("app.db.error_logger"):
return
try:
detail = ""
if record.exc_info:
detail = "".join(traceback.format_exception(*record.exc_info))
log_error_background(
source=record.name,
message=record.getMessage(),
detail=detail,
level=record.levelname.lower(),
notify=False,
)
except Exception:
pass