Java玩转大数据

Hudi 概念和特性

2022-07-04  本文已影响0人  AlienPaul

背景

本篇为Hudi概念和特性相关介绍。依据于官网和相关博客资料,融入了个人理解。内容可能会有疏漏,欢迎大家指正和补充。

Hudi概念

Apache Hudi是一个Data Lakes的开源方案,Hudi是Hadoop Updates and Incrementals的简写,它是由Uber开发并开源的Data Lakes解决方案。Hudi具有如下基本特性/能力:

来源:Hudi数据湖简介_阿福Chris的博客-CSDN博客_hudi 数据湖

时间线

https://hudi.apache.org/docs/next/timeline/

Hudi内部维护了时间线,支持按照数据的到达时间顺序来获取数据。Hudi确保时间线上的动作是原子性的。

Hudi 表类型

https://hudi.apache.org/docs/next/table_types/#table-and-query-types

Copy on Write

数据使用列存储形式存放,每次提交都会产生新版本的列存储文件。将原有数据文件copy一份,合并入变更的数据后保存一份新的数据。因此写入放大很高,读取放大基本为0。适用于读取多写入少的场景。写入存在延迟,Flink每次checkpoint或者Spark每次微批处理才会commit新数据。读取延迟很低。可以通过hoodie.cleaner.commits.retained配置项确定需要保存的最近commit个数,防止存储空间占用无限放大。可以提供较好的并发控制,因为读取数据的时候,无法读取到正在写入但尚未commit或者是写入失败的数据。

Merge on Read

数据使用行存储(avro)和列存储(parquet)共同存放。其中新变更的数据使用行存储,历史数据采用列存储。每当满足一定条件的时候(经过n个commit或者经过特定时间)Hudi开始compact操作,将行存储的数据和列存储的合并,生成新的列存储文件。适用于写多读少的场景。写入延迟很小,读取的时候需要合并新文件avro和parquet,存在一定延迟。需要使用定期的online compaction或者是手工执行的offline compaction将avro格式和parquet格式文件合并。

通过配置项table.type指定表类型。

两种表类型的特性对比:

Trade-off CopyOnWrite MergeOnRead
Data Latency Higher Lower
Query Latency Lower Higher
Update cost (I/O) Higher (rewrite entire parquet) Lower (append to delta log)
Parquet File Size Smaller (high update(I/0) cost) Larger (low update cost)
Write Amplification Higher Lower (depending on compaction strategy)

查询类型

https://hudi.apache.org/docs/next/table_types/#table-and-query-types

Snapshot Queries : Queries see the latest snapshot of the table as of a given commit or compaction action. In case of merge on read table, it exposes near-real time data(few mins) by merging the base and delta files of the latest file slice on-the-fly. For copy on write table, it provides a drop-in replacement for existing parquet tables, while providing upsert/delete and other write side features.
Incremental Queries : Queries only see new data written to the table, since a given commit/compaction. This effectively provides change streams to enable incremental data pipelines.
Read Optimized Queries : Queries see the latest snapshot of table as of a given commit/compaction action. Exposes only the base/columnar files in latest file slices and guarantees the same columnar query performance compared to a non-hudi columnar table.

总结:

Table Type Supported Query types
Copy On Write Snapshot Queries + Incremental Queries
Merge On Read Snapshot Queries + Incremental Queries + Read Optimized Queries

通过hoodie.datasource.query.type参数控制查询类型。配置项对应为:

Trade-off Snapshot Read Optimized
Data Latency Lower Higher
Query Latency Higher (merge base / columnar file + row based delta / log files) Lower (raw base / columnar file performance)

索引

Hudi通过HoodieKey(recordKey和poartition path)和file id的对应关系来加速upsert操作。这正是Hudi的索引机制。这种对应关系在初始记录写入之后不会在改变。

对于COW表插入数据的场景,索引可以快速的过滤掉不涉及数据修改的file。对于MOR表插入数据的场景,索引能够很快定位到需要合并的文件。不需要像Hive ACID一样,合并所有的base file。

Hudi支持下面4种Index选项:

所有具有GLOBAL和非GLOBAL两种(HBase本来就是global的)。其中:

使用场景:

hoodie.index.type用来修改Index选项。

注意:使用GLOBAL_BLOOM的时候需要留意hoodie.bloom.index.update.partition.path配置。源码给出的解释如下:

  /**
   * Only applies if index type is GLOBAL_BLOOM.
   * <p>
   * When set to true, an update to a record with a different partition from its existing one
   * will insert the record to the new partition and delete it from the old partition.
   * <p>
   * When set to false, a record will be updated to the old partition.
   */

设置为true的话,如果record更新操作修改了partition,则会在新partition插入这条数据,然后在旧partition删除这条数据。
如果设置为false,会在就parititon更新这条数据。

hoodie.simple.index.update.partition.path对于GLOBAL_SIMPLE也同理。

文件结构

https://hudi.apache.org/docs/next/file_layouts/

元数据表(Metadata Table)

https://hudi.apache.org/docs/next/metadata/

用于提高读写性能,避免使用list file操作。

0.10.1以后metadata table默认启用(hoodie.metadata.enable)。

为了确保metadata table保持最新,针对同一张Hudi表的写操作需要根据不同的场景增加相应配置。

单个writer同步表服务(清理,聚簇,压缩),只需配置hoodie.metadata.enable=true,重启writer。

单个writer异步表服务(同一进程内),需要配置乐观并发访问控制:

hoodie.write.concurrency.mode=optimistic_concurrency_control
hoodie.cleaner.policy.failed.writes=LAZY
hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.InProcessLockProvider

多个writer(不同进程)异步表服务,需要配置乐观并发访问控制:

hoodie.write.concurrency.mode=optimistic_concurrency_control
hoodie.cleaner.policy.failed.writes=LAZY
hoodie.write.lock.provider=<distributed-lock-provider-classname>

外部分布式锁提供方有:ZookeeperBasedLockProvider, HiveMetastoreBasedLockProviderDynamoDBBasedLockProvider

Hudi 写类型

https://hudi.apache.org/docs/next/write_operations/

总结:

通过write.operation配置项指定。

写入步骤

The following is an inside look on the Hudi write path and the sequence of events that occur during a write.

  1. Deduping(去重)
    同一批中的数据先去重,需要combine或者reduce by key。
  2. Index Lookup(查询索引)
    查找插入的数据位于哪些file slice中。
  3. File Sizing(文件大小控制)
    Hudi基于之前提交数据的平均大小,制定计划将小文件中插入足够的数据,使它的大小接近配置的容量上限限制。
  4. Partitioning(分区)
    确定update和insert的数据属于哪个file group,可能伴随新file group的创建。
  5. Write I/O(写入数据)
    写入数据过程。根据表类型写入base file或者是log file。
  6. Update Index(更新索引)
    Now that the write is performed, we will go back and update the index.
    写入操作已完成,更新索引。
  7. Commit(提交)
    原子提交所有的更改。
  8. Clean (if needed)(清理,如果需要的话)
    如果需要,运行清理步骤。
  9. Compaction(压缩)
    如果使用MOR表,压缩会同步运行,或者是异步调度。
  10. Archive(归档)
    最后归档步骤将老旧的timeline项目移动到归档目录。

Schema变更

https://hudi.apache.org/docs/next/schema_evolution

试验功能,Spark 3.1.x和3.2.x支持Schema变更。

Key Generation

https://hudi.apache.org/docs/next/key_generation

Primary key由RecordKey和Partition path组成。

RecordKey由hoodie.datasource.write.recordkey.field决定。Partition path由hoodie.datasource.write.partitionpath.field决定。

并发控制

https://hudi.apache.org/docs/next/concurrency_control

Hudi支持MVCC和乐观并发访问两种方式。MVCC方式所有的table service都使用同一个writer来保证没有冲突,避免竟态条件。新版本的Hudi增加了乐观并发访问控制(OCC)。支持文件级别的乐观锁。需要依赖外部组件实现乐观锁,例如Zookeeper,Hive metastore等。

启用并发控制:

hoodie.write.concurrency.mode=optimistic_concurrency_control
hoodie.cleaner.policy.failed.writes=LAZY
hoodie.write.lock.provider=<lock-provider-classname>

使用Zookeeper分布式锁:

hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider
hoodie.write.lock.zookeeper.url
hoodie.write.lock.zookeeper.port
hoodie.write.lock.zookeeper.lock_key
hoodie.write.lock.zookeeper.base_path

使用HiveMetastore分布式锁:

hoodie.write.lock.provider=org.apache.hudi.hive.HiveMetastoreBasedLockProvider
hoodie.write.lock.hivemetastore.database
hoodie.write.lock.hivemetastore.table

禁用并发写入:

hoodie.write.concurrency.mode=single_writer
hoodie.cleaner.policy.failed.writes=EAGER
上一篇 下一篇

猜你喜欢

热点阅读