架构师

Spark+Flink+Iceberg打造湖仓一体架构实践探索

2022-03-16  本文已影响0人  架构师老狼

数据湖-大数据生态杀青 数据湖-大数据生态杀青

数据仓库的痛点

数据湖三剑客对比

Hudi
Delta
Iceberg
总结
Iceberg术语

spark + Iceberg离线数仓

>controller
val sparkConf = new SparkConf()
      .set("spark.sql.catalog.hadoop_prod", "org.apache.iceberg.spark.SparkCatalog")
      .set("spark.sql.catalog.hadoop_prod.type", "hadoop")
      .set("spark.sql.catalog.hadoop_prod.warehouse", "hdfs://hadoop01:9820/spark/warehouse")
      .set("spark.sql.catalog.catalog-name.type", "hadoop")
      .set("spark.sql.catalog.catalog-name.default-namespace", "db")
      .set("spark.sql.sources.partitionOverwriteMode", "dynamic")
      .set("spark.sql.session.timeZone", "GMT+8")
      .setMaster("local[*]")
      .setAppName("dwd_app")
    val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
    DwdIcebergService.readOdsData(sparkSession)
> service 
  // 加载member 到dwd
  def loadMember(sparkSession: SparkSession): Unit ={
    sparkSession.read.json("/datasource/iceberg/member.log").drop("dn")
      .withColumn("uid", col("uid").cast("int"))
      .withColumn("ad_id", col("ad_id").cast("int"))
      .writeTo("hadoop_prod.db.dwd_member").overwritePartitions()
  }
  def getDwsMemberData(sparkSession: SparkSession, dt: String) = {
    import sparkSession.implicits._
    ....
    val result = dwdMember.join(dwdMemberRegtype.drop("dt"), Seq("uid"), "left")
      .join(dwdPcentermempaymoney.drop("dt"), Seq("uid"), "left")
      .join(dwdBaseAd, Seq("ad_id", "dn"), "left")
      .join(dwdBaseWebsite, Seq("siteid", "dn"), "left")
      .join(dwdVipLevel, Seq("vip_id", "dn"), "left_outer")
      .select("...").as[DwsMemberResult]

    val resultData = result.groupByKey(item => item.uid + "_" + item.dn)
      .mapGroups { case (key, iters) =>
        val keys = key.split("_")
        val uid = Integer.parseInt(keys(0))
        val dn = keys(1)
        val dwsMembers = iters.toList
        val paymoney = dwsMembers.filter(_.paymoney != null)
          .map(item => BigDecimal.apply(item.paymoney))
          .reduceOption(_ + _)
          .getOrElse(BigDecimal.apply(0.00)).toString
   ....
    // 分区列不能为null,spark-sql内存表null为字符串
    resultData.where($"dn" =!= "null").show()
    resultData.where($"dn" =!= "null")
      .write.format("iceberg")
      .mode("overwrite").save("hadoop_prod.db.dws_member")
  }
 def queryDetails(sparkSession: SparkSession, dt: String) = {
    import sparkSession.implicits._
    val result = DwsIcebergDao.queryDwsMemberData(sparkSession).as[QueryResult].where(s"dt='${dt}'")
    result.cache()

    //统计根据url统计人数  wordcount
    result.mapPartitions(partition => {
      partition.map(item => (item.appregurl + "_" + item.dn + "_" + item.dt, 1))
    }).groupByKey(_._1)
      .mapValues(item => item._2).reduceGroups(_ + _)
      .map(item => {
        val keys = item._1.split("_")
        val appregurl = keys(0)
        val dn = keys(1)
        val dt = keys(2)
        (appregurl, item._2, dt, dn)
      }).toDF("appregurl", "num", "dt", "dn")
      .writeTo("hadoop_prod.db.ads_register_appregurlnum").overwritePartitions()

    // 统计各memberlevel等级 支付金额前三的用户: mysql、oracle、hive、phoenix、iceberg对where里都不支持开窗函数,spark内存函数强大
    result.withColumn("rownum", row_number().over(Window.partitionBy("memberlevel").orderBy(desc("paymoney"))))
      .where("rownum<4")
      .orderBy("memberlevel", "rownum")
      .select("...")
      .writeTo("hadoop_prod.db.ads_register_top3memberpay").overwritePartitions()
  }
Flink+Iceberg 流批一体架构
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(6000);
        ...
        kafakSource.setStartFromLatest();
        DataStream<RowData> result = env.addSource(kafakSource).map(item -> {
          ....
            rowData.setField(0, uid);
            rowData.setField(1, courseid);
            rowData.setField(2, deviceid);
            rowData.setField(3, StringData.fromString(array[3].trim()));
            return rowData;
        });

        result.print(">>>处理完数据:");
        TableLoader testtopicTable = TableLoader.fromHadoopTable("hdfs://hadoop01:9820/flink/warehouse/iceberg_db/dwd_view_log");
        FlinkSink.forRowData(result).tableLoader(testtopicTable).build();

        env.execute();
   DataStream<RowData> batch = FlinkSource.forRowData().env(env).tableLoader(tableLoader).streaming(false).build();
DataStream<RowData> stream = FlinkSource.forRowData().env(env).tableLoader(tableLoader).streaming(true).build();
Table table = dwsIcbergDao.queryDwsMemberData(env, tableEnv).where($("dt").isEqual(dt));
        DataStream<QueryResult> queryResultDataStream = tableEnv.toAppendStream(table, QueryResult.class);

        tableEnv.createTemporaryView("tmpA", queryResultDataStream);
        String sql = "select *from(select uid,memberlevel,register,appregurl" +
                ",regsourcename,adname,sitename,vip_level,cast(paymoney as decimal(10,4)),row_number() over" +
                " (partition by memberlevel order by cast(paymoney as decimal(10,4)) desc) as rownum,dn,dt from tmpA where dt='" + dt + "') " +
                " where rownum<4";
        Table table1 = tableEnv.sqlQuery(sql);
        DataStream<RowData> top3DS = tableEnv.toRetractStream(table1, RowData.class).filter(item -> item.f0).map(item -> item.f1);

        String sql2 = "select appregurl,count(uid),dn,dt from tmpA where dt='" + dt + "' group by appregurl,dn,dt";
        Table table2 = tableEnv.sqlQuery(sql2);
        DataStream<RowData> appregurlnumDS = tableEnv.toRetractStream(table2, RowData.class).filter(item -> item.f0).map(item -> item.f1);

        TableLoader top3Table = TableLoader.fromHadoopTable(warehouseDir + "/ads_register_top3memberpay");
        TableLoader appregurlnumTable = TableLoader.fromHadoopTable(warehouseDir + "/ads_register_appregurlnum");

    FlinkSink.forRowData(top3DS).tableLoader(top3Table).overwrite(true).build();
    FlinkSink.forRowData(appregurlnumDS).tableLoader(appregurlnumTable).overwrite(true).build();
优化实践
1 小文件处理
Table table = findTable(options, conf);
Actions.forTable(table).rewriteDataFiles()
        .targetSizeInBytes(10 * 1024) // 10KB
        .execute();

Iceberg 0.11 新特性,支持了流式小文件合并。通过分区/存储桶键使用哈希混洗方式写数据、从源头直接合并文件,这样的好处在于,一个 task 会处理某个分区的数据,提交自己的 Datafile 文件,比如一个 task 只处理对应分区的数据。这样避免了多个 task 处理提交很多小文件的问题,且不需要额外的维护代码,只需在建表的时候指定属性 write.distribution-mode,该参数与其它引擎是通用的,比如 Spark 等。

CREATE TABLE city_table ( 
     province BIGINT,
     city STRING
) PARTITIONED BY (province, city) WITH (
    'write.distribution-mode'='hash' 
);
2 排序功能
insert into Iceberg_table select days from Kafka_tbl order by days, province_id;
总结
2022-03-16 16:09:24,486   INFO --- [                        jobmanager-future-thread-2]  org.apache.flink.runtime.checkpoint.CheckpointCoordinator                       (line: 1250)  :  Completed checkpoint 60 for job c7a6d8df0b422bb4c27a35b21a9142de (9169 bytes in 5 ms).
2022-03-16 16:09:30,481   INFO --- [                                  Checkpoint Timer]  org.apache.flink.runtime.checkpoint.CheckpointCoordinator                       (line:  741)  :  Triggering checkpoint 61 (type=CHECKPOINT) @ 1647418170480 for job c7a6d8df0b422bb4c27a35b21a9142de.
2022-03-16 16:09:30,483   INFO --- [IcebergFilesCommitter -> Sink: IcebergSink hdfs://hadoop01:9820/flink/warehouse/iceberg_db/dwd_view_log (1/1)#0]  org.apache.iceberg.flink.sink.IcebergFilesCommitter                             (line:  162)  :  Start to flush snapshot state to state backend, table: hdfs://hadoop01:9820/flink/warehouse/iceberg_db/dwd_view_log, checkpointId: 61
2022-03-16 16:09:30,483   INFO --- [                        jobmanager-future-thread-6]  org.apache.flink.runtime.checkpoint.CheckpointCoordinator                       (line: 1250)  :  Completed checkpoint 61 for job c7a6d8df0b422bb4c27a35b21a9142de (9169 bytes in 3 ms).
上一篇 下一篇

猜你喜欢

热点阅读