湖仓一体技术调研(Apache Hudi、Iceberg和Delta lake对比)
作者:程哥哥、刘某迎 、杜某安、刘某、施某宇、严某程
1 引 言
随着当前的大数据技术逐步革新,企业对单一的数据湖和数仓架构并不满意。越来越多的企业开始融合数据湖和数据仓库的平台,不仅可以实现数据仓库的功能,还实现了各种不同类型数据的处理功能、数据科学、用于发现新模型的高级功能,这就是所谓的"湖仓一体"。湖仓一体(Data LakeHouse)是一种新型开放式架构,将数据湖和数据仓库的优势充分结合,它构建在数据湖低成本的数据存储架构之上,又继承了数据仓库的数据处理和管理功能。作为新一代大数据技术架构,将逐渐取代单一数据湖和数据仓库架构。
图1‑1 大数据技术架构
1.1数据仓库
数据仓库,英文名称为Data Warehouse,可简写为DW或DWH。数据仓库的目的是构建面向分析的集成化数据环境,为企业提供决策支持(Decision Support)。它出于分析性报告和决策支持目的而创建。数据仓库本身并不"生产"任何数据,同时自身也不需要"消费"任何的数据,数据来源于外部,并且开放给外部应用,这也是为什么叫"仓库",而不叫"工厂"的原因。
1.1.1 基本特征
数据仓库是数据仓库是面向主题的(Subject Oriented)、集成的(Integrated)、非易失的(Non-Volatile)和时变的(Time Variant)的数据集合,用于支持管理决策。
- 面向主题
传统数据库中,最大的特点是面向应用进行数据的组织,各个业务系统可能是相互分离的。而数据仓库则是面向主题的。主题是一个抽象的概念,是较高层次上企业信息系统中的数据综合、归类并进行分析利用的抽象。在逻辑意义上,它是对应企业中某一宏观分析领域所涉及的分析对象。
- 集成性
通过对分散、独立、异构的数据库数据进行抽取、清理、转换和汇总便得到了数据仓库的数据,这样保证了数据仓库内的数据关于整个企业的一致性。数据仓库中的综合数据不能从原有的数据库系统直接得到。因此在数据进入数据仓库之前,必然要经过统一与综合,这一步是数据仓库建设中最关键、最复杂的一步,所要完成的工作有:
-
统一源数据中所有矛盾之处,如字段的同名异义、异名同义、单位不统一、字长不一致,等等。
-
进行数据综合和计算。数据仓库中的数据综合工作可以在从原有数据库抽取数据时生成,但许多是在数据仓库内部生成的,即进入数据仓库以后进行综合生成的。
- 非易失性(不可更新性)
数据仓库的数据反映的是一段相当长的时间内历史数据的内容,是不同时点的数据库快照的集合,以及基于这些快照进行统计、综合和重组的导出数据。数据非易失性主要是针对应用而言。数据仓库的用户对数据的操作大多是数据查询或比较复杂的挖掘,一旦数据进入数据仓库以后,一般情况下被较长时间保留。数据仓库中一般有大量的查询操作,但修改和删除操作很少。因此,数据经加工和集成进入数据仓库后是极少更新的,通常只需要定期的加载和更新。
- 时变性
数据仓库包含各种粒度的历史数据。数据仓库中的数据可能与某个特定日期、星期、月份、季度或者年份有关。数据仓库的目的是通过分析企业过去一段时间业务的经营状况,挖掘其中隐藏的模式。虽然数据仓库的用户不能修改数据,但并不是说数据仓库的数据是永远不变的。分析的结果只能反映过去的情况,当业务变化后,挖掘出的模式会失去时效性。因此数据仓库的数据需要更新,以适应决策的需要。从这个角度讲,数据仓库建设是一个项目,更是一个过程。数据仓库的数据随时间的变化表现在以下几个方面:
-
数据仓库的数据时限一般要远远长于操作型数据的数据时限
-
操作型系统存储的是当前数据,而数据仓库中的数据是历史数据
-
数据仓库中的数据是按照时间顺讯追加的,它们都带有时间属性
1.2数据湖
数据湖(Data Lake)是一个以原始格式存储数据的存储库或系统。它按原样存储数据,而无需事先对数据进行结构化处理。一个数据湖可以存储结构化数据(如关系型数据库中的表),半结构化数据(如CSV、日志、XML、JSON),非结构化数据(如电子邮件、文档、PDF)和二进制数据(如图形、音频、视频)。
1.2.1 基本特征
数据湖的特征可以从数据和计算两个方面进行分析,首先是在数据方面,主要具有以下四个特性:
-
保真性:数据湖中对于业务系统中的数据都会存储一份"一模一样"的完整拷贝。与数据仓库不同的地方在于,数据湖中必须要保存一份原始数据,无论是数据格式、数据模式、数据内容都不应该被修改。同时,数据湖应该能够存储任意类型/格式的数据。
-
灵活性:对于任何数据应用来说,schema的设计都是必不可少的。数据湖强调的是"读取型schema",背后的潜在逻辑认为业务的不确定性是常态:我们无法预期业务的变化,那么我们就保持一定的灵活性,将设计去延后,让整个基础设施具备使数据"按需"贴合业务的能力。既然没办法预估业务的变化,那么就将数据保持为最原始的状态,一旦需要时,可以根据需求对数据进行加工处理。因此,数据湖更加适合创新型企业、业务高速变化发展的企业。同时,数据湖的用户也相应的要求更高,数据科学家、业务分析师(配合一定的可视化工具)是数据湖的目标客户。
-
可管理性:既然数据湖中的数据要求"保真性"和"灵活性",那么数据湖中至少会存在两类数据:原始数据和处理后的数据。数据湖中的数据会不断地累积、演化。因此,对于数据管理能力也会要求很高,至少应该包含以下数据管理能力:数据源、数据连接、数据格式、数据schema(库/表/列/行)。同时,数据湖是单个企业/组织中统一的数据存放场所,因此,还需要具备一定的权限管理能力。
-
可追溯性:数据湖是一个组织/企业中全量数据的存储场所,需要对数据的全生命周期进行管理,包括数据的定义、接入、存储、处理、分析、应用的全过程。一个强大的数据湖实现,需要能做到对其间的任意一条数据的接入、存储、处理、消费过程是可追溯的,能够清楚的重现数据完整的产生过程和流动过程。
在计算方面。数据湖对于计算能力的要求相当广泛,主要取决于业务对于计算的要求,总的来说可以概括为以下两点:
-
丰富的计算引擎:从批处理、流式计算、交互式分析到机器学习,各类计算引擎都属于数据湖应该囊括的范畴。一般情况下,数据的加载、转换、处理会使用批处理计算引擎;需要实时计算的部分,会使用流式计算引擎;对于一些探索式的分析场景,可能又需要引入交互式分析引擎。随着大数据技术与人工智能技术的结合越来越紧密,各类机器学习、深度学习算法也被不断引入。因此,对于一个合格的数据湖项目而言,计算引擎的可扩展/可插拔,应该是一类基础能力。
-
多模态的存储引擎:理论上,数据湖本身应该内置多模态的存储引擎,以满足不同的应用对于数据访问需求(综合考虑响应时间/并发/访问频次/成本等因素)。但是,在实际的使用过程中,数据湖中的数据通常并不会被高频次的访问,而且相关的应用也多在进行探索式的数据应用,为了达到可接受的性价比,数据湖建设通常会选择相对便宜的存储引擎(如S3/OSS/HDFS/OBS),并且在需要时与外置存储引擎协同工作,满足多样化的应用需求。
1.3现有数据平台及问题
数据仓库(data warehouse)诞生之初的使命是:从业务数据库搜集数据,然后统一进行中心式维护,并满足业务方的决策和BI需求。数据在入仓时,需要遵循 Schema-on-Write原则(入库前定义Schema,传统数据库的用法)。这就需要提前进行建模,从而为下游BI应用的查询分析提供性能保障。这就是第一代数据分析平台的架构。但是使用一段时间后,初代架构暴露出了这么几个问题:
-
弹性缺失:存储和计算都在相同的本地设备上,导致企业需要按照业务负载的峰值进行付费,一旦数据规模膨胀,成本很高。
-
异构数据:Warehouse架构不能存储和查询非结构化数据(视频/音频/文本)
为了解决这些问题,企业开始把原始数据存储到数据湖(datalake)当中。此处的Datalake指:基于常见文件格式(如Parquet/ORC)的低成本文件存储,同时还具备基础的文件API。这套架构常见于hadoop生态(使用HDFS作为文件存储)。Datalake是一种Scheme-on-Read模式。虽然能够快速低成本的存储几乎任何数据,但这种方案把数据质量监控和数据管理的成本转嫁给了下游应用(因为下游应用读时,才能感知数据质量情况)。使用数据时,企业需要从Datalake中把目标数据的子集ETL到下游的数据仓库,再进行分析决策。这个架构中的Datalake因为使用了开放的文件格式,因此其他分析应用(如ML系统),可以直接访问数据。这是相比于初代的封闭数据格式,是一种优势。
从2015年起,云上Datalake(如S3)具备低成本&高SLA等优势,开始逐步替代HDFS。但整体架构和上面提到的第二代架构基本相似。Datalake下游是云厂商自己的数据仓库,如Redshift。这种双层架构(Datalake + Warehouse)目前在企业中是一种主流架构。双层架构看起来成本较低(云上对象存储/算力成本 本身不高),但问题在于双层架构的复杂度。双层架构需要两轮ETL,这其中存在着延时/复杂度/任务失败等多种情况。此外,对于现阶段企业出现的一些高级分析用例(比如机器学习)前面几种架构对其支持的都不是很理想。
总的来说,当今的架构存在这几个问题:
-
可靠性问题:数据仓库和数据湖保持一致性的成本很高,需要持续ETL,而每次ETL都存在作业失败或者脏数据的风险
-
数据陈旧:一般数据仓库的数据时效性都落后于数据湖。双层架构在数据时效性上,甚至不如第一代架构
-
高级分析用例无法支持:目前没有ML系统(如tf/pytorch)可以在数据仓库系统中运行。不同于BI场景。这类系统通过非SQL的方式 从大数据集中获取一个小的子集。而Warehouse一般只支持JDBC/ODBC协议。暴露出的问题:1)数据访问很低效;2)ML应用无法访Warehouse内的封闭文件格式。因此,目前基于Warehouse架构来满足这些用例,只能把数据再次导出为文件。这使得复杂度和延时进一步增加。如果基于Datalake直接运行这些分析用例,流程虽然可以跑通。但失去了数据仓库作为DBMS的管理Feature,如 ACID/事务/索引 等。简单来说,ML系统对数据访问的诉求是兼顾:性能Performance + DBMS管理Feature + 兼容性好的数据访问方式。 第二代架构显然满足不了需求
-
维护成本:体现在ETL执行两次的维护开销+双倍存储开销。同时在Warehouse内的数据,一般是DBMS厂专有格式,后续作业迁移和数据迁移成本很高
2 湖仓一体化
所谓湖仓一体,可概括为将数据仓库和数据湖进行整合,融为一体。数据仓库是信息的中央存储库,建立在传统的数据库技术上,存储了大量的结构化数据。通常数据定期从数据定期从事务系统、关系数据库和其他来源流入数据仓库。数据湖基于 HDFS 和 OSS 存储技术。是一种不断演进、可扩展的大数据存储、处理、分析的基础设置。它就像一个大型仓库,可以存储任何形式(包括结构化和非结构化)和任何格式(包括文本、音频、视频和图像)的原始数据,虽然数据仓库和数据湖的应用场景和架构不同,但它们并不是对立关系。数据仓库存储结构化的数据,适用于快速的BI和决策支撑,而数据湖可以存储任何格式的数据,往往通过挖掘能够发挥出数据的更大作为,因此 在一些场景上二者的并存可以给企业带来更多收益。
承载着新的需求,湖仓一体应运而生,又被称为Lake House,其出发点是通过数据仓库和数据湖的打通和融合,让数据流动起来,减少重复建设,直接在数据湖的低成本存储上实现与数据仓库中类似的数据结构和数据管理功能。Lake House架构最重要的一点,是实现数据仓库和数据湖的数据/元数据无缝打通和自由流动。湖里的"显性价值"数据可以流到仓里,甚至可以直接被数仓使用;而仓里的"隐性价值"数据,也可以流到湖里,低成本长久保存,供未来的数据挖掘使用。
目前,数据仓库和数据湖的融合有两个方向,一种是在数据湖基础上增加数据仓库能力,另一种是数据湖和数据仓库并行融合形成混合式的逻辑数据仓库。接下来,我们将介绍三个开源的数据湖系统,这些系统在最近的更新中,添加了一些功能,使得其实现与数据仓库中类似的数据结构和数据管理功能。
2.1 Apache Hudi
Hudi是Hadoop Updates and Incrementals的简写,用于管理分布式文件系统上大型分析数据集存储。Hudi具有如下基本特性/能力:
-
Hudi能够摄入(Ingest)和管理(Manage)基于HDFS之上的大型分析数据集,主要目的是高效的减少入库延时
-
Hudi在HDFS数据集上提供如下流原语:插入更新(如何改变数据集);增量拉取(如何获取变更的数据)
-
Hudi可以对HDFS上的parquet格式数据进行插入/更新操作
-
Hudi通过自定义InputFormat与Hadoop生态系统(Spark、Hive、Parquet)集成
-
Hudi通过Savepoint来实现数据恢复
2.1.1基本架构
Hudi能够整合Batch和Streaming处理的能力,主要通过Spark自身支持的基本来能来实现的。一个数据处理Pipeline通常由Source、Processing、Sink三个部分组成,Hudi可以作为Source、Sink,它把数据存储到分布式文件系统(如HDFS)中。
Apache Hudi在大数据应用场景中,所处的位置,如下图所示:
图 2‑1 Apache Hudi
2.1.2 Timeline
Hudi内部对每个表都维护了一个Timeline,这个Timeline是由一组作用在某个表上的Instant对象组成。Instant表示在某个时间点对表进行操作的从而达到某一个状态的表示,所以Instant包含Instant Action、Instant Time和Instant State三个内容,它们的含义如下所示:
-
Instant Action:对Hudi表执行的操作类型,目前包括COMMITS、CLEANS、DELTA_COMMIT、COMPACTION、ROLLBACK、SAVEPOINT这6种操作类型
-
Instant Action:对Hudi表执行的操作类型,目前包括COMMITS、CLEANS、DELTA_COMMIT、COMPACTION、ROLLBACK、SAVEPOINT这6种操作类型
-
Instant Action:对Hudi表执行的操作类型,目前包括COMMITS、CLEANS、DELTA_COMMIT、COMPACTION、ROLLBACK、SAVEPOINT这6种操作类型
下面是官网给出的一个例子来解释一下Timeline,如图2-2所示:
图 2 ‑2 Hudi Timeline
上面的示例显示了Hudi表上的10:00到10:20之间发生的upserts,大约每5分钟一次。每次操作执行完成,会看到对应这个Hudi表的Timeline上,有一系列的COMMIT元数据生成。当满足一定条件时,会在指定的时刻对这些COMMIT进行CLEANS和COMPACTION操作,这两个操作都是在后台完成,其中在10:05之后执行了一次CLEANS操作,10:10之后执行了一次COMPACTION操作。可以看到,从数据生成到最终到达Hudi系统,可能存在延迟如图中数据大约在07:00、08:00、09:00时生成,数据到达大约延迟了分别3、2、1小时多,最终生成COMMIT的时间才是upsert的时间。对于数据到达时间(Arrival Time)和事件时间(Event Time)相关的数据延迟(Latency)和完整性(Completeness)的权衡,Hudi可以将数据upsert到更早时间的Buckets或Folders下面。通过使用Timeline来管理,当增量查询10:00之后的最新数据时,可以非常高效的找到10:00之后发生过更新的文件,而不必根据延迟时间再去扫描更早时间的文件,比如这里,就不需要扫描7:00、8:00或9:00这些时刻对应的文件(Buckets)。
2.1.3 Hudi存储
Hudi将表组织成HDFS上某个指定目录(basepath)下的目录结构,表被分成多个分区,分区是以目录的形式存在,每个目录下面会存在属于该分区的多个文件,类似Hive表,每个Hudi表分区通过一个分区路径(partitionpath)来唯一标识。在每个分区下面,通过文件分组(File Group)的方式来组织,每个分组对应一个唯一的文件ID。每个文件分组中包含多个文件分片(File Slice),每个文件分片包含一个Base文件(*.parquet),这个文件是在执行COMMIT/COMPACTION操作的时候生成的,同时还生成了几个日志文件(*.log.*),日志文件中包含了从该Base文件生成以后执行的插入/更新操作。
Hudi采用MVCC设计,当执行COMPACTION操作时,回合并日志文件和Base文件,生成新的文件分片。CLEANS操作会清理掉不用的/旧的文件分片,释放存储空间。
Hudi会通过记录Key与分区Path组成Hoodie Key,即Record Key+Partition Path,通过将Hoodie Key映射到前面提到的文件ID,具体其实是映射到file_group/file_id,这就是Hudi的索引。一旦记录的第一个版本被写入文件中,对应的Hoodie Key就不会再改变了。
2.1.4 Hudi表类型
Hudi主要有两种类型的表:Copy-On-Write表和Merge-On-Road表。
Copy-On-Write表使用专门的列式文件格式存储数据,例如Parquet。更新时保存多版本,并且在写的过程中通过异步的Merge来实现重写(Rewrite)数据文件。Copy-On-Write表只包含列式格式的Base文件,每次执行COMMIT操作会生成新版本的Base文件。所以,Copy-ON-Write表存在写放大的问题,因为每次有更新操作都会重写(Rewrite)整个Base文件。通过官网给出的一个例子,来说明写入Copy-On-Write表,并进行查询操作的基本流程,如下图所示:
图 2 ‑3 COW表查询
每次执行INSERT或UPDATE操作,都会在Timeline上生成一个新的COMMIT,同时对应着一个文件分片(File Slice)。如果是INSERT操作则生成文件分组的第一个新的文件分片,如果是UPDATE操作则会生成一个新版本的文件分片。
写入过程中可以进行查询,如果查询COMMIT未10:10之前的数据,则会首先查询Timeline上最新的COMMIT,通过过滤掉只会小于10:10的数据查询出来,即把文件ID为1、2、3且版本为10:05的文件分片查询出来。
Merge-On-Read表使用列式和行式文件格式混合的方式来存储数据,列式文件格式比如Parquet,行式文件格式比如Avro。更新时写入到增量(Delta)文件中,之后通过同步或异步的COMPACTION操作,生成新版本的列式格式文件。
Merge-On-Read表存在列式格式的Base文件,也存在行式格式的增量(Delta)文件,新到达的更新都会写到增量日志文件中,根据实际情况进行COMPACTION操作来将增量文件合并到Base文件上。通常,需要有效的控制增量日志文件的大小,来平衡读放大和写放大的影响。Merge-On-Read表可以支持Snapshot Query和Read Optimized Query,下面的例子展示了Merge-On-Read表读写的基本流程,如下图所示:
图 2‑4 MOR表读写流程
上图中,每个文件分组都对应一个增量日志文件(Delta Log File)。COMPACTION操作在后台定时执行,会把对应的增量日志文件合并到文件分组的Base文件中,生成新版本的Base文件。对于查询10:10之后的数据的Read Optimized Query,只能查询到10:05及其之前的数据,看不到之后的数据,查询结果只包含版本为10:05、文件ID为1、2、3的文件;但是Snapshot Query是可以查询到10:05之后的数据的。
由于COW表和MOR表的存储格式的不同,它们的数据写入延迟和查询延迟如表1所示:
表格 1 Hudi数据表属性
Copy-On-Write | Merge-On-Read | |
---|---|---|
数据延迟 | 高 | 低 |
查询延迟 | 低 | 高 |
更新成本 | 高,需要重写整个parquet文件 | 低,append方式写增量文件 |
2.1.5 Hudi查询类型
Hudi支持三种查询类型:
-
Snapshot Query:只能查询到给定COMMIT或COMPACTION后的最新快照数据。对于Copy-On-Write表,Snapshot Query能够查询到,已经存在的列式格式文件(Parquet文件);对于Merge-On-Read表,Snapshot Query能够查询到通过合并已存在的Base文件和增量日志文件得到的数据。
-
Incremental Query:只能查询到最新写入Hudi表的数据,也就是给定的COMMIT/COMPACTION之后的最新数据。
-
Read Optimized Query:只能查询到给定的COMMIT/COMPACTION之前所限定范围的最新数据。也就是说,只能看到列式格式Base文件中的最新数据。
2.1.6 搜索引擎支持能力
Copy-On-Write表
Merge-On-Read表
2.2 Apache Iceberg
Apache Iceberg 是开源的数据湖方案之一,最初由 Netflix 开发。它对自己的定位是一种用于大型分析数据集的开放表格式。我们可以简单理解为它是基于计算层(flink、spark)和存储层(orc、parquet)的一个中间层,我们可以把它定义成一种"数据组织格式",Iceberg将其称之为"表格式"也是表达类似的含义。它与底层的存储格式(比如ORC、Parquet之类的列式存储格式)最大的区别是,它并不定义数据存储方式,而是定义了数据、元数据的组织方式,向上提供统一的"表"的语义。它构建在数据存储格式之上,其底层的数据存储仍然使用Parquet、ORC等进行存储。在hive建立一个iceberg格式的表。用flink或者spark写入iceberg,然后再通过其他方式来读取这个表,比如spark、flink、presto等。Iceberg的架构和实现并未绑定于某一特定引擎,它实现了通用的数据组织格式,利用此格式可以方便地与不同引擎(如Flink、Hive、Spark)对接。
2.2.1 Iceberg特性
-
数据存储、计算引擎插件化:Iceberg在设计之初的目标就是提供一个开放通用的表格式(Table Format)实现方案.因此, 它不和特定的数据存储、计算引擎绑定. 目前大数据领域的常见数据存储(HDFS、S3…), 计算引擎(Flink、Spark…)都可以接入Iceberg. 在生产环境中, 技术人员可以根据公司的实际情况, 选择不同的组件搭使用.甚至, 可以不通过计算引擎,直接读取存在文件系统上的数据.
-
数据表演化(Table Evolution):Iceberg可以通过SQL的方式进行表级别模式演进. 进行这些操作的时候, 代价极低. 不存在读出数据重新写入或者迁移数据这种费时费力的操作.比如在常用的Hive中, 如果我们需要把一个按天分区的表, 改成按小时分区. 此时, 不能再原表之上直接修改, 只能新建一个按小时分区的表, 然后再把数据Insert到新的小时分区表. 而且, 即使我们通过Rename的命令把新表的名字改为原表, 使用原表的上次层应用, 也可能由于分区字段修改, 导致需要修改 SQL. 这样花费的经历是非常繁琐的.
-
分区演化(Partition Evolution):Iceberg可以在一个已存在的表上直接修改, 因为Iceberg的查询流程并不和分区信息直接关联.当我们改变一个表的分区策略时, 对应修改分区之前的数据不会改变, 依然会采用老的分区策略, 新的数据会采用新的分区策略, 也就是说同一个表会有两种分区策略, 旧数据采用旧分区策略, 新数据采用新新分区策略, 在元数据里两个分区策略相互独立,不重合. 在技术人员查询数据的时候, 如果存在跨分区策略的情况, 则会解析成两个不同执行计划, 如Iceberg官网提供图所示:
图 2-5 Iceberg Query
图中booking_table表2008年按月分区, 进入2009年后改为按天分区, 这两中分区策略共存于该表中.得益于Iceberg的隐藏分区(Hidden Partition), 技术人员在写SQL 查询的时候, 不需要在SQL中特别指定分区过滤条件, Iceberg会自动分区, 过滤掉不需要的数据.Iceberg分区演化操作同样是一个元数据操作, 不会重写数据文件.
-
列顺序演化(Sort Order Evolution):Iceberg可以在一个已经存在的表上修改排序策略.修改了排序策略之后, 旧数据依旧采用老排序策略不变.往Iceberg里写数据的计算引擎总是会选择最新的排序策略, 但是当排序的代价极其高昂的时候, 就不进行排序了.
-
隐藏分区(Hidden Partition):Iceberg的分区信息并不需要人工维护, 它可以被隐藏起来. 不同其他类似Hive 的分区策略, Iceberg的分区字段/策略(通过某一个字段计算出来), 可以不是表的字段和表数据存储目录也没有关系. 在建表或者修改分区策略之后, 新的数据会自动计算所属于的分区.在查询的时候同样不用关系表的分区是什么字段/策略, 只需要关注业务逻辑, Iceberg会自动过滤不需要的分区数据.
2.2.2 Iceberg原理
-
优化数据入库流程:Iceberg 提供 ACID 事务能力,上游数据写入即可见,不影响当前数据处理任务,这大大简化了 ETL;Iceberg 提供了 upsert、merge into 能力,可以极大地缩小数据入库延迟
-
支持更多的分析引擎:目前 Iceberg 支持的计算引擎有 Spark、Flink、Presto 以及 Hive。
-
统一数据存储和灵活的文件组织:提供了基于流式的增量计算模型和基于批处理的全量表计算模型。批处理和流任务可以使用相同的存储模型,数据不再孤立;Iceberg 支持隐藏分区和分区进化,方便业务进行数据分区策略更新。支持 Parquet、Avro 以及 ORC 等存储格式
-
增量读取处理能力:Iceberg 支持通过流式方式读取增量数据,支持 Structed Streaming 以及 Flink table Source
2.3 Delta lake
Delta Lake的核心概念很简单:Delta Lake使用存储在云对象中的预写日志,以ACID 的方式维护了哪些对象属于Delta table这样的信息。对象本身写在parquet文件中,使已经能够处理Parquet格式的引擎可以方便地开发相应的connectors。这样的设计可以让客户端以串行的方式一次更新多个对象,替换一些列对象的子集,同时保持与读写parquet文件本身相同的高并发读写性能。日志包含了为每一个数据文件维护的元数据,如min/max统计信息。相比"对象存储中的文件"这样的方式,元数据搜索相关数据文件速度有了数量级的提升。 最关键的是,Delta Lake的设计使所有元数据都在底层对象存储中,并且事务是通过针对对象存储的乐观并发协议实现的(具体细节因云厂商而异)。这意味着不需要单独的服务来维护 Delta table 的状态,用户只需要在运行查询时启动服务器,享受存储计算扩展分离带来的好处。
基于这样的事务性设计,Delta Lake 能够提供在传统云数据湖上无法提供的解决用户痛点的特性,包括:
-
Time travel:允许用户查询具体时间点的数据快照或者回滚错误的数据更新。
-
Upsert,delete 以及 merge 操作:高效重写相关对象实现对存储数据的更新以及合规工作流(比如 GDPR)
-
高效的流I/O:流作业以低延迟将小对象写入表中,然后以事务形式将它们合并到大对象中来提高查询性能。支持快速"tail"读取表中新加入数据,因此作业可以将 Delta表作为一个消息队列。
-
缓存:由于Delta表中的对象以及日志是不可变的,集群节点可以安全地将他们缓存在本地存储中。
-
数据布局优化:在不影响查询的情况下,自动优化表中对象的大小,以及数据记录的聚类(clustering)(将记录存储成Zorder实现多维度的本地化)。
-
Schema 演化:当表的schema变化时,允许在不重写parquet文件的情况下读取旧的parquet文件。
-
日志审计:基于事务日志的审计功能。
这些特性改进了数据在云对象存储上的可管理性和性能,并且结合了数仓和数据湖的关键特性创造了"湖仓"的典范:直接在廉价的对象存储上使用标准的 DBMS 管理功能。
图 2-6 Storage
2.3.1 数据存储格式
本质上来说,delta Lake就是一种数据格式,介于分析引擎和存储层之间。存储层可以使用云存储或者HDFS等分布式文件系统。一张Delta Lake 表就是一个目录,如图2所示。目录中包含数据文件(Data Objects),以及一系列的日志(log)。
数据文件在被存储以Apache Parquet 数据格式的形式存储,每个文件都有全局统一的命名GUID,并且按照日期进行放到不同的子目录中。
日志文件,存储在_delta_log目录下,每一次操作记录为json文件,按照递增的顺序编号。操作类型有:修改元数据,增加或删除文件。增加文件的操作,在json文件里还包含一些关于文件的统计信息。删除操作采用lazy删除的方式,保留在log以及log checkpoint中,直到文件被物理删除。
日志检查点。周期性的将日志压缩为日志检查点,并且检查点会散去一些冗余的log,例如添加之后的删除操作,改变元数据的操作只会保留最后一个。日志检查点也按照Parquet 数据格式进行存储,并且使用 _last_checkpoint 来记录最后一个检查点的位置。
Delta Lake 表是云对象存储或文件系统上的一个目录,其中包含了表数据对象和事务操作日志(以及 checkpoint 日志)对象。客户端使用乐观并发控制协议来更新这些数据结构。Delta Lake 中,数据对象采用 parquet 格式存储,数据对象可分区,并且名称为唯一的 GUID。存储格式示例如下所示:
图 2-7 文件存储在Delta table
2.3.2 日志
Delta Lake 日志对象存储在表的 _delta_log 子目录中,它包含一系列连续的递增数字作为 ID 的 JSON 对象用于存储日志记录,以及某些特定日志对象的检查点,这些检查点将检查点之前的日志合并为 Parquet 格式。每个日志记录对象(比如 000003.json)包含了在前一个版本的表基础上进行的操作数组。这些操作数组用于保存数据对象信息,数据对象的添加记录还可以包括数据统计信息,例如总记录条数以及每列的最小/最大值和空计数。
另外 Delta Lake 的日志对象还可以保存一些额外信息,比如更新应用事务 ID。Delta Lake 为应用程序提供了一种将应用程序的数据包括在日志记录中的方法,允许应用程序在其日志记录对象中写入带有 appId 和版本字段的自定义 txn 操作,这样该日志对象就可以用来跟踪应用程序特定的信息,例如应用程序输入流的对应偏移量。这对于实现端到端事务性应用很有用。例如,写入 Delta 表的流处理系统需要知道先前已经提交了哪些写入,才能实现Exactly Once 语义:如果流作业崩溃,则需要知道其哪些写入先前已写入表中,以便它可以从输入流中的正确偏移处开始重播后续写入。
日志 checkpoint 的主要作用是对日志对象(json 文件)进行定期压缩,删除冗余,冗余的操作包括:
-
对同一数据对象先执行添加操作,然后执行删除操作。可以删除添加项,因为数据对象不再是表的一部分。根据表的数据retention配置,应将删除操作保留为墓碑具体来说,客户端使用在删除操作中的时间戳来决定何时从存储中删除对象。
-
同一对象的多个添加项可以被最后一个替换,因为新添加项只能添加统计信息。
-
来自同一appId的多个txn操作可以被最新的替换,因为最新的txn操作包含其最新版本字段
-
changeMetadata以及协议操作也可以进行合并以仅保留最新的元数据。
-
checkpoint 采用 parquet 格式保存,这种面向列的文件对于查询表的元数据以及基于数据统计信息查找哪些对象可能包含与选择性查询相关的数据来说是非常理想的存储格式。默认情况下,客户端每10个事务会写入一个检查点。为了便于查找,另外还有一个_last_checkpoint文件用于保存最新的Checkpoint ID。
2.3.3 读写访问协议
Delta Lake 按以下步骤对表进行读取:
1. 在table的log目录读取_last_checkpoint 对象,如果对象存在,读取最近一次的 checkpoint ID
2. 在对象存储table的log目录中执行一次LIST操作,如果"最近一次 checkpoint ID"存在,则以此ID做start key;如果它不存在,则找到最新的 .parquet 文件以及其后面的所有 .json 文件。这个操作提供了数据表从最近一次"快照"去恢复整张表所有状态所需要的所有文件清单。
3. 使用"快照"(如果存在)和后续的"日志记录"去重新组成数据表的状态(即,包含 add records,没有相关remove records的数据对象)和这些数据对象的统计信息。
4. 使用统计信息去定位读事务的query相关的数据对象集合。
5. 可以在启动的spark cluster或其他计算集群中,并行的读取这些相关数据对象。这个访问协议的每一步中都有相关的设计去规避对象存储的最终一致性。比如,客户端可能会读取到一个过期的_last_checkpoint文件,仍然可以用它的内容,通过LIST命令去定位新的"日志记录"文件清单,生产最新版本的数据表状态。这个 _last_checkpoint 文件主要是提供一个最新的快照 ID,帮助减少 LIST 操作的开销。同样的,客户端能容忍在LIST 最近对象清单时的不一致(比如,日志记录 ID 之间的 gap),也能容忍在读取日志记录中的数据对象时,还不可见,通过等一等/重试的方式去规避。
Delta Lake中写事务
一个写入数据的事务,一般会涉及最多5个步骤,具体有几步取决事务中的具体操作:
1. 找到一个最近的日志记录ID,比如r,使用读事务协议的1-2 步(比如,从最近的一次 checkpoint ID 开始往前找)。事务会读取表数据的第r个版本(按需),然后尝试去写一个r+1版本的日志记录文件。
2. 读取表数据的r版本数据,如果需要,使用读事务相同的步骤(比如,合并最新的checkpoint .parquet 和 较新的所有.json日志记录文件,生成数据表的最新状态,然后读取数据表相关的数据对象清单)
3. 写入事务相关的数据对象到正确的数据表路径,使用GUID生成对象名。这一步可以并行化。最后这些数据对象会被最新的日志记录对象所引用。
4. 尝试去写本次写事务的日志记录到 r+1 版本的 .json 日志记录对象中 ,如果没有其他客户端在尝试写入这个对象(乐观锁)。这一步需要是原子的(atomic),如果失败需要重试。
5. 此步可选。为r+1版本的日志记录对象,写一个新的.parquet快照对象。然后,在写事务完成后,更新 _last_checkpoint 文件内容,指向r+1的快照。
2.3.4隔离级别
Delta Lake写事务实现了线性隔离级别,也使得事务的日志记录ID可以线性的增长。Delta Lake的读事务是snapshot isolation级别或者serializability的。根据默认的读数据流程(如上所述),读取过程是快照隔离级别的,但是客户端如果想达到线性化(serializable)的读取,可以发出一个"read-write"的事务,假装mock一次写事务然后再读,来达到线性化。
2.3.5 局限性
-
Delta Lake目前只支持单表的序列化级别的事务,因为每张表都有它自己的事务日志。如果有跨表的事务日志将能打破这个局限,但这可能会显著的增加并发乐观锁的竞争(在给日志记录文件做append时)。在高TPS的事务场景下,一个coordinator是可以承接事务 log 写入的,这样能解决事务直接读写对象存储。
-
在流式工作负载下,Delta Lake受限于云对象存储的latency。比如,使用对象存储的 API很难达到毫秒级的流式延迟要求。另外一边看,我们发现大企业的用户一般都跑并行的任务,在使用 Delta Lake 去提供秒级的服务延迟在大多数场景下也是能够接受的。
-
Delta Lake 目前不支持二级索引(只有数据对象级别的min/max统计),我们已经开始着手开发一个基于Bloom filter的index了。Delta的ACID事务能力,允许我们以事务的方式更新这些索引。
3 实验及分析
3.1基于Apache Hudi的查询测试
3.1.1 写入测试数据到kafka
- 查看当前kafka集群信息
kafkacat -b kafkabroker -L -J | jq .
- 生产数据,将Hudi源码目录下的测试数据写入到kafka中
cat docker/demo/data/batch_1.json | kafkacat -b kafkabroker -t stock_ticks -P
- 查看写入数据后kafka集群信息
kafkacat -b kafkabroker -L -J | jq .
- 消费数据
kafkacat -C -b kafkabroker -t stock_ticks -p 0 -o -10
3.1.2 Spark写入数据到Hudi
- 执行 spark-submit
执行以下spark-submit命令以启动delta-streamer,从kafka集群消费数据,采用COPY_ON_WRITE模式写入到HDFS,表名stock_ticks_cow
spark-submit \
–class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer $HUDI_UTILITIES_BUNDLE \
–table-type COPY_ON_WRITE \
–source-class org.apache.hudi.utilities.sources.JsonKafkaSource \
–source-ordering-field ts \
–target-base-path /user/hive/warehouse/stock_ticks_cow \
–target-table stock_ticks_cow --props /var/demo/config/kafka-source.properties \
–schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider
执行以下spark-submit命令以启动delta-streamer,从kafka集群消费数据,采用MERGE_ON_READ模式写入到HDFS,表名stock_ticks_mor
spark-submit \
–class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer $HUDI_UTILITIES_BUNDLE \
–table-type MERGE_ON_READ \
–source-class org.apache.hudi.utilities.sources.JsonKafkaSource \
–source-ordering-field ts \
–target-base-path /user/hive/warehouse/stock_ticks_mor \
–target-table stock_ticks_mor \
–props /var/demo/config/kafka-source.properties \
–schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \
–disable-compaction
- 查看hdfs文件
Stock_ticks_cow表:stock_ticks_cow以日期分区,分区目录下有一个元数据文件和parquet格式的数据文件。在.hoodle目录下可以看见commit信息。
hdfs dfs -ls -R /user/hive/warehouse/stock_ticks_cow
Stock_ticks_mor
hdfs dfs -ls -R /user/hive/warehouse/stock_ticks_cow
- 同步到Hive元数据
/var/hoodie/ws/hudi-sync/hudi-hive-sync/run_sync_tool.sh \
–jdbc-url jdbc:hive2://hiveserver:10000 \
–user hive \
–pass hive \
–partitioned-by dt \
–base-path /user/hive/warehouse/stock_ticks_cow \
–database default \
–table stock_ticks_cow
/var/hoodie/ws/hudi-sync/hudi-hive-sync/run_sync_tool.sh \
–jdbc-url jdbc:hive2://hiveserver:10000 \
–user hive \
–pass hive \
–partitioned-by dt \
–base-path /user/hive/warehouse/stock_ticks_mor \
–database default \
–table stock_ticks_mor
3.1.3 Spark查询数据
- 进入spark-sehll:
$SPARK_INSTALL/bin/spark-shell \
–jars $HUDI_SPARK_BUNDLE \
–master local[2] \
–driver-class-path $HADOOP_CONF_DIR \
–conf spark.sql.hive.convertMetastoreParquet=false \
–deploy-mode client \
–driver-memory 1G \
–executor-memory 3G \
–num-executors 1 \
–packages org.apache.spark:spark-avro_2.11:2.4.4
- 查询当前所有的表
scala> spark.sql(“show tables”).show(100, false)
- 对COW表运行最大时间戳查询
scala> spark.sql(“select symbol, max(ts) from stock_ticks_cow group by symbol HAVING symbol = ‘GOOG’”).show(100, false)
- 对COW表进行投影查询
scala> spark.sql(“select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_cow where symbol = ‘GOOG’”).show(100, false)
- 对MOR表进行最大时间戳查询,使用读优化
scala> spark.sql(“select symbol, max(ts) from stock_ticks_mor_ro group by symbol HAVING symbol = ‘GOOG’”).show(100, false)
- 对MOR表进行投影查询,使用读优化查询
scala> spark.sql(“select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_ro where symbol = ‘GOOG’”).show(100, false)
- 对MOR表进行投影查询,使用快照查询
scala> spark.sql(“select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_rt where symbol = ‘GOOG’”).show(100, false)
3.1.4 写入第二批测试数据
- 将第二批数据写入到kafka
cat docker/demo/data/batch_2.json | kafkacat -b kafkabroker -t stock_ticks -P
- 将第二批数据写入Hudi COW表
spark-submit \
–class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer $HUDI_UTILITIES_BUNDLE \
–table-type COPY_ON_WRITE \
–source-class org.apache.hudi.utilities.sources.JsonKafkaSource \
–source-ordering-field ts \
–target-base-path /user/hive/warehouse/stock_ticks_cow \
–target-table stock_ticks_cow \
–props /var/demo/config/kafka-source.properties \
–schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider
查看hdfs目录:
hdfs dfs -ls -R /user/hive/warehouse/stock_ticks_cow
- 将第二批数据写入Hudi MOR表
spark-submit \
–class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer $HUDI_UTILITIES_BUNDLE \
–table-type MERGE_ON_READ \
–source-class org.apache.hudi.utilities.sources.JsonKafkaSource \
–source-ordering-field ts \
–target-base-path /user/hive/warehouse/stock_ticks_mor \
–target-table stock_ticks_mor \
–props /var/demo/config/kafka-source.properties \
–schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \
–disable-compaction
查看hdfs目录:
hdfs dfs -ls -R /user/hive/warehouse/stock_ticks_mor
3.1.5 数据读取
- 进入spark-sehll:
$SPARK_INSTALL/bin/spark-shell \
–jars $HUDI_SPARK_BUNDLE \
–master local[2] \
–driver-class-path $HADOOP_CONF_DIR \
–conf spark.sql.hive.convertMetastoreParquet=false \
–deploy-mode client \
–driver-memory 1G \
–executor-memory 3G \
–num-executors 1 \
–packages org.apache.spark:spark-avro_2.11:2.4.4
- 对于MOR表,进行读优化查询
scala> spark.sql(“select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_ro where symbol = ‘GOOG’”).show(100, false)
- 对于MOR表,进行快照查询
scala> spark.sql(“select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_rt where symbol = ‘GOOG’”).show(100, false)
- 对于MOR表,进行增量查询
scala> val hoodieIncViewDF = spark.read.format(“org.apache.hudi”).option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL).option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, “20220211064632000”).load(“/user/hive/warehouse/stock_ticks_mor”)
scala> hoodieIncViewDF.registerTempTable(“stock_ticks_mor_incr_tmp1”)
scala> spark.sql(“select `_hoodie_commit_time`, symbol, ts, volume, open, close from stock_ticks_mor_incr_tmp1 where symbol = ‘GOOG’”).show(100, false);
由于读优化查询只查询存量文件,所以即使插入第二批数据之后,查询到的数据也还是尚未合并的第一批数据的内容;而快照查询则会合并存量文件和增量文件,因此查询到的数据是第一批和第二批数据合并后的数据;而增量查询我指定了只查询第二次提交的数据,因此最后的结果中只包含第二次写入的数据。
3.2基于Delta lake的读写测试
3.2.1 tpch测试
tpch测试数据集 1GB大小数据量,测试数据以DeltaLake的存储形式放到hdfs中
3.2.2 并发读写
-
使用hdparm工具,测得磁盘IO性能为142.45MB/S
-
单进程条件下,写入3.3GB的csv文件,耗时249.53s
-
1个生产者分批写入3.3GB csv文件,2个消费者读取并输出top10数据,耗时940.85s
3.3基于Iceberg的读写测试
3.3.1 基础环境搭建
- Hive
spark.sql.catalog.zdm=org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.zdm.type=hive #hive
spark.sql.catalog.zdm.uri=thrift://metastore-host:port
#hive.metastore.uris 可以去hive-site.xml里查看
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions #SQL扩展,开启支持merge及CALL操作
- HDFS
spark.sql.catalog.zdm=org.apache.iceberg.spark.SparkCatalog
conf spark.sql.catalog.zdm.type=hadoop
conf spark.sql.catalog.zdm.warehouse=hdfs://HDFS81339//tmp/iceberg
- 其他
查看目录和空间名称:SHOW CURRENT NAMESPACE
切换本地/Hive库:use zdm.test
- 启动
spark3-sql --master yarn \
–conf spark.sql.catalog.zdm=org.apache.iceberg.spark.SparkCatalog \
–conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
–conf spark.sql.catalog.zdm.type=hive \
–conf spark.sql.catalog.zdm.uri=thrift://metastore-host:port
3.3.2 DDL操作
- 建表
create table iceberg_spark(id int ,name string,dt String) using iceberg PARTITIONED by (dt);
- 插入数据
hive> insert into table iceberg_table values(5,‘e’);
- 删除表
hive> insert into table iceberg_table values(5,‘e’);
- 重命名
ALTER TABLE zdm.test.iceberg_spark RENAME TO zdm.test.iceberg_spark1;
- 新增列(FIRST和AFTER更改顺序):
ALTER TABLE zdm.test.iceberg_spark1 ADD COLUMN sex String AFTER id;
- 删除列:
ALTER table iceberg_spark1 DROP COLUMN flag;
- 分区操作
新增/更改/删除分区字段:添加分区字段是元数据操作,不会更改任何现有表数据。新数据将使用新分区写入,旧数据还在改之前分区中。对于元数据表中的新分区字段,旧数据文件将具有空值。
ALTER TABLE zdm.test.iceberg_spark ADD PARTITION FIELD catalog;
ALTER TABLE zdm.test.iceberg_spark ADD PARTITION FIELD truncate(dt, 4);
ALTER TABLE zdm.test.iceberg_spark ADD PARTITION FIELD years(dt); --dt必须是timestamp类型
ALTER TABLE zdm.test.iceberg_spark DROP PARTITION FIELD truncate(dt, 4); --慎重
3.3.3 DML操作
- 插入数据
insert into/overwrite table iceberg_spark values(1,“a”,“2021”),(2,“b”,“2021”),(3,“c”,“2021”);
- 删除数据
delete from zdm.test.iceberg_spark where id=4;
4. 参考文献
-
Michael Armbrust, Ali Ghodsi, Reynold Xin, Matei Zaharia, Lakehouse: A New Generation of Open Platforms that Unify Data Warehousing and Advanced Analytics. In 11th Annual Conference on Innovative Data Systems Research (CIDR '21), January 11–15, 2021.
-
Armbrust et al. Delta Lake: High-Performance ACID Table Storage over Cloud Object Stores. PVLDB, 13(12): 3411-3424, 2020.
-
Dražen Oreščanin, Tomislav Hlupić, Data Lakehouse - a Novel Step in Analytics Architecture. In 44th International Convention on Information, Communication and Electronic Technology (MIPRO), pp. 1242-1246, 2021.
转载请注明:湖仓一体技术调研(Apache Hudi、Iceberg和Delta lake对比) | 胖虎的工具箱-编程导航