paimon sink 源码之 dataStream 的拓扑梳理

2024-05-05  本文已影响0人  loukey_j

拓扑梳理

  1. 判断是否有 logStoreTableFactory 在创建 DynamicTableSink 是根据建表语句是否有配置 log.system 来决定是否有 logStoreTableFactory
    1.1 log.system 默认值是 none,可以配置成 kafka,配置成 kafka 数据不仅仅会写入 fileSystem 也会写入 kafka 相当于是双写,流读消费 paimon 表数据的时候 streaming-read-mode 可以配置成 log 从 kafka 进行消费

    1.2 log.system 设置为 kafka 举例

    CREATE TABLE T (i INT, j INT) WITH (
       'log.system'='kafka', 
       'log.system.partitions'='2', 
       'kafka.bootstrap.servers'='%s', 
       'kafka.topic'='Tt',
      'connector='paimon'
      ...
    )
    

    1.3 LogSinkFunction 的创建,对于 log.system 是 kafka 实际上就是利用 kafka connector 的 FlinkKafkaProducer 进行数据发送


    LogSinkFunction
  2. sink.parallelism 设置 sink 的并行度

  3. local-merge-buffer-size 如作业存在主键数据偏斜可以设置“local-merge-buffer-size”,在数据进行 shuffle 之前进行 buffer 和 merge, 当同一主键在快照之间频繁更新时,这特别有用。建议从“64 mb”开始调整缓冲区大小。不适用于 CDC ingestion
    3.1 如果设置了 local-merge-buffer-size 就会加入一个 LocalMergeOperator 算子
    3.2 LocalMergeOperator 算子并行度和上游保持一致,shuffle 方式是 forward

  4. 对 DataStream<RowData> 进行一次 map 将 org.apache.flink.table.data.RowData 转化成 org.apache.paimon.data.InternalRow
    4.1 算子并行度和上游一致
    4.2 paimon InternalRow 只是对 flink RowData 的包装实际还是操作的 flink RowData

  5. 判断 BucketMode ,在这篇里有讲述 Paimon Table BuketMode 的逻辑

到此在不同 bucketMode 下对应的算子梳理完毕了, 在这些处理完之后还有一些通用的 doCommit 逻辑

  1. 判断 sink.savepoint.auto-tag 是否为 true 默认为 false , 参数表示是否自动创建 tag 如果开了则添加 AutoTagForSavepointCommitterOperator 算子 并且这个算子里面是包含 CommitterOperator 的
    • 算子并行度 为 1
  2. 如果没有开启则直接是 CommitterOperator 算子 并行度也是 1
  3. 最后添加一个 空的 sink 节点 DiscardingSink 并行度为 1

FINAL

上一篇 下一篇

猜你喜欢

热点阅读