iceberg源码-FlinkSplitGenerator

2021-09-24  本文已影响0人  神奇的考拉

一、概述

主要是为了flink进行读取datafile提供读取性能以及整体吞吐量,来进行combine split的操作;并基于此来生成对应的CombineScanTask,进而生成FlinkInputSplit的。

二、实现

// 关键代码
// 生成CombineScanTask用于完成每个FlinkInputSplit
private static List<CombinedScanTask> tasks(Table table, ScanContext context) {
  TableScan scan = table
      .newScan()  // 创建一个新的scan
      .caseSensitive(context.caseSensitive()) // 当指定数据列是否需要考虑忽略大小写
      .project(context.project());            // 是否进行下推读取

  if (context.snapshotId() != null) { // 指定本次读取的snapshot
    scan = scan.useSnapshot(context.snapshotId());
  }

  if (context.asOfTimestamp() != null) { // 指定本次读取的timestamp
    scan = scan.asOfTime(context.asOfTimestamp());
  }

  if (context.startSnapshotId() != null) { 
    if (context.endSnapshotId() != null) { // 指定本次读取的[startSnapshot, endSnapshot)
      scan = scan.appendsBetween(context.startSnapshotId(), context.endSnapshotId());
    } else { // 流式读取 未指定对应的endSnapshot
      scan = scan.appendsAfter(context.startSnapshotId());
    }
  }
    
  if (context.splitSize() != null) { // 指定split时的大小
    scan = scan.option(TableProperties.SPLIT_SIZE, context.splitSize().toString());
  }

  if (context.splitLookback() != null) { // 指定combine split时的文件个数
    scan = scan.option(TableProperties.SPLIT_LOOKBACK, context.splitLookback().toString());
  }

  if (context.splitOpenFileCost() != null) { // 指定combine split时对应的文件大小
    scan = scan.option(TableProperties.SPLIT_OPEN_FILE_COST, context.splitOpenFileCost().toString());
  }

  if (context.filters() != null) { // 指定本次scan的filter内容,便于在进行读取下推 在读取时过滤对应的内容
    for (Expression filter : context.filters()) {
      scan = scan.filter(filter);
    }
  }

  // 获取到对应的CombineScanTask集
  try (CloseableIterable<CombinedScanTask> tasksIterable = scan.planTasks()) {
    return Lists.newArrayList(tasksIterable);
  } catch (IOException e) {
    throw new UncheckedIOException("Failed to close table scan: " + scan, e);
  }
}
上一篇下一篇

猜你喜欢

热点阅读