事件表:
id | status | updated_at ------------------------------------------ UUID | SMALLINT | TIMESTAMP WITH TIME ZONE
|
有一个 INDEX(status、updated_at)。实际上,有很多表,每个队列一个。
轮询代码:
for _ in shutdown_handler.loop(): # see appendix below event_meta = get_event_to_process( where_status_eq=TO_PROCESS, set_status_to=PROCESSING, ) if event_meta is None: time.sleep(x) # be gentle on the DB continue
try: # Perform task! set_status(event_meta, PROCESSED) except: set_status(event_meta, ERRORED, ...)
|
get_event_to_process(...)执行的 SQL 行列如下:
WITH ids AS MATERIALIZED ( SELECT id FROM event_queue WHERE status = {where_status_eq} ORDER BY updated_at LIMIT 1 FOR UPDATE SKIP LOCKED ) UPDATE event_queue SET status = {set_status_to} WHERE id = ANY(SELECT id FROM ids) RETURNING id
|
请注意,
- 使用 MATERIALISED 强制 CTE 在 UPDATE 之前急于求值(另外:希望此查询确实不存在竞争条件)。
- set_status(...)只是对某一行的 status 和 updated_at 执行更新。
您可能想让 cron 作业删除队列中超过一段时间的数据。
下面是一个用于轮询循环的好帮手,它可以帮助处理关机问题,并在一小时无活动后自动计时。
import os, signal, threading
INTERRUPT_TIMEOUT = 60 * 60 # 1 hour work_done: threading.Event
def kill_after_timeout() -> None: global work_done work_done = threading.Event() if work_done.wait(INTERRUPT_TIMEOUT): return os.kill(os.getpid(), signal.SIGKILL)
class ShutdownHandler: def __init__(self, max_loops: int | None = None) -> None: self.exit_flag = False signal.signal(signal.SIGINT, self.signal_handler) signal.signal(signal.SIGTERM, self.signal_handler)
def signal_handler(self, signal: int, frame: FrameType | None) -> None: self.exit_flag = True
def loop(self) -> Iterator[None]: global work_done while True: if self.exit_flag(): work_done.set() return # 让线程成为守护进程意味着如果父线程死亡,它也会死亡。 threading.Thread(target=kill_after_timeout, daemon=True).start() yield None work_done.set() # tell the timeout thread we did some work
|