Python中使用Postgres用于队列的实现代码

事件表:

id   | status   | updated_at
------------------------------------------
UUID | SMALLINT | TIMESTAMP WITH TIME ZONE

有一个 INDEX(status、updated_at)。实际上,有很多表,每个队列一个。

轮询代码:

for _ in shutdown_handler.loop():  # see appendix below
    event_meta = get_event_to_process(
        where_status_eq=TO_PROCESS,
        set_status_to=PROCESSING,
    )
    if event_meta is None:
        time.sleep(x)  # be gentle on the DB
        continue

    try:
        # Perform task!
        set_status(event_meta, PROCESSED)
    except:
        set_status(event_meta, ERRORED, ...)

get_event_to_process(...)执行的 SQL 行列如下:

WITH ids AS MATERIALIZED (
    SELECT id FROM event_queue
    WHERE status = {where_status_eq}
    ORDER BY updated_at
    LIMIT 1
    FOR UPDATE SKIP LOCKED
)
UPDATE event_queue
SET status = {set_status_to}
WHERE id = ANY(SELECT id FROM ids)
RETURNING id

请注意,

  • 使用 MATERIALISED 强制 CTE 在 UPDATE 之前急于求值(另外:希望此查询确实不存在竞争条件)。
  • set_status(...)只是对某一行的 status 和 updated_at 执行更新。

您可能想让 cron 作业删除队列中超过一段时间的数据。

下面是一个用于轮询循环的好帮手,它可以帮助处理关机问题,并在一小时无活动后自动计时。

import os, signal, threading

INTERRUPT_TIMEOUT = 60 * 60  # 1 hour
work_done: threading.Event


def kill_after_timeout() -> None:
    global work_done
    work_done = threading.Event()
    if work_done.wait(INTERRUPT_TIMEOUT):
        return
    os.kill(os.getpid(), signal.SIGKILL)


class ShutdownHandler:
    def __init__(self, max_loops: int | None = None) -> None:
        self.exit_flag = False
        signal.signal(signal.SIGINT, self.signal_handler)
        signal.signal(signal.SIGTERM, self.signal_handler)

    def signal_handler(self, signal: int, frame: FrameType | None) -> None:
        self.exit_flag = True

    def loop(self) -> Iterator[None]:
        global work_done
        while True:
            if self.exit_flag():
                work_done.set()
                return
            # 让线程成为守护进程意味着如果父线程死亡,它也会死亡。
            threading.Thread(target=kill_after_timeout, daemon=True).start()
            yield None
            work_done.set()  # tell the timeout thread we did some work