构建 Streaming Lakehouse:使用 Paimon

2024-02-01  本文已影响0人  Flink中文社区

一、背景信息

数据湖与传统的数据仓库相比,可以更灵活地处理各种类型的数据,并支持高度可扩展的存储,通常被用于大数据分析。为了支持准实时乃至实时的数据处理,数据湖需要能够快速地接收和存储数据(数据入湖),同时提供低延迟的查询性能以满足分析需求。

Apache Paimon 和 Apache Hudi 作为数据湖存储格式,有着高吞吐的写入和低延迟的查询性能,是构建数据湖的常用组件。本文将在阿里云EMR上,针对数据实时入湖场景,对 Paimon 和 Hudi 的性能进行比对,然后分别以 Paimon 和 Hudi 作为统一存储搭建准实时数仓。

二、集群环境

本文使用的集群环境是最新的阿里云 EMR 5.16.0,集群节点的属性如下:

使用的组件及版本如下:

本文主要由两部分组成,分别是 Paimon 和 Hudi 数据实时入湖性能测试(Flink),以及 Paimon 和 Hudi 准实时数仓全链路搭建(Flink + Spark),测试数据均存储在 EMR 的 OSS-HDFS 中。

三、数据实时入湖

数据实时入湖是数据湖格式的一个重要应用场景,也是构建实时湖仓的第一步。本节测试参考的是 paimon-cluster-benchmark。按照实际的业务情况,划分了两个具体场景:upsert 场景(数据存在更新与订正)和纯 append 场景,在两个场景上分别检验 Paimon 和 Hudi 的读写能力。

本节测试将使用 Flink 流式入湖,部署模式是 Flink Standalone 模式,Flink 配置如下,由于 TM 运行内存对测试结果影响较大,分别统计 8g/16g/20g 下的测试结果。并且由于本测试不需要用到 TM 的 managed 内存,将其设为 1m。

parallelism.default: 16
jobmanager.memory.process.size: 4g
taskmanager.numberOfTaskSlots: 1
taskmanager.memory.process.size: 8g/16g/20g
execution.checkpointing.interval: 2min
execution.checkpointing.max-concurrent-checkpoints: 3
taskmanager.memory.managed.size: 1m
state.backend: rocksdb
state.backend.incremental: true
table.exec.sink.upsert-materialize: NONE

3.1 upsert 场景

数据湖 upsert 用于更新或插入新数据。在进行 upsert 时,会检查待写的数据是否已存在于数据湖中。如果数据已存在,则更新该数据;如果数据不存在,则插入新数据。upsert 通常是基于某种唯一标识符或主键来判断数据是否已存在。

本节测试数据源由 Flink datagen 产生,随机生成主键范围为 0~100,000,000 的数据,然后使用 Flink 将数据分别流式写入 Paimon 和 Hudi 表中,并统计写入 5 亿条数据(经统计,此时单个 bucket 内的 parquet 文件总大小在 2GB 内)的总耗时。同时,我们还使用 Flink 以批读的方式读取写入的 Paimon 和 Hudi 表,并统计总耗时。

对于 upsert 场景,Paimon 选择 primary-key 表,Hudi 选择 merge-on-read 表,由于它们都支持 compaction,所以测试将进一步划分为关闭和开启 compaction。

  1. 关闭 compaction

Paimon 表的配置如下,bucket 个数与 Flink 的并行度一致,设置为 16。由于 Hudi 默认文件格式为 parquet 格式,为了与 Hudi 保持一致,后续均采用 parquet 作为文件输出格式,压缩方式统一设为 snappy。

'bucket' = '16',
'file.format' = 'parquet',
'file.compression' = 'snappy',
'write-only' = 'true'

Hudi 表的配置如下,采用 BUCKETindex,桶个数为 16,与 Flink 并行度一致。由于 Hudi MOR 表的读取会受到参数 compaction.max_memory 的影响,将其配置为 taskmanager.memory.process.size 的一半。

'table.type' = 'MERGE_ON_READ',
'metadata.enabled' = 'false',
'index.type' = 'BUCKET',
'hoodie.bucket.index.num.buckets' = '16',
'write.operation' = 'upsert',
'write.tasks' = '16',
'hoodie.parquet.compression.codec' = 'snappy',
'read.tasks' = '16',
'compaction.schedule.enabled' = 'false',
'compaction.async.enabled' = 'false',
'compaction.max_memory' = '4096/8192/10240' -- TM process memory的一半

测试结果如下:

可以发现在 upsert 场景,关闭 compaction 时,Paimon 读写性能均优于 Hudi,且 Hudi 对 TM 的内存要求更高。

  1. 开启 compaction

Paimon 配置:

'bucket' = '16',
'file.format' = 'parquet',
'file.compression' = 'snappy',
'num-sorted-run.compaction-trigger' = '5' -- 默认配置

Hudi 配置:

由于测试所需的总耗时不多(checkpoint 个数也相应较少),并且随着未 compaction 的 log 文件增加,Hudi 需要的 compaction 内存将变得更大,因此配置 compaction.delta_commits 为 2 来保证在写入期间有 compaction 执行完成。

'table.type' = 'MERGE_ON_READ',
'metadata.enabled' = 'false',
'index.type' = 'BUCKET',
'hoodie.bucket.index.num.buckets' = '16',
'write.operation' = 'upsert',
'write.tasks' = '16',
'hoodie.parquet.compression.codec' = 'snappy',
'read.tasks' = '16',
'compaction.schedule.enabled' = 'true',
'compaction.async.enabled' = 'true',
'compaction.tasks' = '16',
'compaction.delta_commits' = '2'
'compaction.max_memory' = '4096/8192/10240' -- TM process memory的一半

测试结果如下:

在 upsert 场景,开启 compaction 时,Paimon 读写性能均优于 Hudi。对比前面的关闭 compaction 测试,Paimon 和 Hudi 的写性能均有所下降,但读性能得到提升。

Hudi 的 compaction 比较消耗内存,运行时间较长,并且它是异步执行,当写入任务完成时,未完成的 compaction 是不会继续执行的。观察发现,当 TM 内存给到 20G 时,Hudi 仍有 4 个 delta commits 未被 compaction(即使配置了 compaction.delta_commits=2)。并且, Paimon 的 compaction 默认也不是 full compaction。因此,我们还做了以下补充测试,手动对 Paimon 和 Hudi 做一次 full compaction,然后对比读取数据的时间,结果如下:

3.2 append 场景

数据入湖的另一种场景是数据 append 写,比如日志入湖。

本节测试数据源同样由 Flink datagen 产生,然后使用 Flink 写入 Paimon 和 Hudi 表中,同样统计使用 Flink 写入 5 亿条数据(在 append 场景 Paimon 和 Hudi 均不需要 bucket)的总耗时;以及使用 Flink 批读已写入的 Paimon 和 Hudi 表的总耗时。

Paimon 表的配置:

'bucket' = '-1',
'file.format' = 'parquet',
'file.compression' = 'snappy'

Hudi 表的配置:

由于单个批次数据量足够大,不存在小文件问题,因此关闭 clustering:

'table.type' = 'COPY_ON_WRITE',
'metadata.enabled' = 'false',
'write.operation' = 'insert',
'write.tasks' = '16',
'hoodie.parquet.compression.codec' = 'snappy',
'read.tasks' = '16',
'write.insert.cluster' = 'false',
'clustering.schedule.enabled' = 'false',
'clustering.async.enabled' = 'false'

测试结果如下:

在 append 场景,Paimon 读写性能优于 Hudi,且二者都对 TM 内存要求均不高。

四、准实时数仓

在数据入湖之后,基于数据湖格式+流式引擎的强大能力,可以进一步构建一体化实时数仓。本节将分别以 Paimon 和 Hudi 为统一存储,在经典的电商场景下搭建一套准实时数仓,数仓具体有以下几层:

  1. ODS 层:通过 Flink 的 datagen connector 产生 orders(订单表,包含原始订单信息),再通过 Flink 实时写入,作为 ODS 层。

  2. DWM 层:通过 Spark streaming 实时消费 ODS 层,产出 DWM 层 dwm_shop_users(用户-商户聚合中间表,包含中间聚合指标)。

  3. DWS 层:通过 Spark streaming 实时消费 DWM 层的 changelog 数据,构建 DWS 层 dws_users(用户聚合指标表)以及 dws_shops(商户聚合指标表)。

4.1 datagen -> ODS

该层使用 Flink 实时入湖,为了更贴近生产环境,Flink 以 Yarn Session 模式启动,同时由于数据链路的增加,为了合理分配资源,对内存和并行度做出以下调整:

yarn-session.sh -Dparallelism.default=8 \
                -Djobmanager.memory.process.size=2g \
                -Dtaskmanager.numberOfTaskSlots=2 \
                -Dtaskmanager.memory.process.size=8g \
                -Dtaskmanager.memory.managed.size=1m \
                -Dexecution.checkpointing.interval=2min \
                -Dexecution.checkpointing.max-concurrent-checkpoints=3 \
                -Dstate.backend=rocksdb \
                -Dstate.backend.incremental=true \
                -Dtable.exec.sink.upsert-materialize=NONE \
                --detached

datagen 建表语句如下,rows-per-second 调整为10000

CREATE TEMPORARY TABLE datagen_orders
(
  order_name         STRING
  ,order_user_id     BIGINT
  ,order_shop_id     BIGINT
  ,order_product_id  BIGINT
  ,order_fee         DECIMAL(20, 2)
  ,order_state       INT
)
WITH (
  'connector' = 'datagen'
  ,'rows-per-second' = '10000'
  ,'fields.order_user_id.kind' = 'random'
  ,'fields.order_user_id.min' = '1'
  ,'fields.order_user_id.max' = '10000'
  ,'fields.order_shop_id.kind' = 'random'
  ,'fields.order_shop_id.min' = '1'
  ,'fields.order_shop_id.max' = '10000'
  ,'fields.order_product_id.kind' = 'random'
  ,'fields.order_product_id.min' = '1'
  ,'fields.order_product_id.max' = '1000'
  ,'fields.order_fee.kind' = 'random'
  ,'fields.order_fee.min' = '0.1'
  ,'fields.order_fee.max' = '10.0'
  ,'fields.order_state.kind' = 'random'
  ,'fields.order_state.min' = '1'
  ,'fields.order_state.max' = '5'
);

Paimon 建表和写入语句如下:

CREATE TABLE IF NOT EXISTS paimon_catalog.order_dw.ods_orders
(
  order_id           STRING
  ,order_name        STRING
  ,order_user_id     BIGINT
  ,order_shop_id     BIGINT
  ,order_product_id  BIGINT
  ,order_fee         DECIMAL(20, 2)
  ,order_create_time TIMESTAMP(3)
  ,order_update_time TIMESTAMP(3)
  ,order_state       INT
)
WITH (
  'bucket' = '-1',
  'file.format' = 'parquet',
  'file.compression' = 'snappy'
);

INSERT INTO paimon_catalog.order_dw.ods_orders
SELECT
  UUID() AS order_id
  ,order_name
  ,order_user_id
  ,order_shop_id
  ,order_product_id
  ,order_fee
  ,NOW() AS order_create_time
  ,NOW() AS order_update_time
  ,order_state
FROM datagen_orders;

Hudi 建表和写入语句如下:

create TEMPORARY table ods_orders
(
  order_id           STRING
  ,order_name        STRING
  ,order_user_id     BIGINT
  ,order_shop_id     BIGINT
  ,order_product_id  BIGINT
  ,order_fee         DECIMAL(20, 2)
  ,order_create_time TIMESTAMP(3)
  ,order_update_time TIMESTAMP(3)
  ,order_state       INT
)
WITH (
    'connector' = 'hudi'
    ,'path' = '/xxx/hudi/order_dw.db/ods_orders'
    ,'precombine.field' = 'order_update_time'
    ,'table.type' = 'COPY_ON_WRITE'
    ,'hoodie.database.name' = 'order_dw'
    ,'hoodie.table.name' = 'ods_orders'
    ,'hoodie.datasource.write.recordkey.field' = 'order_id'
    ,'metadata.enabled' = 'false'
    ,'write.operation' = 'insert'
    ,'write.tasks' = '8'
    ,'hoodie.parquet.compression.codec' = 'snappy'
    ,'write.insert.cluster' = 'false'
    ,'clustering.schedule.enabled' = 'false'
    ,'clustering.async.enabled' = 'false'
)
;

INSERT INTO ods_orders
SELECT
  UUID() AS order_id
  ,order_name
  ,order_user_id
  ,order_shop_id
  ,order_product_id
  ,order_fee
  ,NOW() AS order_create_time
  ,NOW() AS order_update_time
  ,order_state
FROM datagen_orders;

4.2 ODS -> DWM

对于 Paimon 表,依靠其本身的聚合引擎能力,通过简单的配置(merge-engine)即可方便地聚合消费 pv 和总金额,从而构建用户-商户聚合中间表。同时由于下游需要读取 changelog,配置 changelog-producer 为 lookup。

CREATE TABLE paimon_catalog.order_dw.dwm_shop_users
(
  shop_id  BIGINT
  ,user_id BIGINT
  ,ds      STRING COMMENT '小时'
  ,pv      BIGINT COMMENT '该小时内,该用户在该商户的消费次数'
  ,fee_sum DECIMAL(20, 2) COMMENT '该小时内,该用户在该商户的消费总金额'
)
tblproperties (
  'primary-key' = 'shop_id, user_id, ds'
  ,'bucket' = '8'
  ,'changelog-producer' = 'lookup'
  ,'file.format' = 'parquet'
  ,'file.compression' = 'snappy'
  ,'merge-engine' = 'aggregation'
  ,'fields.pv.aggregate-function' = 'sum'
  ,'fields.fee_sum.aggregate-function' = 'sum'
  ,'metadata.stats-mode' = 'none'
);

Paimon Spark Streaming 作业示例代码如下:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{date_format, lit}

object PaimonOds2DwmJob {

  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder().getOrCreate()
    val sourceLocation = "/xxx/paimon/order_dw.db/ods_orders"
    val targetLocation = "/xxx/paimon/order_dw.db/dwm_shop_users"
    val checkpointDir = "/xxx/paimon/order_dw.db/dwm_shop_users_checkpoint"
    import spark.implicits._

    spark.readStream
      .format("paimon")
      .load(sourceLocation)
      .select(
        $"order_shop_id",
        $"order_user_id",
        date_format($"order_create_time", "yyyyMMddHH").alias("ds"),
        lit(1L),
        $"order_fee"
      )
      .writeStream
      .format("paimon")
      .option("checkpointLocation", checkpointDir)
      .start(targetLocation)
    
    spark.streams.awaitAnyTermination()
  }
}

对于 Hudi 表,想要实现类似的聚合操作,则需要依赖于自定义 Payload 或者 Merger 来实现,本文采用自定义 merger 实现,对 key 相同的记录的 uv、pv、fee_sum 字段进行聚合,核心逻辑如下:

public class OrdersLakeHouseMerger extends HoodieAvroRecordMerger {
  @Override
  public Option<Pair<HoodieRecord, Schema>> merge(HoodieRecord older, Schema oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) throws IOException {
    // ...
    Object oldData = older.getData();
    GenericData.Record oldRecord = (oldData instanceof HoodieRecordPayload)
        ? (GenericData.Record) ((HoodieRecordPayload) older.getData()).getInsertValue(oldSchema).get()
        : (GenericData.Record) oldData;

    Object newData = newer.getData();
    GenericData.Record newRecord = (newData instanceof HoodieRecordPayload)
        ? (GenericData.Record) ((HoodieRecordPayload) newer.getData()).getInsertValue(newSchema).get()
        : (GenericData.Record) newData;

    // merge uv
    if (HoodieAvroUtils.getFieldVal(newRecord, "uv") != null && HoodieAvroUtils.getFieldVal(oldRecord, "uv") != null) {
      newRecord.put("uv", (Long) oldRecord.get("uv") + (Long) newRecord.get("uv"));
    }

    // merge pv
    if (HoodieAvroUtils.getFieldVal(newRecord, "pv") != null && HoodieAvroUtils.getFieldVal(oldRecord, "pv") != null) {
      newRecord.put("pv", (Long) oldRecord.get("pv") + (Long) newRecord.get("pv"));
    }

    // merge fee_sum
    if (HoodieAvroUtils.getFieldVal(newRecord, "fee_sum") != null && HoodieAvroUtils.getFieldVal(oldRecord, "fee_sum") != null) {
      BigDecimal l = new BigDecimal(new BigInteger(((GenericData.Fixed) oldRecord.get("fee_sum")).bytes()), 2);
      BigDecimal r = new BigDecimal(new BigInteger(((GenericData.Fixed) newRecord.get("fee_sum")).bytes()), 2);
      byte[] bytes = l.add(r).unscaledValue().toByteArray();
      byte[] paddedBytes = new byte[9];
      System.arraycopy(bytes, 0, paddedBytes, 9 - bytes.length, bytes.length);
      newRecord.put("fee_sum", new GenericData.Fixed(((GenericData.Fixed) newRecord.get("fee_sum")).getSchema(), paddedBytes));
    }
    HoodieAvroIndexedRecord hoodieAvroIndexedRecord = new HoodieAvroIndexedRecord(newRecord);
    return Option.of(Pair.of(hoodieAvroIndexedRecord, newSchema));
  }
}

Hudi Spark Streaming 作业示例代码如下,由于下游需要读取 changelog,配置 hoodie.table.cdc.enabled 为 true。

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{date_format, lit}

object Ods2DwmJob {

  def main(args: Array[String]): Unit = {
    
    val spark = SparkSession.builder().getOrCreate()
    val sourceLocation ="/xxx/hudi/order_dw.db/ods_orders"
    val targetLocation = "/xxx/hudi/order_dw.db/dwm_shop_users"
    val checkpointDir = "/xxx/hudi/order_dw.db/dwm_shop_users_checkpoint"

    import spark.implicits._

    spark.readStream
      .format("hudi")
      .load(sourceLocation)
      .select(
        $"order_shop_id".alias("shop_id"),
        $"order_user_id".alias("user_id"),
        date_format($"order_create_time", "yyyyMMddHH").alias("ds"),
        lit(1L).alias("pv"),
        $"order_fee".alias("fee_sum")
      )
      .writeStream
      .format("hudi")
      .option("hoodie.datasource.write.table.type", "COPY_ON_WRITE")
      .option("hoodie.datasource.write.recordkey.field", "shop_id, user_id, ds")
      .option("hoodie.datasource.write.precombine.field", "ds")
      .option("hoodie.database.name", "order_dw")
      .option("hoodie.table.name", "dwm_shop_users")
      .option("hoodie.metadata.enable", "false")
      .option("hoodie.index.type", "BUCKET")
      .option("hoodie.bucket.index.num.buckets", "8")
      .option("hoodie.datasource.write.operation", "upsert")
      .option("hoodie.datasource.write.record.merger.impls", "org.apache.hudi.common.model.merger.OrdersLakeHouseMerger")
      .option("hoodie.parquet.compression.codec", "snappy")
      .option("hoodie.table.cdc.enabled", "true")
      .option("hoodie.table.cdc.supplemental.logging.mode", "data_before_after")
      .option("checkpointLocation", checkpointDir)
      .start(targetLocation)
    
    spark.streams.awaitAnyTermination()
  }
}

最后将作业分别提交任务到 yarn:

spark-submit --class Ods2DwmJob \
             --master yarn \
             --deploy-mode cluster \
             --name PaimonOds2DwmJob \
             --conf spark.driver.memory=2g \
             --conf spark.driver.cores=2 \
             --conf spark.executor.instances=4 \
             --conf spark.executor.memory=16g \
             --conf spark.executor.cores=2 \
             --conf spark.yarn.submit.waitAppCompletion=false \
             ./paimon-spark-streaming-example.jar

spark-submit --class Ods2DwmJob \
             --master yarn \
             --deploy-mode cluster \
             --name HudiOds2DwmJob \
             --conf spark.driver.memory=2g \
             --conf spark.driver.cores=2 \
             --conf spark.executor.instances=4 \
             --conf spark.executor.memory=16g \
             --conf spark.executor.cores=2 \
             --conf spark.yarn.submit.waitAppCompletion=false \
             --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
             --conf spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension \
             --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog \
             ./hudi-spark-streaming-example.jar

在上述资源下,当作业稳定运行 100 个 batch(3小时左右)后的 Streaming 作业 UI 如下:

此时,Paimon 单个 batch 写入时间为 40s 左右

Hudi 单个 batch 写入时间为 65s 左右

4.3 DWM -> DWS

Paimon SparkSQL 建表语句如下,仍然配置聚合引擎对指定字段进行聚合:

CREATE TABLE paimon_catalog.order_dw.dws_users
(
  user_id  BIGINT
  ,ds      STRING COMMENT '小时'
  ,fee_sum DECIMAL(20, 2) COMMENT '该小时内,该用户的消费总金额'
)
tblproperties (
  'primary-key' = 'user_id, ds'
  ,'bucket' = '8'
  ,'merge-engine' = 'aggregation'
  ,'fields.fee_sum.aggregate-function' = 'sum'
);

CREATE TABLE paimon_catalog.order_dw.dws_shops
(
  shop_id  BIGINT
  ,ds      STRING COMMENT '小时'
  ,uv      BIGINT COMMENT '该小时内,该商户的消费总人数'
  ,pv      BIGINT COMMENT '该小时内,该商户的消费总次数'
  ,fee_sum DECIMAL(20, 2) COMMENT '该小时内,该商户的消费总金额'
)
tblproperties (
  'primary-key' = 'shop_id, ds'
  ,'bucket' = '8'
  ,'merge-engine' = 'aggregation'
  ,'fields.uv.aggregate-function' = 'sum'
  ,'fields.pv.aggregate-function' = 'sum'
  ,'fields.fee_sum.aggregate-function' = 'sum'
);

Paimon Spark Streaming Dwm2DwsJob 如下,由于需要流读上游 changelog,配置 read.changelog 为 true。

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{lit, when}

object Dwm2DwsJob {
  def main(args: Array[String]): Unit = {
    
    val spark = SparkSession.builder().getOrCreate()
    val sourceLocation = "/xxx/paimon/order_dw.db/dwm_shop_users"
    val targetLocation1 = "/xxx/paimon/order_dw.db/dws_users"
    val checkpointDir1 = "/xxx/paimon/order_dw.db/dws_users_checkpoint"
    val targetLocation2 = "/xxx/paimon/order_dw.db/dws_shops"
    val checkpointDir2 = "/xxx/paimon/order_dw.db/dws_shops_checkpoint"
    
    import spark.implicits._

    val df = spark.readStream
      .format("paimon")
      .option("read.changelog", "true")
      .load(sourceLocation)

    df.select(
      $"user_id",
      $"ds",
      when($"_row_kind" === "+I" || $"_row_kind" === "+U", $"fee_sum")
        .otherwise($"fee_sum" * -1)
        .alias("fee_sum"))
      .writeStream
      .format("paimon")
      .option("checkpointLocation", checkpointDir1)
      .start(targetLocation1)

    df.select(
      $"shop_id",
      $"ds",
      when($"_row_kind" === "+I" || $"_row_kind" === "+U", lit(1L)).otherwise(lit(-1L)).alias("uv"),
      when($"_row_kind" === "+I" || $"_row_kind" === "+U", $"pv").otherwise($"pv" * -1).alias("pv"),
      when($"_row_kind" === "+I" || $"_row_kind" === "+U", $"fee_sum")
        .otherwise($"fee_sum" * -1)
        .alias("fee_sum")
      .writeStream
      .format("paimon")
      .option("checkpointLocation", checkpointDir2)
      .start(targetLocation2)
    
    spark.streams.awaitAnyTermination()
  }
}

Hudi Spark Streaming Dwm2DwsJob 如下,可复用上一层定义的 Merger。由于 Hudi 也需要流读 changelog,配置 hoodie.datasource.query.type 为 incremental 以及 hoodie.datasource.query.incremental.format 为 cdc。Hudi 的 changelog 格式和 Paimon 不同,数据处理逻辑和 Paimon 略有不同。

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, get_json_object, lit, when}
import org.apache.spark.sql.types.{DecimalType, LongType}

object Dwm2DwsJob {

  def main(args: Array[String]): Unit = {
    
    val spark = SparkSession.builder().getOrCreate()
    val sourceLocation ="/xxx/hudi/order_dw.db/dwm_shop_users"
    val targetLocation1 = "/xxx/hudi/order_dw.db/dws_users"
    val checkpointDir1 = "/xxx/hudi/order_dw.db/dws_users_checkpoint"
    val targetLocation2 = "/xxx/hudi/order_dw.db/dws_shops"
    val checkpointDir2 = "/xxx/hudi/order_dw.db/dws_shops_checkpoint"

    import spark.implicits._

    val df = spark.readStream
      .format("hudi")
      .option("hoodie.datasource.query.type", "incremental")
      .option("hoodie.datasource.query.incremental.format", "cdc")
      .load(sourceLocation)

    df.select(
      get_json_object($"after", "$.user_id").cast(LongType).alias("user_id"),
      get_json_object($"after", "$.ds").alias("ds"),
      when(get_json_object($"before", "$.fee_sum").isNotNull, get_json_object($"after", "$.fee_sum").cast(DecimalType(20, 2)) - get_json_object($"before", "$.fee_sum").cast(DecimalType(20, 2)))
        .otherwise(get_json_object($"after", "$.fee_sum").cast(DecimalType(20, 2)))
        .alias("fee_sum"))
      .writeStream
      .format("hudi")
      .option("hoodie.datasource.write.table.type", "COPY_ON_WRITE")
      .option("hoodie.datasource.write.recordkey.field", "user_id, ds")
      .option("hoodie.datasource.write.precombine.field", "ds")
      .option("hoodie.database.name", "order_dw")
      .option("hoodie.table.name", "dws_users")
      .option("hoodie.metadata.enable", "false")
      .option("hoodie.index.type", "BUCKET")
      .option("hoodie.bucket.index.num.buckets", "8")
      .option("hoodie.datasource.write.operation", "upsert")
      .option("hoodie.datasource.write.record.merger.impls", "org.apache.hudi.common.model.merger.OrdersLakeHouseMerger")
      .option("hoodie.parquet.compression.codec", "snappy")
      .option("checkpointLocation", checkpointDir1)
      .start(targetLocation1)

    df.select(
      get_json_object($"after", "$.shop_id").cast(LongType).alias("shop_id"),
      get_json_object($"after", "$.ds").alias("ds"),
      when(get_json_object($"before", "$.fee_sum").isNotNull, lit(0L)).otherwise(lit(1L)).alias("uv"),
      when(get_json_object($"before", "$.fee_sum").isNotNull, get_json_object($"after", "$.pv").cast(LongType) - get_json_object($"before", "$.pv").cast(LongType))
        .otherwise(get_json_object($"after", "$.pv").cast(LongType))
        .alias("pv"),
      when(get_json_object($"before", "$.fee_sum").isNotNull, get_json_object($"after", "$.fee_sum").cast(DecimalType(20, 2)) - get_json_object($"before", "$.fee_sum").cast(DecimalType(20, 2)))
        .otherwise(get_json_object($"after", "$.fee_sum").cast(DecimalType(20, 2)))
        .alias("fee_sum"))
      .writeStream
      .format("hudi")
      .option("hoodie.datasource.write.table.type", "COPY_ON_WRITE")
      .option("hoodie.datasource.write.recordkey.field", "shop_id, ds")
      .option("hoodie.datasource.write.precombine.field", "ds")
      .option("hoodie.database.name", "order_dw")
      .option("hoodie.table.name", "dws_shops")
      .option("hoodie.metadata.enable", "false")
      .option("hoodie.index.type", "BUCKET")
      .option("hoodie.bucket.index.num.buckets", "8")
      .option("hoodie.datasource.write.operation", "upsert")
      .option("hoodie.datasource.write.record.merger.impls", "org.apache.hudi.common.model.merger.OrdersLakeHouseMerger")
      .option("hoodie.parquet.compression.codec", "snappy")
      .option("checkpointLocation", checkpointDir2)
      .start(targetLocation2)
    
    spark.streams.awaitAnyTermination()
  }
}

最后将作业分别提交任务到 yarn:

spark-submit --class Dwm2DwsJob \
             --master yarn \
             --deploy-mode cluster \
             --name PaimonDwm2DwsJob \
             --conf spark.driver.memory=2g \
             --conf spark.driver.cores=2 \
             --conf spark.executor.instances=4 \
             --conf spark.executor.memory=8g \
             --conf spark.executor.cores=2 \
             --conf spark.yarn.submit.waitAppCompletion=false \
             ./paimon-spark-streaming-example.jar

spark-submit --class Dwm2DwsJob \
             --master yarn \
             --deploy-mode cluster \
             --name HudiDwm2DwsJob \
             --conf spark.driver.memory=2g \
             --conf spark.driver.cores=2 \
             --conf spark.executor.instances=4 \
             --conf spark.executor.memory=8g \
             --conf spark.executor.cores=2 \
             --conf spark.yarn.submit.waitAppCompletion=false \
             --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
             --conf spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension \
             --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog \
             ./hudi-spark-streaming-example.jar

在上述资源下,当作业稳定运行 100 个 batch(3 小时左右)后的 Streaming 作业 UI(以 dws_shops 表为例)如下:

此时,Paimon 单个 batch 写入时间为 10s 左右

Hudi 单个 batch 写入时间为 13s 左右

4.4 SparkSQL 查询

在该场景下,我们可以查询 DWM 层的 dwm_shop_users 表作为其他业务场景的上游表,也可以查询 DWS 层数据直接用于应用分析或者报表展示,使用如下两个 SQL 查询:

-- SparkSQL 查询 ods_orders
select order_id, order_user_id, order_shop_id, order_fee, order_create_time
from order_dw.ods_orders 
order by order_create_time desc limit 10;

-- SparkSQL 查询 dws_shops
select shop_id, ds, uv, pv, fee_sum 
from order_dw.dws_shops 
where ds = '2023120100' order by ds, shop_id limit 10;

以上,我们分别以 Paimon 和 Hudi 完成了每小时增加 4 千万条记录(压缩后 10 GB)量级的实时 ETL 链路的搭建,均可以满足分钟级的生产场景的需求。

五、总结

  1. 在实时入湖场景中,Paimon 具有比 Hudi 更强的读写性能,并且对内存的需求更小。

  2. 在数仓 DWM、DWS 层构建过程中,由于 Paimon 内置了 mergeFunction 功能,可以通过配置参数直接构建聚合指标,而 Hudi 需要通过手动编写自定义 Payload 或者 Merger 来实现。

  3. 在基于 Spark 构建的准实时数仓的各层链路中,Paimon 计算单个 batch 的耗时均比 Hudi 更短。

上一篇下一篇

猜你喜欢

热点阅读