Flink(1.13) 核心概念

2021-08-20  本文已影响0人  万事万物

TaskManager与Slots

Flink中每一个worker(TaskManager)都是一个JVM进程,它可能会在独立的线程上执行一个Task。为了控制一个worker能接收多少个task,worker通过Task Slot来进行控制(一个worker至少有一个Task Slot)。
这里的Slot如何来理解呢?很多的文章中经常会和Spark框架进行类比,将Slot类比为Core,其实简单这么类比是可以的,可实际上,可以考虑下,当Spark申请资源后,这个Core执行任务时有可能是空闲的,但是这个时候Spark并不能将这个空闲下来的Core共享给其他Job使用,所以这里的Core是Job内部共享使用的。接下来我们再回想一下,之前在Yarn Session-Cluster模式时,其实是可以并行执行多个Job的,那如果申请两个Slot,而执行Job时,只用到了一个,剩下的一个怎么办?那我们自认而然就会想到可以将这个Slot给并行的其他Job,对吗?所以Flink中的Slot和Spark中的Core还是有很大区别的。
每个task slot表示TaskManager拥有资源的一个固定大小的子集。假如一个TaskManager有三个slot,那么它会将其管理的内存分成三份给各个slot。资源slot化意味着一个task将不需要跟来自其他job的task竞争被管理的内存,取而代之的是它将拥有一定数量的内存储备。需要注意的是,这里不会涉及到CPU的隔离,slot目前仅仅用来隔离task的受管理的内存。

# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.

taskmanager.numberOfTaskSlots: 1
  1. 一个worker至少有个一个task slot
  2. slot之间可以共享资源
  3. slot之间是内存隔离不是cpu隔离

4.2.2Parallelism(并行度)

并行度

一个特定算子的子任务(subtask)的个数被称之为这个算子的并行度(parallelism),一般情况下,一个流程序的并行度,可以认为就是其所有算子中最大的并行度。一个程序中,不同的算子可能具有不同的并行度。
Stream在算子之间传输数据的形式可以是one-to-one(forwarding)的模式也可以是redistributing的模式,具体是哪一种形式,取决于算子的种类。

# The parallelism used for programs that did not specify and other parallelism.

parallelism.default: 1

并行度:

优先级:
算子指定 > env全局指定 > 提交参数 > 配文件

同一个算子的并行实例,不能在同一个slot里,

slot数量必须大于等于最大并行度,才可以正常运行,总结slot的数量是由最大并行度所决定的。

并行对与subtask的关系:

共享组

在默认情况下,JobManager 会将志同道合算子划分到一起,他们都属于在一个组内(default)

示例图

如上图:socket 算子和 FlatMap 分在一个组内。若想把它们拆开可以自定义共享组,每个算子都有slotSharingGroup函数,作用就是将其独立划分成一个组。

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 监听端口
        DataStreamSource<String> source = env.socketTextStream("hadoop102", 9999);


        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = source.flatMap((FlatMapFunction<String, Tuple2<String, Integer>>) (lines, out) -> {
            // 切割每行数据,并收集到 Collector中
            Arrays.stream(lines.split(" ")).forEach(s -> out.collect(Tuple2.of(s, 1)));
        }).returns(Types.TUPLE(Types.STRING, Types.INT)).slotSharingGroup("flatMapGroup")
                
                .keyBy(0).sum(1);

        sum.print("test");

        env.execute();

    }

注意这段代码,将flatMap 重新划分一个组,这样它就单独使用slot了。

     SingleOutputStreamOperator<Tuple2<String, Integer>> sum = source.flatMap((FlatMapFunction<String, Tuple2<String, Integer>>) (lines, out) -> {
            // 切割每行数据,并收集到 Collector中
            Arrays.stream(lines.split(" ")).forEach(s -> out.collect(Tuple2.of(s, 1)));
        }).returns(Types.TUPLE(Types.STRING, Types.INT)).slotSharingGroup("flatMapGroup")

再运行;flat map就单独拎出去了。


共享组的好处:同一个slot中,其他subtask之间就变争抢slot中资源,好比5个人在一个房子了,什么东西都是五个人再用,共享组相当于单独一个房间,房间里的资源都是属于他一个人的。实际工作中,通常会将任务比较重的,单独划分成一个组。

  1. 只有属于同一个slot共享组的subtask,才可以共享同一个slot
  2. 属于同一个算子的subtask,不能共享同一个slot。
    比如Map算子并发度为3,相当于把任务分成了三份,若他们都在同一个slot中执行,将毫无意义,也是不允许的。所以会将它们分在不同的slot中,这样才能更好提高执行的并发度
上一篇下一篇

猜你喜欢

热点阅读