使用Spring WebFlux和Reactive Kafka实现后端服务器发送事件

22-10-06 banq

以非阻塞方式从基于 Kafka 的响应式 WebFlux REST 服务器到 Webflux 客户端的数据流。
以下设计的架构可用于:
  • 近乎实时地将数据推送到外部或内部应用程序。
  • 将数据推送到文件上并将它们安全地复制到云服务。
  • 将相同的数据从 Kafka 主题推送到多个客户端。


在我们执行示例应用程序来演示使用 Spring WebFlux 和 Reactive Kafka 的服务器发送事件 (SSE) 之前,让我们了解基本概念:

什么是服务器发送事件?
服务器发送事件 (SSE) 是一种服务器推送技术,它允许客户端通过 HTTP 连接接收自动服务器更新。

SSE 可用于:

通过维护单个连接并保持连续的事件流通过它来替换长轮询(为每次拉取创建一个新连接)。
启用使用单向数据通信的应用程序
(例如:电子商务网站、实时股票价格更新)。

什么是 Spring WebFlux?
Spring WebFlux框架是一个完全异步且非阻塞的响应式 Web 堆栈,可以处理大量并发连接。WebFlux 支持Reactive Streams背压,运行在 Netty 等服务器上。它使我们能够垂直扩展服务以处理相同硬件上的更大负载。

什么是Reactive Kafka反应式卡夫卡?
Reactive Kafka是基于项目 Reactor 和 Kafka 生产者/消费者 API 的 Kafka 反应式 API。它允许使用具有非阻塞背压和低开销的功能性 API 从 Kafka 发布和消费数据,这允许反应式 Kafka 与其他反应器系统集成并提供端到端反应式管道。

注意:要彻底掌握 Webflux 和 Reactive Kafka,请务必了解术语。

我们将使用 Spring WebFlux 框架和反应式 Kafka 构建 WebFlux 服务器,为客户端公开一个 REST API 以发出安全的 HTTP 请求。
一旦在客户端和 Web Flux 服务器之间建立了安全连接,它就会使用来自 Kafka 主题的消息并异步推送数据,除非需要,否则不会关闭与客户端的连接。
我们将利用反应器存储库中现有的生产者示例,而不是构建反应式 Kafka 生产者。此外,我们将通过在终端上使用 curl 命令来测试服务器的 SSE 响应,而不是构建 Web Flux 客户端。

先决条件:

  • Java v1.8+
  • Apache Kafka + 基本理解
  • Intellij 或 Eclipse 或 Sprint 工具套件
  • Kafka Conduktor tool


依赖:
 dependencies


1、让我们创建一个Kafka Receiver配置,它是一个消费者:Kafka Receiver Configuration,

它的配置是通用的GROUP_ID_CONFIG,因为我们现在正在处理一个单一的客户端,启用自动提交和总是阅读最早的消息,但我们也可以将其更新为最新的。如果我们启用多个客户端,每个客户端可以根据最后提交的偏移量接收来自同一主题的消息。为了保持简单,我们将处理字符串反序列化器,它可以扩展到通用的JSON/AVRO模式。

2、配置准备好后,我们将创建一个REST控制器:REST controller
从Kafka主题中消耗消息,并将响应作为数据的通量送回来。使用MediaType.TEXT_EVENT_STREAM_VALUE作为内容类型。这告诉客户端,将建立一个连接,并且流是开放的,可以从服务器向客户端发送事件。

3、测试,在我们测试这个应用程序之前,这里有一个我们从reactor资源库中利用的StringSerializer的样本producer生产者。

现在,在localhost:9092启动Kafka服务器,并创建一个主题,如上述配置所示。
下面的命令会有帮助:

$ bin/zookeeper-server-start.sh config/zookeeper.properties
$ bin/kafka-server-start.sh config/server.properties
$ bin/kafka-topics.sh — create — bootstrap-server localhost:9092 — replication-factor 1 — partitions 1 — topic <topic_name>
$ bin/kafka-topics.sh — list — bootstrap-server localhost:9092



Kafka Conduktor tool显示了用分区1创建的主题,并且没有从Kafka发送器产生任何消息(Count=0)。

接下来,让我们在localhost:8080上运行Spring-boot应用程序,并使用终端上的curl命令向该服务器发出一个样本请求,这样可以保持连接的一直活着:

curl — location — request GET 'localhost:8080/sse' \
 — header 'Content-Type: text/event-stream;charset=UTF-8' \
 — header 'Accept: text/event-stream;charset=UTF-8'


一旦curl命令在终端上被执行,一个Kafka接收器就被注册了(如Kafka Conduktor的控制台所示)。

现在,让我们通过运行Kafka Sender推送一些数据到Kafka主题上,看看终端是如何接收数据的,终端在这里充当客户端。

一旦消息被发布到主题上,数据就会被推送到终端上--这就是服务器发送事件(SSE)。
用Conduktor工具跟踪消费者,可显示正在被各自的客户端消费的消息。

我们可以看到,有40条消息被发布,所有的消息都被消耗了(Lag=0,End-Current=40-40),并被发送到客户端。

总结
这个项目可以通过使GROUP_ID_CONFIG唯一化和为每个客户设置最新的偏移量来扩展支持多个客户,这通过保持连接活力和异步流式数据为每个客户创建一个新的消费组。如果任何客户端失去了与服务器的连接,并且能够在中断后重新建立安全连接,那么该客户端将重新加入现有的消费者组,并从之前承诺的偏移量接收数据。

这种架构也可以用来创建一个批处理调度器,以消费和传输消息到文件,并将它们安全地复制到云服务上,允许客户端访问文件进行进一步处理。

源码:Refer code here.

反应式模型有一些缺陷需要考虑。

  • 反应式很难调试。如果您没有正确构建管道,则发生错误时,它可能会默默地丢失。
  • 非反应式代码和反应式代码就像油和水。这需要像 R2DBC 这样的疯狂的东西。任何阻塞代码都很难正确集成到反应式管道中。(想想,像http客户端这样的东西)。反应式实际上要求一切都是端到端的反应式。
  • 从性能的角度来看,反应式是很难分析的。这是因为方法实际花费的时间和它实际被调用的地方可能相距甚远,而且不明显。检测诸如 "这实际上是一个反应式n2 "这样的事情是非常困难的。
  • 认知复杂性、嵌套代码难以阅读、难以调试、划分生态系统工作量。如果 Loom 成功,希望看到 Spring 团队摆脱 Webflux。Reactor 应该用作 Spring MVC 中的可选库来处理可组合性和背压。

1