正好一次(Exactly-once)消息传递在Kafka中已经完全支持

17-07-04 banq
                   

本文是来自Kafka的创始人Jay Kreps的一篇博文,回答了世面上怀疑Kafka是否支持正好一次(Exactly-once)的消息传递,从而说明了Kafka能支持分布式事务,保证微服务事务的完整性,关键是将偏移量和你要保存的状态通过JDBC事务或者JTA事务保存到数据库,失败恢复时从这个偏移量开始从卡夫卡中重新读取,保证了消息和你的业务状态数据的一致性,当然使用卡夫卡的Stream API则更方便。下面是原文大意翻译:

周四我们发布了一个新版本的Apache Kafka,大大加强了它提供的语义保证。

这个版本意味着几年消息幂等思考的尾声,也就是如何以快速,实用和正确的方式进行可靠的流处理,落地细节工作大约花费一年左右,其中包括长达一百页的详细设计文件:正好一次传递与事务性消息,在卡夫卡社区进行了各种讨论和批评,进行了广泛的性能测试,专门针对此功能添加了还有成千上万条的分布式折磨试验。

对这个版本发布的反应主要是“哇,那太棒了”。不幸的是,我犯了一个经典的错误:我读了评论。有人激动的同时有人声称我们是说谎的骗子。

这里一个质疑:

“它不可能是恰好一次传递...这是不可能的,因为一个非常简单的数学定理。它提升了了人们的怀疑,也许作者可能自己都会感到困惑,更使其他人不信任整个事情。“

“这是反经过数学证明事实的有趣文章。除非您改变本文中没有仔细规定的假设,否则在所有情况下都无法运行,包括系统是什么。“

如果你属于想到这个观点的人群之一,我会要求你实际看看我们真正知道的什么是可能的和什么是不可能的,以及卡夫卡已经如何建成实现的,希望能够得到一个更明智的意见。

所以让我们分两部分。首先,是一次理论上的不可能性?第二,卡夫卡如何可能实现它。

正好一次传递是完全不可能吗?

每个人似乎都非常确定,这是真的,不知道为什么,完全一次传递/语义在数学上是不可能的。然而,尽管这显然是常识,但是你很少看到人们出示相关某种类型的证据,甚至是精确定义了什么是正好一次。他们左顾言它地指向到其他事情,如FLP结果或两个拜占庭将军的问题作为证据,但没有一个是关于正好一次传递的。在分布式系统中,您不能在没有精确描述事情的情况下谈论可能或不可能的事情,

那么有没有正式定义的其他名词概念类似像我们想要谈论的正好一次问题吗?

是的,事实证明,有这样的概念。正好一次可以称为是“原子广播”或“总顺序广播”。这是来自更受欢迎的一本分布式系统教科书的定义。

在我看来,这是人们通过在pub / sub消息传递中正好一次传递的意思:即可以发布消息,并且它们将被一个或多个接收应用程序精确地传送一次。

那么可能解决原子广播吗?

简短的答案是肯定的,除了我拍摄的上面照片之外,您还可以阅读整个分类来比较数十种算法。

但是阅读分布式系统书后,怎么能说服自己这是真的?

事实证明,原子广播相当于consensus共识,所以也许我们可以缩小我们的问题,试图去了解共识是否可能的。这是有帮助的,因为共识可能是分布式系统中研究最多的问题。

共识是否可能?可能你有一种感觉,因为这是众所周知的算法主攻的问题,如Paxos和Raft,并且在现代分布式系统实践中得到广泛实现。但是,如果你想要一个理论结果,你需要具体说明你正在谈论的环境设置和故障模式。

..

共识是现代分布式系统发展的支柱。卡夫卡其中心抽象是分布式一致的日志,实际上是您可以想象成最纯粹的类似于多方共识的模拟。所以如果你不相信共识是可能的话,那么你也不相信卡夫卡是可能的,在这种情况下,你不用担心卡夫卡的正好一次支持的可能性!

好的,那么我们怎么才能建立一个在卡夫卡恰好一次传递的应用程序?

你可能记得卡夫卡有一个日志,如下所示:

请注意,它是一个强有序的记录序列,每个记录都被分配一个顺序的数字偏移,用于标识日志中的记录位置。

“生产者”将记录附加到此日志,零个或多个消费者应用程序从其控制的给定偏移量读取消息。


让我们想象一下这样的应用程序:

生产者想要发布消息,消费者想要读取这些消息,并将它们(或一些从它们派生的数量)放入数据库中。我们该怎么做才能得到正确答案?


您可以看到可能出现如下的两类问题:

1. 如果生产者写入日志后但无法通过网络获得确认,则会出现第一个问题。这将使这个生产者受到约束:这可能是实际上写成功了,或者可能是它从来没有写入到卡夫卡过。我们不知道!如果我们重试,结果是写成功了,我们可以重复一次; 如果我们不能重试,结果还是写不成功,我们则会丢失这个写操作。这实际上类似我们插入到没有主键或自动递增的主键的数据库表中的同样遇到的困境。

2. 第二个问题来自消费者。消费者可以从日志中读取一些消息,并将结果写入其数据库,但在更新其偏移标记其位置之前发生失败故障,当消费者重新启动(可能会自动使用Kafka组管理机制的不同机器)时,可能会导致消息读取重复(因为偏移位置没有更新)。如果应用程序首先更新其存储的偏移量,然后更新数据库,则故障导致的重新启动则会丢失数据库的更新。

我们来谈谈这两个问题。第一个问题是通过幂等来解决的。这允许生产者客户端始终重试直到成功,而不会有重复的可能性(Kafka将透明地检测它们并忽略它们)。

我们不深入讨论第二个问题,我们不深入是因为它已经是一个漫长的博客文章,对那些了解卡夫卡的人,我们坚持一个简短的简要描述。

这是一个更深入的讨论:

为了确保正好一次处理的需求,消费者需要确保其创建的派生状态和指向上游的偏移量保持同步。这里的一个关键是,消费者可以控制其在日志中的偏移量,并可以将其存储在任何需要的地方。

有两种常用的方法在Kafka之上来获得恰好一次的语义:

1.将偏移量存储在与派生状态相同的DB中,并在事务中更新两者。重新启动时,从DB读取当前偏移量,然后从偏移位置开始读取卡夫卡。

2.以幂等的方式将状态更新和偏移量一起写入。例如,如果您的派生状态是一个key和一个跟踪出现次数的计数器,则将偏移量与计数值一起存储,并忽略任何偏移量<=当前存储值的任何更新。

好的,但是你可能会反对:“那是很难的”,我实际上并不认为这是很难的。我的意思是事务不算什么难题,如果你更新多个表也会要使用事务。添加偏移表并将其包含在事务更新中并不是什么火箭科学。

我听到的另一个反对意见是,它不是真的“恰好一次”,而是“有效地一次effectively once”。我不得不同意这个词语更好(虽然不那么常见),但我指出,我们仍在辩论未定义术语的定义!如果我们想要在有关传递方面定义一个可以明确定义的属性,我实际上认为Atomic Broadcast原子广播是一个很好的定义(虽然是一个可怕的名字 - “原子”)。只要我们是非正式地说话,我认为说“正好一次”也是很好的,因为人们对它意味着什么有直观的理解(我猜,如果我们宣布支持Atomic Broadcast原子广播,大概会增加不少混乱)。

我认为更大的讽刺是,人们所期望的真正保障既不是“正好exactly”也不是“有效effectively”,也不与“一次”或“传递”有任何关系;

毕竟,我所描述的解决方案并不是那么复杂,但是您仍然必须考虑应用程序的语义。

我上面提供的例子实际上是将两个不同的问题混合在一起:数据处理以及处理后的结果与一些存储系统的集成。由于这些交织在一起,开发人员必须以一种很难解开的方式将它们合在一起。

改进这一点的想法是将应用程序分解为两个不同的部分:“流处理”部分,其转换一个或多个输入流(潜在地跨记录聚合或加入副数据)和将数据发送到数据存储的连接器(这些可以在相同的应用程序或进程中运行,但它们在逻辑上是不同的)。

连接器需要从卡夫卡到特定数据系统之间数据的事务性或幂等传递方面去求证考量。这需要思考和仔细照顾偏移量管理。 - 如果您有一个JDBC数据库连接器,就能正确处理“正好一次”,它将适用于支持JDBC的任何数据库,应用程序开发人员不必考虑它。

我们已经有了一些连接器,你可以下载它们

困难的部分是对数据流进行通用转换正确。这是事务支持的地方,同时要与卡夫卡的Streams API结合在一起。

对于那些不熟悉的人而言,Kakfa Streams API是生产者和消费者之上的一层,为输入和输出流之间的转换提供了非常通用的API:几乎可以在应用程序中做任何事情,您可以使用此API 。如果您习惯于古典消息传递系统API,那么这个功能是不那么强大。

一个完整的示例流应用程序如下所示:

public static void main(String[] args) throws Exception {
  Properties streamsConfiguration = new Properties();
  streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-lambda-example");
  streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
  streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
  streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
  streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

  final Serde<String> stringSerde = Serdes.String();
  final Serde<Long> longSerde = Serdes.Long();

  KStreamBuilder builder = new KStreamBuilder();
  KStream<String, String> textLines = builder.stream(stringSerde, stringSerde, "TextLinesTopic");
  KStream<String, Long> wordCounts = textLines
        .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
        .map((key, word) -> new KeyValue<>(word, word))
        // Required in Kafka 0.10.0 to re-partition the data because we re-keyed the stream in the `map` step.
        // Upcoming Kafka 0.10.1 does this automatically for you (no need for `through`).
        .through("RekeyedIntermediateTopic")
        .countByKey("Counts")
        .toStream();
  wordCounts.to(stringSerde, longSerde, "WordsWithCountsTopic");

  KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
  streams.start();

  Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}

<p>

上面是使用Kafka的流API构建的流处理应用程序。

这个应用程序是计算分布式的“字数”,这是经典的大数据示例。这个字数是完全实时和连续的(每次创建新文档时,这个数字都会增加)。

请注意,这只是一个普通的,简单的Java应用程序的主要方法。此应用程序与任何其他应用程序相同。它像卡夫卡消费者一样工作,所有运行的实例协调处理传入的数据流。

我们如何确保这个应用程序的正确性?毕竟这一切都有你可以想象的所有复杂的事情:输入,输出,传入消息中的聚合和分布式处理。

卡夫卡的所有流处理都在做三件事情:

1. 阅读输入消息

2. 可能会对其状态进行更新(需要进行容错,如果应用程序实例失败并在其他地方恢复)

3. 可能产生输出消息

关键的要求是确保这三件事总是在一起发生,或者根本就不会发生。我们不能允许状态更新但没有产生输出这样的任何故障情况,反之亦然。

我们如何才能实现这一点?

在过去的几年里,我们非常艰难思考这个问题,并且已经有一段时间了。基础工作是过去几年版本的一些变化,你可能没有注意到:

1. 在0.8.1版本中,Kafka添加了日志压缩,允许将其用作状态更改的可浏览日志和快照。这意味着您可以将任意本地(磁盘或内存)数据结构的一组更新这样操作改为对Kafka的一系列写入。这允许我们使本地计算的状态容错。

2. 从卡夫卡读取数据相当于推动您的偏移。在0.8.2中,我们移动了偏移量的存储机制,使用Kafka本身来存储偏移量。“committing提交”一个偏移量相当于悄悄地对卡夫卡的写入(消费者客户端软件已经为你悄悄实现,因此您可能不知道)。

3.输出到卡夫卡...这一直是写入卡夫卡。

这些实现设置为我们现在刚刚添加的功能都准备好了:将这三个操作透明地包装到单个事务中。这样可以保证读取,处理,状态更新和输出全部发生在一起,或者完全不发生。

这不会很慢吗?许多人认为分布式事务本身会很慢。在这种情况下,我们不需要为每一个输入做一个事务,我们可以将它们一起批处理。批量越大,事务的有效开销越低(事务具有恒定的成本,事务并不考虑事务中的消息数量)。博客文章给出了非常有希望的性能结果。

结果是,如果我的应用程序成为使用流API,我下载并使用一个正好一次的数据库JDBC连接器与输出系统集成,现在我可以获得端到端的正确性,只需一个配置更改。

真的很酷的是,这种功能并不与Java API有关:Java API只是围绕通用网络协议的包装器,用于建模持续、有状态、正确的数据流处理。任何语言都可以使用此协议。我们认为这种方式能正确地通过任意进程将输入和输出topic链接在一起,执行转换和实现协议则是几乎添加了一种非常强大的“闭合”属性。

Exactly-once Support in Apache Kafka – Jay Kreps –

                   

8