flink run任务提交流程<命令行方式>
2019-10-24 本文已影响0人
Watchi
运行模式是 flink-on-yarn per-job模式,每个任务有独立的yarn session,启动任务的方式是CLI方式。所以我们任务启动命令像是这样:
flink run -ynm your_jobName -yn 7 -ys 2 -p 14 -ytm 2048m -yjm 2048m -m yarn-cluster -yD metrics.reporter.influxdb_reporter.db=metrics_flink -c com.xxx.K2kExtractStream k2k-extractor-1.0-SNAPSHOT_jobName.jar
那么任务具体是如何启动的呢?
让我们先看下flink启动脚本:
image2019-10-24 13_42_41.png
再看下CliFrontend的实现:
image2019-10-24 12_57_14.png
获取配置,最终调用:parseParameters方法
image2019-10-24 12_56_44.png
这个方法里构建了 PackagedProgram 对象,对象内容如下:
image2019-10-24 13_0_17.png
然后把PackagedProgram以及几个参数传入runProgram
image2019-10-24 12_58_35.png
这个方法很关键,里面创建了ClusterClient (on-yarn模式即RestClusterClient) 和 jobGraph(通过program构建),并把jobGraph部署到JobCluster。
image2019-10-24 13_28_27.png
deploySessionCluster是调用的父类AbstractYarnClusterDescriptor的deploySessionCluster,并返回一个client对象。这里对配置和yarn集群相关的做了一定的检查,最主要调用了两个方法:
1 startAppMaster里面封装了大量yarn客户端代码,最终yarnClient.submitApplication(appContext);
和 2 createYarnClusterClient,分别是创建appMaster和启动一个与yarn进行Http 交互的Client(启动了两个LeaderRetrievalService)
image2019-10-24 13_19_26.png
至此发生了什么呢? 上面我们提到了返回一个client,其实上面的过程执行后,yarn上已经有了一个flink任务(通过yarn任务list可查),只不过这个flink任务还没有执行env.execute的后续流程。
client的执行包含了env.execute(),我们继续往下看
image2019-10-24 13_35_57.png
这个里面的逻辑是先切换当前classloader为userCodeClassloader。然后开始通过反射去执行任务jar包里的main函数。
image2019-10-24 13_37_43.png
最后执行的方法:
mainMethod.invoke(null, (Object) args);
至此任务提交jar到执行jar内main函数已经分析完,有些流程不那么重点的就忽略了,比如如何启动的appMaster。先到这里,之后env.execute 是如何转化成可执行任务的。后续补充......