Kafak消费者的使用

该文详细介绍了Kafka消费者原理和使用策略,如果我们将消费者的偏移量使用JDBC事务或JTA事务保存起来,就能实现分布式端到端的事务,也就是通常所说的分布式事务。

消费者是否活着

每个消费者作为消费者小组(consumer group)的一部分,都从已订阅的主题中被分配到一个分区partitions的子集。在这些分区上有一个组锁。只要这个锁被锁定,组中的其他成员就不能从该成员处再读取消息。当你的消费者健康运行时,这种设置就正是你想要的。这也是避免重复消费的唯一方法。但是如果消费者因服务器机器或应用程序故障而死亡,则需要释放该锁,以便将分区分配给其他健康成员。

卡夫卡的组协调协议(group coordination protocol)使用心跳机制解决了这个问题。在每次重新平衡之后,最新的所有成员开始定期向组协调员发送心跳。只要协调器继续接收到心跳,它就假定该成员是健康的。在每次接收到心跳时,协调器启动(或重设)定时器。如果定时器到期时还没有收到心跳,协调器会将该成员标记为死机,并向组中的其余部分发信号通知他们应重新加入,以便可以重新分配分区。定时器的持续时间称为会话超时,并在客户端上配置了session.timeout.ms 。

props.put(“session.timeout.ms”,“60000”);

会话超时确保如果服务器机器或应用程序崩溃或者网络分区将消费者与协调器隔离,则锁将被释放。然而,应用程序的故障通常会更加棘手。因为消费者仍然向协调器发送心跳并不一定意味着应用程序是健康的。

消费者的轮询循环旨在处理这个问题。当您调用消费者的poll方法 或其他阻塞API之一时,所有网络IO都在前台完成。消费者不使用任何后台线程。这意味着,当你调用poll方法,心跳才会发送。如果您的应用程序停止轮询(无论是因为处理代码已抛出异常或下游系统已崩溃),则不会发送心跳,会话超时将过期,并且该组将重新平衡。

唯一的问题是如果消费者花费超过会话超时来处理消息,则可能会触发虚假的重新平衡。因此,您应该将会话超时设置为足够大。默认值为30秒,但将其设置为高达数分钟并不是不合理的。较大会话超时的唯一缺点是协调器需要更长时间来检测真正的消费者崩溃。

传递语义

当消费组首次创建时,根据auto.offset.reset 配置设置定义的策略设置初始偏移量。一旦消费者开始处理,它会根据应用的需要定期进行偏移。在每次后续的重新分配平衡之后,该位置将被设置为该组中该分区的最后提交确认的偏移量。如果消费者在提交已成功处理消息的偏移量之前崩溃,则另一个消费者将会重复这个工作。因此,更频繁地提交偏移量,在崩溃中看到的这种重复次数就越少。

当将enable.auto.commit 设置为true(这是默认值)时,则认为自动提交策略已启用。,消费者会根据“auto.commit.interval.ms”配置的间隔周期性自动触发偏移提交。通过减少提交间隔时间,您可以限制消费者在发生崩溃时必须重新处理的次数。

如果要自己手动提交,就需要直接使用消费者提交API,首先需要通过在用户配置中将enable.auto.commit 设置为false来禁用自动提交。

props.put(“enable.auto.commit”,“false”);

消费提交API本身很简单,但最重要的一点是如何将其集成到轮询循环中。因此,以下示例包括以粗体显示提交详细信息的完整轮询循环。手动处理提交的最简单方法是使用同步提交API:


try {
while (running) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records)
System.out.println(record.offset() + ": " + record.value());

try {
consumer.commitSync();
} catch (CommitFailedException e) {
// application specific failure handling
}
}
} finally {
consumer.close();
}

使用不带任何参数的commitSync方法,可以提交最后一次调用poll方法时返回的偏移量。此调用将无限期堵塞,直到提交成功或失败。您需要担心的主要错误是发生在当消息处理所花费的时间超过会话超时,当这种情况发生时,协调器将消费者从组中踢出,从而导致CommitFailedException 抛出。您的应用程序应该来处理此错误,通过尝试回滚自上次成功提交的偏移以后消费使用消息所引起的任何更改。

通常,您应确保仅在消息成功处理完毕后才会提交偏移量。如果消息在发送提交之前崩溃,那么消息将不得不被再次处理。如果提交策略保证最后一次提交的偏移量永远不会超出当前的位置,那么您具有“至少一次”传递语义,但是会有重复消息处理。


如果更改提交策略,让当前位置不会超过最后一次提交的偏移量,如上图所示,您将获得“最多一次”传递。如果消费者的当前位置时赶上最后一次提交的偏移量之前崩溃了,则该没赶上的这段差距中的所有消息都将“丢失”,但是您可以确保不会有多于一次处理消息,也就是没有重复消息处理。为了实现这个策略,我们只需要改变提交和消息处理的顺序。


try {
while (running) {
ConsumerRecords<String, String> records = consumer.poll(1000);

try {
consumer.commitSync();
for (ConsumerRecord<String, String> record : records)
System.out.println(record.offset() + ": " + record.value());
} catch (CommitFailedException e) {
// application specific failure handling
}
}
} finally {
consumer.close();
}

请注意,使用自动提交可以给您“至少一次”处理,因为消费者保证仅对已经返回给应用程序的消息进行偏移。您在最坏情况下可能需要重新处理的消息数量受应用程序在提交间隔期间(由auto.commit.interval.ms 配置)处理的消息数量的限制。

但是,通过使用commit API,您可以更好地控制您愿意接受多少重复处理。在最极端的情况下,您可以在处理每个消息后提交偏移量,如以下示例所示:


try {
while (running) {
ConsumerRecords<String, String> records = consumer.poll(1000);

try {
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.offset() + ": " + record.value());
consumer.commitSync(Collections.singletonMap(record.partition(), new OffsetAndMetadata(record.offset() + 1)));
}
} catch (CommitFailedException e) {
// application specific failure handling
}
}
} finally {
consumer.close();
}

在这个例子中,我们在调用commitSync时传递了我们要提交的显式偏移量。注意这个偏移量应该始终是应用程序读取的下一个消息的偏移量。当没有参数调用commitSync时,消费者提交返回给应用程序的最后一个偏移量(加上1),但是我们这里代码不能这样使用,因为它将提交的位置超出我们的实际进度。

显然,在每个消息之后提交可能不是大多数用例的好主意,因为处理线程必须堵塞直至从Kafka服务器返回的每个提交请求。这会降低吞吐量。一个更合理的方法可能是在每N个消息之后提交,N可以被调整以获得更好的性能。

此示例中commitSync 的参数是一个Map参数,从主题分区到OffsetAndMetadata 实例的映射。这个提交API允许您在每个提交中包含一些其他元数据。可以用于记录提交的时间,提交到的主机或应用程序所需的任何信息。在这个例子中,我们把它留空了。

相比每条消息接受后立即,一个更合理策略是处理完每个分区的消息后再提交。ConsumerRecords 集合提供访问该组包含的分区信息以及每个分区的消息。下面的例子说明了这个策略。


try {
while (running) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords)
System.out.println(record.offset() + ": " + record.value());

long lastoffset = partitionRecords.get(partitionRecords.size() - 1).offset();
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastoffset + 1)));
}
}
} finally {
consumer.close();
}

异步提交
上面示例主要集中在同步提交API上,但消费者还可使用异步APIcommitAsync 。使用异步提交通常会提供更高的吞吐量,因为应用程序可以在提交返回之前开始处理下一批消息。权衡之处在于,您可能稍后才会发现提交失败了。下面的例子显示了基本用法:


try {
while (running) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records)
System.out.println(record.offset() + ": " + record.value());

consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,
Exception exception) {
if (exception != null) {
// application specific failure handling
}
}
});
}
} finally {
consumer.close();
}

请注意,我们已经为commitAsync 提供了一个回调,当提交完成(成功或不成功)时,它由消费者调用。如果你不需要这个,你也可以调用commitAsync 没有参数。

原文



[该贴被admin于2017-07-07 16:26修改过]