使用ConnectableFlux在elasticsearch上进行后台批处理 - Jeroen van Wilgenburg


我们有一个Project Reactor应用程序,有很多通过id进行的单个get / insert操作,导致Elasticsearch集群上的负载非常高,根据ID添加批处理读取是一件非常繁琐的事情,以至于我正在寻找其他解决方案。我最终想出了一个使用ConnectableFlux的解决方案。
场景是:每条消息都会从elasticsearch中检索文档,然后使用这个消息中的信息进行更新,然后再插入Elasticsearch中。当消息在10秒钟之内得到处理时,延迟不是什么大问题。不好的是,我们的Elasticsearch集群承受着压力。峰值负载下的响应时间超过250毫秒。最终导致消息处理过程中的几分钟延迟。
所有代码示例均在github上可用。我在本文中使用的代码段是经过简化的版本(具有更少的日志记录和文档),以提高可读性。源存储库还包含用于创建测试数据和运行性能测试的所有脚本。它基本上是一个标准的Spring Initializr – Spring Boot应用程序。
详细点击标题见原文。