在虚拟线程中处理 Kafka 记录
Quarkus 中的虚拟线程支持不仅限于 REST 和 HTTP。 许多其他部分支持虚拟线程,例如 gRPC、计划任务和消息传递。 在这篇文章中,我们将了解如何在虚拟线程上处理 Kafka 记录,从而提高处理的并发性。
Quarkus Reactive Messaging 扩展支持虚拟线程。 与HTTP类似,要在虚拟线程上执行处理,需要使用@RunOnVirtualThread注解:
@Incoming("input-channel") |
每条消息的处理都在单独的虚拟线程上运行。 因此,对于来自 input-channel 的每条消息,都会创建一个新的虚拟线程
此执行模型可与任何 Quarkus 反应式消息连接器一起使用,包括 AMQP 1.0、Apache Pulsar 和 Apache Kafka。 此处理的并发性不再像 @Blocking 注释那样受到工作线程数量的限制。 因此,这种新颖的执行模型简化了高并发数据流应用程序的开发。
在虚拟线程上处理 Kafka 记录
为了说明如何在虚拟线程上处理 Kafka 记录,让我们考虑一个简单的应用程序。 该应用程序是一个虚假欺诈检测器。 它分析银行交易,如果给定账户在给定时间内的交易金额达到阈值,我们就认为存在欺诈。 该代码可在此 GitHub 存储库中获取。
当然,你可以使用更复杂的检测算法,甚至使用AI/ML。 在这种情况下,我们低效地使用了 Redis 时间序列 命令,引入了不必要的 I/O。 这是有目的地利用虚拟线程的阻塞能力:
@Incoming("tx") |
如果您运行此应用程序并有大量事务,它将无法工作。 处理在虚拟线程上正确执行。 但是,Redis 连接池尚未进行调整以处理该并发级别。 很快,没有可用的 Redis 连接,它开始将命令排入等待列表。 当该队列已满时,它开始拒绝命令。 幸运的是,您可以使用以下命令配置等待队列的最大大小:
# Increase Redis pool size (and waiting queue size) as we will have a lot of concurrency |
当我们在此应用程序中使用 Redis 时,您将在许多其他客户端(包括 HTTP 客户端)中面临相同的问题。 因此,请正确配置它们以处理这种新级别的并发性。
如果运行应用程序并打开 UI,您将看到并发数达到了预期的最大值 1024。
总结
这篇文章解释了如何在虚拟线程上执行消息处理。 虽然该示例使用 Kafka,但您可以对 Quarkus 提供的其他消息连接器使用相同的方法。 不要忘记这种应用程序:
- 需要调优连接池,并发度比之前高很多
- 如果您的处理是 CPU 密集型的,可能会导致垄断