Flink Iceberg 0.11
背景
我们在使用Flink+Kafka做实时数仓以及数据传输过程中,遇到了一些问题,Iceberg 0.11的新特性解决了这些业务场景,基于Iceberg我们做了一些实践,对比Kafka来说,Iceberg在某些特定场景有自己的优势,在这里做一些小的实践分享
分享主题
- 痛点
1.kafka数据具有时效性,在消费积压的时候容易造成数据过期丢失
2.Flink结合Hive做近实时系统越来越慢 - 选择iceberg的原因
1.解决了kafka因数据过期而导致的数据不完整问题,0.11版本支持了实时读取的功能
2.解决了Hive做为近实时数仓的性能问题,比如hive元数据文件过多,执行计划慢 - 使用总结
痛点1:KAFKA数据丢失
存储上,通常我们会选择kafka做实时数仓,以及日志传输。kafka本身存储成本其实蛮高的,数据保留时间有时效性,一旦消费积压,数据达到过期时间后,会导致数据丢失
什么数据适合入湖
对实时有适当放宽的,能接受1-10分钟的延迟,比如业务方可以接受近实时数据,比如日志类数据这样时效性不是特别敏感的
为什么Iceberg只能做近实时入湖?
物理数据写入Iceberg后,直到触发了checkpoint,这个时候才会写元数据,当元数据写入完毕后,数据由不可见变为可见,这也是实时性为什么不能像kafka一样,只能做近实时
image.png
Flink SQL入湖流程
Flink Iceberg实时读写
实时入湖流程
入湖流程分析
术语解析
-
数据文件(data files)
Iceberg 表真实存储数据的文件,一般存储在data目录下 -
清单文件(Manifest file)
每行都是每个数据文件的详细描述,包括数据文件的状态、文件路径、分区信息、列级别的统计信息(比如每列的最大最小值、空值数等)、通过该文件、可过滤掉无关数据、提高检索速度 -
快照(Snapshot)
快照代表一张表在某个时刻的状态。每个快照里面会列出表在某个时刻的所有数据文件列表。Data files 是存储在不同的 manifest files 里面, manifest files 是存储在一个 Manifest list 文件里面,而一个 Manifest list 文件代表一个快照。
Flink入湖流程
image.png组件介绍
-
IcebergStreamWriter
主要用来写入记录到对应的 avro、parquet、orc 文件,生成一个对应的 Iceberg DataFile,并发送给下游算子;另外一个叫做 IcebergFilesCommitter,主要用来在 checkpoint 到来时把所有的 DataFile 文件收集起来,并提交 Transaction 到 Apache iceberg,完成本次 checkpoint 的数据写入、生成DataFile -
IcebergFilesCommitter
为每个checkpointId 维护了一个 DataFile 文件列表,即 map<Long, List<DataFile>>,这样即使中间有某个 checkpoint的transaction 提交失败了,它的 DataFile 文件仍然维护在 State 中,依然可以通过后续的 checkpoint 来提交数据到 Iceberg 表中。
踩坑记录
我之前在SQL Clinet写数据到Iceberg、data目录数据一直在更新,但是metadata没有数据,导致查询的时候没有数,因为Iceberg的查询计划是需要元数据来索引真实数据的,本质上时候因为IcebergFilesCommitter这个组件需要状态来存储某个checkpoint对应的数据文件,而SQL Clinet是不支持状态开启的。所以会导致data写入数据而metadata目录不写入元数据
PS:写数据必须开启checkpoint
实时读取Demo
前期工作
- 开启实时读写功能
set execution.type = streaming - 开启table sql hint功能来使用OPTIONS属性
set table.dynamic-table-options.enabled=true
//注册Iceberg catalog用于操作Iceberg表
CREATE CATALOG iceberg_catalog WITH (\n" +
" 'type'='iceberg',\n" +
" 'catalog-type'='hive'," +
" 'uri'='thrift://localhost:9083'" +
");
//实时数据入湖
insert into iceberg_catalog.iceberg_db.tbl1 \n
select * from kafka_tbl;
//实时查询入湖数据、也可以sink到
insert into iceberg_catalog.iceberg_db.tbl2
select * from iceberg_catalog.iceberg_db.tbl
/*+ OPTIONS('streaming'='true', 'monitor-interval'='10s', snapshot-id'='3821550127947089987')*/ ;
参数解释
- monitor-interval:连续监视新提交的数据文件的时间间隔(默认值:1s)
- start-snapshot-id:从指定的快照ID开始读取数据、每个快照ID关联的是一组mainfest元数据文件,每个元数据文件映射着自己的真实数据文件,通过快照ID,可以读取到某个版本的数据
-
一秒前的数据
image.png -
一秒后刷新的数据
image.png
小文件处理
Iceberg 0.11新特性,对小文件合并支持了处理
- 通过分区/存储桶键使用哈希混洗方式写记录、这样的好处在于,一个task会处理某个分区的数据,提交自己的Datafile文件,比如一个task只处理对应分区的数据,这样避免了多个task处理提交很多小文件的问题
write.distribution-mode: 该参数与其它引擎是通用的、比如spark等
CREATE TABLE city_table (
province BIGINT,
city STRING
) PARTITIONED BY (province, city) WITH (
'write.distribution-mode'='hash'
);
Iceberg动态更新Schema
如果数据入hive table,上游数据字段变更,需要重建表、以及重启作业、这是一个相当麻烦的工作,Iceberg通过捕捉上游Schema变更,将元数据信息写入最新的快照版本,通过版本可以动态的读取到最新的Schema
PS:社区暂时不支持的该功能,目前只在商业版做了
痛点2:Flink结合Hive的近实时越来越慢
image.png随着表和分区增多,将会面临以下问题
-
元数据过多
hive将分区改为小时/分钟级、虽然提高了数据的准实时性,但是metestore的压力也是显而易见的,进而导致查询计划变慢 -
数据库压力变大
随着元数据增加,存储hive元数据的数据库压力也会增加,一段时间后,还需要对该库进行升级。比如存储空间
Iceberg 查询计划
- 查询计划是在表中查找查询所需文件的过程。
- 元数据过滤
清单文件包括分区数据元组和每个数据文件的列级统计信息。
在计划期间,查询谓词会自动转换为分区数据上的谓词,并首先应用于过滤数据文件。接下来,使用列级值计数,空计数,下限和上限来消除与查询谓词不匹配的文件。 - 查询检索的时候,会根据当前snapshot ID查询关联到的maintalifilses,这是个文件清单列表,每个
maintalifilse又记录了当前data数据块的元数据信息,其中就包含了文件列的最大值和最小值,然后根据这个元数据信息,索引到具体的文件块,从而更快的查询到数据
Iceberg 0.11 排序
-
排序介绍
在Iceberg 0.11之前,Flink是不支持iceberg排序功能的,所以之前只能结合spark以批模式来支持排序功能,0.11新增了排序特性的支持,也意味着,我们在实时也可以体会到这个好处
image.png
排序demo
insert into iceberg_table select days from kafka_tbl order by days, province_id
Iceberg manifest详解
参数解释
- file_path: 物理文件位置
- partition: 文件所对应的分区
- lower_bounds: 该文件中,多个排序字段的最小值,下图是我的days和province_id最小值
-
upper_bounds: 该文件中,多个排序字段的最大值,下图是我的days和province_id最大值
通过分区、列的上下限信息来确定是否读取file_path的文件,数据排序后,文件列的信息也会记录在元数据中,查询计划从manifest去定位文件,不需要把信息记录在hive metadata,从而减轻hive metadata压力,提升查询效率
image.png
总结
Apache Iceberg0.11有很多实用的新特性、比如实时读取数据,商业版的捕获Schema动态变更发送下游,通过hash的方法让task处理一个区域的数据,避免了小文件问题,以及排序功能提升了查询速度。