42 lines
1.2 KiB
Python
42 lines
1.2 KiB
Python
from __future__ import annotations
|
|
|
|
import time
|
|
|
|
from app.config import get_settings
|
|
from app.db import Database, now_iso
|
|
from app.dispatcher import Dispatcher
|
|
|
|
|
|
def run() -> None:
|
|
settings = get_settings()
|
|
db = Database(settings)
|
|
db.migrate(settings)
|
|
dispatcher = Dispatcher(db, settings)
|
|
interval = settings.worker_interval_seconds
|
|
print(
|
|
"Delivery worker running every "
|
|
f"{interval}s, batch={settings.delivery_batch_size}, concurrency={settings.delivery_concurrency}"
|
|
)
|
|
while True:
|
|
with db.connect() as conn:
|
|
now = now_iso()
|
|
conn.execute(
|
|
"""
|
|
INSERT INTO app_state (key, value, updated_at)
|
|
VALUES ('worker.last_seen_at', ?, ?)
|
|
ON CONFLICT(key) DO UPDATE SET value = excluded.value, updated_at = excluded.updated_at
|
|
""",
|
|
(now, now),
|
|
)
|
|
processed = dispatcher.process_due_deliveries(
|
|
limit=settings.delivery_batch_size,
|
|
concurrency=settings.delivery_concurrency,
|
|
)
|
|
if processed:
|
|
print(f"processed {processed} due deliveries")
|
|
time.sleep(interval)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
run()
|