paimon sink 源码 之 paimon table 创建

2024-05-04  本文已影响0人  loukey_j

在学习 paimon sink 的过程中本来只想快速梳理下 paimon 的 sink 时对 DataStream 操作的拓扑, 但是过程中发现 paimon 会有很多概念,并且这些概念都做了很好的抽象,一口吃不了大胖子,慢慢的边啃边理解吧。这篇记录在学习过程中的 paimon table,包含 paimon table 的创建和 table 本身。

paimon table 创建

在 sql 中 table 的创建少不了 catalog, catalog 的创建又离不开 flink 的 CatalogFactory。

Paimon 对于 flink CatalogFactory 的实现


Paimon 对于 flink CatalogFactory 实现类的不同点

  1. 核心方法 CatalogFactory#createCatalog 的实现不一样

Paimon 对于 Flink Catalog 的实现


从刚刚 createCatalog 方法中可以看到他们区别是 FlinkGenericCatalog 不仅仅含有 paimon 的 FlinkCatalog 还包含 flink hive connector 的 HiveCatalog,从 FlinkGenericCatalog 的实现来看,很多操作都会同时操作两个 catalog, 其中 HiveCatalog 是对 hive HMS 进行请求操作,FlinkCatalog 是对 paimon 进行操作,方法例举如下。

public CatalogBaseTable getTable(ObjectPath tablePath) {
     try {
         return paimon.getTable(tablePath); //  paimon 的 FlinkCatalog
     } catch (TableNotExistException e) {
         return flink.getTable(tablePath);  // hive connector 的 HiveCatalog

public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) {
     String connector = table.getOptions().get(CONNECTOR.key());
     if (FlinkCatalogFactory.IDENTIFIER.equals(connector)) { //是 paimon 表
         paimon.createTable(tablePath, table, ignoreIfExists); // 就用 paimon 的 FlinkCatalog 进行操作
     } else {
         flink.createTable(tablePath, table, ignoreIfExists); // 否则就用  hive connector 的 HiveCatalog

public List<String> listTables(String databaseName) {
     // flink list tables contains all paimon tables // 都包含?那他是怎么把 paimon 表同步到 HMS 的
     return flink.listTables(databaseName); // 为什么这里只用  hive connector 的 HiveCatalog 就行?

所以看起来 FlinkGenericCatalog 有如下特点

  1. 是可以自动识别是否为 paimon 表,优先用 paimon catalog 去尝试。
  2. 这个 catalog 可以兼容普通的 hive 表和 paimon 表

对于 listTables 等一些操作为什么只要用 HiveCatalog 就行?
既然都包含,那么 paimon 的 FlinkCatalog 是怎么把表同步到 HMS 的?

Paimon 的 FlinkCatalog

上面得知 FlinkCatalog 是 flink Catalog 的一个实现

FlinkCatalog extends org.apache.flink.table.catalog.AbstractCatalog {
  private final org.apache.paimon.catalog.Catalog catalog;
  public List<String> listTables(String databaseName) {
      return catalog.listTables(databaseName);
 ... ...

Paimon 的 Catalog


Paimon 的 Catalog 是如何创建的

Paimon 的 org.apache.paimon.hive.HiveCatalog

HiveCatalog extends AbstractCatalog implements Catalog{
      private final IMetaStoreClient client;  // hive client
      protected void alterTableImpl(Identifier identifier, List<SchemaChange> changes) {
        final SchemaManager schemaManager = schemaManager(identifier);
        // first commit changes to underlying files
        TableSchema schema = schemaManager.commitChanges(changes);
        try {
            // sync to hive hms 表变更同步到 hive
            Table table = client.getTable(identifier.getDatabaseName(), identifier.getObjectName());
            updateHmsTablePars(table, schema);
            updateHmsTable(table, identifier, schema);
                    identifier.getDatabaseName(), identifier.getObjectName(), table, true);
        } catch (Exception te) {
            throw new RuntimeException(te);

到这里就说明了 paimon table 的创建前揍, 从 Flink 的 CatalogFactory 到 Flink 的 Catalog, 再到 Paimon 基于 Flink 的 Catalog, Paimon 基于 Flink 的 Catalog 是一个壳子实际是用的 Paimon 的 Catalog, Paimon 的 Catalog 又是通过 Paimon 的 CatalogFactory 创建而来。接下来看看主角 Paimon 的 Catalog 创建的 Paimon Table, Table 的创建是 org.apache.paimon.catalog.AbstractCatalog 实现的

org.apache.paimon.catalog.AbstractCatalog implements Catalog  {
  public Table getTable(Identifier identifier) throws TableNotExistException {
        if (isSystemDatabase(identifier.getDatabaseName())) { // 先忽略
        } else if (isSpecifiedSystemTable(identifier)) { //先忽略
        } else {
            return getDataTable(identifier);

private FileStoreTable getDataTable(Identifier identifier) throws TableNotExistException {
        TableSchema tableSchema = getDataTableSchema(identifier);
        return FileStoreTableFactory.create(
                new CatalogEnvironment(
                                lockFactory().orElse(null), lockContext().orElse(null), identifier),

然后在 FileStoreTableFactory.create 方法中根据是否有主键会创建 AppendOnlyFileStoreTable 或者 PrimaryKeyFileStoreTable

    public static FileStoreTable create(
         FileIO fileIO, // 这又是个啥玩意?
         Path tablePath,
         TableSchema tableSchema,
         Options dynamicOptions,
         CatalogEnvironment catalogEnvironment) {
     FileStoreTable table =
                     ? new AppendOnlyFileStoreTable(
                             fileIO, tablePath, tableSchema, catalogEnvironment)
                     : new PrimaryKeyFileStoreTable(
                             fileIO, tablePath, tableSchema, catalogEnvironment);
     return table.copy(dynamicOptions.toMap());

到这里 Paimon Table 就已经创建完成了,Table 提供了表的读取写入和表操作的一些抽象,涉及面较多
简单看看 Table 和写入相关的一些 方法 混个眼熟,后面了解更多再补充

Paimon Table 之 FileStoreTable

abstract class  AbstractFileStoreTable AbstractFileStoreTable implements FileStoreTable {
    protected final FileIO fileIO;
    public BucketMode bucketMode() { // 分桶模式很重要
        return store().bucketMode(); // store() 方法在子类实现
  ... ...

BucketMode 翻译自官网

Fixed Bucket 指的是 BucketMode.FIXED

Dynamic Bucket 有两种

Dynamic bucket mode can not work with log system

Normal Dynamic Bucket Mode 指的是 BucketMode.DYNAMIC

  1. 一般来说,没有性能损失,但会有一些额外的内存消耗,一个分区中的1 亿个 条目多占用1 GB内存,不再活动的分区不占用内存。
  2. 对于更新率较低的表,建议使用此模式,以显着提高性能。

Normal Dynamic Bucket Mode支持sort-compact以加快查询速度。请参阅紧凑排序

Cross Partitions Upsert Dynamic Bucket Mode 指的是 BucketMode.GLOBAL_DYNAMIC



