pipefunc:数据DAG管道快速构建工具


一个 Python 库,旨在让构建和运行复杂的计算工作流变得异常快速和简单。如果您曾经处理过函数之间复杂的依赖关系,为并行化而苦苦挣扎,或者希望以更简单的方式创建和管理DAG 管道,pipefunc那么这里可以为您提供帮助。


pipefunc使您能够轻松地在 Python 中构建有向无环图 (DAG)管道。它处理:

  1. 自动依赖解析: pipefunc智能地确定函数的正确执行顺序,无需手动进行依赖管理。
  2. 闪电般快速的执行:以最小的开销(每个函数调用约 15 µs),pipefunc确保您的管道以极快的速度运行。
  3. 轻松并行化: pipefunc自动并行化独立任务,无论是在本地机器还是 SLURM 集群上。它支持任何concurrent.futures.Executor!
  4. 直观的可视化:生成交互式图表来可视化您的管道结构并了解数据流。
  5. 简化的参数扫描: pipefunc该mapspec功能可让您轻松定义和运行 N 维参数扫描,非常适合科学计算、模拟和超参数调整。
  6. 资源分析:通过详细的 CPU、内存和时间报告深入了解管道的性能。
  7. 缓存:避免使用多个缓存后端进行冗余计算。
  8. 类型注释验证:确保整个管道的类型一致性,以便尽早发现错误。
  9. 错误处理:包括ErrorSnapshot捕获有关错误的详细信息的功能,使调试更容易。

pipefunc适用于:

  • 科学计算:简化模拟、数据分析和复杂的计算工作流程。
  • 机器学习:构建强大且可重复的 ML 管道,包括数据预处理、模型训练和评估。
  • 数据工程:通过自动依赖管理和并行执行创建高效的 ETL 流程。
  • HPC:在 SLURM 集群上运行pipefunc,对代码的更改很少。
  • 任何使用互连功能并希望改善代码组织、性能和可维护性的人。

pipefunc专为生产用途而设计,但它也是一个很好的原型设计和实验工具。
比较:

  • Dask相比: pipefunc提供了一种更高级别、更具声明性的方式来定义管道。它会根据您的函数定义和mapspecs 自动管理任务调度和执行,而无需您编写显式并行代码。
  • 与 Luigi/Airflow/Prefect/Kedro 相比:虽然这些工具在 ETL 和事件驱动工作流方面表现出色,但pipefunc专注于科学计算、模拟和计算工作流,其中对执行和资源分配的细粒度控制至关重要。此外,它的设置和开发方式更容易,依赖性最小!
  • Pandas相比:您可以轻松地与 结合使用pipefunc!Pandas用于pipefunc管理操作的执行Pandas并并行化数据处理管道。但它也可以与Polars、Xarray和其他库很好地配合使用!
  • Joblib 相比:与 pipefunc相比,它具有多项优势Joblib。pipefunc它可自动确定函数的执行顺序、生成管道的交互式可视化效果、分析资源使用情况并支持多个缓存后端。此外,pipefunc它还允许您使用 s 指定输入和输出之间的映射mapspec,从而实现复杂的 map-reduce 操作。

例子:
pipefunc 提供了一个 Pipeline 类,您可以使用它来定义函数管道。您可以使用pipefunc装饰器将函数添加到管道,装饰器还允许您指定函数的输出名称。定义管道后,您可以针对特定输出值执行它,通过组合函数节点来简化它,将其可视化为有向图,并分析管道函数的资源使用情况。有关更详细的使用说明和示例,请查看包中提供的使用示例。

下面是 pipefunc 的一个简单示例用法,用于说明其主要功能:

from pipefunc import pipefunc, Pipeline

@pipefunc(output_name="c")
def add(a, b):
    return a + b

@pipefunc(output_name=
"d")
def multiply(b, c):
    return b * c

pipeline = Pipeline([add, multiply])
result = pipeline(
"d", a=2, b=3)  # Automatically executes 'add' first
print(result)  # Output: 15

pipeline.visualize() # Visualize the pipeline

使用 mapspec 的并行示例:

import numpy as np
from pipefunc import pipefunc, Pipeline
from pipefunc.map import load_outputs

@pipefunc(output_name="c", mapspec="a<i>, b[j] -> c[i, j]")
def f(a: int, b: int):
    return a + b

@pipefunc(output_name=
"mean") # no mapspec, so receives 2D <code>c[:, :]</code>
def g(c: np.ndarray):
    return np.mean(c)

pipeline = Pipeline([f, g])
inputs = {
"a": [1, 2, 3], "b": [4, 5, 6]}
result_dict = pipeline.map(inputs, run_folder=
"my_run_folder", parallel=True)
result = load_outputs(
"mean", run_folder="my_run_folder") # can load now too
print(result)  # Output: 7.0