为什么我们放弃 RabbitMQ 并用 Postgres 队列取而代之?


我们最近从我们的系统中删除了 RabbitMQ,并将其替换为构建在我们的 Postgres 数据库之上并用 SQL 编写的队列。
它极大地提高了我们系统的可靠性和弹性。这是对这一变化、其背后的基本原理以及我们是如何做到的的记录。

注意:这不是对 RabbitMQ 或其他队列系统的控诉。很有可能(很可能,真的)我们的判断是错误的。


让我们简要地谈谈为什么我们首先需要一个队列。Prequel 是一个大规模数据管道:我们帮助 B2B SaaS 公司将数据同步到客户的数据库或从客户的数据库同步数据。换句话说,我们的业务包括在各种数据库和仓库之间运行数据传输。这些传输中的每一个都可以建模为一个作业,将其放入队列中并由工作人员处理。这些作业的一个特点是它们的处理时间可能相当长:数据回填需要几个小时的情况并非闻所未闻,而最快的作业至少需要几秒钟。为了让这一切顺利进行,我们每天都要入队和出队数以千计的工作。

当我们最初设计我们的系统时,我们选择 RabbitMQ 作为这项工作的正确队列解决方案。
我们的设置非常简单。我们将其配置为任何消息只能由一个消费者处理:这使我们能够避免让不同的工作人员运行相同的传输,这会浪费每个人的计算。我们还将其设置为让消费者“手动”确认消息。我们试图让所有东西都尽可能轻巧,并且在一段时间内效果很好。

几个月后,我们开始遇到问题和挑战。我们花了很多时间来调整我们的消费者重新连接行为,也就是如果其中一个工作人员以某种方式失去与队列的连接会发生什么。

我们有一些多线程错误,消费者从队列中超时会导致它重新连接两次,从而有效地以指数方式增加队列线程的数量。

直到一个决定性的晚上,我们接到了一位顾客的传呼。他们看到他们的一些转账被任意延迟了几个小时,有时甚至延迟到我们的系统将它们标记为过时并取消它们。

对此没有任何好的解释:我们的队列没有显示任何重大积压,我们的worker看起来很健康,而且大多数Job工作都在正常处理。
除了那些偶尔出现的、看似不确定的掉队者。

我们设法找到了问题的根源。事实证明,每个 RabbitMQ 消费者在获取当前消息时都在预取下一条消息(作业)。这可以防止其他消费者确认该消息,从而防止从队列中取出它。请在此处查看关于此的 RabbitMQ 文档。

现在,当大多数工作花费的时间大致均匀且界限分明时,这很好。

事情变得危险的地方是某些工作可能需要几个小时(请记住,我们在这里谈论的是数据传输)。在我们的案例中发生的事情是:一个worker会拿起一条消息,并预取下一条。它会处理当前的消息,一个多小时的传输,并有效地控制下一个消息,直到它完成它正在处理的那个。

更糟糕的是,没有办法(我们可以找到)禁用该行为。您可以将预取计数设置为 1,这意味着每个工作人员将最多预取 1 条消息。或者您可以将其设置为 0,这意味着它们将各自预取无限数量的消息。但似乎没有办法_实际上_将预取计数设置为零(即禁用预取)。

我们对正在发生的事情有很好的了解,但我们没有立即修复它的方法。

事实证明,我们实际上可以在我们的 Postgres 数据库中重新创建相同的队列功能,但对其进行调整以符合我们的确切要求。

这就是我们构建的内容:一个由单个且出奇简单的 Postgres 表支持的新队列。我们的发布者写入消息给它,我们的消费者(worker)从中阅读。

在消费者中添加一个 ORDER BY 子句来维护队列排序之类的东西
并且我们通过简单的读/写行级锁保证作业不会被超过一个worker读取。

新系统实际上有点简单得离谱。这是一件好事。到目前为止,它的表现也很完美。

它为我们的团队带来了几个重要的好处。一方面,应用程序状态不再分布在两个系统(RabbitMQ 存储和 Postgres)上。它现在集中在我们的应用程序数据库中。这使得灾难恢复变得更加容易,并通过移除活动部分来提高整个系统的弹性。