关于Delta Lake的ACID事务机制简介


近年来,随着大数据利用用例的多样化,需要为分布式存储添加更多功能。这几年诞生了几款OSS存储层SW,可以原样使用HDFS等分布式存储和Apache Spark等分布式处理框架,为分布式存储添加新功能。其中,Delta Lake通过将ACID事务的功能赋予分布式存储来提高数据集的可靠性,本文将进行介绍。我们将探索 Delta Lake 的 ACID 事务,从 Delta Lake 的概述到深入的内容,例如在源代码级别检查实现。
 
什么是Delta Lake?
Delta Lake 是 Databricks 在 2019 年宣布为 OSS 的存储层软件。Delta Lake 允许您通过在分布式存储(如 HDFS 和 Amazon S3)上启用 ACID 事务来保持数据集的可靠性。
部署 Delta Lake 很简单,只需在实现 Spark 应用程序时使用 Maven 或 SBT 安装包。(以下以Maven为例)

<dependency>
  <groupId>io.delta</groupId>
  <artifactId>delta-core_2.12</artifactId>
  <version>1.1.0</version>
</dependency>

如果您只想读写简单的数据,只需对您的 Spark SQL 应用程序进行一些更改。

// Write
val data = spark.range(0, 5)
data.write.format(
"delta").save("/tmp/delta-table")

// Read
val df = spark.read.format(
"delta").load("/tmp/delta-table")

它与 Spark SQL Read/Write 相同,只是格式已更改为"delta"。仅此一项就可以确保您仍然可以以 Parquet 的数据格式读取和写入数据,以及读取和写入 ACID 事务。
此外,Delta Lake 可以更轻松地为以前难以用分布式存储处理的数据实施 Upsert(在 SQL 术语中也称为 Merge)。
对于HDFS等存储,通过规范划分,实现了高扩展性。结果,如果我想更新数据,我必须做这样的事情:1. 读取所有要更新的数据集2. 向读取的数据集添加/更新/删除数据3. 写入新数据集4.删除旧数据集
使用 Delta Lake,这些现在可以通过一次操作完成。
 
分布式存储的 ACID 事务
首先,让我们看看每个 ACID 事务是什么。
  • 原子性:对于事务中包含的操作,要么保证“所有结果都反映”,要么“什么都不反映”。
  • 一致性:保证事务前后的数据一致性。
  • 隔离:每个事务的处理不影响其他事务。
  • 持久性:已完成事务的数据不应在系统故障时丢失。

分布式存储配置了大量机器,其中一台机器经常坏掉,所以有些机器是用 Dorability 作为标准实现的。例如,HDFS 通过默认创建三个副本并跨不同节点存储它们来实现 Durability。
其余三个通常不受分布式存储的保护。
关于原子性,我们会在更早的Upsert过程中考虑。(转贴)
  1. 读取所有要更新的数据集
  2. 对读取的数据集添加/更新/删除数据
  3. 写一个新的数据集
  4. 擦除旧数据集

现在假设应用程序在 3 完成时崩溃了。在这种情况下,旧数据集和新数据集都将保留在 HDFS 中。如果你立即注意到它,你可能没有任何问题,但如果它无人看管,其他应用程序将使用旧数据集,如果你注意到它,数据集很多,你不知道哪个是最新的。这个可能会导致问题。
关于一致性,可能是数据集的模式有问题。在HDFS中,写入的数据的内容是没有经过审查的,所以有可能,比如写Parquet有更多的列,json有更多的层次等等。如果模式不一致的地方很多,数据集可能无法建立,数据处理和分析也可能无法进行。
让我们在与原子性相同的情况下考虑隔离。(转贴)
  1. 读取所有要更新的数据集
  2. 对读取的数据集添加/更新/删除数据
  3. 写一个新的数据集
  4. 擦除旧数据集

在 HDFS 中,Isolation Level 是最低的 Read Uncommitted,即没有事务并发控制。在这种情况下,例如,1 和 3 之间的另一个事务可能会更新数据集,从而导致数据不一致。
  
关于 Delta Lake 事务
Delta Lake 假设您将使用 HDFS 和其他 Durability 分布式存储作为数据存储。因此,Durability 是由分布式存储来保证的。此外,通过使用 Delta Lake 来确保原子性、一致性和隔离性。
让我更详细地解释一下这个故事。通过将格式更改为"delta",Write 将按如下方式执行
  1. 将数据的实体Parquet文件写入分布式存储
  2. 将即1个数据的元数据写入分布式存储“Delta Log”(=提交)

读取执行如下。
  1. 通过阅读之前的“Delta Log”确定所需的 Parquet 文件
  2. 读取指定的 Parquet 文件

以下是 Delta Log 的示例:
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{
"metaData":{"id":"ac72d2ec-7bdc-42cd-88cc-72da634f0873","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1640262316180}}
{
"add":{"path":"part-00000-1c9604e7-6b21-4392-acf1-1fb45e31744b-c000.snappy.parquet","partitionValues":{},"size":296,"modificationTime":1640262319000,"dataChange":true}}

这个Delta Log的存在实际上保证了原子性、一致性和隔离性。
 
  • 首先是原子性,

这很简单。由于编写 Delta Log 本身就是一次提交,因此您可以创建一种情况,如果有 Delta Log,则反映所有更新,如果没有 Delta Log,则不反映任何更新。没有在任何 Delta Log 中列出的 Parquet 文件永远不会被 Read 读取,所以如果只将一些过渡处理作为 Parqeut 文件写入分布式存储,那也没关系。
 
  • 接下来是一致性:

说到前面提到的schema问题,这可以通过检查Delta Log上挂起的Metadata是否存在不一致来实现。如果它与元数据中保存的架构不匹配,则更新将被拒绝(= 架构验证),并且维护数据集完整性的机制将起作用。只要不冲突,也可以增加模式中的列数(=模式演化)和分区中的列数。(Schema Evolution 的具体行为是automatic-schema-evolution>https://docs.delta.io/latest/delta-update.htmlautomatic-schema-evolution)
 
  • 最后,隔离性:

Delta Lake 利用多版本并发控制 (MVCC) 和乐观独占控制。Delta Log 有版本编号,这意味着数据集的版本。通过在读取数据时在 Spark 中保留版本号,即使发生另一个事务的更新(从应用程序端读取),您也可以继续读取相同版本的数据集(=快照)。您也可以指定特定版本)。对于write,记下事务开始时数据集的最新版本是什么,如果在commit时已经有(latest version + 1)的Delta Log,就会和其他事务发生数据冲突。