导读 本次分享内容是数据湖 Iceberg 在小米的应用场景。
(资料图)
主要包括以下三方面内容:
1. Iceberg 核心特性
2. Iceberg 在小米的应用场景
3. 未来规划
分享嘉宾|李培殿 小米 研发工程师
编辑整理|赵超 世纪高通
出品社区|DataFun
01
Iceberg 核心特性
Iceberg 是具有 SQL 行为的表的开放式标准,此定义由 Ryan Blue 提出。这个定义中包含了两点:
第一点,Iceberg 有 SQL 行为,意味着 Iceberg 是针对于结构化数据的,具有结构化数据的特性,如 Schema 等。
第二点,Iceberg 是一个开放性的标准,开放性标准体现在两方面。第一方面体现在设计上,Iceberg 支持多种文件格式,在存储介质上可以选择各种分布式存储或者云存储(如公有云),在上层应用上支持了 Flink、Spark、Hive 和 Trino 等多种查询引擎。第二方面则体现在社区上,目前已经有多家公司参与设计和建设。
接下来介绍 Iceberg 的几个特点。
1. Iceberg 可以避免意外发生
Iceberg 表可以放心使用,无需考虑太多不愉快的事情发生。
(1)Iceberg 提供了事务性
对表的任何操作都是原子性操作,同时使用多快照提供了读写分离的特性。
(2)Iceberg 提供了 Full Schema Evolution
可以对 Iceberg 表进行 Schema 修改,比如字段类型提升、增加列、删除列、重命名列、调整列顺序等。这里需要说明的是,字段类型不是可以随意更改的,Iceberg 只支持字段类型提升。例如,int 改成 long,float 改成 double,或者精度增大等。
2. Iceberg 支持隐式分区
Iceberg 有多种分区函数供选择,如下图所示。当我们需要根据某个 timestamp 类型的字段提取出的年、月、日或者小时进行分区时,可以直接使用 Iceberg 提供的分区函数。Iceberg 还支持多级分区,在分区选择上具有更高的灵活性。
与 Hive 进行对比,隐式分区体现在:
(1)Iceberg 写入时,不需要像 Hive 一样指定分区,写入哪个分区是由 Iceberg 自动管理的。这样的好处在于,可以保证数据分区是正确的,防止用户错误导致数据分区错误。
(2)用户查询时,不需要考虑分区的物理结构。假如一张表使用 date 字段做了分区,用户查询时不需要考虑这个字段是进行了月的分区,还是天的分区,只需要按照这个字段进行查询即可,Iceberg 会自动生成查询计划,如下图所示。
(3)在目录结构上,Iceberg 具有元数据层,通过记录分区和文件地址之间的关系,实现了物理结构和逻辑结构的分离。这样,可以非常方便地进行 partition evolution 操作。
3. Iceberg 的行级更新的能力
Fomat version 2 中提供了行级更新的能力,在 Iceberg 中使用了两类文件进行标记删除。第一类是 position delete file,这类文件可以指定文件和行号进行删除。第二类是 equality delete file,这类文件记录了被删除记录的唯一键进行删除。Iceberg 只是规定了可以使用这两类文件进行删除,但具体由哪一类文件或两类文件共同使用以达到删除目的,是由引擎层来决定的。下图中是 Flink 引擎实现行级删除的模式,对当前事务写入的文件使用 position delete file,而对于之前的事务写入的文件会使用 equality delete file 进行删除。查询时,使用 Merge On Read 模式,可以得到已经删除成功的结果。
02
Iceberg 在小米的应用场景
本节介绍 Iceberg 数据湖在小米的几个应用。
1. 日志入湖场景
小米原有的日志入湖的数据链路如下图所示,用户会在 Client 端使用 MQ 的 SDK,将数据发送到 MQ 中。小米使用 Talos 作为 MQ,对标于业界的 Kafka,MQ 中没有 Schema。之后使用 Spark streaming 将文件直接 flush 到 HDFS 上,然后使用 add partition 挂载到 Hive 上。
这个链路的特点是:
(1)使用了旧版本的 Spark streaming,实现的是 at least once 语义,数据可能会出现重复。
(2)由于 MQ 当中没有 Schema,只能使用上报的时间进行分区。这样,会在凌晨的时候出现分区漂移的问题。
(3)直接 flush 文件到 Hive 上时,Hive 的 schema 与文件 schema 可能不匹配,导致历史数据读取时可能会出现问题。
针对以上问题,我们使用 Iceberg 对日志入湖的流程重新进行了设计,修改后的数据链路如下图所示。在 MQ 上配置 Schema,使用 Flink SQL 进行解析,然后写入到 Iceberg 中。
这个数据链路有以下几个特性:
(1)使用 Flink SQL 的 exactly once,保证数据的不丢不重。
(2)使用了 Iceberg 的隐式分区特性,保证数据分区的正确性,避免了分区漂移问题。
(3)Schema On Write 以及 schema evolution 特性,保证数据在 schema 演变过程中也永远是正确的。
链路在实际落地中,可能会出现数据丢失的问题。数据丢失的根本原因是链路上的数据不规范。Talos 使用的 Schema On Read 模式,用户将 Schema 附加到 MQ 上,在 MQ 到 Iceberg 的过程中,有一个 Schema 同步的过程。但由于 MQ 中的 Schema 人为配置可能延迟,会导致 MQ 的 SDK 发送的数据与 MQ 中 Schema 不一致,使得 FlinkSQL 解析的时候可能会丢掉一些列。最终用户角度看到的就是数据丢失。想要解决这个问题,要在流程中进行规范,首先定义 Schema,然后发送数据。
2. Flink+ Iceberg 构建的近实时数仓
小米有很多的 IOT 设备,在这些设备上打点有两个痛点问题:
(1)设备打点数据延迟上报问题非常严重。 假设一台设备的一批数据没有上报,然后关机,过了一个月数据才上报,那么数据开发工程师需要将过去一个月的数据进行重新计算和存储。由于 Hive 不支持事务性,那么在进行重新计算然后覆盖过去一个月的数据的过程中,可能会导致下游读取的异常。
(2)由于 Spark 离线任务通常都是 T+1 的,所以凌晨时会启动很多的 Spark 作业做指标拆分,将 ODS 的数据拆分到 DWD 层,这会导致集群的资源紧张,数据产出的延迟风险非常大。
针对这些问题,我们使用了 Flink+Iceberg 对链路进行重构,重构后的数据链路如下图。
这个链路具有以下特点:
(1)首先在入湖侧,Iceberg 的隐式分区可以保证打点延迟的数据能够正确分区,以刚才的例子,一个月之前的数据不需要覆盖写入,只需要将下游的数据进行回溯即可。
(2)结合 Iceberg 的灵活分区,使用 date+event_name 进行了二级分区。这样,下游进行指标拆分时,只需要指定二级分区就可以进行消费,这样可以大大减少数据的扫描量,进而节省计算资源。
(3)整个链路中使用 Flink 来替换 Spark,这对用户来说非常重要,因为它意味着凌晨的计算量可以平摊到全天,这样产出延迟的风险可以大大降低。分摊到全天并不意味着风险变高了,相反,Flink 的 checkpoint 只有十几分钟到半个小时。这样,即使作业失败,恢复的代价也会比较小。
3. 离线场景下遇到的一些问题
Iceberg 的离线场景是比较完善的。但是,若需要数据链路稳定,仍然需要一些努力。
(1)分区完备性校验
分区完备性校验,即如何感知到上游的 T-1 数据已经写入完成,从而开启下游作业。这里分成两个场景。
① 离线形式的表, 之前 Hive 表的校验逻辑是校验 success 文件。但是 Iceberg 写入并没有 success 文件。同时 Iceberg 表的分区散落在各元数据文件当中,而 list partition 操作非常耗时。针对这一问题,我们使用了任务依赖,不是使用数据依赖来依赖分区的检测,而是依赖于上游的任务。当上游任务写入完成之后,下游任务就可以进行调度。
② 实时写入的表, Iceberg 表分区在写入第一条数据时就已经生成,这样也无法校验分区。并且,在实时场景下,经常会有数据延迟到达的问题。针对这个问题,我们参考了 Flink 的 watermark 机制,使用了 Iceberg 的 watermark,根据用户提供的时间列来生成一个时间戳,如下图所示,我们会在快照里增加一个时间戳,有一个单独的检查作业来对比分区和 watermark,当 watermark 超过分区时,即意味着分区写入完成,业界也称这种方式为流转批。
(2)离线场景的优化
① 试图将 z-order 应用于 ETL,在实践中,z-order 在整个分区中执行的代价很高。而且,对于 ETL 底层的一些表(如 ODS,DWD),查询的次数比较少,z-order 带来的收益不大。因此,建议用户使用 local sort 进行排序写入的方式。
② 我们在内部实现了 parquet 的 page column index,相比 parquet 之前的谓词下推的方式时 row group 级别的,一个 row group 是 128M 或 256M,而 parquet 最小的可读单位其实是一个 page,大概是 2MB 左右,page column index 会对 page 建立一个 min-max 索引,查询时可以利用查询谓词和 page 的 min-max 索引来对数据进行有效过滤,最终读入更少量的 page 进行计算,如下图所示。
在小米内部 benchmark 场景中,效果还是不错的。最好的情况下,可以过滤 80% 的数据。但若查询的是非排序列,比如下图的 Q7 到 Q9,基本上没有什么改善效果。
(3)隐式分区在离线场景的问题
当我们将 Iceberg 引入到离线场景之后,由 Iceberg 自带的隐式分区和 dynamic overwrite 带来的结果与用户期望有所不同。例如,假设表结构中含有四个字段(如下图所示),我们使用 date 按天分区之后再使用 hour 按小时分区。
当我们使用语句 insert overwrite _test values(1,‘a’,20230101,1),(2,‘b’,20230101,2) 进行覆盖写入后,会发现查询结果只覆盖了date=20230101/hour=1和date=20230101/hour=2分区,没有覆盖date=20230101/hour=3 的分区。这意味着 dynamic overwrite 对隐式分区操作时,不会覆盖所有的二级分区。此时,用户希望回归到 Hive 的使用方式,解决方法是使用 static overwrite 来指定分区进行覆盖。将覆盖语句修改为:
set =static;
insert overwrite _test partition(date=20230101) values(1,‘c’, 1), (2,‘d’,2);
(4)Spark timestamp 带来的问题
Iceberg 类型和多引擎类型的对齐上存在一些问题。如 Iceberg 当中的 timestamp 类型有两类,第一种是带有时区的 timestamptz,第二种是无时区的 timestamp。
而 Spark 的 timestamp 类型只有一类,即有时区的 timestamp 类型。这样就带来一个问题,如何使用 Spark DDL 来创建出 Iceberg 的无时区的 timestamp 类型呢?这时需要配置一个参数:
set‘’=true
当使用Spark来读取Iceberg timestamp类型时,则需要配置另一个参数:
Set ‘’=true
这时 Spark 会把无时区的当成有时区的进行处理,也就是说当时间戳是 UTC 的 0 点,那么 Spark 读出来的就已经加了 8 个小时了(这里假设系统时区为 UTC+8)。这样用起来似乎也没什么问题,但是与 Trino 比较起来就有问题了。当我们在平台上同时提供了 Spark 和 Trino 两种 adhoc 的查询方式,会发现结果是不同的。这个问题在 Spark 之后应该会有所改善,因为设计中会引入一个新的无时区的时间戳类型。
4. 实时集成入湖
我们将 MySQL、TiDB、Oracle 等关系型数据库的 binlog 日志采集到 MQ 当中,再使用 Flink 写入到 Iceberg 的 format v2 上,如下图所示。
这种数据链路的特点包括:
(1)整个链路借助于 Flink 的 Exactly Once 和 Iceberg 的事务性,可以到达一个端到端的 exactly once 的语义。
(2)Iceberg 对实时支持可以达到分钟级别。
(3)Iceberg 自身的 merge on read 设计,需要后台定时执行 compaction 任务。Iceberg 的 compaction 是一种插件式设计,到目前还未实现在 Flink 当中。目前,当需要使用 Flink 进行类似于 HBase 的限流或写停等操作时,尚需自己开发。假如 Compaction 任务异常终止,写链路是感知不到的。会造成写入时没有问题,但是查询时速度很慢的现象。
此外,我们在 v2 中发现更多 Iceberg 存在的问题:
(1)唯一键问题: Iceberg 本身并没有明确说明在表中可以配置一个主键,而是将这个权利交给引擎层去处理。这张表是否可以保证唯一主键,完全取决于引擎及使用方式。即使使用了支持声明主键的引擎,也很难保证声明的主键的唯一性。除非默认开启 Upsert 方式,但这种方式代价比较高。
(2)Upsert 问题: Iceberg 的文件组织实现方式的 Upsert 的代价比较高。因为 Iceberg 在设计时,希望数据尽可能入湖且没有索引,所以不会去校验这条数据是否已经存在了。Upsert 的实现方式为 delete+insert 方式,即写入两条记录,一条删除一条新增。当数据量比较大时,会导致 equality delete file 文件过多。解决方法有两种,一是增加 compaction 频次,二是通过 bloom filter 来过滤掉一些无用的 delete。
(3)并发冲突问题: 实时写入时,compaction 和写入会出现并发冲突,这往往是由于 compaction 过程中,有一条 position delete 数据写入了。这种方式下,Flink 是比较友好的,因为 position delete 只会指向一个新增的文件,不会对历史的文件进行引用。因此在校验时,可以对 position delete file 在快照中打标记,从而忽略由 position delete 带来的冲突进而导致 compaction 失败问题。
(4)完整 CDC 问题: Iceberg 与 Hudi 或 Paimon 不同,没有专门的 changelog 供 Flink 直接消费。我们需要从文件组织中将 changelog 自行解析出来,这样的解析代价很高,并且可能出现由于 Upsert 操作而带来的 changelog 不准确。小米内部实现了单事务中解析出删除的数据和插入的数据,然后以顺序的方式提供给下游消费。但是若单个快照中,先删后写的操作过多时,会导致下游波动。Changelog 不准确(尤其在非主键聚合的场景下),是通过配置 changelog CDC 去重来解决的,依赖于 Flink 内部的state 撤回的机制来解决,配置语句为:set =true。
5. 列级数据加密
Iceberg 由于元数据层的设计,可以在 Iceberg 表上实现数据加密。列级数据加密主要是利用了 parquet 高版本的加密能力。之前,小米内部的数据加密是依赖于隐私集群,单独的 IDC 机房的隔离会造成运维成本高,以及数据孤岛的问题。因此我们参照社区在 Iceberg 上实现了一个数据加密,这个方案称为单层数据加密。
与直接数据加密方式不同,直接数据加密的每条数据的写入都会调用一次 KeyCenter 进行加密,然后写入。单层数据加密会在 Iceberg 表中保存加密之后的一个密钥,当写入程序写入时,会调用一次 KeyCenter,对加密的密钥进行一次解密以获取明文密钥 DEK,然后对数据进行加密写入。读取过程与写入过程类似,读取时会对 Iceberg 元数据中保存的加密密钥进行解密,进而对数据进行解密处理。这里会涉及两个密钥,一个是 Iceberg 表自身保存的 DEK,另一个是对这个 DEK 加密的 KeyCenter 中的密钥。 单层包裹的加密方案的优点是:
(1)parquet 列级数据加密,不需要对所有的列进行加密,用户可以选择需要加密的列。
(2)对 KeyCenter 压力较小,写入和读取时只需要对 KeyCenter 访问有限次数。
这个方案在小米内部实现的是简化版本,我们会对一个 Iceberg 表维护一个 DEK 密钥。而社区的方案中,密钥粒度比较细,可以是分区粒度的密钥,也可以是文件级别的密钥。
6. Hive 升级 Iceberg 的调研
(1)方案 1:使用 migrate 原地升级
可以使用社区提供的 migrate 原地升级的方案进行升级。社区提供了 Spark 的 procedure 语法,使用 CALL migrate 语法可以直接将 Hive 表升级为 Iceberg 表。下面的例子中,将 Spark_ 表升级成了 Iceberg 表,同时将新增属性 foo 为 bar。
CALL catalog_(‘Spark_’, map(‘foo’, ‘bar’))
但这种方式在实际落地中存在一些问题:
① Iceberg 支持的文件只有 parquet/orc/avro 这三种格式,不支持 text、sequenceFile 等文件格式。导致一些 Hive 表无法支持升级为 Iceberg 表。
② 表下游消费离线作业的 Spark 必须是 以上的版本。而小米内部存在一些低版本的 HiveSQL 和低版本的 Spark 作业,因此这部分表是无法使用这个方案进行升级的。
(2)方案 2:复用 Hive location
出于减少下游作业的改动的目的,我们希望能够复用 Hive 的 location。写入的时候写入到 Iceberg 表,让 Iceberg 表和 Hive 表的存储地址相同。这样我们只需要升级上游作业,下游表在 catalog 层仍然存在,这样下游作业不需要改动,如下图所示。
这个想法是比较好的,但是实现过程有些取巧,因为 Iceberg 是多快照的,因此一个分区下,可能会有多个副本,而 Hive 是通过 list 目录来读取数据的。这样,Hive 在读取时,可能会读取到重复数据。若想要让 Hive 读取单快照,那只能及时清理 Iceberg 快照和残留文件。但是这样又使得 Iceberg 失去了事务性,而且受限于 Hive 下游消费作业,Iceberg 的一些特性(如 schema evolution)也都受到了限制。若是 Hive 的 parquet 版本和 Iceberg 的 parquet 版本不一致,那么改动会非常大。最终这个方案被放弃。
(3)方案 3:创建新表
这是业界使用最多的方案,这个方案的思路是:创建一张相同的 Iceberg 表,将 Hive 的历史数据回溯到 Iceberg 当中,然后升级上游作业,随后测试验证和升级 Hive 的所有下游作业,让其消费 Iceberg。
为什么这个方案比较麻烦,但是用户愿意迁移呢?主要有两个原因:
① 我们在 Iceberg 上使用了 ZSTD 的压缩算法,得益于 ZSTD 更高的压缩率,使得存储成本可以降低 30%。
② 在回溯历史数据的时候,我们对大字符串进行了排序,这样可以提高数据的相似度,进一步提升压缩率。对一张表来说,存量数据在存储中占有更大的比例。若是能够对历史数据的存储空间减少 30%,用户还是可以接受改造的。
7. Iceberg 在小米的应用现状
目前有 1 万 4 千多张表,日新增已经超过了 Hive,总的数据量已经达到 30PB。
03
未来规划
首先,我们将跟进物化视图的功能。在 OLAP 场景且没有谓词下推的情况下,我们期望通过预计算的方式来提高 Iceberg 的查询能力。
其次,我们将跟进 Iceberg 在 上的 changelog view。这个功能使得 Spark 可以获取到 Iceberg 的 changelog,我们希望在离线场景下也可以进行增量读取和更新。
最后,小米会在海外集群上探索数据上云。小米内部都是 EBS 挂载,EBS 本身比较贵,而 HDFS 本身有 3 个副本,相比直接使用公有云成本较高。
04
问答环节
Q1:为什么要 Spark streaming 切换为 Flink SQL,主要出于什么考虑?
A1:主要是内部架构考虑。第一是,Spark Streaming 的 版本的 At least once 语义会导致数据重复。第二是,引入 Flink 之后,开始积极向 Flink 方向靠拢,不再去维护 Spark streaming 的方向,在替换为 Flink SQL 之后,对整个数据链路进行了迭代。
Q2:watermark 是 Iceberg 已经存在的,还是业务自己加的?
A2:这个需要业务自己配置使用什么字段来作为 watermark 的生成字段,需要用户自己配置。然后 Flink 在写入时,会在快照中生成 watermark。
Q3:小米在强实时场景中用到了 Hudi 吗?
A3:没有,小米在强实时场景走的 MQ 那套数据链路。
Q4:选型上为什么是 Iceberg 而不是 Hudi?
A4:最初为使得 kappa 架构和 lambda 架构得到统一而调研了数据湖的组件,选择 Iceberg 的主要原因是 Iceberg 的开放性和多引擎支持。2021 年 4 月份,Iceberg 最先支持了 Flink。而当时,Hudi 和 Spark 还未解耦。我们出于使用 Flink 的角度而选择了 Iceberg。实践中,Iceberg 在实时数据的处理中,尤其在 CDC 处理方面,可能没有 Hudi 那么易用。我们也对 Iceberg 进行了二次开发,才把数据链路运行得稳定一些。
Q5:历史的离线作业仓库,数仓作业为 Hive 作业,如果切换到实时链路 Iceberg,如何做到无感知切换?比如说,SparkSQL 语法与 FlinkSQL 语法不同,以及 UDF 实现不同。
A5:目前没有办法做到无感知切换,SparkSQL 和 FlinkSQL 语义上就不大一样。若是切换到 Flink batch 还有可能,但若是想要离线切到实时,基本上要把整个逻辑的实现一遍。
Q6:目前实时数仓当中,append 模式和 Upsert 模式的数据延时可以做到几分钟?尽可能避免数据延迟到达。
A6:这两种模式,目前最低都是 1 分钟。 我们约束了用户配置的 checkpoint 时长,最低不能低于 1 分钟。
Q7:如何使用 local sort 进行多列查询?
A7:这个可以写入时在算法上使用 z-order 排序替换默认的排序算法来实现。
Q8:切换 Iceberg 带来的切换成本是怎样的,业务需求是否很强烈?
A8:Iceberg 带来的事务性、隐式分区、多引擎支持的特性可以切实解决用户的问题。即使切换过程中有很大的成本,当数据湖方案确实可以解决用户的痛点时,用户也会想用这个新架构去替换。
今天的分享就到这里,谢谢大家。
关键词: