Slack是如何实现分布式任务处理的扩展?

                   
banq 17-12-15

这里介绍Slack公司是如何使用Kafka和Redis作为分布式任务队列(类似国内当当网的elastic-job),以毫秒级可靠地处理数十亿个任务。

Slack是一家提供协作工具的SaaS公司,提供聊天群组 + 大规模工具集成 + 文件整合 + 统一搜索四大功能,帮团队打造了一条流畅的消息总线,弥补了团队协作中的沟通断层,致力于形成团队的知识库,类似国内钉钉之类企业QQ之类定位。

下面是他们的分布式任务处理架构的经验分享:

Slack是使用任务Job队列(任务队列/工作队列)系统处理一些特殊业务逻辑,这些业务逻辑在Web请求的上下文中运行起来太费时。该系统是我们架构的关键组成部分,应用于每个Slack消息、推送通知、URL展开、日历提醒和计费计算。在我们最忙碌的日子里,系统以每秒33,000的高峰处理超过14亿份任务job。任务Job执行时间范围从几毫秒到(在某些情况下)几分钟。

任务Job的队列实施可追溯到Slack早期时间,以后不断增长,逐渐被广泛用于整个公司。随着时间的推移,我们遭遇到CPU、内存和网络资源的容量限制,我们开始扩展缩放系统,但原来的架构基本保持不变。

但是,大约一年前,Slack由于任务队列问题而经历了明显的生产中断。数据库层中的资源争用导致任务作业执行速度放缓,导致Redis达到其配置的最大内存限制。在这一点上,由于Redis没有空闲的内存,我们不能再排队加入新的任务,这意味着所有依赖于任务队列的Slack操作都失败了。更糟糕的是,我们的系统实际上需要一点空闲的Redis内存才能使任务Job从队列中取出,所以即使解决了底层的数据库争用,作业队列仍然被锁定,需要大量的手动干预才能恢复。

这一事件导致整个任务架构进行重新评估。接下来介绍的是关于我们如何在此核心系统设计上做出重大改变的经验故事,对周围依赖系统的破坏最小,没有进行“停止整个系统”的转换或单向迁移,而且提供了未来改进的余地。


初始任务队列系统架构

在去年的这个时候,任务队列架构可以按照如下方式进行勾画,对于创建或使用Redis任务队列的人来说,这将大致熟悉(banq注:类似国内当当网的elastic-job):

www ---> redis ----> worker


一个Job任务的运行过程:

1.将任务放入Redis的队列时时,Web应用程序首先根据任务类型和参数创建一个标识符。

2.入队处理程序根据该标识符的哈希和给定作业的逻辑队列来选择相应配置的Redis主机之一。

3.使用存储在Redis主机上的pub/sub数据结构,处理程序会执行有限的重复数据删除 - 如果队列中已存在具有相同ID的作业,则请求将被丢弃,否则作业将被添加到队列中。

4.worker机器对Redis集群进行了轮询,寻找新的工作。当一个worker在一个监视的队列中找到一个Job任务时,它将任务从待处理队列移动到一个正在进行的任务列表中,并产生一个异步任务来处理它。

5.一旦任务完成,worker从正在进行的任务清单中删除任务。如果任务失败,则worker将其移动到特定的队列中,重试之前配置的次数,直到成功或移动到永久失败的任务(手动检查和修复)列表中。

架构问题
之前发生的中断教训使我们得出结论:扩展现有系统是站不住脚的,需要更多的基础设计工作。

我们确定的一些约束是:
1.Redis没有什么操作上的空间,特别是在内存方面。如果我们排队的速度超过了队列取出速度一段时间以后,我们将耗尽内存,无法再取出任务(因为取出任务也需要有足够的内存将任务移到处理列表中)。

2.Redis连接形成了一个完整的偶图 - 每个任务队列客户端必须连接到每个Redis实例(因此具有正确和最新的信息)。
worker无法独立于Redis进行扩展 - 添加一个新worker导致额外的投票和加载Redis。此属性导致了一个复杂的反馈情况,试图增加执行能力可能会压倒已经超载的Redis实例,从而放慢或停止整个进度。

3.之前关于使用Redis数据结构的决策意味着任务出列努力是与队列长度成比例的。随着队列变长,他们变得更难以清空 - 另一个不幸的反馈循环。

4.提供给应用程序和平台工程师的语义和服务质量保证不清楚,难以定义;任务队列上的异步处理对于我们的系统架构是非常重要的,但实际上工程师们不愿意使用它。现有功能(如有限的重复数据删除)的更改也是非常高风险的,因为许多作业都依靠它们正常运行。

这些问题中的每一个都提出了各种解决方案,包括进一步扩大现有系统的投入到完全彻底改写。我们确定了我们认为能够解决最迫切需求的三个方面的架构:

1. 使用持久的存储(如Kafka)替换Redis内存中的数据结构,以提供在内存耗尽和任务丢失的缓冲区。

2.为任务开发一个新的调度程序,提高服务质量保证,并提供诸如限速和优先级等理想特性。

3.将任务执行从Redis中分离出来,使我们能够根据需要扩展任务执行,而不是进行困难且昂贵的权衡操作。

增量更改或完全重写?
我们知道,实施所有这些潜在的架构增强功能将会促使Web应用程序和任务队列的重大变化。团队希望专注于最关键的问题,并获得任何新的系统组件的生产经验,而不是一次尝试一切。渐进演进是最有效的方法。

我们决定解决的第一个问题是,我们无法保证队列建立期间的写入可用性。如果工作人员的排队速度低于出列速度,Redis集群本身最终会耗尽内存。在Slack的规模下,这是可能会很快发生。此时,Redis群集将无法接受任何其他任务。

我们曾想过用Kafka完全替代Redis,但很快就意识到这条路线需要对调度、执行和重复删除作业的应用程序逻辑进行重大改变。本着追求最小可行的改变的精神,我们决定在Redis之前添加Kafka,而不是用Kafka彻底取代Redis。这将缓解我们的系统中的关键瓶颈,同时保留现有的应用程序入队和出队接口。

以下是使用Kafka及其在Redis之前的支持组件的作业队列架构的增量更改草图。




将任务Job排入Kafka
我们面临的第一个挑战是如何有效地从我们的PHP / Hacklang网络应用程序找到任务然后放入到Kafka。尽管我们为此目的探讨了现有的解决方案,但是没有一个能够满足我们的需求,所以我们开发了Kafkagate,这是一个用Go编写的新的无状态服务,让任务排队到Kafka。

Kafkagate公开了一个简单的HTTP POST接口,每个请求都包含一个Kafka主题,分区和内容。对于Kafka 使用Sarama golang驱动程序,它只是将传入的HTTP请求中继到Kafka中,并返回操作结果成功/失败。通过这个设计,Kafkagate保持与各个客户端的持续连接,并可以跟随集群领导者的变化,同时为我们的PHP / Hack网络应用程序提供一个低延迟的简单界面。

Kafkagate专为:
1. 在可用性和一致性两者中偏向前者: 在写任务到Kafka时,我们只等待集群领导者那边承认这个请求,而不是把Job任务复制给其他队列。这种选择提供了尽可能低的延迟,但是如果Kafka队列主机在复制之前意外死亡,确实会造成丢失任务的风险。对于大多数Slack的应用语义而言,这是一个正确的权衡,尽管我们也正在考虑向Kafkagate添加一个选项,以允许关键的应用程序等待某些操作的更强一致性保证。

2.简单的客户端语义: Kafkagate使用同步写入到Kafka,它允许我们肯定地确认到达队列的任务(尽管存在上述丢失写入的风险),并在失败或超时的情况下返回错误。这样可以在不改变原有语义的情况下加强语义,使工程师可以放心地使用它,同时还能够在将来修改队列队列设计。

3.最小延迟:为了减少花费在排队工作上的时间,我们对性能进行了一些优化。例如,我们如何部署和路由到Kafkagates:Slack部署在AWS上,AWS 在每个独立区域提供了几个“ 可用区域 ”(AZ)。一个地区的地理信息系统具有低延迟链路,并提供一定程度的隔离,大多数故障不会影响其他地理信息系统。AZ之间的连接通常比保持在单个AZ中的连接更高的延迟,并且还会产生转移成本。现在,任务队列优先将请求路由到与主机排队任务相同的AZ中的Kafkagate实例,同时仍允许故障转移到其他AZ,这样可以提高延迟和成本,同时仍允许容错。

在未来,我们正在考虑进一步优化在网络应用主机上本地运行的Kafkagate服务,以避免在写给Kafka时额外的网络跳跃。

将任务从卡夫卡中继到Redis
架构中的下一个新组件解决了将任务从Kafka中继到Redis中。JQRelay是一个用Go编写的无状态服务,将来自Kafka主题的任务转发到其相应的Redis集群。在设计这项服务时,我们必须考虑以下几点:

1.数据编码:在早期的系统中,Web应用程序(用PHP和Hack编写)会在将任务存储在Redis中时对JSON进行编码。随后,任务队列工作者(也用PHP编写)将解码任务有效载荷以供执行。在新系统中,我们依靠JQRelay(用Go编写)来解码JSON编码的任务,检查任务,然后用JSON重新编码并将其写入相应的Redis集群。听起来很简单吧?

事实证明,golang和PHP JSON编码器都有一些意外奇怪现象,引起了我们的心痛。具体而言,在Go,<,>,和&字符被替换成对应的Unicode实体默认情况下,在PHP中,/字符以“escaped” \ 默认替换。这两种行为都会导致代表两个不同运行时间的JSON数据结构表示,这种情况在原来架构仅限于PHP的系统中是不存在的。

2.自配置:当JQRelay实例启动时,它将尝试获取对应于Kafka主题的键/值条目的Consul锁。如果获得锁定,则开始转发来自该主题的所有分区的作业。如果失去锁定,它将释放所有资源并重新启动,以便不同的实例可以提取该主题。我们在EC2自动扩展组中运行JQRelay,以便任何失败的机器自动更换到服务中,并通过此锁定流程。与此Consul锁定策略结合使用时,我们确保作业队列使用的所有Kafka主题都只有一个中继进程分配给它们,并且失败会自动恢复。

3.处理失败: JQRelay依靠Kafka提交偏移来跟踪每个主题分区中的任务作业。如果任务成功写入Redis,则分区使用者仅移动一下偏移。如果发生Redis问题,它将无限期地重试,直到Redis重新启动(或Redis服务本身被更换)。任务具体的错误是通过将任务重新安排到Kafka来处理,而不是悄悄地放弃任务。通过这种方式,我们可以防止特定于任务的错误会阻止相应队列上的所有任务的处理进度,我们保留这个任务,以便我们可以诊断并修复错误,而不会完全失去这个任务。

4.速率限制: JQRelay在写入Redis时尊重Consul中配置的速率限制。它会依靠Consul watch API来对速率限制的变化做出反应。

Kafka集群设置
我们的集群运行Kafka 0.10.1.2版本,有16个经纪人,并在i3.2xlarge EC2机器上运行。每个主题都有32个分区,复制因子为3,保留期为2天。我们使用支持rack的复制(其中“rack”对应于AWS可用性区域)进行容错,并且启用了不干净的领导者选举。

1.负载测试:我们建立了一个负载测试环境,以强化我们的Kafka集群,然后将其投入生产。作为负载测试的一部分,我们按预期的生产速率将任务排入各种卡夫卡主题。这个负载测试使我们能够适当规模化我们的Kafka生产集群,从而拥有足够的空间来处理单个队列主机的失败、集群领导层的变化和其他行政行为,并为我们未来增长的Slack服务提供空间。

2.失败测试:了解不同的Kafka集群故障情况在应用程序中将如何表现是很重要的,例如连接失败,任务作业入队失败,失踪的任务和重复的任务。为此,我们对以下故障情况进行了测试:

(1)努力杀死一个broker队列主机
(2)在一个单一的AZ中杀死两个broker并优雅地杀死他们
(3)杀死所有三个broker,迫使卡夫卡挑一个不洁的领导人
(4)重新启动群集

在所有这些情况下,系统都按照我们的预期运行,并达到了可用性目标。

3.数据迁移:我们使用我们的负载测试设置来确定跨代理安全数据迁移的最佳节流率。此外,我们尝试在迁移期间使用较低的保留期(因为我们在成功执行后不需要保留一份工作)。

我们每天有14亿个任务,我们宁愿有选择性地将broker业务的分割而不是主题迁移。这是未来工作的一部分。

生产部署
推出新系统包括以下步骤:

1.双写:我们开始两次写入任务到旧和新的系统(每个任务是排队到双方的Redis和Kakfa)。然而,JQRelay在“阴影”模式下运作,从卡夫卡(Kafka)中读取所有的工作。这个设置让我们安全地测试从web应用程序到JQRelay的实际生产流量的新排队路径。

2.保证系统正确性:为确保新系统的正确性,我们追踪和比较了通过系统各个部分的工作数量:从Web应用程序到Kafkagate,Kafkagate到Kafka,最后是Kafka到Redis。

3.心跳:为了确保新系统为50个Redis群集和1600个Kafka分区(50个主题×32个分区)进行端到端的工作,我们每隔一分钟就为每个Kafka分区加入心跳机制。然后,我们监测并警告这些心跳的端到端流量和时间。

4. 最终推出:一旦我们确信我们的系统正确性,我们就在内部为Slack启用了几个星期。在没有问题的情况下,我们针对客户的各种任务类型逐一推出。


结论
将Kafka添加到任务队列中,在保护我们的基础设施免于局限于Redis内存方面的限制取得了巨大的成功。让我们来重新构建队列:在旧系统中,如果Web应用程序的排队速率比作业队列出队速率更高,则Redis集群本身最终会耗尽内存并导致中断。在新系统中,当应用程序被写入持久存储(Kafka)时,Web应用程序可以维持其高入队率。相反,我们调整JQRelay中的速率限制,以匹配出队速率或暂停队列到Redis。

从更广的角度来看,这项工作还改善了任务队列的可操作性,可配置的速率限制和任务排队超出执行能力的持久存储 - 比我们以前处理的更细的工具。更清晰的客户端语义将帮助我们的应用程序和平台团队更加自信地使用任务队列。而且基础架构团队还可以继续改进任务队列,从将JQRelay的速率限制与Redis内存容量绑定到改进系统调度和执行方面的更大目标。

Scaling Slack’s Job Queue – Several People Are Cod

[该贴被admin于2017-12-18 17:48修改过]

1