使用 Vert.x 处理 Kafka 和数据库之间的背压


异步编程在开发反应式和响应式应用程序方面带来了许多优点。然而,它也存在缺点和挑战,其中主要的问题之一是背压问题。


什么是背压?
在物理学中定义是:
它是与管道中所需的流体流动相反的阻力或力

我们可以把这个问题转化为一个已知的场景:
从总线上轮询的消息并将其保存到数据库中,但是在总线上有大量的消息,我们的应用程序轮询的速度非常快,但保存消息到的数据库却非常慢。

在同步情况下,没有背压的问题,计算的同步性阻止了来自总线的轮询,直到当前消息被处理。

但是,在异步情况下,轮询的执行不是与数据库保存发生在同一线程。因此,如果数据库不能处理来自总线的所有消息,这些消息就会滞留在 "中间",也就是在我们服务的内存中。

这可能会导致失败,或者在最坏的情况下,导致服务故障。

自动轮询
起初,我们的轮询会做这些操作:

  • 初始化JDBC客户端
  • 初始化Kafka客户端
  • 订阅主题
  • 持久记录

这段代码非常简单,而且在处理少量消息时效果很好。
当负载越来越大时,问题就出现了:使用Vertx Kafka消费者的处理程序意味着没有对消息比例的控制,所以它会连续轮询而不考虑持久化率,导致内存过载。

让我们尝试开发一个将消息持久化在数据库中的应用程序,并使其进化到处理背压的程度:

public class MainVerticle extends AbstractVerticle {

  @Override
  public void start(Promise<Void> startPromise) throws Exception {
    JDBCClient jdbc = JDBCClient.create(vertx, datasourceConfiguration());
    KafkaConsumer
      .<String, String>create(vertx, kafkaConsumerConfiguration())
      .subscribe("topic.name", startPromise)
      .handler(record -> {
        persist(jdbc, record)
          .onSuccess(result -> System.out.println(
"Message persisted"))
          .onFailure(cause -> System.err.println(
"Message not persisted " + cause));
      });
  }

  private Map<String, String> kafkaConsumerConfiguration() {
    final Map<String, String> config = new HashMap<>();
    config.put(BOOTSTRAP_SERVERS_CONFIG,
"kafkahost:9092");
    config.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    config.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    return config;
  }


  private Future<UpdateResult> persist(JDBCClient jdbc, KafkaConsumerRecord<String, String> record) {
    Promise<UpdateResult> promise = Promise.promise();
    JsonArray params = toParams(record);
    jdbc.updateWithParams(
"insert or update query to persist record", params, promise);
    return promise.future();
  }

  private JsonObject datasourceConfiguration() {
   
// TODO datasource configuration
    return null;
  }

  private JsonArray toParams(KafkaConsumerRecord<String, String> record) {
   
// TODO: convert the record into params for the sql command
    return null;
  }
}

显式轮询
为了处理背压,应使用显式轮询,这可以通过避开kafka消费者的处理程序设置和手动调用轮询(在下面的例子中,每100ms)来实现。
通过使用这种方法,可以使每次轮询只有在先前轮询的消息批次被持久化时才被执行。

这种行为可以通过处理每个消息的持久化未来,并用CompositeFuture.all收集所有的消息来实现,只有当所有的消息都完成时才会成功,而且只有在这种情况下才能进行下一次轮询。

如果至少有一个未来失败了,那么一切都会失败,轮询也会停止。

有各种解决方案可以使服务处理失败,例如,将消息发送到死信队列,但我们不会涉及这种情况。

这段代码的问题是,如果一个消息失败了,我们将失去这个消息,因为消费者被设置为自动提交,所以,是vertx提交了主题偏移。

public class MainVerticle extends AbstractVerticle {

  @Override
  public void start(Promise<Void> startPromise) throws Exception {
    JDBCClient jdbc = JDBCClient.create(vertx, datasourceConfiguration());
    KafkaConsumer<String, String> consumer = KafkaConsumer
      .<String, String>create(vertx, kafkaConsumerConfiguration())
      .subscribe("topic.name", startPromise);

    poll(jdbc, consumer);
  }

  private void poll(JDBCClient jdbc, KafkaConsumer<String, String> consumer) {
    Promise<KafkaConsumerRecords<String, String>> pollPromise = Promise.promise();
    consumer.poll(100, pollPromise);

    pollPromise.future()
      .compose(records -> {
        List<Future<UpdateResult>> futures = IntStream.range(0, records.size())
          .mapToObj(records::recordAt)
          .map(record -> persist(jdbc, record))
          .collect(toList());

        return CompositeFuture.all(new ArrayList<>(futures));
      })
      .onSuccess(composite -> {
        System.out.println(
"All messages persisted");
        poll(jdbc, consumer);
      })
      .onFailure(cause -> System.err.println(
"Error persisting messages: " + cause))
    ;
  }

  private Map<String, String> kafkaConsumerConfiguration() {
    final Map<String, String> config = new HashMap<>();
    config.put(BOOTSTRAP_SERVERS_CONFIG,
"kafkahost:9092");
    config.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    config.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    return config;
  }

  ...
}

手动提交
将ENABLE_AUTO_COMMIT_CONFIG属性设置为false,我们的服务就会掌管对主题偏移提交的所有权。

只有当每条消息都将被持久化时,才会进行提交,通过这种技巧,至少实现了一次交付。

public class MainVerticle extends AbstractVerticle {

  @Override
  public void start(Promise<Void> startPromise) throws Exception {
    JDBCClient jdbc = JDBCClient.create(vertx, datasourceConfiguration());
    KafkaConsumer<String, String> consumer = KafkaConsumer
      .<String, String>create(vertx, kafkaConsumerConfiguration())
      .subscribe("topic.name", startPromise);

    poll(jdbc, consumer);
  }

  private void poll(JDBCClient jdbc, KafkaConsumer<String, String> consumer) {
    Promise<KafkaConsumerRecords<String, String>> pollPromise = Promise.promise();
    consumer.poll(100, pollPromise);

    pollPromise.future()
      .compose(records -> {
        List<Future<UpdateResult>> futures = IntStream.range(0, records.size())
          .mapToObj(records::recordAt)
          .map(record -> persist(jdbc, record))
          .collect(toList());

        return CompositeFuture.all(new ArrayList<>(futures));
      })
      .compose(composite -> {
        Promise<Void> commitPromise = Promise.promise();
        consumer.commit(commitPromise);
        return commitPromise.future();
      })
      .onSuccess(any -> {
        System.out.println(
"All messages persisted and committed");
        poll(jdbc, consumer);
      })
      .onFailure(cause -> System.err.println(
"Error persisting and committing messages: " + cause))
    ;
  }

  private Map<String, String> kafkaConsumerConfiguration() {
    final Map<String, String> config = new HashMap<>();
    config.put(BOOTSTRAP_SERVERS_CONFIG,
"kafkahost:9092");
    config.put(ENABLE_AUTO_COMMIT_CONFIG,
"false");
    config.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    config.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    return config;
  }
  ...
}

奖励功能:实现排序
只要稍加努力,就有可能实现排序:
future组合允许强迫每个持久化操作等待其前一个操作的完成。

这可以通过将异步计算一个接一个地串联起来来实现,所以每一个都会在前一个未来成功时被执行。

这是一个聪明的模式,在需要序列化的时候使用。

public class MainVerticle extends AbstractVerticle {
  ...
  private void poll(JDBCClient jdbc, KafkaConsumer<String, String> consumer) {
    Promise<KafkaConsumerRecords<String, String>> pollPromise = Promise.promise();
    consumer.poll(100, pollPromise);

    pollPromise.future()
      .compose(records -> IntStream.range(0, records.size())
        .mapToObj(records::recordAt)
        .reduce(Future.<UpdateResult>succeededFuture(),
          (acc, record) -> acc.compose(it -> persist(jdbc, record)),
          (a,b) -> a
        )
      )
      .compose(composite -> {
        Promise<Void> commitPromise = Promise.promise();
        consumer.commit(commitPromise);
        return commitPromise.future();
      })
      .onSuccess(any -> {
        System.out.println("All messages persisted and committed");
        poll(jdbc, consumer);
      })
      .onFailure(cause -> System.err.println(
"Error persisting and committing messages: " + cause));
  }
  ...
}

结论
在使用异步编程时,背压是一个基本话题。
它不是从vert.x的免费功能,但它可以通过一些简单的技巧来实现。