apache iceberg

iceberg源码-FlinkSource

2021-09-23  本文已影响0人  神奇的考拉

一、概述

FlinkSource适用于通过Flink方式来读取iceberg的table,类似TableScan(并且每个Scan都会关联一个ScanContext)。
FlinkSource能够读取有界静态数据,同时也是支持增量数据和流式读取。

二、实现

1、默认情况下FlinkSource是不允许构造的(私有化构造器)
2、通过Builder设计模式将复杂对象的构建与其表示分离;

// ============ 省略部分代码 ============== //
// flink stream环境
private StreamExecutionEnvironment env;
// iceberg table
private Table table;
// 用于loader iceberg表
private TableLoader tableLoader;
// iceberg的schema描述
private TableSchema projectedSchema;
// 进行icebeg 表读取的配置参数
private ReadableConfig readableConfig = new Configuration();
// 构建scan上下文(很关键);能够进行filter/limit/properties设置/snapshotid指定/是否区分大小写
// datafile split/数据回放等
private final ScanContext.Builder contextBuilder = ScanContext.builder();
// ============ 省略部分代码 ============== //
关键代码
// ============== buildFormat =============== //
public FlinkInputFormat buildFormat() {
  Preconditions.checkNotNull(tableLoader, "TableLoader should not be null");
  // 对应的iceberg表的schema 
  Schema icebergSchema;
  // 用于iceberg的读取(实例化的时候注意是可序列化的[写入的时候是串行的,读写可以同时存在])
  FileIO io;
  // iceberg datafile文件的加解密的(实例化的时候也是需要注意是可序列化的)
  EncryptionManager encryption;
  if (table == null) { // 加载iceberg表
    // load required fields by table loader.
    // 表加载器来完成内容的加载
    tableLoader.open();
    try (TableLoader loader = tableLoader) {
      table = loader.loadTable();
      icebergSchema = table.schema();
      io = table.io();
      encryption = table.encryption();
    } catch (IOException e) {
      throw new UncheckedIOException(e);
    }
  } else {
    icebergSchema = table.schema();
    io = table.io();
    encryption = table.encryption();
  }

  if (projectedSchema == null) {
    contextBuilder.project(icebergSchema);
  } else {
    contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema, projectedSchema));
  }

  // 最终构建一个RichInputFormat 
  return new FlinkInputFormat(tableLoader, icebergSchema, io, encryption, contextBuilder.build());
}

构建DataStream

public DataStream<RowData> build() {
  Preconditions.checkNotNull(env, "StreamExecutionEnvironment should not be null");
  // 1、构建source读取的input format
  FlinkInputFormat format = buildFormat();
  // 2、构建读取scan
  ScanContext context = contextBuilder.build();
  // 3、用于iceberg与flink的类型转换
  TypeInformation<RowData> typeInfo = FlinkCompatibilityUtil.toTypeInfo(FlinkSchemaUtil.convert(context.project()));
  // 4、构建datastream
  if (!context.isStreaming()) { // 4.1 批模式
    // 指定读取时的并行度
    int parallelism = inferParallelism(format, context);
    // 创建datastream
    return env.createInput(format, typeInfo).setParallelism(parallelism);
  } else { // 4.2 流模式
    // 根据指定monitor周期进行数据读取
    StreamingMonitorFunction function = new StreamingMonitorFunction(tableLoader, context);
    
    String monitorFunctionName = String.format("Iceberg table (%s) monitor", table);
    String readerOperatorName = String.format("Iceberg table (%s) reader", table);
    // 按照指定的monitor进行数据读取
    return env.addSource(function, monitorFunctionName)
        .transform(readerOperatorName, typeInfo, StreamingReaderOperator.factory(format));
  }
}

关于批模式读取时的并行度设置

// 用于flink批模式下读取时的parallelism的大小设置
// parallelism来自当前读取的split的个数 以及 手动指定的table.exec.resource.default-parallelism两个选项中最小的那个值
int inferParallelism(FlinkInputFormat format, ScanContext context) {
  int parallelism = readableConfig.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM); // 手动指定的table.exec.resource.default-parallelism
  if (readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM)) { // 是否开启根据split个数来推断当前读取的并行度个数;若是未指定的时候,则使用手动指定
    int maxInferParallelism = readableConfig.get(FlinkConfigOptions
        .TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM_MAX);  // 默认指定读取时的并行度:100
    Preconditions.checkState(
        maxInferParallelism >= 1,
        FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM_MAX.key() + " cannot be less than 1");
    int splitNum;
    try {
      FlinkInputSplit[] splits = format.createInputSplits(0); // 对datafiles读取进行split的个数
      splitNum = splits.length;
    } catch (IOException e) {
      throw new UncheckedIOException("Failed to create iceberg input splits for table: " + table, e);
    }

    parallelism = Math.min(splitNum, maxInferParallelism); // 取split和默认MAX_parallelism的最小值
  }

  if (context.limit() > 0) { // 还需要注意limit参数的设置防止parallelism设置过大
    int limit = context.limit() >= Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) context.limit();
    parallelism = Math.min(parallelism, limit);
  }

  // parallelism must be positive.
  parallelism = Math.max(1, parallelism);
  return parallelism;
}

三、关于FlinkSource的批读取

相对比较简单通过Stream Execution Environment来创建InputFormat

四、关于FlinkSource的流读取

首先指定monitor function定时获取对应的datafiles内容,并执行split,再将生成的FlinkInputSplit下发给下游的task进行处理;

上一篇下一篇

猜你喜欢

热点阅读