在虚拟线程中处理 Kafka 记录

Quarkus 中的虚拟线程支持不仅限于 REST 和 HTTP。 许多其他部分支持虚拟线程,例如 gRPC、计划任务和消息传递。 在这篇文章中,我们将了解如何在虚拟线程上处理 Kafka 记录,从而提高处理的并发性。

Quarkus Reactive Messaging 扩展支持虚拟线程。 与HTTP类似,要在虚拟线程上执行处理,需要使用@RunOnVirtualThread注解:


@Incoming("input-channel")
@Outgoing(
"output-channel")
@RunOnVirtualThread
public Fraud detect(Transaction tx) {
   
// Run on a virtual thread
}

每条消息的处理都在单独的虚拟线程上运行。 因此,对于来自 input-channel 的每条消息,都会创建一个新的虚拟线程

此执行模型可与任何 Quarkus 反应式消息连接器一起使用,包括 AMQP 1.0、Apache Pulsar 和 Apache Kafka。 此处理的并发性不再像 @Blocking 注释那样受到工作线程数量的限制。 因此,这种新颖的执行模型简化了高并发数据流应用程序的开发。

在虚拟线程上处理 Kafka 记录
为了说明如何在虚拟线程上处理 Kafka 记录,让我们考虑一个简单的应用程序。 该应用程序是一个虚假欺诈检测器。 它分析银行交易,如果给定账户在给定时间内的交易金额达到阈值,我们就认为存在欺诈。 该代码可在此 GitHub 存储库中获取。

当然,你可以使用更复杂的检测算法,甚至使用AI/ML。 在这种情况下,我们低效地使用了 Redis 时间序列 命令,引入了不必要的 I/O。 这是有目的地利用虚拟线程的阻塞能力:

@Incoming("tx")
@Outgoing(
"frauds")
@RunOnVirtualThread
public Fraud detect(Transaction tx) {
    String key =
"account:transactions:" + tx.account;

   
// Add sample
    long timestamp = tx.date.toInstant(ZoneOffset.UTC).toEpochMilli();
    timeseries.tsAdd(key, timestamp, tx.amount, new AddArgs()
        .onDuplicate(DuplicatePolicy.SUM));

   
// Retrieve the last sum.
    var range = timeseries.tsRevRange(key, TimeSeriesRange.fromTimeSeries(),
           
// 1 min for demo purpose.
            new RangeArgs().aggregation(Aggregation.SUM, Duration.ofMinutes(1))
                    .count(1));

    if (!range.isEmpty()) {
       
// Analysis
        var sum = range.get(0).value;
        if (sum > 10_000) {
            Log.warnf(
"Fraud detected for account %s: %.2f", tx.account, sum);
            return new Fraud(tx.account, sum);
        }
    }
    return null;
}

如果您运行此应用程序并有大量事务,它将无法工作。 处理在虚拟线程上正确执行。 但是,Redis 连接池尚未进行调整以处理该并发级别。 很快,没有可用的 Redis 连接,它开始将命令排入等待列表。 当该队列已满时,它开始拒绝命令。 幸运的是,您可以使用以下命令配置等待队列的最大大小:

# Increase Redis pool size (and waiting queue size) as we will have a lot of concurrency
quarkus.redis.max-pool-size=100 # Number of connection in the pool
quarkus.redis.max-pool-waiting=10000 # Waiting queue max size

当我们在此应用程序中使用 Redis 时,您将在许多其他客户端(包括 HTTP 客户端)中面临相同的问题。 因此,请正确配置它们以处理这种新级别的并发性。
如果运行应用程序并打开 UI,您将看到并发数达到了预期的最大值 1024。

总结
这篇文章解释了如何在虚拟线程上执行消息处理。 虽然该示例使用 Kafka,但您可以对 Quarkus 提供的其他消息连接器使用相同的方法。 不要忘记这种应用程序:

  • 需要调优连接池,并发度比之前高很多
  • 如果您的处理是 CPU 密集型的,可能会导致垄断