使用ELASTICSEARCH进行近实时索引 - bozho


选择索引策略很困难。虽然Elasticsearch 文档确实有一些一般建议,有一些小技巧,但它也取决于特定用例。在典型情况下,您有一个数据库作为事实的来源,并且有一个使事物可搜索的索引。您可以采用以下策略:

  • 随着数据的到来而建立索引–您可以同时插入数据库并建立索引。如果没有太多数据,这是有道理的。否则索引将变得非常低效。
  • 存储在数据库中,并与计划的作业一起建立索引–这可能是最常见的方法,并且易于实现。但是,如果要索引的数据很多,则可能会出现问题,因为必须使用数据库中的(从,到)条件来精确地获取该数据,并且索引落后于实际数据的秒数(或分钟)在计划的作业运行之间
  • 推送到消息队列并编写索引使用方–您可以运行RabbitMQ之类的东西,并让多个使用方轮询数据并为其编制索引。这不是容易实现的,因为您必须轮询多个项目才能利用批处理索引,然后仅在成功执行批处理时将它们标记为已使用-有点交易行为。
  • 对内存中的项目进行排队并定期对其进行刷新–这可能是好的且高效的,但是如果节点死亡,则可能会丢失数据,因此您必须根据数据库中的数据进行某种运行状况检查
  • 混合-结合以上内容;例如,如果需要在以后充实原始数据并更新索引,则可以在内存中对项目进行排队,然后使用“存储在数据库中,具有计划作业的索引”来更新索引并填写任何缺失的项目。或者,您可以在数据的某些部分出现时建立索引,并对更活跃的数据类型使用另一种策略

我们最近决定实施“内存中队列”方法(与另一方法结合使用,因为无论如何我们都必须进行一些计划的后处理)。最初的尝试是使用Elasticsearch客户端提供的类BulkProcessor。逻辑很清晰:如果达到一定限制或以固定的时间间隔,将索引请求存储在内存中并批量将其刷新到Elasticsearch。因此,最多每隔X秒,最多每隔Y个记录将有一个批处理索引请求。这样就可以实现近实时索引,而不会给Elasticsearch带来太大的压力。根据Elasticsearch的建议,它还允许同时多个批量索引请求。

但是,我们使用了BulkProcessor不支持的REST API(通过Jest)。我们尝试插入REST索引逻辑,而不是当前的本机逻辑,尽管它几乎可以正常工作,但在此过程中,我们注意到了一些令人担忧的问题- internalAdd每次将索引请求添加到批量请求时都会调用该方法synchronized。这意味着线程将阻塞,等待彼此添加内容。对于生产环境来说,这听起来不太理想且有风险。

因此我们进行了单独的实施:可以在这里看到– ESBulkProcessor

它允许多个线程同时刷新到Elasticsearch,但是只有一个线程(使用锁)要从队列中使用以形成批处理。由于这是一项快速的操作,因此最好对其进行序列化。并不是因为并发队列无法处理从中读取的多个线程,而是可以;但是达到同时由多个线程形成批量的条件将导致几个小批量而不是一个大批量,因此一次只需要一个消费者。这不是一个大问题,因此可以将其删除。但重要的是要注意它没有阻塞。

这已经生产了一段时间了,似乎没有任何问题。如果由于负载增加或边缘情况而发生更改,我将报告任何更改。
如果这是唯一的索引逻辑,则必须重申该问题–您的应用程序节点可能会失败,并且最终可能会导致Elasticsearch中的数据丢失。我们不在那种情况下,我不确定哪种方法是最好的补救方法–是在服务器发生故障的情况下对近期数据进行部分重新索引,或者通过批处理检查是否没有数据库和索引之间不匹配。当然,我们还应该说您可能并不总是拥有一个数据库–有时,Elasticsearch就是您用于数据存储的全部,在这种情况下,需要某种队列持久性。
最终目标是实现近乎实时的索引编制,因为用户期望尽快看到他们的数据,同时又不影响Elasticsearch集群。
“什么是对数据建立索引的最佳方法”这一主题非常重要,我希望至少已经澄清了一点,并且我们的贡献对于其他情况也很有意义。