异步编程在开发反应式和响应式应用程序方面带来了许多优点。然而,它也存在缺点和挑战,其中主要的问题之一是背压问题。
什么是背压?
在物理学中定义是:
它是与管道中所需的流体流动相反的阻力或力
我们可以把这个问题转化为一个已知的场景:
从总线上轮询的消息并将其保存到数据库中,但是在总线上有大量的消息,我们的应用程序轮询的速度非常快,但保存消息到的数据库却非常慢。
在同步情况下,没有背压的问题,计算的同步性阻止了来自总线的轮询,直到当前消息被处理。
但是,在异步情况下,轮询的执行不是与数据库保存发生在同一线程。因此,如果数据库不能处理来自总线的所有消息,这些消息就会滞留在 "中间",也就是在我们服务的内存中。
这可能会导致失败,或者在最坏的情况下,导致服务故障。
自动轮询
起初,我们的轮询会做这些操作:
- 初始化JDBC客户端
- 初始化Kafka客户端
- 订阅主题
- 持久记录
这段代码非常简单,而且在处理少量消息时效果很好。
当负载越来越大时,问题就出现了:使用Vertx Kafka消费者的处理程序意味着没有对消息比例的控制,所以它会连续轮询而不考虑持久化率,导致内存过载。让我们尝试开发一个将消息持久化在数据库中的应用程序,并使其进化到处理背压的程度:
public class MainVerticle extends AbstractVerticle {
@Override public void start(Promise<Void> startPromise) throws Exception { JDBCClient jdbc = JDBCClient.create(vertx, datasourceConfiguration()); KafkaConsumer .<String, String>create(vertx, kafkaConsumerConfiguration()) .subscribe("topic.name", startPromise) .handler(record -> { persist(jdbc, record) .onSuccess(result -> System.out.println("Message persisted")) .onFailure(cause -> System.err.println("Message not persisted " + cause)); }); }
private Map<String, String> kafkaConsumerConfiguration() { final Map<String, String> config = new HashMap<>(); config.put(BOOTSTRAP_SERVERS_CONFIG, "kafkahost:9092"); config.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); config.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); return config; }
private Future<UpdateResult> persist(JDBCClient jdbc, KafkaConsumerRecord<String, String> record) { Promise<UpdateResult> promise = Promise.promise(); JsonArray params = toParams(record); jdbc.updateWithParams("insert or update query to persist record", params, promise); return promise.future(); }
private JsonObject datasourceConfiguration() { // TODO datasource configuration return null; }
private JsonArray toParams(KafkaConsumerRecord<String, String> record) { // TODO: convert the record into params for the sql command return null; } }
|
显式轮询
为了处理背压,应使用显式轮询,这可以通过避开kafka消费者的处理程序设置和手动调用轮询(在下面的例子中,每100ms)来实现。
通过使用这种方法,可以使每次轮询只有在先前轮询的消息批次被持久化时才被执行。
这种行为可以通过处理每个消息的持久化未来,并用CompositeFuture.all收集所有的消息来实现,只有当所有的消息都完成时才会成功,而且只有在这种情况下才能进行下一次轮询。
如果至少有一个未来失败了,那么一切都会失败,轮询也会停止。
有各种解决方案可以使服务处理失败,例如,将消息发送到死信队列,但我们不会涉及这种情况。
这段代码的问题是,如果一个消息失败了,我们将失去这个消息,因为消费者被设置为自动提交,所以,是vertx提交了主题偏移。
public class MainVerticle extends AbstractVerticle {
@Override public void start(Promise<Void> startPromise) throws Exception { JDBCClient jdbc = JDBCClient.create(vertx, datasourceConfiguration()); KafkaConsumer<String, String> consumer = KafkaConsumer .<String, String>create(vertx, kafkaConsumerConfiguration()) .subscribe("topic.name", startPromise);
poll(jdbc, consumer); }
private void poll(JDBCClient jdbc, KafkaConsumer<String, String> consumer) { Promise<KafkaConsumerRecords<String, String>> pollPromise = Promise.promise(); consumer.poll(100, pollPromise);
pollPromise.future() .compose(records -> { List<Future<UpdateResult>> futures = IntStream.range(0, records.size()) .mapToObj(records::recordAt) .map(record -> persist(jdbc, record)) .collect(toList());
return CompositeFuture.all(new ArrayList<>(futures)); }) .onSuccess(composite -> { System.out.println("All messages persisted"); poll(jdbc, consumer); }) .onFailure(cause -> System.err.println("Error persisting messages: " + cause)) ; }
private Map<String, String> kafkaConsumerConfiguration() { final Map<String, String> config = new HashMap<>(); config.put(BOOTSTRAP_SERVERS_CONFIG, "kafkahost:9092"); config.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); config.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); return config; }
... }
|
手动提交
将ENABLE_AUTO_COMMIT_CONFIG属性设置为false,我们的服务就会掌管对主题偏移提交的所有权。
只有当每条消息都将被持久化时,才会进行提交,通过这种技巧,至少实现了一次交付。
public class MainVerticle extends AbstractVerticle {
@Override public void start(Promise<Void> startPromise) throws Exception { JDBCClient jdbc = JDBCClient.create(vertx, datasourceConfiguration()); KafkaConsumer<String, String> consumer = KafkaConsumer .<String, String>create(vertx, kafkaConsumerConfiguration()) .subscribe("topic.name", startPromise);
poll(jdbc, consumer); }
private void poll(JDBCClient jdbc, KafkaConsumer<String, String> consumer) { Promise<KafkaConsumerRecords<String, String>> pollPromise = Promise.promise(); consumer.poll(100, pollPromise);
pollPromise.future() .compose(records -> { List<Future<UpdateResult>> futures = IntStream.range(0, records.size()) .mapToObj(records::recordAt) .map(record -> persist(jdbc, record)) .collect(toList());
return CompositeFuture.all(new ArrayList<>(futures)); }) .compose(composite -> { Promise<Void> commitPromise = Promise.promise(); consumer.commit(commitPromise); return commitPromise.future(); }) .onSuccess(any -> { System.out.println("All messages persisted and committed"); poll(jdbc, consumer); }) .onFailure(cause -> System.err.println("Error persisting and committing messages: " + cause)) ; }
private Map<String, String> kafkaConsumerConfiguration() { final Map<String, String> config = new HashMap<>(); config.put(BOOTSTRAP_SERVERS_CONFIG, "kafkahost:9092"); config.put(ENABLE_AUTO_COMMIT_CONFIG, "false"); config.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); config.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); return config; } ... }
|
奖励功能:实现排序
只要稍加努力,就有可能实现排序:
future组合允许强迫每个持久化操作等待其前一个操作的完成。
这可以通过将异步计算一个接一个地串联起来来实现,所以每一个都会在前一个未来成功时被执行。
这是一个聪明的模式,在需要序列化的时候使用。
public class MainVerticle extends AbstractVerticle { ... private void poll(JDBCClient jdbc, KafkaConsumer<String, String> consumer) { Promise<KafkaConsumerRecords<String, String>> pollPromise = Promise.promise(); consumer.poll(100, pollPromise);
pollPromise.future() .compose(records -> IntStream.range(0, records.size()) .mapToObj(records::recordAt) .reduce(Future.<UpdateResult>succeededFuture(), (acc, record) -> acc.compose(it -> persist(jdbc, record)), (a,b) -> a ) ) .compose(composite -> { Promise<Void> commitPromise = Promise.promise(); consumer.commit(commitPromise); return commitPromise.future(); }) .onSuccess(any -> { System.out.println("All messages persisted and committed"); poll(jdbc, consumer); }) .onFailure(cause -> System.err.println("Error persisting and committing messages: " + cause)); } ... }
|
结论
在使用异步编程时,背压是一个基本话题。
它不是从vert.x的免费功能,但它可以通过一些简单的技巧来实现。