Flink Parallelism和Slot理解

2019-11-22  本文已影响0人  LZhan

相关博客:Flink工作原理

image.png
1 问题出现

Caused by: akka.pattern.AskTimeoutException:
Ask timed out on [Actor[akka://flink/user/taskmanager_0#15608456]] after [10000 ms].
Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.LocalRpcInvocation".

该问题的导致是因为TaskManager的slot数量不足的原因,导致job提交失败。在Flink 1.63中修复变成异常了。

2 什么是parallelism?

一个Flink程序是由多个任务组成(source、transformation和sink)。一个任务由多个并行的实例(线程)来执行,一个任务的并行实例(线程)数目就被称为该任务的并行度。

并行的意思,在Flink中代表每个任务的并行度,适当的提高并行度可以大大提高job的执行效率,比如当你的job消费kafka的速度过慢,适当调大就消费正常了。
flink配置文件中可以查看到默认并行度是1。

如何设置并行度?
<1>命令行:./bin/flink run -p 10 ../wordcount.jar
<2>代码中:env.setParallelism(10)

这里设置的并行度,是整个程序的并行度,那么如果后面的每个算子不单独设置并行度覆盖的话,那么后面每个算子的并行度都是这里设置的值了。
可以在每个算子后面给每个算子单独设置并行度:

data.keyBy(new xxxKey())
    .flatMap(new XxxFlatMapFunction()).setParallelism(5)
    .map(new XxxMapFunction).setParallelism(5)
    .addSink(new XxxSink()).setParallelism(1)
2.1 任务的并行度可以从多个层次指定

优先级由高到低:

.flatMap(new XxxFlatMapFunction()).setParallelism(5)
.map(new XxxMapFunction).setParallelism(5)
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
./bin/flink run -p 10 ../examples/*WordCount-java*.jar
2.2 operator subtask

在程序执行期间,一个流会生成一个或者多个stream的partition,并且一个operator会生成一个或者多个operator subtask。operator的subtask彼此之间是独立的,分别在不同的线程里去执行并且可能分布在不同的机器上或者containers上。
operator的subtasks的数量等于该操作算子的并行度的数量。
流的并行度有总是取决于产生它的操作算子的并行度决定的。同一个flink程序中的不同的operators可能有不同的并行度。

image.png
3.Slot

默认,如果subtask是来自相同的job,但不是相同的task,Flink允许subtask共享slot。这样就会出现一个slot可能容纳一个job中的整个pipeline。允许slot共享有以下两个好处:
① Flink集群需要的task slots的数量和作业中的最高并行度的一致。不需要计算一个程序总共包含多少个task。
②更好的利用资源。如果没有slot共享,非密集型source/map()子任务将阻塞与资源密集型窗口子任务一样多的资源;在slot共享的话,将我们图6的示例中的基本并行度从2提高到6,可以充分利用slot资源,同时确保繁重的subtasks在Taskmanager中公平分配。

image.png

注意:这里的job就是一个flink任务,task就是该任务里面的source,map,sink等,而subtask就是每个task的多个并行实例(实例个数就是并行度的大小),如果并行度为2,那么source[1],source[2]就是两个属于source的subtask。

4.TaskManager的运行个数

Job的最大并行度除以每个TaskManager分配的任务槽数。
yn:TaskManager的实际个数 ;ys:每个TaskManager分配的slot数;
p:最大并行度
TaskManager的任务槽个数在使用flink run脚本提交on YARN作业时用-ys/--yarnslots参数来指定,另外在flink-conf.yaml文件中也有默认值taskManager.numberOfTaskSlots

即yn(实际)=Math.ceil(p/ys)
ys(总共)=yn(实际)*ys(指定)
实际需要的slot数目=p

上一篇 下一篇

猜你喜欢

热点阅读