使用Kafka 和 Spring Boot 实现并发编程


本文将教您如何使用 Spring Boot 和 Spring for Kafka 为 Kafka 消费者配置并发。Spring for Kafka 的并发与Kafka 分区和消费者组密切相关。消费者组中的每个消费者都可以从多个分区接收消息。组内的消费者使用单个线程,而消费者组使用多个线程来消费消息。虽然每个消费者都是单线程的,但记录的处理可以利用多个线程。我们将分析如何用Spring Boot和Spring for Kafka来实现。

GitHub 存储库。no-transactions-service目录

先决条件
我们将在今天的练习中使用三种不同的工具。当然,我们将使用最新版本的 Spring Boot 3 和 Java 19 创建 Spring Boot 消费者应用程序。为了在本地运行 Kafka,我们将使用 Redpanda——一个与 Kafka API 兼容的平台。您可以使用他们的 CLI 工具轻松启动和管理 Redpanda – rpk. 如果您想在笔记本电脑上安装,请按照此处rpk提供的安装说明进行操作。

最后,我们需要一个负载测试工具。我正在使用k6工具及其扩展来与 Kafka 集成。当然,这只是一个提议,你可以使用任何你喜欢的其他解决方案。k6我能够快速生成大量消息并将其发送到 Kafka 。为了使用 k6,您需要将其安装在笔记本电脑上。这是安装说明。之后,您需要安装xk6-kafka扩展。在以下文档中,您有完整的扩展列表k6。

介绍
出于本练习的目的,我们将创建一个连接到 Kafka 并从单个主题接收消息的简单 Spring Boot 应用程序。从业务逻辑的角度来看,它处理内存数据库中帐户和存储之间的事务。这是我们需要包含在 Maven 中的依赖项列表pom.xml:

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
  <groupId>com.fasterxml.jackson.core</groupId>
  <artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
  <groupId>com.h2database</groupId>
  <artifactId>h2</artifactId>
  <scope>runtime</scope>
</dependency>


让我们看看配置:

spring:
  application.name: no-transactions-service
  kafka:
    bootstrap-servers: ${KAFKA_URL}
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.LongDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        "[spring.json.value.default.type]": "pl.piomin.services.common.model.Order"
        "[spring.json.trusted.packages]": "pl.piomin.services.common.model"
        "[spring.json.use.type.headers]": false
    producer:
      key-serializer: org.apache.kafka.common.serialization.LongSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

  • 我们的应用程序使用KAFKA_URL环境变量中设置的地址连接到Kafka代理。
  • 它希望得到JSON格式的消息。因此,我们需要设置JsonDeserializer作为解串器的值。
  • 传入的消息被序列化为pl.piomin.services.common.model.Order对象。
  • 为了使其工作,我们需要设置spring.json.value.default.type和spring.json.trusted.packages属性。
  • k6工具不会设置一个包含JSON目标类型信息的头,所以我们需要用spring.json.use.type.headers属性在Spring for Kafka上禁用该功能。

这里是代表传入信息的类。

public class Order {

   private Long id;
   private Long sourceAccountId;
   private Long targetAccountId;
   private int amount;
   private String status;

   // GETTERS AND SETTERS...
}

我们需要做的最后一件事是为Kafka启用Spring,并生成一些测试账户来进行交易事务。

@SpringBootApplication
@EnableKafka
public class NoTransactionsService {

   public static void main(String[] args) {
      SpringApplication.run(NoTransactionsService.class, args);
   }

   private static final Logger LOG = LoggerFactory
      .getLogger(NoTransactionsService.class);

   Random r = new Random();

   @Autowired
   AccountRepository repository;

   @PostConstruct
   public void init() {
      for (int i = 0; i < 1000; i++) {
         repository.save(new Account(r.nextInt(1000, 10000)));
      }
   }

}

使用Redpanda运行Kafka
一旦我们成功安装了rpk CLI,我们就可以通过执行以下命令轻松地运行一个单节点的Kafka代理:

$ rpk container start

一旦我们创建了一个broker,我们就可以创建一个主题。
我们将在测试中使用带有交易名称的主题。在第一步中,我们用一个分区做测试。

$ rpk topic create transactions -p 1

为Kafka准备负载测试
我们的负载测试将生成和发送JSON格式的随机值的订单。k6工具允许我们用JavaScript编写测试。我们需要使用k6 Kafka扩展库。Kafka代理的地址是从KAFKA_URL环境变量中获取的。我们每次生成一个新的消息时,都会递增订单的id字段。

import {
  Writer,
  SchemaRegistry,
  SCHEMA_TYPE_JSON,
} from "k6/x/kafka";
import { randomIntBetween } from 'https://jslib.k6.io/k6-utils/1.2.0/index.js';

const writer = new Writer({
  brokers: [`${__ENV.KAFKA_URL}`],
  topic: "transactions",
});

const schemaRegistry = new SchemaRegistry();

export function setup() {
  return { index: 1 };
}

export default function (data) {
  writer.produce({
    messages: [
      {
        value: schemaRegistry.serialize({
          data: {
            id: data.index++,
            sourceAccountId: randomIntBetween(1, 1000),
            targetAccountId: randomIntBetween(1, 1000),
            amount: randomIntBetween(10, 50),
            status: "NEW"
          },
          schemaType: SCHEMA_TYPE_JSON,
        }),
      },
    ],
  });
}

export function teardown(data) {
  writer.close();
}

在运行测试之前,我们需要设置KAFKA_URL环境变量。然后我们可以使用k6运行命令来生成和发送大量的信息。

$ k6 run load-test.js -u 1 -d 30s 

场景一:单分区主题监听器

让我们从默认值开始。我们的主题只有一个分区。我们正在创建@KafkaListener,只用主题和消费者组的名字。一旦监听器收到一个传入的消息,它就会调用AccountService Bean来处理这个订单。

@Inject
AccountService service;
    
@KafkaListener(
   id = "transactions",
   topics = "transactions",
   groupId = "a")
public void listen(Order order) {
   LOG.info("Received: {}", order);
   service.process(order);
}

我们的Spring Boot Kafka应用已经准备好了并发性。我们将在交易期间用PESSIMISTIC_WRITE模式锁定账户实体。

public interface AccountRepository extends CrudRepository<Account, Long> {

    @Lock(LockModeType.PESSIMISTIC_WRITE)
    Optional<Account> findById(Long id);
}


这里是我们的AccountService Bean的实现,用于处理传入的订单。

@Service
public class AccountService {

    private static final Logger LOG = LoggerFactory
            .getLogger(AccountService.class);
    private final Random RAND = new Random();

    KafkaTemplate<Long, Order> kafkaTemplate;
    AccountRepository repository;

    public AccountService(KafkaTemplate<Long, Order> kafkaTemplate, 
                          AccountRepository repository) {
        this.kafkaTemplate = kafkaTemplate;
        this.repository = repository;
    }

    @Transactional
    public void process(Order order) {
        Account accountSource = repository
                .findById(order.getSourceAccountId())
                .orElseThrow(); // (1)

        Account accountTarget = repository
                .findById(order.getTargetAccountId())
                .orElseThrow(); // (2)

        if (accountSource.getBalance() >= order.getAmount()) { // (3)
            accountSource.setBalance(accountSource.getBalance() - order.getAmount());
            repository.save(accountSource);
            accountTarget.setBalance(accountTarget.getBalance() + order.getAmount());
            repository.save(accountTarget);
            order.setStatus("PROCESSED");
        } else {
            order.setStatus("FAILED");
        }

        try {
            Thread.sleep(RAND.nextLong(1, 20)); // (4)
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }

        LOG.info("Processed: order->{}", new OrderDTO(order, accountSource, accountTarget));

        // (5)
        CompletableFuture<SendResult<Long, Order>> result = kafkaTemplate
           .send("orders", order.getId(), order);
        result.whenComplete((sr, ex) ->
                LOG.debug("Sent(key={},partition={}): {}",
                        sr.getProducerRecord().partition(),
                        sr.getProducerRecord().key(),
                        sr.getProducerRecord().value()));
    }

}


process(...)方法是@Transactional。在第一步,我们找到源(1)和目标(2)账户实体。然后,如果源账户(3)上有足够的资金,我们就在账户之间进行转账。我还模拟了一个延迟,只是为了测试目的(4)。最后,我们可以使用KafkaTemplate Bean(5)异步发送一个响应到另一个主题。

我们只有一个运行的应用程序实例和一个负责处理消息的线程。让我们为我们的消费者组验证分区上的当前Lag。

消息的处理速度非常慢。

场景二:多分区主题监听器
现在,transactions主题由 10 个分区组成。我们不会更改应用程序代码和配置中的任何内容。我们将删除之前创建的主题并使用以下命令创建一个包含 10 个分区的新主题:

$ rpk topic delete transactions
$ rpk topic create transaction -p 10

我们再次使用以下 Maven 命令启动应用程序:

$ mvn spring-boot:run

让我们分析应用程序日志:虽然我们有 10 个分区,但仍然有一个线程在监听它们。

场景 3:多个分区的消费者并发

现在,我们要在Kafka监听器层面上启用并发性。为了实现它,我们需要在@KafkaListener注解里面设置并发concurrency 字段。这个参数仍然与Kafka分区有关。所以,设置高于分区数量的值是没有意义的。在我们的案例中,有10个分区--与之前的方案相同。

@KafkaListener(
   id = "transactions",
   topics = "transactions",
   groupId = "a",
   concurrency = "10")
public void listen(Order order) {
   LOG.info("Received: {}", order);
   service.process(order);
}

之后,我们可以启动Spring Boot应用。让我们看看会发生什么。正如你所看到的,我们有10个并发连接--每个都绑定在一个线程上。

但是,如果我们运行负载测试,分区上的延迟仍然很大。

理论上,我们可以通过增加实例数量来提高整体性能。但是,这种方法不会改变任何东西。为什么?让我们运行另一个并查看日志。现在,只有 5 个线程仍然绑定到分区。其他五个线程处于空闲状态。系统的整体性能没有改变。 

场景四:多线程处理
最后一个场景。我们将使用 Java 创建一个线程池ExecutorService。我们仍然可以使用具有 Kafka 消费者并发功能的自定义线程池,如下所示(通过参数concurrency)。每次侦听器收到新消息时,它都会在单独的线程中处理它们。

@Service
public class NoTransactionsListener {

    private static final Logger LOG = LoggerFactory
            .getLogger(NoTransactionsListener.class);

    AccountService service;
    ExecutorService executorService = Executors.newFixedThreadPool(30);

    public NoTransactionsListener(AccountService service) {
        this.service = service;
    }

    @KafkaListener(
            id = "transactions",
            topics = "transactions",
            groupId = "a",
            concurrency = "3")
    public void listen(Order order) {
        LOG.info("Received: {}", order);
        executorService.submit(() -> service.process(order));
    }

}

在那种情况下,应该澄清一件事。使用应用程序级别的自定义线程池,我们将丢失单个分区内的消息排序。之前的模型保证了顺序,因为我们每个分区都有一个线程。对于我们的 Spring Boot 应用来说,这并不重要,因为我们只是在独立处理消息。

让我们启动应用程序。有 3 个并发线程从分区接收消息。

有 30 个线程用于处理消息,3 个线程用于监听分区。一旦在消费者线程中接收到消息,它就由工作线程处理。

现在,我们可以再次运行负载测试。它生成并向我们的 Kafka 代理发送了约 85k 条消息(每秒约 2700 条)。

滞后Lag值看起来真的很好,尽管应用程序不能无延迟地处理所有请求。

这是因为Spring Kafka中提交偏移量的默认ack模式是BATCH。如果我们把它改为RECORD模式,即在监听器处理完记录后返回时提交偏移量,我们应该会得到一个更精确的滞后值。

为了覆盖该选项,我们需要定义ConcurrentKafkaListenerContainerFactory Bean,如下所示。

@Bean
ConcurrentKafkaListenerContainerFactory<Integer, String>
    kafkaListenerContainerFactory(ConsumerFactory<Integer, String> consumerFactory) {
   ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
      new ConcurrentKafkaListenerContainerFactory<>();
   factory.setConsumerFactory(consumerFactory);
   factory.setConcurrency(3);
   factory.getContainerProperties()
      .setAckMode(ContainerProperties.AckMode.RECORD);
   return factory;
}

详细点击标题