Flink

Flink在加载文件数据源时,如何创建分片呢?

2019-12-01  本文已影响0人  LZhan

主要分析FileInputFormat类的createInputSplits方法

参数minNumSplits,通常是readFile等读取文件操作的并行度决定的。

(1)minNumSplits = Math.max(minNumSplits, this.numSplits);
先获取这个minNumSplits变量,在并行度与numSplits中取最大值(numSplits通常是在FileInputFormat类的configure方法中定义)

(2)获取这个路径下的所有文件的总长度 totalLength


(3)之后进行unsplittable分支判断
这个unsplittable用来判断文件格式是否能够spliit,有些文件格式如avro,deflate等在块级别上是不能分割的(are not splittable),只能整个文件读取。


所以,如果该文件格式是不支持split的,那么分片的数目就等于该路径下文件的数目。


this.minSplitSize是在FileInputFormat类configure方法中定义的,如果该值小于等于block大小,
那么minSplitSize就是configure中定义的minSplitSize;
否则的话minSplitSize就是block size。

splitSize = Math.max(minSplitSize, Math.min(maxSplitSize, blockSize));
halfSplit就是splitSize右移1位(相当于除以2)

定义最后一个分片的大小,可以是一般的1.1倍
final long maxBytesForLastSplit = (long) (splitSize * MAX_SPLIT_SIZE_DISCREPANCY);

c、开始对一个文件开始分片

先去遍历该文件的每一个block(按照offset排序过),获取该block的start位置和end位置;
如果当前文件内容offset在start和end之间,则进入if分支;
接下来再去判断当前block的分片内容是不是超过一半了,没有超过一半在返回下一个block的索引位置,否则返回当前索引位置(即返回包含大部分数据的block索引位置)。

<2>的第二部分:
blockIndex = getBlockIndexForPosition(blocks, position, halfSplit, blockIndex);
返回当前split超过一半内容所在的block,并进行初始化分片即FileInputSplit实例

<3> 分配最后一个分片split


上一篇下一篇

猜你喜欢

热点阅读