MuleSoft:批处理的最佳实践

在当今数据驱动的世界中,组织经常需要高效、准确地处理大量数据。无论是迁移大量数据集、在系统之间同步记录,还是执行复杂的数据转换,批处理在确保这些任务可靠、按时完成方面都发挥着关键作用。

本文深入探讨了 Mule 4 中批处理的最佳实践,并提供了有关如何设计、实施和优化批处理作业以满足企业需求的见解。从优化批处理大小和利用并行处理到确保数据安全和实施强大的错误处理,我们将介绍帮助您构建高效可靠的批处理解决方案的基本策略。

批处理场景
批处理是指将大量数据划分为更小、更易于管理的块或“批次”来处理和处理大量数据的技术。此方法允许应用程序高效地处理海量数据集(例如大文件、数据库记录或消息),方法是将工作负载划分为可独立和并行处理的较小部分。

  • 场景#1:整合来自多个来源的数据并发送到单个目的地。
  • 场景#2:两个系统/应用程序之间的同步。
  • 场景#3:源系统和目标系统之间的 ETL(提取、转换和加载)过程。
  • 场景#4:通过 API 将使用的数据推送到下游遗留系统。

Mule 中的批处理
Mule提供了多种处理批量数据的方法,例如:

  • — 批次范围
  • — For-Each
  • — 流媒体
选择正确的选项取决于数据负载的要求和大小。

最佳实践
批处理过程(一般)

  • 在处理阶段开始之前转换数据集,而不是逐条转换。这称为前端加载转换。
  • 在处理阶段之前,从输入有效负载中过滤任何不需要的字段或记录,因为完全不触碰它们可能比遍历整个有效负载更快、更可靠,以避免传递这些字段/记录。
  • 将批量部署与实时部署分开。
  • 当同时运行多个批处理作业时:
— 评估资源(计算、文件空间、内存)利用率。

— 链接批处理作业而不是并行运行。

— 注意故障点、恢复时间以及对其他工作的影响。

  • 就文件大小而言:
— 将文件拆分为多个较小的文件或限制字段 — 大文件会消耗工作内存。
— 评估分块模式、文件流模式。
  • 给您的源存储库添加水印,因为它允许:
— 提高流程的弹性。
— 简化对未处理数据的访问。

流媒体
写入 CSV、JSON 或 XML 等文件时使用流式传输。
默认情况下,整个有效负载都会加载到内存中,但流式有效负载的一小部分数据会在到达时加载,从而防止出现内存不足的问题。
流式传输确实会影响应用程序的性能,从而减慢其处理交易的速度。
流式传输限制对输出中的项目的访问。
注意:有关流策略及其配置的更多信息,请参阅此处

批调整

  • 尽量避免在批处理步骤的处理器组件内进行 DataWeave 转换: 步骤中的 DataWave 每次将处理一条记录,效率低下。尽可能在输入阶段或批处理提交(在步骤的批处理聚合器组件内)中进行转换。
  • 避免过多的批处理步骤:Mule 在批处理步骤之间使用队列,这会消耗内存。此外,禁用持久队列,因为这会在批处理步骤之间引入更多延迟。
  • 使用迭代器模式读取大文件而不会耗尽内存:您可以实现:
    — 使用自定义 Java 实现
    — 在 Dataweave 转换中
    在转换过程中,通过将输出强制转换为Iterator,它将输出一个不会将整个列表加载到内存中的java.util.Iterator 。
%dw 2.0 
output application/java 
--- 
payload as Iterator

然后,下一个消息处理器通过迭代有效负载,从 Iterator 中一次拉取一个 Java 对象,而无需读取整个内容。

  • 使用批量提交或流式提交对端点进行单独插入:通常建议使用固定大小(聚合器大小)的批量提交。否则,您可以使用流式提交,这可以减少内存消耗,但会影响性能。
  • 正确配置批处理线程:除非您在批处理作业范围内配置,否则最大并发数默认为(2 * CPU 核心数量)。增加线程将提高性能,但会消耗更多内存和 CPU。缓慢增加或减少,因为过多的线程也可能影响端点。
默认情况下会有 16 个线程,但您可以增加或减少以提高处理时间。
  • 考虑适当设置块大小(+ 或 -):增加块大小意味着性能提高,但会影响内存。此外,在使用单个批处理步骤时,请考虑将块大小设置为提交大小。
  • 避免将输入有效负载存储在变量中:将批处理的整个输入有效负载存储在变量(流、会话或记录变量)中可能会导致堆空间内存不足,因此请避免这样做。
切勿将批处理的有效负载保存在变量中。
  • 在单独的批量失败步骤中定义错误处理以减少完全重放。
  • 配置您的调度策略: 根据您的应用程序是单工作程序还是多工作程序托管,配置您的调度策略:
— ORDERED_SEQUENTIAL每次只能处理一个作业
— ROUND_ROBIN将在工作人员之间分配工作。


常规调整

  • 就性能而言,并非所有格式都相同。Bean (Java 对象)有效负载往往是最快的。因此,如果考虑到其他因素,这是一个可行的选择,请在 Java 对象中创建有效负载。
  • 使用Dataweave 2.0来操作消息对象的内容,而不是使用 Groovy 等脚本语言或 Java 等编程语言。使用 DataWeave 作为表达语言,您可以直接查询数据,而不必考虑这些转换。
  • 对不可消耗的有效负载使用缓存。好处是可以更快地处理消息。
  • 使用连接池作为数据库连接器。
  • 确定应用程序的正确大小。 有多种方法可以确定应用程序的正确大小,这些任务可能包括了解:
— 在给定时间内保存在内存中的对象的大小。
— 并发交易的数量。
— 交易量。
  • 通过更改正在处理的批处理数据的保留策略来避免“设备上没有剩余空间”错误 。具有大量或频繁批处理作业的应用程序需要大量磁盘空间。批处理进程将批处理实例的历史记录保留在 Mule Runtime 的临时目录中。默认情况下,保留策略设置为 7 天。监控进程将删除已满足到期条件的临时数据。您可以使用如下配置:
<batch:job jobName="JobName" doc:id="01d87864-3dc8-4dae-9696-1f57b42126c1" >
   ...
   <batch:history >
      <batch:expiration maxAge=
"1" ageUnit="DAYS" />
   </batch:history>
</batch:job>

这将导致历史数据在 1 天后被删除,并且可能适用于每天运行的批处理过程。
您需要根据批处理运行的频率和处理的数据量来选择到期策略。

  • 修改默认频率以尽快删除过期的批处理作业实例元数据。默认情况下,每 60 分钟清除一次过期实例及其元数据。您可以通过更改wrapper.conf中的以下属性来修改它:
mule.batch.historyExpirationFrequency={milliseconds}
  • 避免因大量 DataWeave 临时文件而导致“设备上没有剩余空间”错误。DataWeave 引擎在处理一些大负载时会在内部生成临时文件。有时,大量临时文件会耗尽 Mule 运行时服务器机器上的磁盘空间。这些临时文件通常会在 JVM 中触发完整 GC 过程后被删除,但是,如果 Mule 应用程序的内存消耗实际上并不高,并且 GC 过程不经常触发,则临时文件将累积在磁盘上,无法快速删除,从而耗尽磁盘空间。解决方案是:
— 垂直增加应用程序的大小。
— 或者,在涉及的流程上添加以下代码以明确触发 GC:
<logger level="INFO" doc:name="Trigger full GC" doc:id="31669a1e-2246-4d3c-823e-9460e5162d33" message='#[%dw 2.0&#10;output application/json&#10;var rv = java!java::lang::Runtime::getRuntime().gc()&#10;---&#10;"Explicitly trigger System.gc() " ++ if (rv == null) "finished" else "failed"]' />

结论
实施这些最佳实践将帮助您在 MuleSoft 中设计高效、可靠且可扩展的批处理解决方案。正确设计的批处理作业可以以最少的资源消耗处理大量数据,从而确保集成过程顺利有效。