Taipy:将数据和人工智能算法转变为可投入生产的 Web 应用


Taipy 是一个开源 Python 库,用于构建 Web 应用程序前端和后端。
立即将数据和 AI 算法转化为可投入生产的 Web 应用程序。

将 PySpark 与 Taipy 结合使用
Taipy 是一款功能强大的工作流程编排工具,具有易于使用的框架,可以轻松应用于您现有的数据应用程序。Taipy 建立在坚实的概念基础上 -场景、任务和数据 节点- 这些概念非常强大,允许开发人员轻松建模他们的管道,即使在没有明确支持的情况下使用第 3 方包也是如此。

通过一个简单的示例来演示如何将PySpark与Taipy集成,以将您的大数据处理需求与智能作业执行结合起来。
palmerpenguins数据集为例:

>>> penguin_df
┌───────┬─────────┬───────────┬────────────────┬───────────────┬───────────────────┬─────────────┬────────┬──────┐
│ index │ species │  island   │ bill_length_mm │ bill_depth_mm │ flipper_length_mm │ body_mass_g │  sex   │ year │
├───────┼─────────┼───────────┼────────────────┼───────────────┼───────────────────┼─────────────┼────────┼──────┤
│     0 │ Adelie  │ Torgersen │ 39.1           │ 18.7          │ 181.0             │ 3750.0      │ male   │ 2007 │
│     1 │ Adelie  │ Torgersen │ 39.5           │ 17.4          │ 186.0             │ 3800.0      │ female │ 2007 │
│     2 │ Adelie  │ Torgersen │ 40.3           │ 18.0          │ 195.0             │ 3250.0      │ female │ 2007 │
│     3 │ Adelie  │ Torgersen │ NaN            │ NaN           │ NaN               │ NaN         │ NaN    │ 2007 │
│     4 │ Adelie  │ Torgersen │ 36.7           │ 19.3          │ 193.0             │ 3450.0      │ female │ 2007 │
│   ... │ ...     │ ...       │ ...            │ ...           │ ...               │ ...         │ ...    │  ... │
└───────┴─────────┴───────────┴────────────────┴───────────────┴───────────────────┴─────────────┴────────┴──────┘

该数据集仅包含 344 条记录——几乎不是一个需要 Spark 处理的数据集。但是,该数据集是可访问的,并且其大小与演示 Spark 与 Taipy 的集成无关。如果必须使用更大的数据集进行测试,您可以根据需要多次复制数据。

我们将设计一个执行两项主要任务的工作流程:

1- Spark 任务 (spark_process):

加载数据;
按“物种”、“岛屿”和“性别”对数据进行分组;
求其他列的平均值(“ bill_length_mm ”、“ bill_depth_mm ”、“ Flipper_length_mm ”、“ body_mass_g ”);
保存数据。

2- Python 任务(过滤器):

加载Spark任务之前保存的输出数据;
给定“物种”、“岛屿”和“性别”,返回聚合值。
我们的小项目将包含 4 个文件:


我们的小项目将包含 4 个文件:

app/
├─ penguin_spark_app.py  # the spark application
├─ config.py  # the configuration for our taipy workflow
├─ main.py  # the main script (including our application gui)
├─ penguins.csv  # the data as downloaded from the palmerpenguins git repo

可从palmerpenguins 存储库获取

1. Spark 应用程序 (penguin_spark_app.py)
通常,我们使用 Spark-submit 命令行实用程序运行 PySpark 任务。

当使用 Taipy 进行工作流程编排时,我们可以继续做同样的事情。唯一的区别是,我们不是在命令行中运行命令,而是让工作流管道生成一个子进程,该子进程使用 Spark-submit 运行 Spark 应用程序。

在开始讨论之前,我们首先看一下我们的 Spark 应用程序。只需浏览一下代码,然后继续阅读以获取有关此脚本功能的简要说明:

<strong>app/penguin_spark_app.py</strong>
import argparse
import os
import sys

parser = argparse.ArgumentParser()
parser.add_argument("--input-csv-path", required=True, help="Path to the input penguin CSV file.")
parser.add_argument(
"--output-csv-path", required=True, help="Path to save the output CSV file.")
args = parser.parse_args()

import pyspark.pandas as ps
from pyspark.sql import SparkSession

def read_penguin_df(csv_path: str):
    penguin_df = ps.read_csv(csv_path)
    return penguin_df

def clean(df: ps.DataFrame) -> ps.DataFrame:
    return df[df.sex.isin([
"male", "female"])].dropna()

def process(df: ps.DataFrame) -> ps.DataFrame:
   
"""The mean of measured penguin values, grouped by island and sex."""

    mean_df = df.groupby(by=[
"species", "island", "sex"]).agg("mean").drop(columns="year").reset_index()
    return mean_df

if <strong>name</strong> ==
"<strong>main</strong>":
    spark = SparkSession.builder.appName(
"Mean Penguin").getOrCreate()

    penguin_df = read_penguin_df(args.input_csv_path)
    cleaned_penguin_df = clean(penguin_df)
    processed_penguin_df = process(cleaned_penguin_df)
    processed_penguin_df.to_pandas().to_csv(args.output_csv_path, index=False)

    sys.exit(os.EX_OK)

我们可以通过在终端中输入以下命令来提交此 Spark 应用程序以供执行:

spark-submit --master local[8] app/penguin_spark_app.py \
--input-csv-path app/penguins.csv \
--output-csv-path app/output.csv

它将执行以下操作:

  1. 提交penguin_spark_app.py应用程序以在8个CPU核心上本地执行;
  2. 从 app/penguins.csv CSV 文件加载数据;
  3. 按“物种”、“岛屿”和“性别”分组,然后按平均值聚合其余列;
  4. 将生成的 DataFrame 保存到 app/output.csv。

另请注意,我们已对Spark 应用程序进行了编码以接收 2 个命令行参数:

  1. — input-csv-path:输入企鹅penguin  CSV 文件的路径;和
  2. —output-csv-path:Spark 应用程序处理后保存输出 CSV 文件的路径。

2. Taipy配置(config.py)
此时,我们已经有了 penguin_spark_app.py PySpark 应用程序,并且需要创建一个 Taipy 任务来运行此 PySpark 应用程序。
再次快速浏览一下 app/config.py 脚本,然后继续阅读:


```python
<strong>app/config.py</strong>
import datetime as dt
import os
import subprocess
import sys
from pathlib import Path

import pandas as pd
import taipy as tp
from taipy import Config

SCRIPT_DIR = Path(<strong>file</strong>).parent
SPARK_APP_PATH = SCRIPT_DIR / "penguin_spark_app.py"

input_csv_path = str(SCRIPT_DIR /
"penguins.csv")

# -------------------- Data Nodes --------------------

input_csv_path_cfg = Config.configure_data_node(id=
"input_csv_path", default_data=input_csv_path)
# Path to save the csv output of the spark app
output_csv_path_cfg = Config.configure_data_node(id=
"output_csv_path")

processed_penguin_df_cfg = Config.configure_parquet_data_node(
    id=
"processed_penguin_df", validity_period=dt.timedelta(days=1)
)

species_cfg = Config.configure_data_node(id=
"species")  # "Adelie", "Chinstrap", "Gentoo"
island_cfg = Config.configure_data_node(id=
"island")  # "Biscoe", "Dream", "Torgersen"
sex_cfg = Config.configure_data_node(id=
"sex")  # "male", "female"

output_cfg = Config.configure_json_data_node(
    id=
"output",
)

# -------------------- Tasks --------------------

def spark_process(input_csv_path: str, output_csv_path: str) -> pd.DataFrame:
    proc = subprocess.Popen(
        [
            str(Path(sys.executable).with_name(
"spark-submit")),
            str(SPARK_APP_PATH),
           
"--input-csv-path",
            input_csv_path,
           
"--output-csv-path",
            output_csv_path,
        ],
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
    )

    try:
        outs, errs = proc.communicate(timeout=15)
    except subprocess.TimeoutExpired:
        proc.kill()
        outs, errs = proc.communicate()

    if proc.returncode != os.EX_OK:
        raise Exception(
"Spark training failed")

    df = pd.read_csv(output_csv_path)

    return df

def filter(penguin_df: pd.DataFrame, species: str, island: str, sex: str) -> dict:
    df = penguin_df[(penguin_df.species == species) & (penguin_df.island == island) & (penguin_df.sex == sex)]
    output = df[[
"bill_length_mm", "bill_depth_mm", "flipper_length_mm", "body_mass_g"]].to_dict(orient="records")
    return output[0] if output else dict()

spark_process_task_cfg = Config.configure_task(
    id=
"spark_process",
    function=spark_process,
    skippable=True,
    input=[input_csv_path_cfg, output_csv_path_cfg],
    output=processed_penguin_df_cfg,
)

filter_task_cfg = Config.configure_task(
    id=
"filter",
    function=filter,
    skippable=True,
    input=[processed_penguin_df_cfg, species_cfg, island_cfg, sex_cfg],
    output=output_cfg,
)

scenario_cfg = Config.configure_scenario(
    id=
"scenario", task_configs=[spark_process_task_cfg, filter_task_cfg]
)


您还可以使用Taipy Studio构建 Taipy 配置,Taipy Studio 是一个 Visual Studio Code 扩展,它提供了用于构建 Taipy .toml 配置文件的图形编辑器。

Taipy 中的 PySpark 任务
让我们提取并检查 config.py 脚本的相关部分,该部分在 Taipy 中创建“ spark_process ”Spark 任务(及其 3 个关联的数据节点)

<strong>Code snippet: Spark task in Taipy</strong>

# -------------------- Data Nodes --------------------

input_csv_path_cfg = Config.configure_data_node(id="input_csv_path", default_data=input_csv_path)
# Path to save the csv output of the spark app
output_csv_path_cfg = Config.configure_data_node(id=
"output_csv_path")

processed_penguin_df_cfg = Config.configure_parquet_data_node(
    id=
"processed_penguin_df", validity_period=dt.timedelta(days=1)
)

# -------------------- Tasks --------------------

def spark_process(input_csv_path: str, output_csv_path: str) -> pd.DataFrame:
    proc = subprocess.Popen(
        [
            str(Path(sys.executable).with_name(
"spark-submit")),
            str(SPARK_APP_PATH),
           
"--input-csv-path",
            input_csv_path,
           
"--output-csv-path",
            output_csv_path,
        ],
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
    )

    try:
        outs, errs = proc.communicate(timeout=15)
    except subprocess.TimeoutExpired:
        proc.kill()
        outs, errs = proc.communicate()

    if proc.returncode != os.EX_OK:
        raise Exception(
"Spark training failed")

    df = pd.read_csv(output_csv_path)

    return df

spark_process_task_cfg = Config.configure_task(
    id=
"spark_process",
    function=spark_process,
    skippable=True,
    input=[input_csv_path_cfg, output_csv_path_cfg],
    output=processed_penguin_df_cfg,
)

由于我们设计的 penguin_spark_app.py Spark 应用程序需要接收 2 个参数(input_csv_path 和 output_csv_path),因此我们选择用 Taipy 数据节点来表示这 2 个参数。请注意,您的使用情况可能有所不同,您可以(也应该!)根据自己的需要修改任务、函数和相关数据节点。例如,您可以

  • 有一个执行一些常规 ETL 的 Spark 任务,但不返回任何内容;
  • 倾向于硬编码输入和输出路径,而不是将它们持久化为数据节点;或
  • 将额外的应用程序参数保存为数据节点并传递给 Spark 应用程序。

然后,我们像这样以 Python 子进程的形式运行 spark-submit:

subprocess.Popen(
    [
        str(Path(sys.executable).with_name("spark-submit")),
        str(SPARK_APP_PATH),
       
"--input-csv-path",
        input_csv_path,
       
"--output-csv-path",
        output_csv_path,
    ],
)

请注意,列表元素的顺序应保留以下格式,就像在命令行上执行一样:


$ spark-submit [spark-arguments]  [application-arguments]

同样,根据我们的使用情况,我们可以指定不同的 spark-submit 脚本路径、Spark 参数(我们的示例中没有提供任何参数),或者根据我们的需要指定不同的应用程序参数。

读取并返回 output_csv_path
请注意,spark_process 函数是这样结束的:

def spark_process(input_csv_path: str, output_csv_path: str) -> pd.DataFrame:
    ...

    df = pd.read_csv(output_csv_path)

    return df
    

在我们的案例中,我们希望 Taipy 任务在 Spark 处理完数据后输出数据,以便将其写入 processed_penguin_df_cfg Parquet 数据节点。其中一种方法是手动读取输出目标(本例中为 output_csv_path),然后将其作为 Pandas DataFrame 返回。

不过,如果你不需要 Spark 应用程序的返回数据,你可以直接让你的 Taipy 任务(通过 spark_process 函数)返回 None。

缓存Spark 任务
由于我们在配置 spark_process_task_cfg 时将 skippable 属性设置为 True,因此在重新执行场景时,Taipy 将跳过 spark_process 任务的重新执行,并重用持久化任务输出:processed_penguin_df_cfg Pandas DataFrame。

不过,我们也为 processed_penguin_df_cfg 数据节点定义了 1 天的有效期(validity_period),因此如果数据帧上次缓存的时间超过一天,Taipy 仍会重新执行任务。

Taipy 的 GUI 功能,可以在此处找到快速入门

您可以在此存储库中找到所有代码和数据。