什么是幂等数据管道? - Alaro


牛津词典中定义的幂等“是集合中的一个元素,当它自身相乘或以其他方式运算时,其值不变。”

1.什么是幂等数据管道
运行从源获取数据并将其多次加载到关系数据库中的管道可能会导致数据库中存在重复值,从而导致错误的指标和许多其他错误。使管道幂等将防止这种情况发生,并使您成为更好的工程师。
换句话说,使用相同的输入多次运行数据管道将始终产生相同的输出。

2. 幂等数据管道的优点
下面列出了幂等数据管道的一些优点

  • 确保在回填的情况下不会对存储位置产生重复数据
  • 它使管道中的转换结果可预测
  • 它有助于减少数据存储费用
  • 它还有助于删除旧的/不需要的数据

3.如何使数据管道具有幂等性
数据管道中最常见的步骤是

  • 从一个或多个源中提取数据
  • 执行一些转换
  • 加载到数据仓库

幂等管道将确保如果列出的步骤中发生任何错误,仍会产生预期的结果作为输出。

如果在加载阶段发生错误,数据没有完全加载到数据仓库中,我们的幂等管道应该从数据仓库中删除半加载的数据,并在管道重新运行时将新的数据存储为完全加载的数据. 仅当数据管道在重新运行时将生成所需的相同数据时才建议这样做。这种模式被称为删除-写入模式。Spark、Snowflake 等技术提供了其他幂等设计模式,如spark-overwrite

下面提供了一种在 python 中实现delete-write模式的方法:

import pandas as pd
import os
import shutil


def extract(path: str = "s3://my_bucket_name/file0.parquet") -> pd.DataFrame:
  df = pd.read_parquet(path)
  return df


# remove error rows
def transform(df: pd.DataFrame) -> pd.DataFrame:
  df_clean = df[df['customerId'] != 'A']
  return df_clean


def load_dwh(df: pd.DataFrame, output_location: str) -> None:
  if os.path.exists(output_location) == True:
    # removes the entire folder
    shutil.rmtree(output_location)
    os.makedir(output_location)
    df.to_csv(filename.csv)
  else:
    df.to_csv(filename.csv)
    
  return None

在上面的片段中,我实现了一个简单的ETL管道,从s3桶中获取parquet文件,并使用pandas read_parquet来读取它。然后根据确定的业务逻辑对数据进行转换,在我的例子中,它删除了所有customer_id等于A的记录。
在这里,load_dwh函数接受两个参数:数据框架和output_location,检查output_location是否已经存在,如果在加载数据时发生错误,就会出现这种情况,然后删除output_location中指定的文件夹,用新的数据重新创建它。

4。结论
拥有幂等数据管道可以避免数据工程师的很多麻烦,尤其是当数据管道由于错误或业务逻辑更改而需要多次重新运行时。