探索 Kafka Producer 的内部结构 - Alex

22-07-01 banq

Adobe Experience Platform Pipeline是一个低延迟、基于 Kafka 的流系统。管道连接数百个 Adob​​e 组件和系统。我们的 Kafka 集群处理310B msg/天,300 TB/天的 IN和920 TB/天的 OUT流量。因此,了解 Kafka 客户端的内部结构对于能够在如此大的流量下进行扩展至关重要。
Kafka Producer 是一个向 Kafka 发布消息的客户端。

组成:
  • 生产者元数据——管理生产者所需的元数据:集群中的主题和分区、充当分区领导者的代理节点等。
  • Partitioner — 计算给定记录的分区。
  • 序列化器——记录键和值序列化器。序列化程序将对象转换为字节数组。
  • 生产者拦截器——可能会改变记录的拦截器。
  • 记录累加器Record Accumulator — 累积记录并按主题分区将它们分组为批次。
  • 事务管理器——管理事务并维护必要的状态以确保幂等生产。
  • Sender — 将数据发送到 Kafka 集群的后台线程。


向 Kafka 发送消息
Kafka Producer 异步发送消息并返回一个Future<RecordMetadata>,表示发送结果。此外,用户可以选择提供回调,以便在 Kafka 代理确认记录时调用。虽然看起来很简单,但幕后发生了一些事情:

  1. 生产者将消息传递给已配置的拦截器列表。例如,拦截器可能会改变消息并返回更新的版本。
  2. 序列化器将记录键和值转换为字节数组
  3. 如果未指定,则默认或配置的分区程序计算主题分区。
  4. 记录累加器Record Accumulator 使用配置的压缩算法将消息附加到生产者批次。

此时,消息还在内存中,并没有发送到 Kafka 代理:
Record Accumulator按主题和分区对内存中的消息进行分组。
发送者线程将具有相同代理作为领导者的多个批次组合成请求并发送它们。
直到这一步,消息才被发送到 Kafka。

配置 Kafka 生产者
Kafka Producer 具有三个必需的属性:

  • bootstrap.servers — 用于建立与 Kafka 集群的初始连接的主机/端口对列表。格式:“host1:port1,host2:port2,...”
  • key.serializer — 完全限定的类名,表示实现org.apache.kafka.common.serialization.Serializer接口的密钥序列化程序。
  • value.serializer — 完全限定的类名,表示实现org.apache.kafka.common.serialization.Serializer接口的值序列化器。



消息递交时间
Kafka Producer 提供配置参数来控制在各个阶段花费的时间:

  • - max.block.ms — 等待元数据获取和缓冲区分配的时间
  • - linger.ms — 等待发送其他记录的时间
  • - retry.backoff.ms — 重试失败请求之前的等待时间
  • - request.timeout.ms — 等待 Kafka 代理响应的时间
  • - delivery.timeout.ms - 稍后引入,是KIP-91的一部分,为用户提供有保证的超时上限,而无需调整生产者组件内部


数据持久性
用户可以通过acks配置参数控制写入 Kafka 的消息的持久性。允许的值为:
  • - 0,生产者不会等待经纪人的确认
  • - 1,生产者将只等待分区领导者写入消息,而不等待所有追随者
  • - all,生产者将等待所有同步副本确认消息。这是以延迟为代价的,并且代表了最强大的可用保证。
    使用acks=all有一些细微差别需要澄清同步副本。在 Kafka 方面,两个设置和当前状态会影响行为:
  • - 主题复制因子topic replication factor
  • - min.insync.replicas设置
  • - 当前同步副本的数量,包括领导者本身。

min.insync.replicas指定acks=all请求的同步副本的最小阈值。如果这个要求不能满足,Broker 将拒绝生产者的请求,甚至不尝试写入并等待acks。

在瞬时故障期间,同步复制可能低于复制的总数,但只要它大于或等于min.insync.replicas--acks=all的请求就会成功。

用户可以通过重新发送失败的请求来缓解瞬时故障并提高持久性。这可以通过retries(默认MAX_INT)和delivery.timeout.ms(默认120000)设置来实现。重试可能会导致重复的消息和消息顺序的改变。
这些副作用可以通过设置enable.idempotence=true来缓解,但它的代价是降低吞吐量。

分区器
主题中的消息被组织成分区。用户可以通过消息键或可插拔的ProducerPartitioner实现来控制分区分配。分区器可以使用partitioner.class配置来设置,它应该是一个实现org.apache.kafka.client.producer.Partitioner接口的完全合格的类名。

Kafka提供了三种开箱即用的实现。DefaultPartitioner、RoundRobinPartitioner和UniformStickyPartitioner。

  1. DefaultPartitioner--如果消息的键是空的--使用当前的分区,并在下一个批次中改变。对于非空键,它使用公式计算:Murmur2hash(key)% 主题分区总数。
  2. RoundRobinPartitioner - 忽略消息键,以轮流方式在所有活动分区中平均分配消息。如果分区有一个指定的经纪人作为领导者,则被认为是活跃的。
  3. UniformStickyPartitioner - 忽略消息键,使用当前分区,并在下一批中改变分区。


文章配图等详细点击标题

猜你喜欢