diff --git a/.env.example b/.env.example index 4f52b7c..597eaf7 100644 --- a/.env.example +++ b/.env.example @@ -17,6 +17,7 @@ MAX_DELIVERY_ATTEMPTS=3 RETRY_BACKOFF_SECONDS=60 FEISHU_TIMEOUT_SECONDS=10 DISPATCH_INLINE=false +DISPATCH_WAKEUP_ON_RECEIVE=true DELIVERY_BATCH_SIZE=100 DELIVERY_CONCURRENCY=5 -WORKER_INTERVAL_SECONDS=15 +WORKER_INTERVAL_SECONDS=2 diff --git a/app/config.py b/app/config.py index fb8907f..441181d 100644 --- a/app/config.py +++ b/app/config.py @@ -20,9 +20,10 @@ class Settings: feishu_timeout_seconds: int = 10 timezone: str = "" dispatch_inline: bool = False + dispatch_wakeup_on_receive: bool = True delivery_batch_size: int = 100 delivery_concurrency: int = 5 - worker_interval_seconds: int = 15 + worker_interval_seconds: int = 2 def get_settings() -> Settings: @@ -40,7 +41,9 @@ def get_settings() -> Settings: feishu_timeout_seconds=int(os.getenv("FEISHU_TIMEOUT_SECONDS", "10")), timezone=os.getenv("APP_TIMEZONE") or os.getenv("TZ", ""), dispatch_inline=os.getenv("DISPATCH_INLINE", "").lower() in {"1", "true", "yes", "on"}, + dispatch_wakeup_on_receive=os.getenv("DISPATCH_WAKEUP_ON_RECEIVE", "true").lower() + in {"1", "true", "yes", "on"}, delivery_batch_size=int(os.getenv("DELIVERY_BATCH_SIZE", "100")), delivery_concurrency=int(os.getenv("DELIVERY_CONCURRENCY", "5")), - worker_interval_seconds=int(os.getenv("WORKER_INTERVAL_SECONDS", "15")), + worker_interval_seconds=int(os.getenv("WORKER_INTERVAL_SECONDS", "2")), ) diff --git a/app/server.py b/app/server.py index 346e69a..b3aa452 100644 --- a/app/server.py +++ b/app/server.py @@ -4,6 +4,7 @@ import html import json import mimetypes import os +import threading import urllib.error import urllib.request from datetime import datetime, timezone @@ -29,6 +30,31 @@ class AppContext: self.db = Database(settings) self.db.migrate(settings) self.dispatcher = Dispatcher(self.db, settings) + self.dispatch_wakeup_lock = threading.Lock() + self.dispatch_wakeup_running = False + + def wake_dispatcher(self) -> None: + if self.settings.dispatch_inline or not self.settings.dispatch_wakeup_on_receive: + return + with self.dispatch_wakeup_lock: + if self.dispatch_wakeup_running: + return + self.dispatch_wakeup_running = True + thread = threading.Thread(target=self._run_dispatch_wakeup, name="dispatch-wakeup", daemon=True) + thread.start() + + def _run_dispatch_wakeup(self) -> None: + try: + while True: + processed = self.dispatcher.process_due_deliveries( + limit=self.settings.delivery_batch_size, + concurrency=self.settings.delivery_concurrency, + ) + if processed < self.settings.delivery_batch_size: + break + finally: + with self.dispatch_wakeup_lock: + self.dispatch_wakeup_running = False def json_response(handler: BaseHTTPRequestHandler, status: int, payload: dict[str, Any] | list[Any]) -> None: @@ -488,6 +514,7 @@ class Handler(BaseHTTPRequestHandler): "interval_seconds": self.context.settings.worker_interval_seconds, }, "dispatch_inline": self.context.settings.dispatch_inline, + "dispatch_wakeup_on_receive": self.context.settings.dispatch_wakeup_on_receive, }, ) @@ -501,6 +528,8 @@ class Handler(BaseHTTPRequestHandler): try: payload = parse_json_body(self) result = self.context.dispatcher.receive_alert(payload) + if result.get("delivery_ids"): + self.context.wake_dispatcher() json_response(self, 202, result) except ValidationError as exc: json_response(self, 400, {"error": str(exc)}) diff --git a/docker-compose.yml b/docker-compose.yml index 09510ef..915da26 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -14,6 +14,7 @@ services: RETRY_BACKOFF_SECONDS: ${RETRY_BACKOFF_SECONDS:-60} FEISHU_TIMEOUT_SECONDS: ${FEISHU_TIMEOUT_SECONDS:-10} DISPATCH_INLINE: ${DISPATCH_INLINE:-false} + DISPATCH_WAKEUP_ON_RECEIVE: ${DISPATCH_WAKEUP_ON_RECEIVE:-true} DELIVERY_BATCH_SIZE: ${DELIVERY_BATCH_SIZE:-100} DELIVERY_CONCURRENCY: ${DELIVERY_CONCURRENCY:-5} volumes: @@ -33,9 +34,10 @@ services: RETRY_BACKOFF_SECONDS: ${RETRY_BACKOFF_SECONDS:-60} FEISHU_TIMEOUT_SECONDS: ${FEISHU_TIMEOUT_SECONDS:-10} DISPATCH_INLINE: ${DISPATCH_INLINE:-false} + DISPATCH_WAKEUP_ON_RECEIVE: ${DISPATCH_WAKEUP_ON_RECEIVE:-true} DELIVERY_BATCH_SIZE: ${DELIVERY_BATCH_SIZE:-100} DELIVERY_CONCURRENCY: ${DELIVERY_CONCURRENCY:-5} - WORKER_INTERVAL_SECONDS: ${WORKER_INTERVAL_SECONDS:-15} + WORKER_INTERVAL_SECONDS: ${WORKER_INTERVAL_SECONDS:-2} volumes: - dispatcher-data:/data