Pinterest使用MemQ、Singer和Kafka优化大数据摄取


在 Pinterest,Logging Platform 团队维护着每天摄取数 TB 数据的数据摄取基础设施的骨干。

MemQ:使用 Netty 实现内存高效的批量数据交付
MemQ是内部构建的下一代数据摄取平台,最近由 Logging Platform 团队开源。
在设计服务时,我们努力最大化我们的资源效率,特别是我们专注于通过使用堆外内存来减少 GC。
Netty 被选为我们的低级网络框架,因为它在灵活性、性能和复杂的开箱即用功能之间取得了很好的平衡。
例如,我们在整个项目中大量使用了 ByteBuf。ByteBufs 是 Netty 中数据的构建块。它们类似于 Java NIO ByteBuffers,但通过提供“智能指针”方法使用手动引用计数进行自定义内存管理,允许开发人员更多地控制对象的生命周期. 通过使用 ByteBufs,我们设法通过传递堆外网络缓冲区指针来传输带有单个数据副本的消息,从而进一步减少了垃圾收集所使用的周期。

消息在MemQ代理中的典型旅程:

  • 每个从网络上收到的消息都将通过一个长度编码的协议进行重构,该协议将被分配到JVM堆外的ByteBuf中(用Netty的话说是直接内存),并且将是整个管道中唯一存在的有效载荷副本。
  • 这个ByteBuf引用将被传递到主题处理器中,并与其他也在等待被上传到存储目的地的消息一起放入一个Batch。
  • 一旦满足上传限制,无论是由于时间阈值还是大小阈值,Batch将被分派。
  • 在上传到S3这样的远程对象存储的情况下,整批消息将被保存在一个CompositeByteBuf中(这是一个由多个ByteBuf组成的虚拟包装ByteBuf),并使用netty-reactor库上传到目的地,允许我们在处理路径中不创建额外的数据副本。
  • 通过建立在ByteBufs和其他Netty结构之上,我们能够在不牺牲性能的情况下快速迭代,避免重蹈覆辙。

Singer:利用异步处理减少线程开销
Singer已经在 Pinterest 工作了很长时间,可靠地将消息传递到 PubSub 后端。随着越来越多的用例载入 Singer,我们开始遇到内存使用瓶颈,导致频繁出现 OOM 问题和事件。

随着越来越多的用例进入Singer,我们开始遇到了内存使用的瓶颈,导致了频繁的OOM问题和事件。
Singer在Pinterest的几乎所有团队都限制了内存和CPU资源,以避免对主机服务的影响,例如我们的API服务层。
在检查了代码并利用VisualVM、本地内存跟踪(NMT)和pmap等调试工具后,我们注意到了各种潜在的改进措施,最明显的是减少线程的数量。在进行NMT结果分析后,我们注意到了线程的数量以及这些线程(由于Singer执行器和生产者线程池的分配)导致的堆栈所使用的内存。

深入研究这些线程的来源,这些线程大部分来自Singer发布的每个Kafka集群的线程池。这些线程池中的线程是用来等待Kafka完成向分区写入消息,然后报告写入的状态。当线程完成工作时,JVM中的每个线程(默认情况下)将分配1MB的内存用于线程的堆栈。

KafkaWriteTask的每次提交都会占用一个线程。完整的代码可以在这里here找到

通过仔细检查这些线程的使用情况,我们很快就会发现,这些线程大多在做非阻塞操作,比如更新指标,而且完全适合使用Java 8中提供的CompletableFutures进行异步处理。
CompletableFuture允许我们通过异步连锁阶段来解决阻塞性调用,从而取代了这些线程的使用,这些线程必须等待结果从Kafka回来。
通过利用KafkaProducer.send(record, callback)方法中的回调,我们依靠Kafka生产者的网络客户端来完全控制网络的复用。

使用CompletableFutures后的结果代码的一个简单例子。完整的代码可以在这里here找到

一旦我们将原来的逻辑转换为几个连锁的非阻塞阶段,很明显,使用一个单一的公共线程池来处理它们,而不考虑日志流,所以我们使用JVM已经提供的公共ForkJoinPool。这极大地减少了Singer的线程使用量,从几百个线程到几乎没有额外的线程。这一改进表明了异步处理的力量,以及受网络约束的应用程序如何从中受益。


Kafka和Singer:用可控的差异来平衡性能和效率
操作我们的Kafka集群一直都是在性能、容错和效率之间取得微妙的平衡。我们的日志代理Singer处于向Kafka发布消息的第一线,是一个关键的组件,在这些因素中起着重要的作用,特别是在通过决定我们为一个主题提供数据的分区来路由流量。

1、默认分区器。均匀分布的流量
在Singer中,来自一台机器的日志会被拾取并路由到它所属的相应主题,并发布到Kafka中的该主题。在早期,Singer会使用我们的默认分区器,以轮流的方式统一发布到该主题的所有分区。例如,如果某台主机上有3000条消息需要发布到30个分区的主题上,每个分区大概会收到100条消息。这对大多数用例来说效果很好,而且有一个很好的好处,即所有分区都收到相同数量的消息,这对这些主题的消费者来说是很好的,因为工作负载在他们之间平均分配。

随着Pinterest的发展,我们的团队扩展到了数千台主机,这种均匀分布的方法开始给我们的Kafka经纪人带来一些问题:高连接数和大量的生产请求开始提升经纪人broker的CPU使用率,而分散消息意味着每个分区的批处理量更小,或者压缩效率更低,导致聚合的网络流量更大。

为了解决这个问题,我们实施了一个新的分区器:SinglePartitionPartitioner。这个分区器通过强迫Singer在每个主机的每个主题上只写一个随机分区来解决这个问题,将所有经纪商broker的扇出减少到一个经纪商。这个分区在生产者的整个生命周期中保持不变,直到Singer重新启动。

对于拥有大量生产者队伍和各主机间相对统一的消息速率的管道来说,这是非常有效的。大数法则对我们有利,从统计学上看,如果生产者的数量明显大于分区,每个分区仍然会收到类似的流量。连接数从(服务于该主题的经纪商数量)乘以(生产者数量)下降到只有(生产者数量),对于较大的主题,这可能会减少100倍。同时,将每个生产者的所有消息批量化到一个分区,在大多数使用情况下,压缩率至少提高了10%。

固定分区分区器:用于调整权衡的可配置方差
尽管想出了这个新的解决方案,但仍有一些管道处于两种解决方案都不理想的中间地带,例如当生产者的数量不足以超过分区的数量时。
在这种情况下,SinglePartitionPartitioner会在分区之间引入明显的倾斜:有些分区会有多个生产者向其写入,而有些分区则被分配到非常少甚至没有生产者。这种倾斜可能会导致下游消费者的工作负载不平衡,也会增加我们团队管理集群的负担,特别是在存储紧张的时候。
因此,我们最近推出了一个新的分区器,可以用在这些情况上,甚至涵盖了原来的用例:FixedPartitionsPartitioner,它基本上允许我们不只是像SinglePartitionPartitioner那样发布到一个固定的分区,而是在固定数量的分区中随机发布。

这种方法有点类似于一致性散列中的虚拟节点的概念,我们人为地创造更多的 "有效生产者 "以实现更连续的分布。
由于每个主机的分区数量可以配置,我们可以将其调整到效率和性能都达到理想水平的甜蜜点。
这种分区器也可以通过分散流量,同时保持合理的连接数,来帮助解决 "热生产者 "的问题。
虽然是一个简单的概念,但事实证明,有能力配置差异程度可能是管理权衡的一个强大工具。