paimon sink 源码之 StoreSinkWriteIm

2024-05-08  本文已影响0人  loukey_j

StoreSinkWriteImpl#构造方法

  1. FileStoreTable table : PrimaryKeyFileStoreTable 或者 AppendOnlyFileStoreTable, paimon sink 源码 之 paimon table 创建
  2. String commitUser: 是 UUID
  3. StoreSinkWriteState state: paimon sink 源码之 RowDataStoreWriteOperator
    这篇有讲他是从状态中恢复出来的 listState 的一个包装里面核心 是一个 map 数据结构,map 里面装的啥在 StoreSinkWriteImpl 里面也没有提到,因为 StoreSinkWriteImpl 的 这个 listState 是空的。map 里面存的啥 要看 StoreSinkWriteImpl 的子类
  4. IOManager ioManager: 来自于 flink runtime, paimon 中利用 flink IOManager 获取 spill 路径用来构建 paimon 自己的 IOManager ,paimon 自己的 IOManager 先不看
  5. boolean ignorePreviousFiles:是 overwritePartition 场景才使用所以可以认为是 false
  6. boolean waitCompaction 逻辑如下:非 writeOnly 情况下 && (开启了 DV || 在 changlogProducer 是 lookup 时开启了 changelog-producer.lookup-wait) waitCompaction 就为 true 否则就是 false


    waitCompaction
  7. MemorySegmentPool memoryPool : 开了 sink.use-managed-memory-allocator 就会有基于 managed-memory 的内存池
  8. 根据第一步的 FileStoreTable table 创建 TableWriteImpl write。 所以从现在开始 write 里面开始套 write 了。琢磨下来总共是 套了 7 层 write 如下图


    image.png

StoreSinkWriteImpl#write(org.apache.paimon.data.InternalRow)

  1. StoreSinkWriteImpl#write --> TableWriteImpl.writeAndReturn(rowData)
public SinkRecord writeAndReturn(InternalRow row) throws Exception {
       //如果配置了 ignore-delete 则舍弃撤回流, 
        if (ignoreDelete && row.getRowKind().isRetract()) {
            return null;
        }
       //进行一次包装 SinkRecord 包含 主键、分区、bucket、和原本的 InternalRow 把一些重要属性先抽取出来方便使用
        SinkRecord record = toSinkRecord(row);
        write.write(record.partition(), record.bucket(), recordExtractor.extract(record));
        return record;
    }

StoreSinkWriteImpl#prepareCommit

CommitMessage committable = write.prepareCommit(this.waitCompaction || waitCompaction, checkpointId)

StoreSinkWriteImpl 有 write 、prepareCommit 的方法都是调用 里面 write 变量的 write 和 prepareCommit 方法
那直接看 TableWriteImpl 的 write 、prepareCommit 方法
在 StoreSinkWriteImpl#构造方法 第 8 步是构造 TableWriteImpl 的地方,所以先看下他时如何构建的然后再看他的 的 write 、prepareCommit 方法

TableWriteImpl#构造方法

// StoreSinkWriteImpl 初始化 TableWriteImpl
private TableWriteImpl<?> newTableWrite(FileStoreTable table) {
TableWriteImpl<?> tableWrite =
                table.newWrite( //不同的  table 创建不同的 write 等下在看
                                commitUser,
                                (part, bucket) -> // 这里竟然用了 state 过滤器,[在上篇](https://www.jianshu.com/p/8153c43a4170) 有详细讲这个过滤器的可能用处,这里只是一个函数,要看函数的调用放过才知道具体的  (part, bucket) 是怎么来的
                                        state.stateValueFilter().filter(table.name(), part, bucket)
                        )
                        .withIOManager(paimonIOManager) // 设置其他属性
                        .withIgnorePreviousFiles(ignorePreviousFiles) // false
                        .withExecutionMode(isStreamingMode) // true
                        .withBucketMode(table.bucketMode()); 

        if (metricGroup != null) {
            tableWrite.withMetricRegistry(new FlinkMetricRegistry(metricGroup));
        }

        if (memoryPoolFactory != null) {
            return tableWrite.withMemoryPoolFactory(memoryPoolFactory); // mamaged memory
        } else {
            return tableWrite.withMemoryPool( // 堆内存
                    memoryPool != null
                            ? memoryPool
                            : new HeapMemorySegmentPool(
                                    table.coreOptions().writeBufferSize(),
                                    table.coreOptions().pageSize()));
        }
}

// 从上面函数来看首先是通过 table 创建出了 TableWriteImpl 然后设置了 各种属性, 
// 接下来看 table 是怎么 newWrite 的。主键表为例 如下

 public TableWriteImpl<KeyValue> newWrite(
            String commitUser, ManifestCacheFilter manifestFilter) {
        TableSchema schema = schema();
        CoreOptions options = store().options();
       //rowKindGenerator 用来解析 '+I', '-U', '+U' or '-D' , 
       //如果有指定 rowkind.field 那就从数据中抽取这个字段来获取 rowKind 
       //否则直接获取 row 的 rowkind 这个是 flink 层面加上的,而 rowkind.field 是数据层面的。
       //暂时没有 get 到数据里面带 rowkind.field 的场景。
        RowKindGenerator rowKindGenerator = RowKindGenerator.create(schema, options);
       //主键表写的数据格式是 KeyValue, append 表写的格式直接是 InternalRow
        KeyValue kv = new KeyValue();
      //调用 TableWriteImpl 构造函数
        return new TableWriteImpl<>(
                 // TableWriteImpl 里面又嵌套了一个 write 对于主键表是 KeyValueFileStoreWrite 
                 // append 表是 AppendOnlyFileStoreWrite 
                 // manifestFilter 是那个 state 过滤器。 ok 这里命名了 manifestFilter,  
                 // manifest 应该是 Paiom 的元数据,文件列表信息,一个 Paimon 表会有很多文件列表信息,
                 // 可能对某个 task 只需要和自己 partition bucket 相关的 manifest
                 // (以上个人猜测,边看边猜,后面到 KeyValueFileStoreWrite 再细看)
                store().newWrite(commitUser, manifestFilter), 
                createRowKeyExtractor(), //用来读取数据的主键的
                record -> { //又是一个函数用来定义如何把 输入的 InternalRow 转化成 KeyValue, 然后后面用 KeyValue 结构去写
                    InternalRow row = record.row(); // 原始 row
                    RowKind rowKind =  // 抽取 rowkind
                            rowKindGenerator == null
                                    ? row.getRowKind()
                                    : rowKindGenerator.generate(row);
                     // 构建 KeyValue 放了主键,sequence, rowKind , row 本身
                    return kv.replace(record.primaryKey(), KeyValue.UNKNOWN_SEQUENCE, rowKind, row);
                },
               //获取是否有配置 ignore-delete ,如果配置了则会对 rowKind 为撤回类型的过滤掉 这个在上面 StoreSinkWriteImpl#write 里面也有提到
                CoreOptions.fromMap(tableSchema.options()).ignoreDelete());
    }

//到这里我们看了 TableWriteImpl 构造的地方了再看 TableWriteImpl 的构造函数

// TableWriteImpl 自身构造函数
public TableWriteImpl(
            FileStoreWrite<T> write, // 来自于 table.store().newWrite() 主键表是 KeyValueFileStoreWrite
            KeyAndBucketExtractor<InternalRow> keyAndBucketExtractor, //主键和bucket 的抽取器给一个InternalRow 之后用来方便对 key bucket 等的抽取
            RecordExtractor<T> recordExtractor, //数据转化器 write 的不同 数据转化也不同,KeyValueFileStoreWrite 需要把 InternalRow 转化成 KeyValue, append 表 不需要转化
            boolean ignoreDelete) { //是否忽略撤回数据
        this.write = write;
        this.keyAndBucketExtractor = keyAndBucketExtractor;
        this.recordExtractor = recordExtractor;
        this.ignoreDelete = ignoreDelete;
    }

FINAL

上一篇下一篇

猜你喜欢

热点阅读