一文搞定 Flink Job 提交全流程
前言
前面,我们已经分析了 一文搞定 Flink 消费消息的全流程 、写给大忙人看的 Flink Window原理 还有 一文搞定 Flink Checkpoint Barrier 全流程 等等,接下来也该回归到最初始的时候,Flink Job 是如何提交的。
正文
我们知道 Flink 总共有两种提交模式:本地模式和远程模式( 当然也对应着不同的 environment,具体可以参考 Flink Context到底是什么?),我们以本地模式为主,两种模式基本上相似。
当我们执行 env.execute ,实际上调用了 LocalStreamEnvironment.execute 方法
/**
* Executes the JobGraph of the on a mini cluster of CLusterUtil with a user
* specified name.
*
* @param jobName
* name of the job
* @return The result of the job execution, containing elapsed time and accumulators.
*/
@Override
// 本地模式执行方法 env.execute
public JobExecutionResult execute(String jobName) throws Exception {
// transform the streaming program into a JobGraph
//TODO 111
//获取 streamGraph
StreamGraph streamGraph = getStreamGraph();
streamGraph.setJobName(jobName);
//获取 jobGraph
JobGraph jobGraph = streamGraph.getJobGraph();
jobGraph.setAllowQueuedScheduling(true);
Configuration configuration = new Configuration();
configuration.addAll(jobGraph.getJobConfiguration());
configuration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "0");
// add (and override) the settings with what the user defined
configuration.addAll(this.configuration);
if (!configuration.contains(RestOptions.BIND_PORT)) {
configuration.setString(RestOptions.BIND_PORT, "0");
}
int numSlotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, jobGraph.getMaximumParallelism());
MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder()
.setConfiguration(configuration)
.setNumSlotsPerTaskManager(numSlotsPerTaskManager)
.build();
if (LOG.isInfoEnabled()) {
LOG.info("Running job on local embedded Flink mini cluster");
}
MiniCluster miniCluster = new MiniCluster(cfg);
try {
//启动集群,包括启动JobMaster,进行leader选举等等
miniCluster.start();
configuration.setInteger(RestOptions.PORT, miniCluster.getRestAddress().get().getPort());
// 提交任务到JobMaster
return miniCluster.executeJobBlocking(jobGraph);
}
finally {
transformations.clear();
miniCluster.close();
}
}
这里构建了 StreamGraph、JobGraph,到后面还会有 ExecutionGraph,关于这些图的一些东西,一张图就差不多了
在这里插入图片描述
当 miniCluster.start() 时
// start cluster
public void start() throws Exception {
synchronized (lock) {
checkState(!running, "MiniCluster is already running");
......
ioExecutor = Executors.newFixedThreadPool(
Hardware.getNumberCPUCores(),
new ExecutorThreadFactory("mini-cluster-io"));
//创建 HA service
haServices = createHighAvailabilityServices(configuration, ioExecutor);
//启动 blobServer
blobServer = new BlobServer(configuration, haServices.createBlobStore());
blobServer.start();
heartbeatServices = HeartbeatServices.fromConfiguration(configuration);
blobCacheService = new BlobCacheService(
configuration, haServices.createBlobStore(), new InetSocketAddress(InetAddress.getLocalHost(), blobServer.getPort())
);
// task executor
startTaskManagers();
......
resourceManagerLeaderRetriever.start(resourceManagerGatewayRetriever);
dispatcherLeaderRetriever.start(dispatcherGatewayRetriever);
webMonitorLeaderRetrievalService.start(webMonitorLeaderRetriever);
......
}
创建了 HaService,启动了 blobCacheService、resourceManagerLeaderRetriever、dispatcherLeaderRetriever、webMonitorLeaderRetrievalService,我们重点看一下 startTaskManagers
@VisibleForTesting
void startTaskExecutor() throws Exception {
synchronized (lock) {
final Configuration configuration = miniClusterConfiguration.getConfiguration();
final TaskExecutor taskExecutor = TaskManagerRunner.startTaskManager(
configuration,
new ResourceID(UUID.randomUUID().toString()),
taskManagerRpcServiceFactory.createRpcService(),
haServices,
heartbeatServices,
metricRegistry,
blobCacheService,
useLocalCommunication(),
taskManagerTerminatingFatalErrorHandlerFactory.create(taskManagers.size()));
taskExecutor.start();
taskManagers.add(taskExecutor);
}
}
TaskExecutor 创建了 TaskExecutor 并启动了。最终的用来submitTask、cancalTask、stopTask 、执行 task 、confirmCheckpoint、requestSlot、freeSlot 等等。
一些必要的组件已经启动成功,接下来该提交 jobGraph 了 miniCluster.executeJobBlocking(jobGraph); 跟踪代码
public CompletableFuture<JobSubmissionResult> submitJob(JobGraph jobGraph) {
final CompletableFuture<DispatcherGateway> dispatcherGatewayFuture = getDispatcherGatewayFuture();
// we have to allow queued scheduling in Flip-6 mode because we need to request slots
// from the ResourceManager
jobGraph.setAllowQueuedScheduling(true);
final CompletableFuture<InetSocketAddress> blobServerAddressFuture = createBlobServerAddress(dispatcherGatewayFuture);
// cache jars and files
final CompletableFuture<Void> jarUploadFuture = uploadAndSetJobFiles(blobServerAddressFuture, jobGraph);
final CompletableFuture<Acknowledge> acknowledgeCompletableFuture = jarUploadFuture
.thenCombine(
dispatcherGatewayFuture,
// 这里真正 submit 操作,交给了 dispatcher 去执行
(Void ack, DispatcherGateway dispatcherGateway) -> dispatcherGateway.submitJob(jobGraph, rpcTimeout))
.thenCompose(Function.identity());
return acknowledgeCompletableFuture.thenApply(
(Acknowledge ignored) -> new JobSubmissionResult(jobGraph.getJobID()));
}
最终交给了 dispatcher 来进行 jobGraph 的提交,最终到这里
private CompletableFuture<Void> runJob(JobGraph jobGraph) {
Preconditions.checkState(!jobManagerRunnerFutures.containsKey(jobGraph.getJobID()));
//创建 job Manager runner
final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = createJobManagerRunner(jobGraph);
jobManagerRunnerFutures.put(jobGraph.getJobID(), jobManagerRunnerFuture);
// start job manager
return jobManagerRunnerFuture
.thenApply(FunctionUtils.uncheckedFunction(this::startJobManagerRunner))
.thenApply(FunctionUtils.nullFn())
.whenCompleteAsync(
(ignored, throwable) -> {
if (throwable != null) {
jobManagerRunnerFutures.remove(jobGraph.getJobID());
}
},
getMainThreadExecutor());
}
这个时候开始创建 jobManagerRunner,在创建 jobManagerRunner 的同时也会创建 jobMaster
public JobMaster(
RpcService rpcService,
JobMasterConfiguration jobMasterConfiguration,
ResourceID resourceId,
JobGraph jobGraph,
HighAvailabilityServices highAvailabilityService,
SlotPoolFactory slotPoolFactory,
SchedulerFactory schedulerFactory,
JobManagerSharedServices jobManagerSharedServices,
HeartbeatServices heartbeatServices,
JobManagerJobMetricGroupFactory jobMetricGroupFactory,
OnCompletionActions jobCompletionActions,
FatalErrorHandler fatalErrorHandler,
ClassLoader userCodeLoader) throws Exception {
super(rpcService, AkkaRpcServiceUtils.createRandomName(JOB_MANAGER_NAME));
this.jobMasterConfiguration = checkNotNull(jobMasterConfiguration);
this.resourceId = checkNotNull(resourceId);
this.jobGraph = checkNotNull(jobGraph);
this.rpcTimeout = jobMasterConfiguration.getRpcTimeout();
this.highAvailabilityServices = checkNotNull(highAvailabilityService);
this.blobWriter = jobManagerSharedServices.getBlobWriter();
this.scheduledExecutorService = jobManagerSharedServices.getScheduledExecutorService();
this.jobCompletionActions = checkNotNull(jobCompletionActions);
this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
this.userCodeLoader = checkNotNull(userCodeLoader);
this.heartbeatServices = checkNotNull(heartbeatServices);
this.jobMetricGroupFactory = checkNotNull(jobMetricGroupFactory);
final String jobName = jobGraph.getName();
final JobID jid = jobGraph.getJobID();
log.info("Initializing job {} ({}).", jobName, jid);
final RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration =
jobGraph.getSerializedExecutionConfig()
.deserializeValue(userCodeLoader)
.getRestartStrategy();
// 设置重启策略
this.restartStrategy = RestartStrategyResolving.resolve(restartStrategyConfiguration,
jobManagerSharedServices.getRestartStrategyFactory(),
jobGraph.isCheckpointingEnabled());
.....
//TODO 111
//createExecutionGraph 可能会 restore from checkpoint(savepoint)
this.executionGraph = createAndRestoreExecutionGraph(jobManagerJobMetricGroup);
......
}
最关键的时在创建 jobMaster 的同时还 create executionGraph。然后开始启动 jobManagerRunner,最终会启动 jobMaster
private CompletionStage<Void> startJobMaster(UUID leaderSessionId) {
log.info("JobManager runner for job {} ({}) was granted leadership with session id {} at {}.",
jobGraph.getName(), jobGraph.getJobID(), leaderSessionId, getAddress());
try {
runningJobsRegistry.setJobRunning(jobGraph.getJobID());
} catch (IOException e) {
return FutureUtils.completedExceptionally(
new FlinkException(
String.format("Failed to set the job %s to running in the running jobs registry.", jobGraph.getJobID()),
e));
}
final CompletableFuture<Acknowledge> startFuture;
try {
// 通过给定的 jobId start job master
startFuture = jobMasterService.start(new JobMasterId(leaderSessionId));
} catch (Exception e) {
return FutureUtils.completedExceptionally(new FlinkException("Failed to start the JobMaster.", e));
}
final CompletableFuture<JobMasterGateway> currentLeaderGatewayFuture = leaderGatewayFuture;
return startFuture.thenAcceptAsync(
(Acknowledge ack) -> confirmLeaderSessionIdIfStillLeader(leaderSessionId, currentLeaderGatewayFuture),
executor);
}
jobMaster 启动完,就会正式开始执行 job 了
public CompletableFuture<Acknowledge> start(final JobMasterId newJobMasterId) throws Exception {
// make sure we receive RPC and async calls
start();
// 正式 开始执行 Job
return callAsyncWithoutFencing(() -> startJobExecution(newJobMasterId), RpcUtils.INF_TIMEOUT);
}
开始正式执行 job
// start job execution
private Acknowledge startJobExecution(JobMasterId newJobMasterId) throws Exception {
validateRunsInMainThread();
checkNotNull(newJobMasterId, "The new JobMasterId must not be null.");
if (Objects.equals(getFencingToken(), newJobMasterId)) {
log.info("Already started the job execution with JobMasterId {}.", newJobMasterId);
return Acknowledge.get();
}
setNewFencingToken(newJobMasterId);
startJobMasterServices();
log.info("Starting execution of job {} ({}) under job master id {}.", jobGraph.getName(), jobGraph.getJobID(), newJobMasterId);
// 重新设置或者调度 executionGraph
resetAndScheduleExecutionGraph();
return Acknowledge.get();
}
然后就开始调度 executionGraph 了
// 调度 execution
public void scheduleForExecution() throws JobException {
assertRunningInJobMasterMainThread();
final long currentGlobalModVersion = globalModVersion;
//改变 job 的状态,由 CREATED 变为 RUNNING
if (transitionState(JobStatus.CREATED, JobStatus.RUNNING)) {
final CompletableFuture<Void> newSchedulingFuture;
switch (scheduleMode) {
case LAZY_FROM_SOURCES:
newSchedulingFuture = scheduleLazy(slotProvider);
break;
case EAGER:
// 300000 ms default
//开始调度
newSchedulingFuture = scheduleEager(slotProvider, allocationTimeout);
break;
default:
throw new JobException("Schedule mode is invalid.");
}
if (state == JobStatus.RUNNING && currentGlobalModVersion == globalModVersion) {
schedulingFuture = newSchedulingFuture;
newSchedulingFuture.whenComplete(
(Void ignored, Throwable throwable) -> {
if (throwable != null && !(throwable instanceof CancellationException)) {
// only fail if the scheduling future was not canceled
failGlobal(ExceptionUtils.stripCompletionException(throwable));
}
});
} else {
newSchedulingFuture.cancel(false);
}
}
else {
throw new IllegalStateException("Job may only be scheduled from state " + JobStatus.CREATED);
}
}
调度了之后就开始 deploy 了
/**
* Deploys the execution to the previously assigned resource.
*
* @throws JobException if the execution cannot be deployed to the assigned resource
*/
// 从 source 到 sink 循环部署
public void deploy() throws JobException {
assertRunningInJobMasterMainThread();
final LogicalSlot slot = assignedResource;
.....
// TaskDeploymentDescriptor 这个类保存了 task 执行所必须的所有内容,
// 例如序列化的算子,输入的 InputGate 和输出的 ResultPartition 的定义,该 task 要作为几个 subtask 执行等等。
final TaskDeploymentDescriptor deployment = vertex.createDeploymentDescriptor(
attemptId,
slot,
taskRestore,
attemptNumber);
// null taskRestore to let it be GC'ed
taskRestore = null;
final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
final ComponentMainThreadExecutor jobMasterMainThreadExecutor =
vertex.getExecutionGraph().getJobMasterMainThreadExecutor();
// We run the submission in the future executor so that the serialization of large TDDs does not block
// the main thread and sync back to the main thread once submission is completed.
// 提交 task 先 source
// 对于 TM 来说,执行 task 就是把收到的 TaskDeploymentDescriptor 对象转换成一个 task 并执行的过程。
CompletableFuture.supplyAsync(() -> taskManagerGateway.submitTask(deployment, rpcTimeout), executor)
.thenCompose(Function.identity())
.whenCompleteAsync(
(ack, failure) -> {
// only respond to the failure case
if (failure != null) {
if (failure instanceof TimeoutException) {
String taskname = vertex.getTaskNameWithSubtaskIndex() + " (" + attemptId + ')';
markFailed(new Exception(
"Cannot deploy task " + taskname + " - TaskManager (" + getAssignedResourceLocation()
+ ") not responding after a rpcTimeout of " + rpcTimeout, failure));
} else {
markFailed(failure);
}
}
},
jobMasterMainThreadExecutor);
}
catch (Throwable t) {
markFailed(t);
ExceptionUtils.rethrow(t);
}
}
部署的过程当中可能会申请资源,然后就开始提交 task 了,再往下就开始执行 task 了。
总结
yarn 模式
在这里插入图片描述