Taipy 是一个开源 Python 库,用于构建 Web 应用程序前端和后端。
立即将数据和 AI 算法转化为可投入生产的 Web 应用程序。
将 PySpark 与 Taipy 结合使用
Taipy 是一款功能强大的工作流程编排工具,具有易于使用的框架,可以轻松应用于您现有的数据应用程序。Taipy 建立在坚实的概念基础上 -场景、任务和数据 节点- 这些概念非常强大,允许开发人员轻松建模他们的管道,即使在没有明确支持的情况下使用第 3 方包也是如此。
通过一个简单的示例来演示如何将PySpark与Taipy集成,以将您的大数据处理需求与智能作业执行结合起来。
以palmerpenguins数据集为例:
>>> penguin_df ┌───────┬─────────┬───────────┬────────────────┬───────────────┬───────────────────┬─────────────┬────────┬──────┐ │ index │ species │ island │ bill_length_mm │ bill_depth_mm │ flipper_length_mm │ body_mass_g │ sex │ year │ ├───────┼─────────┼───────────┼────────────────┼───────────────┼───────────────────┼─────────────┼────────┼──────┤ │ 0 │ Adelie │ Torgersen │ 39.1 │ 18.7 │ 181.0 │ 3750.0 │ male │ 2007 │ │ 1 │ Adelie │ Torgersen │ 39.5 │ 17.4 │ 186.0 │ 3800.0 │ female │ 2007 │ │ 2 │ Adelie │ Torgersen │ 40.3 │ 18.0 │ 195.0 │ 3250.0 │ female │ 2007 │ │ 3 │ Adelie │ Torgersen │ NaN │ NaN │ NaN │ NaN │ NaN │ 2007 │ │ 4 │ Adelie │ Torgersen │ 36.7 │ 19.3 │ 193.0 │ 3450.0 │ female │ 2007 │ │ ... │ ... │ ... │ ... │ ... │ ... │ ... │ ... │ ... │ └───────┴─────────┴───────────┴────────────────┴───────────────┴───────────────────┴─────────────┴────────┴──────┘
|
该数据集仅包含 344 条记录——几乎不是一个需要 Spark 处理的数据集。但是,该数据集是可访问的,并且其大小与演示 Spark 与 Taipy 的集成无关。如果必须使用更大的数据集进行测试,您可以根据需要多次复制数据。
我们将设计一个执行两项主要任务的工作流程:
1- Spark 任务 (spark_process):
加载数据;
按“物种”、“岛屿”和“性别”对数据进行分组;
求其他列的平均值(“ bill_length_mm ”、“ bill_depth_mm ”、“ Flipper_length_mm ”、“ body_mass_g ”);
保存数据。
2- Python 任务(过滤器):
加载Spark任务之前保存的输出数据;
给定“物种”、“岛屿”和“性别”,返回聚合值。
我们的小项目将包含 4 个文件:
我们的小项目将包含 4 个文件:
app/ ├─ penguin_spark_app.py # the spark application ├─ config.py # the configuration for our taipy workflow ├─ main.py # the main script (including our application gui) ├─ penguins.csv # the data as downloaded from the palmerpenguins git repo
|
可从palmerpenguins 存储库获取
1. Spark 应用程序 (penguin_spark_app.py)
通常,我们使用 Spark-submit 命令行实用程序运行 PySpark 任务。
当使用 Taipy 进行工作流程编排时,我们可以继续做同样的事情。唯一的区别是,我们不是在命令行中运行命令,而是让工作流管道生成一个子进程,该子进程使用 Spark-submit 运行 Spark 应用程序。
在开始讨论之前,我们首先看一下我们的 Spark 应用程序。只需浏览一下代码,然后继续阅读以获取有关此脚本功能的简要说明:
<strong>app/penguin_spark_app.py</strong> import argparse import os import sys
parser = argparse.ArgumentParser() parser.add_argument("--input-csv-path", required=True, help="Path to the input penguin CSV file.") parser.add_argument("--output-csv-path", required=True, help="Path to save the output CSV file.") args = parser.parse_args()
import pyspark.pandas as ps from pyspark.sql import SparkSession
def read_penguin_df(csv_path: str): penguin_df = ps.read_csv(csv_path) return penguin_df
def clean(df: ps.DataFrame) -> ps.DataFrame: return df[df.sex.isin(["male", "female"])].dropna()
def process(df: ps.DataFrame) -> ps.DataFrame: """The mean of measured penguin values, grouped by island and sex."""
mean_df = df.groupby(by=["species", "island", "sex"]).agg("mean").drop(columns="year").reset_index() return mean_df
if <strong>name</strong> == "<strong>main</strong>": spark = SparkSession.builder.appName("Mean Penguin").getOrCreate()
penguin_df = read_penguin_df(args.input_csv_path) cleaned_penguin_df = clean(penguin_df) processed_penguin_df = process(cleaned_penguin_df) processed_penguin_df.to_pandas().to_csv(args.output_csv_path, index=False)
sys.exit(os.EX_OK)
|
我们可以通过在终端中输入以下命令来提交此 Spark 应用程序以供执行:
spark-submit --master local[8] app/penguin_spark_app.py \ --input-csv-path app/penguins.csv \ --output-csv-path app/output.csv
|
它将执行以下操作:
- 提交penguin_spark_app.py应用程序以在8个CPU核心上本地执行;
- 从 app/penguins.csv CSV 文件加载数据;
- 按“物种”、“岛屿”和“性别”分组,然后按平均值聚合其余列;
- 将生成的 DataFrame 保存到 app/output.csv。
另请注意,我们已对Spark 应用程序进行了编码以接收 2 个命令行参数:
- — input-csv-path:输入企鹅penguin CSV 文件的路径;和
- —output-csv-path:Spark 应用程序处理后保存输出 CSV 文件的路径。
2. Taipy配置(config.py)
此时,我们已经有了 penguin_spark_app.py PySpark 应用程序,并且需要创建一个 Taipy 任务来运行此 PySpark 应用程序。
再次快速浏览一下 app/config.py 脚本,然后继续阅读:
```python <strong>app/config.py</strong> import datetime as dt import os import subprocess import sys from pathlib import Path
import pandas as pd import taipy as tp from taipy import Config
SCRIPT_DIR = Path(<strong>file</strong>).parent SPARK_APP_PATH = SCRIPT_DIR / "penguin_spark_app.py"
input_csv_path = str(SCRIPT_DIR / "penguins.csv")
# -------------------- Data Nodes --------------------
input_csv_path_cfg = Config.configure_data_node(id="input_csv_path", default_data=input_csv_path) # Path to save the csv output of the spark app output_csv_path_cfg = Config.configure_data_node(id="output_csv_path")
processed_penguin_df_cfg = Config.configure_parquet_data_node( id="processed_penguin_df", validity_period=dt.timedelta(days=1) )
species_cfg = Config.configure_data_node(id="species") # "Adelie", "Chinstrap", "Gentoo" island_cfg = Config.configure_data_node(id="island") # "Biscoe", "Dream", "Torgersen" sex_cfg = Config.configure_data_node(id="sex") # "male", "female"
output_cfg = Config.configure_json_data_node( id="output", )
# -------------------- Tasks --------------------
def spark_process(input_csv_path: str, output_csv_path: str) -> pd.DataFrame: proc = subprocess.Popen( [ str(Path(sys.executable).with_name("spark-submit")), str(SPARK_APP_PATH), "--input-csv-path", input_csv_path, "--output-csv-path", output_csv_path, ], stdout=subprocess.PIPE, stderr=subprocess.PIPE, )
try: outs, errs = proc.communicate(timeout=15) except subprocess.TimeoutExpired: proc.kill() outs, errs = proc.communicate()
if proc.returncode != os.EX_OK: raise Exception("Spark training failed")
df = pd.read_csv(output_csv_path)
return df
def filter(penguin_df: pd.DataFrame, species: str, island: str, sex: str) -> dict: df = penguin_df[(penguin_df.species == species) & (penguin_df.island == island) & (penguin_df.sex == sex)] output = df[["bill_length_mm", "bill_depth_mm", "flipper_length_mm", "body_mass_g"]].to_dict(orient="records") return output[0] if output else dict()
spark_process_task_cfg = Config.configure_task( id="spark_process", function=spark_process, skippable=True, input=[input_csv_path_cfg, output_csv_path_cfg], output=processed_penguin_df_cfg, )
filter_task_cfg = Config.configure_task( id="filter", function=filter, skippable=True, input=[processed_penguin_df_cfg, species_cfg, island_cfg, sex_cfg], output=output_cfg, )
scenario_cfg = Config.configure_scenario( id="scenario", task_configs=[spark_process_task_cfg, filter_task_cfg] )
|
您还可以使用Taipy Studio构建 Taipy 配置,Taipy Studio 是一个 Visual Studio Code 扩展,它提供了用于构建 Taipy .toml 配置文件的图形编辑器。
Taipy 中的 PySpark 任务
让我们提取并检查 config.py 脚本的相关部分,该部分在 Taipy 中创建“ spark_process ”Spark 任务(及其 3 个关联的数据节点)
<strong>Code snippet: Spark task in Taipy</strong>
# -------------------- Data Nodes --------------------
input_csv_path_cfg = Config.configure_data_node(id="input_csv_path", default_data=input_csv_path) # Path to save the csv output of the spark app output_csv_path_cfg = Config.configure_data_node(id="output_csv_path")
processed_penguin_df_cfg = Config.configure_parquet_data_node( id="processed_penguin_df", validity_period=dt.timedelta(days=1) )
# -------------------- Tasks --------------------
def spark_process(input_csv_path: str, output_csv_path: str) -> pd.DataFrame: proc = subprocess.Popen( [ str(Path(sys.executable).with_name("spark-submit")), str(SPARK_APP_PATH), "--input-csv-path", input_csv_path, "--output-csv-path", output_csv_path, ], stdout=subprocess.PIPE, stderr=subprocess.PIPE, )
try: outs, errs = proc.communicate(timeout=15) except subprocess.TimeoutExpired: proc.kill() outs, errs = proc.communicate()
if proc.returncode != os.EX_OK: raise Exception("Spark training failed")
df = pd.read_csv(output_csv_path)
return df
spark_process_task_cfg = Config.configure_task( id="spark_process", function=spark_process, skippable=True, input=[input_csv_path_cfg, output_csv_path_cfg], output=processed_penguin_df_cfg, )
|
由于我们设计的 penguin_spark_app.py Spark 应用程序需要接收 2 个参数(input_csv_path 和 output_csv_path),因此我们选择用 Taipy 数据节点来表示这 2 个参数。请注意,您的使用情况可能有所不同,您可以(也应该!)根据自己的需要修改任务、函数和相关数据节点。例如,您可以
- 有一个执行一些常规 ETL 的 Spark 任务,但不返回任何内容;
- 倾向于硬编码输入和输出路径,而不是将它们持久化为数据节点;或
- 将额外的应用程序参数保存为数据节点并传递给 Spark 应用程序。
然后,我们像这样以 Python 子进程的形式运行 spark-submit:
subprocess.Popen( [ str(Path(sys.executable).with_name("spark-submit")), str(SPARK_APP_PATH), "--input-csv-path", input_csv_path, "--output-csv-path", output_csv_path, ], )
|
请注意,列表元素的顺序应保留以下格式,就像在命令行上执行一样:
$ spark-submit [spark-arguments] [application-arguments]
同样,根据我们的使用情况,我们可以指定不同的 spark-submit 脚本路径、Spark 参数(我们的示例中没有提供任何参数),或者根据我们的需要指定不同的应用程序参数。
读取并返回 output_csv_path
请注意,spark_process 函数是这样结束的:
def spark_process(input_csv_path: str, output_csv_path: str) -> pd.DataFrame: ...
df = pd.read_csv(output_csv_path)
return df
|
在我们的案例中,我们希望 Taipy 任务在 Spark 处理完数据后输出数据,以便将其写入 processed_penguin_df_cfg Parquet 数据节点。其中一种方法是手动读取输出目标(本例中为 output_csv_path),然后将其作为 Pandas DataFrame 返回。
不过,如果你不需要 Spark 应用程序的返回数据,你可以直接让你的 Taipy 任务(通过 spark_process 函数)返回 None。
缓存Spark 任务
由于我们在配置 spark_process_task_cfg 时将 skippable 属性设置为 True,因此在重新执行场景时,Taipy 将跳过 spark_process 任务的重新执行,并重用持久化任务输出:processed_penguin_df_cfg Pandas DataFrame。
不过,我们也为 processed_penguin_df_cfg 数据节点定义了 1 天的有效期(validity_period),因此如果数据帧上次缓存的时间超过一天,Taipy 仍会重新执行任务。
Taipy 的 GUI 功能,可以在此处找到快速入门。
您可以在此存储库中找到所有代码和数据。