PgQueuer:一次一个队列,构建更顺畅的工作流程


PgQueuer 是一个极简、高性能的 Python 作业队列库,充分利用了 PostgreSQL 的强大功能。PgQueuer 专为简单和高效而设计,使用 PostgreSQL 的 LISTEN/NOTIFY 轻松管理作业队列。

特征

  • 简单集成:使用 PostgreSQL 轻松与现有 Python 应用程序集成。
  • 高效的并发处理:利用 PostgreSQLFOR UPDATE SKIP LOCKED进行可靠且并发的作业处理。
  • 实时通知:利用LISTEN并NOTIFY实时更新工作状态的变化。

安装
要安装 PgQueuer,只需使用 pip 安装以下命令:

pip install PgQueuer

示例用法
以下是在典型场景中使用 PgQueuer 处理传入数据消息的方法:

import asyncio

import asyncpg
from PgQueuer.models import Job
from PgQueuer.qm import QueueManager


async def main() -> None:
    pool = await asyncpg.create_pool(min_size=2)
    qm = QueueManager(pool)

    N = 1_000
    # Enqueue messages.
    for n in range(N):
        await qm.queries.enqueue("fetch", f"this is from me: {n}".encode())

    @qm.entrypoint(
"fetch")
    async def process_message(job: Job) -> None:
        print(f
"Processed message: {job}")

    await qm.run()


if __name__ ==
"__main__":
    asyncio.run(main())


数据库配置
PgQueuer 提供了一个命令行界面,可轻松管理安装和卸载。确保您已配置环境变量或使用适当的标志来指定数据库凭据。

安装 PgQueuer 数据库组件:
python -m PgQueuer install 

卸载 PgQueuer 数据库组件:
python -m PgQueuer uninstall 

CLI 支持多个标志来自定义连接设置。用于--help查看所有可用选项。

启动仪表板的示例命令:
python -m PgQueuer dashboard --interval 10 --tail 25 --table-format grid

监听
python -m PgQueuer listen
listen命令允许您在指定通道上实时监控 PostgreSQL NOTIFY 消息。此功能对于调试和观察应用程序处理的原始事件流量特别有用。

基准测试摘要
PgQueuer 经过了基本的基准测试,以评估其在不同作业量和并发级别下的性能。
主要观察:

  • 可扩展性:并发性越高,性能就越高,证明了该库能够有效地管理更大的工作负载。
  • 一致性:PgQueuer 在不同的作业数量之间保持一致的吞吐量,确保可靠的性能。
  • 最佳性能:观察到的最高吞吐量为并发级别为 5 时每秒约 5,200 个作业。