什么是开放表格式OTF?

如果您从事数据领域,您可能听说过开放表格式,例如 Apache Iceberg、Apache Hudi 或 Delta Lake。

开放表格式是数据存储的包装器,并使用一系列文件来

  1. 跟踪表上的架构/分区 (DDL) 更改。
  2. 跟踪表的数据文件及其列统计信息。
  3. 跟踪表上的所有插入/更新/删除 (DML)。

我们将使用 Apache Iceberg 来说明使用 OTF 的好处。虽然实现上存在差异和细微差别,但以下部分也适用于 Apache Hudi 和 Delta 格式。

建议按照代码进行操作!请按照此 GitHub 存储库中的设置部分 创建数据并将其插入订单表中。
Apache Iceberg 可以与多种数据处理系统配合使用;spark-sql我们在示例中使用。

无需重新处理即可演化数据和分区架构
Apache Iceberg 使您能够更改表的数据模式或分区模式,而无需重新处理现有数据。元数据文件跟踪架构和分区更改,允许系统使用相应历史数据的适当数据/分区架构来处理数据。
模式schema和分区演化是分析表中的常见操作,对于传统的 OLAP 系统来说通常成本高昂或容易出错。Apache Iceberg 使数据和分区模式演变成为一项简单的任务。

-- schema evolution
ALTER TABLE local.warehouse.orders ALTER COLUMN cust_id TYPE bigint;
ALTER TABLE local.warehouse.orders DROP COLUMN order_status;

-- parititon evolution
ALTER TABLE local.warehouse.orders ADD PARTITION FIELD cust_id;
INSERT INTO local.warehouse.orders VALUES 
('e481f51cbdc54678b7cc49136f2d6af7',69,CAST('2023-11-14 09:56:33' AS TIMESTAMP)),
('e481f51cbdc54678b7cc49136f2d6af7',87,CAST('2023-11-14 10:56:33' AS TIMESTAMP));

-- check snapshots
select committed_at, snapshot_id, manifest_list from local.warehouse.orders.snapshots order by committed_at desc;
-- We will have two since we had two insert statements

-- See the partitions column statistics and data files added per snapshot
select added_snapshot_id, added_data_files_count, partition_summaries from local.warehouse.orders.manifests;


上面的代码显示表上的每个操作(插入/删除/更新集)都将被视为快照。我们还可以通过表的清单查看分区架构、列统计信息以及每个快照添加或删除的文件数。
我们还可以打开该文件./data/iceberg-warehouse/warehouse/orders/metadata/v<the latest number>.metadata.json(在项目目录中)以分别查看“schema”和“partition-specs”部分下的不同数据和分区模式。

系统如何处理分区已演变的数据的直观表示:


隐藏分区允许您根据列转换定义(并启用使用)分区。例如,您可能希望按时间戳列上的日期进行分区。在 HIVE(没有 OTF)中,您必须在数据中创建一个单独的日期列。但是,使用 Apache Iceberg,您可以在另一列的转换上定义分区,如下所示:

/* -- created in the setup section
CREATE TABLE local.warehouse.orders (
    order_id string,
    cust_id INT,
    order_status string,
    order_date timestamp
) USING iceberg
PARTITIONED BY (date(order_date));
*/

  
-- The below query automatically uses the partition to prune data files to scan.
SELECT cust_id, order_date FROM local.warehouse.orders WHERE order_date BETWEEN '2023-11-01 12:45:33' AND '2023-11-03 12:45:33';

查看之前的时间点表状态,也称为时间旅行
由于元数据文件跟踪数据(以及数据/分区架构)的所有更改,因此我们可以返回到时间点表状态(也称为时间旅行)。例如,如果您想查看四天前的表是什么样子,您可以使用时间旅行来实现。

-- get the time of the first data snapshot
select min(committed_at) as min_committed_at from local.warehouse.orders.snapshots;
-- e.g. 2023-11-21 12:03:08.833

-- Query data as of the oldest committed_at (min_committed_at from the above query) time or after
SELECT * FROM local.warehouse.orders TIMESTAMP AS OF '2023-11-21 12:03:08.833';
-- 15 rows

-- Query without time travel, and you will see all the rows
SELECT * FROM local.warehouse.orders;
-- 17 rows

类似于 Git 的表分支和标签
Apache Iceberg 通过管理每个分支的独立元数据文件来实现表的分支。例如,假设您必须在生产中运行管道一周才能验证您所做的更改是否有效。您可以使用分支来执行此操作,如下所示:


DROP TABLE IF EXISTS local.warehouse.orders_agg;
CREATE TABLE local.warehouse.orders_agg(
    order_date date,
    num_orders int
)  USING iceberg;
INSERT INTO local.warehouse.orders_agg
SELECT date(order_date) as order_date, count(order_id) as num_orders from local.warehouse.orders WHERE date(order_date) = '2023-11-02' GROUP BY 1;

-- Create two branches that are both stored for ten days
ALTER TABLE local.warehouse.orders_agg CREATE BRANCH `branch-v1` RETAIN 10 DAYS;
ALTER TABLE local.warehouse.orders_agg CREATE BRANCH `branch-v2` RETAIN 10 DAYS;

-- Use different logic for each of the branches

-- inserting into branch v1
INSERT INTO local.warehouse.orders_agg.`branch_branch-v1`
SELECT date(order_date) as order_date, count(order_id) as num_orders from local.warehouse.orders WHERE date(order_date) = '2023-11-03' GROUP BY 1;

INSERT INTO local.warehouse.orders_agg.`branch_branch-v1`
SELECT date(order_date) as order_date, count(order_id) as num_orders from local.warehouse.orders WHERE date(order_date) = '2023-11-04' GROUP BY 1;

-- inserting into branch v2
INSERT INTO local.warehouse.orders_agg.`branch_branch-v2`
SELECT date(order_date) as order_date, count(distinct order_id) as num_orders from local.warehouse.orders WHERE date(order_date) = '2023-11-03' GROUP BY 1;

INSERT INTO local.warehouse.orders_agg.`branch_branch-v2`
SELECT date(order_date) as order_date, count(distinct order_id) as num_orders from local.warehouse.orders WHERE date(order_date) = '2023-11-04' GROUP BY 1;

-- validate data, the v2 logic is correct
select * from local.warehouse.orders_agg.`branch_branch-v1` order by order_date;
select * from local.warehouse.orders_agg.`branch_branch-v2` order by order_date;

从上面的练习中,我们注意到branch-v2具有正确的逻辑,因此我们将主分支快进到branch-v2。主分支现在将拥有过去两天的准确数据。

select * from local.warehouse.orders_agg order by order_date desc; 
-- Push the main branch to branch v2's state
CALL local.system.fast_forward('warehouse.orders_agg', 'main', 'branch-v2');
select * from local.warehouse.orders_agg order by order_date desc;

同时处理多个读取和写入
在传统的OLAP系统(例如HIVE)中,如果多个进程在没有适当保护的情况下读/写同一个表,则可能会出现数据读取不一致,或者数据可能在写入过程中被覆盖。Apache Iceberg 以原子方式更新其元数据,这迫使编写器一次“提交”一个更改(如果多个编写器发生冲突,则会对失败的编写器进行重试 )。

读取数据时,Apache Iceberg 使用最新的快照(使用元数据文件)来确保进程内数据操作不会影响读取。

由于Apache Icerber是OSS,我们可以使用任何实现表格式的系统来读写。

例如,我们可以使用 DuckDB 来读取我们的数据。使用 退出 Spark shell,使用exit;docker退出exit。duckdb通过终端使用命令启动 DuckDB CLI 。

INSTALL iceberg;
LOAD iceberg;

-- Count orders by date
WITH orders as (SELECT * FROM iceberg_scan('data/iceberg-warehouse/warehouse/orders', ALLOW_MOVED_PATHS=true))
select strftime(order_date, '%Y-%m-%d') as order_date
, count(distinct order_id) as num_orders
from orders 
group by strftime(order_date, '%Y-%m-%d') 
order by 1 desc;

总结
下次使用 OTF 时,请记住它是一个元数据文件系统,存储对数据的每次更改以及有关数据文件的统计信息。OTF 可以显着改善您管理分析表的方式和开发人员体验。