批处理最佳实践 - Vlad Mihalcea

20-01-31 banq

大多数应用程序至少具有一个批处理任务,在后台执行特定的逻辑。编写批处理作业并不复杂,但是您需要了解一些基本规则,这里将列举一些我发现最重要的规则。

从输入类型的角度来看,处理项目可以通过轮询数据库来实现,也可以将数据通过队列推送到系统中来实现。下面显示了典型批处理系统的三个主要组件:

  • 输入组件(通过轮询或从输入队列加载数据项目)
  • 处理器:主要业务处理逻辑组件
  • 输出组件:输出结果的输出通道或存储位置

分批轮询

您一次只能检索一批数据项目。我最近在尝试检索所有可能的数据项目进行处理时,调度作业抛出了OutOfMemoryError。

系统集成测试使用的是少量数据,因此容易通过,但是由于某些部署问题,当计划调度的作业脱机两天时,由于没有被触发使用,因此要处理的数据项目数会大量累积。 ,并且当调度程序重新联机时,由于数据太大,不适合调度程序的内存堆,因此也无法运行。因此,仅设置高的调度频率还是不够的。

为避免这种情况,您只需要获取一批数据,将它们处理消费掉即可,然后您可以重新运行该过程,直到没有剩余要处理的东西为止。

编写线程安全的批处理程序

通常,无论您选择并行运行多少个作业,计划调度作业都应正确运行。因此,批处理处理器应该是无状态的,使用本地作业执行的上下文将数据状态从一个组件传递到另一个组件。毕竟,即使是安全的全局变量也不是那么安全,因为作业的数据可能在并发执行时混合在一起。

节流

使用队列时(输入或在批处理程序中),您应该始终有一个限制策略。如果物品的生产率始终高于被消费的数据量,那么您将遭受灾难。如果排队的数据项目保留在内存中,最终将消耗完内存。如果项目存储在持久队列中,则队列空间将用完。因此,您需要一种平衡生产者和消费者的机制。要确保有合适的消费者数量来平衡生产数据。(可通过reactive的背压)

当队列大小超过给定阈值时,自动扩展消费者就像启动一个新消费者一样,是一种合适的自适应策略。当队列大小低于其他阈值时杀死使用者,可以释放不必要的空闲线程。

create-new-consumer阈值应大于kill-idle阈值,因为如果它们相等,则当队列大小在阈值大小附近波动时,您将获得create-kill抖动。

存储工作结果

只将工作结果存储在内存中是不好的选择,选择一个持久性存储(MongoDB限制的集合)是一个更好的选择。

如果结果保存在内存中,而不限制一个上限,则批处理处理器最终将耗尽内存。重新启动计划程序将清除您以前的工作结果,这是非常有价值的处理结果,因为这是您获得的唯一反馈。

疯狂的外部服务提供商

for(GeocodeRequest geocodeRequest : batchRequests) {
   mapsService.resolveLocation(geocodeRequest);
}

这段循环代码会疯狂调用您的地图服务提供商,这给他们的服务器带来了很大压力。如果这样批处理请求数很高,那么您可能会被禁止访问。

您应该在两次请求之间添加一个短暂的延迟,但是不要让您的当前睡眠中断,而应使用EIP延迟器。

对批处理程序使用EIP样式编程

尽管程序风格的编程是大多数程序员的默认心态,但许多批处理任务更适合企业集成模式设计。所有上述规则都可以使用EIP工具轻松实现,例如:

  • 消息队列
  • 投票渠道
  • 变形金刚
  • 分离器/聚合器
  • 延迟器

结论

使用EIP组件可以简化测试,因为您一次只专注于一项职责。EIP组件通过队列传递的消息进行通信,因此将一个同步处理通道更改为调度的线程池只是一个配置细节。

有关EIP的更多信息,请查看出色的Spring Integration框架。我已经使用了三年了,接受其强迫性以后,您将更喜欢它而不是过程编程。

相关:

https://www.jdon.com/53742

 

                   

1
猜你喜欢