在Spring中优雅关闭Pulsar消息消费者?


这个github创建的示例应用程序以演示如何使用 Spring Boot 在 Java 中正确实现 Apache Pulsar 队列消费者的正常关闭。

队列消费者实施强大的优雅关闭策略:

  • 我们是立即停止处理飞行中的队列消息,还是等待它们完成?
  • 我们是否停止接受新的队列消息?
  • 我们该如何处理本地排队的消息?

想象一下,您的应用程序是一组汽车(容器的部署)愉快地行驶——灯是绿色的。现在,您需要停止应用程序,以便部署新版本。你可以告诉应用程序它需要立即停止(立即红灯)——但是,就像汽车接近十字路口一样,这可能会导致事故。
那么我们应该怎么做才能让汽车有机会安全通过十字路口呢?
因此我们有黄灯——给汽车一个宽限期,让他们有时间在到达十字路口之前放慢速度停下来。

对于我们的应用程序容器/服务器,我们希望做同样的事情——如果它们当前正在工作(在写入数据库的过程中,发出出站 API 请求等),无论是处理 API 请求还是消费队列消息,通常希望允许此正在进行的工作完成,但停止承担新的工作。

不过,要做到这一点,我们需要一种将两种不同的信号发送到计算机进程的方法,它们相当于一个黄灯和一个红灯。值得庆幸的是,这些信号存在并且是标准化的:
标准信号是:

  • SIGTERM(黄灯)——指示应用程序启动正常关机
  • SIGINT / SIGKILL(红灯)——强制中断/终止进程

在容器领域,像Kubernetes这样的协调器利用这些信号作为推出新的应用程序部署的一部分。

在本文中,我们将介绍在 Spring Boot 环境中使用 Java 客户端时如何为Apache Pulsar处理这个问题——但这里讨论的原则和模式适用于任何框架中的任何队列使用者。
如何在 Spring Boot 中优雅地关闭 Pulsar 消费者
我创建了一个示例应用程序来帮助演示我将要介绍的原则,因此,如果您想继续学习,请查看此 repo并按照自述文件中的步骤执行以下操作:

  1. 使用 Docker Compose 启动本地 Pulsar 集群
  2. 以生产者模式运行应用程序,以向 Pulsar 主题生成消息
  3. 以消费者模式运行应用程序,以使用来自主题的消息

在新的应用程序pod启动和运行后(以及在负载均衡器开始将请求路由到新的pod后),Kubernetes开始清理旧的pod的过程:
首先,一个SIGTERM被发送到所有的旧pod(黄灯信号),这就启动了 "终止宽限期"。你可以在你的笔记本电脑上通过点击Ctrl + C(如果通过命令行运行你的应用程序)来模拟这种行为,或者通过停止你的应用程序,如果你使用的是IntelliJ这样的IDE。

关键是要理解这个信号实际上并没有做任何事情--它是由应用程序来处理SIGTERM信号的。

许多Web框架对SIGTERM都有一些内置的处理方法,它们会用503拒绝任何新进入的Web请求,但允许飞行中的请求完成。在Java中,Spring Boot 2.3+会通过在application.properties文件中设置server.shutdown=graceful属性(默认为立即)来自动做到这一点,Quarkus(quarkus.shutdown.timeout)和其他框架中也有类似的属性。

然而,如果你的应用程序正在做非Web工作,比如消费队列消息。

详细点击标题