paimon sink 源码 之 paimon table 创建

2024-05-04  本文已影响0人  loukey_j

在学习 paimon sink 的过程中本来只想快速梳理下 paimon 的 sink 时对 DataStream 操作的拓扑, 但是过程中发现 paimon 会有很多概念,并且这些概念都做了很好的抽象,一口吃不了大胖子,慢慢的边啃边理解吧。这篇记录在学习过程中的 paimon table,包含 paimon table 的创建和 table 本身。

paimon table 创建

在 sql 中 table 的创建少不了 catalog, catalog 的创建又离不开 flink 的 CatalogFactory。

Paimon 对于 flink CatalogFactory 的实现

org.apache.flink.table.factories.CatalogFactory

Paimon 对于 flink CatalogFactory 实现类的不同点

  1. IDENTIFIER 不同
  1. 核心方法 CatalogFactory#createCatalog 的实现不一样

Paimon 对于 Flink Catalog 的实现

org.apache.flink.table.catalog.Catalog

从刚刚 createCatalog 方法中可以看到他们区别是 FlinkGenericCatalog 不仅仅含有 paimon 的 FlinkCatalog 还包含 flink hive connector 的 HiveCatalog,从 FlinkGenericCatalog 的实现来看,很多操作都会同时操作两个 catalog, 其中 HiveCatalog 是对 hive HMS 进行请求操作,FlinkCatalog 是对 paimon 进行操作,方法例举如下。

public CatalogBaseTable getTable(ObjectPath tablePath) {
     try {
         return paimon.getTable(tablePath); //  paimon 的 FlinkCatalog
     } catch (TableNotExistException e) {
         return flink.getTable(tablePath);  // hive connector 的 HiveCatalog
     }
}

public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) {
     String connector = table.getOptions().get(CONNECTOR.key());
     if (FlinkCatalogFactory.IDENTIFIER.equals(connector)) { //是 paimon 表
         paimon.createTable(tablePath, table, ignoreIfExists); // 就用 paimon 的 FlinkCatalog 进行操作
     } else {
         flink.createTable(tablePath, table, ignoreIfExists); // 否则就用  hive connector 的 HiveCatalog
     }
}

public List<String> listTables(String databaseName) {
     // flink list tables contains all paimon tables // 都包含?那他是怎么把 paimon 表同步到 HMS 的
     return flink.listTables(databaseName); // 为什么这里只用  hive connector 的 HiveCatalog 就行?
}

所以看起来 FlinkGenericCatalog 有如下特点

  1. 是可以自动识别是否为 paimon 表,优先用 paimon catalog 去尝试。
  2. 这个 catalog 可以兼容普通的 hive 表和 paimon 表

对于 listTables 等一些操作为什么只要用 HiveCatalog 就行?
既然都包含,那么 paimon 的 FlinkCatalog 是怎么把表同步到 HMS 的?

Paimon 的 FlinkCatalog

上面得知 FlinkCatalog 是 flink Catalog 的一个实现

FlinkCatalog extends org.apache.flink.table.catalog.AbstractCatalog {
  private final org.apache.paimon.catalog.Catalog catalog;
  public List<String> listTables(String databaseName) {
      return catalog.listTables(databaseName);
    } 
 ... ...
}

Paimon 的 Catalog

org.apache.paimon.catalog.Catalog

Paimon 的 Catalog 是如何创建的

Paimon 的 org.apache.paimon.hive.HiveCatalog

HiveCatalog extends AbstractCatalog implements Catalog{
      private final IMetaStoreClient client;  // hive client
      protected void alterTableImpl(Identifier identifier, List<SchemaChange> changes) {
        final SchemaManager schemaManager = schemaManager(identifier);
        // first commit changes to underlying files
        TableSchema schema = schemaManager.commitChanges(changes);
        try {
            // sync to hive hms 表变更同步到 hive
            Table table = client.getTable(identifier.getDatabaseName(), identifier.getObjectName());
            updateHmsTablePars(table, schema);
            updateHmsTable(table, identifier, schema);
            client.alter_table(
                    identifier.getDatabaseName(), identifier.getObjectName(), table, true);
        } catch (Exception te) {
            schemaManager.deleteSchema(schema.id());
            throw new RuntimeException(te);
        }
    }
}

到这里就说明了 paimon table 的创建前揍, 从 Flink 的 CatalogFactory 到 Flink 的 Catalog, 再到 Paimon 基于 Flink 的 Catalog, Paimon 基于 Flink 的 Catalog 是一个壳子实际是用的 Paimon 的 Catalog, Paimon 的 Catalog 又是通过 Paimon 的 CatalogFactory 创建而来。接下来看看主角 Paimon 的 Catalog 创建的 Paimon Table, Table 的创建是 org.apache.paimon.catalog.AbstractCatalog 实现的

org.apache.paimon.catalog.AbstractCatalog implements Catalog  {
  public Table getTable(Identifier identifier) throws TableNotExistException {
        if (isSystemDatabase(identifier.getDatabaseName())) { // 先忽略
        } else if (isSpecifiedSystemTable(identifier)) { //先忽略
        } else {
            return getDataTable(identifier);
        }
    }
}

private FileStoreTable getDataTable(Identifier identifier) throws TableNotExistException {
        TableSchema tableSchema = getDataTableSchema(identifier);
        return FileStoreTableFactory.create(
                fileIO,
                getDataTableLocation(identifier),
                tableSchema,
                new CatalogEnvironment(
                        Lock.factory(
                                lockFactory().orElse(null), lockContext().orElse(null), identifier),
                        metastoreClientFactory(identifier).orElse(null),
                        lineageMetaFactory));
    }

然后在 FileStoreTableFactory.create 方法中根据是否有主键会创建 AppendOnlyFileStoreTable 或者 PrimaryKeyFileStoreTable

    public static FileStoreTable create(
         FileIO fileIO, // 这又是个啥玩意?
         Path tablePath,
         TableSchema tableSchema,
         Options dynamicOptions,
         CatalogEnvironment catalogEnvironment) {
     FileStoreTable table =
             tableSchema.primaryKeys().isEmpty()
                     ? new AppendOnlyFileStoreTable(
                             fileIO, tablePath, tableSchema, catalogEnvironment)
                     : new PrimaryKeyFileStoreTable(
                             fileIO, tablePath, tableSchema, catalogEnvironment);
     return table.copy(dynamicOptions.toMap());
 }

到这里 Paimon Table 就已经创建完成了,Table 提供了表的读取写入和表操作的一些抽象,涉及面较多
简单看看 Table 和写入相关的一些 方法 混个眼熟,后面了解更多再补充

Paimon Table 之 FileStoreTable

org.apache.paimon.table.Table
abstract class  AbstractFileStoreTable AbstractFileStoreTable implements FileStoreTable {
    protected final FileIO fileIO;
    @Override 
    public BucketMode bucketMode() { // 分桶模式很重要
        return store().bucketMode(); // store() 方法在子类实现
    }
  ... ...
}

BucketMode 翻译自官网

Fixed Bucket 指的是 BucketMode.FIXED

Dynamic Bucket 有两种

动态Bucket仅支持单个写入作业。请不要启动多个作业来写入同一分区(这可能会导致重复数据)。即使您启用'write-only'并启动专用的压缩作业,它也不会起作用。
Dynamic bucket mode can not work with log system

Normal Dynamic Bucket Mode 指的是 BucketMode.DYNAMIC

  1. 一般来说,没有性能损失,但会有一些额外的内存消耗,一个分区中的1 亿个 条目多占用1 GB内存,不再活动的分区不占用内存。
  2. 对于更新率较低的表,建议使用此模式,以显着提高性能。

Normal Dynamic Bucket Mode支持sort-compact以加快查询速度。请参阅紧凑排序

Cross Partitions Upsert Dynamic Bucket Mode 指的是 BucketMode.GLOBAL_DYNAMIC

FINAL

上一篇下一篇

猜你喜欢

热点阅读