flinkflink

Flink作业提交(三)--- Job运行

2021-02-18  本文已影响0人  sj_91d7

源码分析JobMaster如何run起来 介绍到了JobMaster.start方法,这个方法主要是启动rpc服务,并且运行job,接下来看下怎么run job?本文内容是基于Flink 1.9来讲解。

1. 首先看下JobMaster.start方法源码

    /**
     * Start the rpc service and begin to run the job.
     *
     * @param newJobMasterId The necessary fencing token to run the job
     * @return Future acknowledge if the job could be started. Otherwise the future contains an exception
     */
    public CompletableFuture<Acknowledge> start(final JobMasterId newJobMasterId) throws Exception {
        // make sure we receive RPC and async calls
        start();

        return callAsyncWithoutFencing(() -> startJobExecution(newJobMasterId), RpcUtils.INF_TIMEOUT);
    }

2. 接下来看JobMaster.startJobExecution方法

    //-- job starting and stopping  -----------------------------------------------------------------

    private Acknowledge startJobExecution(JobMasterId newJobMasterId) throws Exception {

        validateRunsInMainThread();

        checkNotNull(newJobMasterId, "The new JobMasterId must not be null.");

        if (Objects.equals(getFencingToken(), newJobMasterId)) {
            log.info("Already started the job execution with JobMasterId {}.", newJobMasterId);

            return Acknowledge.get();
        }

        setNewFencingToken(newJobMasterId);

        startJobMasterServices();

        log.info("Starting execution of job {} ({}) under job master id {}.", jobGraph.getName(), jobGraph.getJobID(), newJobMasterId);

        resetAndStartScheduler();

        return Acknowledge.get();
    }

这里有两个重要的方法 startJobMasterServices(); 和 resetAndStartScheduler();  
这两个方法里涉及到的东西都比较多,分开来介绍。

2.1 开始分析 startJobMasterServices() 方法

2.1.1 startHeartbeatServices();

2.1.2 slotPool.start(getFencingToken(), getAddress(), getMainThreadExecutor());

  1. 背景知识:SlotPool是干什么的?
  1. slotPool.start做了哪些工作?

2.1.3 scheduler.start(getMainThreadExecutor());

  1. 背景知识:scheduler是干什么的?
  1. scheduler.start做了哪些工作?

2.1.4 reconnectToResourceManager(new FlinkException("Starting JobMaster component."));

    // ------------------------------------------------------------------------
    //  RPC methods
    // ------------------------------------------------------------------------

    @Override
    public CompletableFuture<RegistrationResponse> registerJobManager(
            final JobMasterId jobMasterId,
            final ResourceID jobManagerResourceId,
            final String jobManagerAddress,
            final JobID jobId,
            final Time timeout) {

        checkNotNull(jobMasterId);
        checkNotNull(jobManagerResourceId);
        checkNotNull(jobManagerAddress);
        checkNotNull(jobId);

        if (!jobLeaderIdService.containsJob(jobId)) {
            try {
                jobLeaderIdService.addJob(jobId);
            } catch (Exception e) {
                ResourceManagerException exception = new ResourceManagerException("Could not add the job " +
                    jobId + " to the job id leader service.", e);

                    onFatalError(exception);

                log.error("Could not add job {} to job leader id service.", jobId, e);
                return FutureUtils.completedExceptionally(exception);
            }
        }

        log.info("Registering job manager {}@{} for job {}.", jobMasterId, jobManagerAddress, jobId);

        CompletableFuture<JobMasterId> jobMasterIdFuture;

        try {
            jobMasterIdFuture = jobLeaderIdService.getLeaderId(jobId);
        } catch (Exception e) {
            // we cannot check the job leader id so let's fail
            // TODO: Maybe it's also ok to skip this check in case that we cannot check the leader id
            ResourceManagerException exception = new ResourceManagerException("Cannot obtain the " +
                "job leader id future to verify the correct job leader.", e);

                onFatalError(exception);

            log.debug("Could not obtain the job leader id future to verify the correct job leader.");
            return FutureUtils.completedExceptionally(exception);
        }

        CompletableFuture<JobMasterGateway> jobMasterGatewayFuture = getRpcService().connect(jobManagerAddress, jobMasterId, JobMasterGateway.class);

        CompletableFuture<RegistrationResponse> registrationResponseFuture = jobMasterGatewayFuture.thenCombineAsync(
            jobMasterIdFuture,
            (JobMasterGateway jobMasterGateway, JobMasterId leadingJobMasterId) -> {
                if (Objects.equals(leadingJobMasterId, jobMasterId)) {
                    return registerJobMasterInternal(
                        jobMasterGateway,
                        jobId,
                        jobManagerAddress,
                        jobManagerResourceId);
                } else {
                    final String declineMessage = String.format(
                        "The leading JobMaster id %s did not match the received JobMaster id %s. " +
                        "This indicates that a JobMaster leader change has happened.",
                        leadingJobMasterId,
                        jobMasterId);
                    log.debug(declineMessage);
                    return new RegistrationResponse.Decline(declineMessage);
                }
            },
            getMainThreadExecutor());

        // handle exceptions which might have occurred in one of the futures inputs of combine
        return registrationResponseFuture.handleAsync(
            (RegistrationResponse registrationResponse, Throwable throwable) -> {
                if (throwable != null) {
                    if (log.isDebugEnabled()) {
                        log.debug("Registration of job manager {}@{} failed.", jobMasterId, jobManagerAddress, throwable);
                    } else {
                        log.info("Registration of job manager {}@{} failed.", jobMasterId, jobManagerAddress);
                    }

                    return new RegistrationResponse.Decline(throwable.getMessage());
                } else {
                    return registrationResponse;
                }
            },
            getRpcService().getExecutor());
    }

2.1.5 resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());

2.2 开始分析 resetAndStartScheduler() 方法

该方法主要处理作业调度相关的工作,包括申请slot以及对Execution进行deploy。首先看下该方法源码

    private void resetAndStartScheduler() throws Exception {
        validateRunsInMainThread();

        final CompletableFuture<Void> schedulerAssignedFuture;

        if (schedulerNG.requestJobStatus() == JobStatus.CREATED) {
            schedulerAssignedFuture = CompletableFuture.completedFuture(null);
            schedulerNG.setMainThreadExecutor(getMainThreadExecutor());
        } else {
            suspendAndClearSchedulerFields(new FlinkException("ExecutionGraph is being reset in order to be rescheduled."));
            final JobManagerJobMetricGroup newJobManagerJobMetricGroup = jobMetricGroupFactory.create(jobGraph);
            final SchedulerNG newScheduler = createScheduler(newJobManagerJobMetricGroup);

            schedulerAssignedFuture = schedulerNG.getTerminationFuture().handle(
                (ignored, throwable) -> {
                    newScheduler.setMainThreadExecutor(getMainThreadExecutor());
                    assignScheduler(newScheduler, newJobManagerJobMetricGroup);
                    return null;
                }
            );
        }

        schedulerAssignedFuture.thenRun(this::startScheduling);
    }

ExecutionGraph在构建的时候,通过ExecutionGraph成员变量列表可以看到,JobStatus默认是CREATED状态。因此resetAndStartScheduler方法首先走了if逻辑,然后是调用startScheduling,接下来看startScheduling方法逻辑。

    @Override
    public void startScheduling() {
        mainThreadExecutor.assertRunningInMainThread();

        try {
            executionGraph.scheduleForExecution();
        }
        catch (Throwable t) {
            executionGraph.failGlobal(t);
        }
    }

会调用executionGraph.scheduleForExecution() --> SchedulingUtils.scheduleEager
重点看下SchedulingUtils.scheduleEager,这个方法主要做了两件事情

2.2.1 为每个 ExecutionVertex 申请slot
首先上源码,该源码在SchedulingUtils#scheduleEager方法中

        // collecting all the slots may resize and fail in that operation without slots getting lost
        final ArrayList<CompletableFuture<Execution>> allAllocationFutures = new ArrayList<>();

        final SlotProviderStrategy slotProviderStrategy = executionGraph.getSlotProviderStrategy();
        final Set<AllocationID> allPreviousAllocationIds = Collections.unmodifiableSet(
            computePriorAllocationIdsIfRequiredByScheduling(vertices, slotProviderStrategy.asSlotProvider()));

        // allocate the slots (obtain all their futures)
        for (ExecutionVertex ev : vertices) {
            // these calls are not blocking, they only return futures
            CompletableFuture<Execution> allocationFuture = ev.getCurrentExecutionAttempt().allocateResourcesForExecution(
                slotProviderStrategy,
                LocationPreferenceConstraint.ALL,
                allPreviousAllocationIds);

            allAllocationFutures.add(allocationFuture);
        }

底层真正申请slot的源码在SchedulerImpl#allocateSingleSlot方法中

    private CompletableFuture<LogicalSlot> allocateSingleSlot(
            SlotRequestId slotRequestId,
            SlotProfile slotProfile,
            boolean allowQueuedScheduling,
            @Nullable Time allocationTimeout) {

        Optional<SlotAndLocality> slotAndLocality = tryAllocateFromAvailable(slotRequestId, slotProfile);

        if (slotAndLocality.isPresent()) {
            // already successful from available
            try {
                return CompletableFuture.completedFuture(
                    completeAllocationByAssigningPayload(slotRequestId, slotAndLocality.get()));
            } catch (FlinkException e) {
                return FutureUtils.completedExceptionally(e);
            }
        } else if (allowQueuedScheduling) {
            // we allocate by requesting a new slot
            return requestNewAllocatedSlot(slotRequestId, slotProfile, allocationTimeout)
                .thenApply((PhysicalSlot allocatedSlot) -> {
                    try {
                        return completeAllocationByAssigningPayload(slotRequestId, new SlotAndLocality(allocatedSlot, Locality.UNKNOWN));
                    } catch (FlinkException e) {
                        throw new CompletionException(e);
                    }
                });
        } else {
            // failed to allocate
            return FutureUtils.completedExceptionally(
                new NoResourceAvailableException("Could not allocate a simple slot for " + slotRequestId + '.'));
        }
    }

slot申请流程总结如下:

  1. SlotPoolImpl会保留一个availableSlots map,首先会先去查找availableSlots是否可以满足slot申请条件
  2. 如果availableSlots没有可用slot,那会向RM申请资源
      - flink1.9之后都是按需申请资源,如果作业执行需要的slot没有得到满足,YarnResourceManager 会向 Yarn 集群的 ResourceManager 申请新的 container,并启动 TaskManager

2.2.2 deploy 所有的 Execution
当所有的ExecutionVertex节点申请到slot之后,就开始进行部署,首先看下源码,该源码在SchedulingUtils#scheduleEager方法中

        // this future is complete once all slot futures are complete.
        // the future fails once one slot future fails.
        final ConjunctFuture<Collection<Execution>> allAllocationsFuture = FutureUtils.combineAll(allAllocationFutures);

        return allAllocationsFuture.thenAccept(
            (Collection<Execution> executionsToDeploy) -> {
                for (Execution execution : executionsToDeploy) {
                    try {
                        execution.deploy();
                    } catch (Throwable t) {
                        throw new CompletionException(
                            new FlinkException(
                                String.format("Could not deploy execution %s.", execution),
                                t));
                    }
                }
            })

deploy方法主要做了下面几件事情

至此,作业已经运行起来了

小结

JobMaster启动作业,主要分成两个步骤

  1. 启动作业运行依赖的服务,比如TM/RM心跳监控,slotPool,scheduler等
  2. 为所有的ExecutionVertex分配slot,并且deploy
  3. JM,TM 申请流程,入口是JobMaster#start方法
       3.1 首先向RM申请JobMaster,并向RM注册
       3.2 然后执行JobMaster#resetAndStartScheduler方法的时候,会先去为每个 ExecutionVertex 申请slot。如果availableSlots可以满足需求,就使用availableSlots;如果availableSlots不能满足需求,YarnResourceManager 会向 Yarn 集群的 ResourceManager 申请新的 container,并启动 TaskManager。
上一篇 下一篇

猜你喜欢

热点阅读