Flink源码解析

一文搞定 Flink Task 提交执行全流程

2020-07-11  本文已影响0人  shengjk1

前言

上一篇 一文搞定 Flink Job 提交全流程,我们知道每一个 operator chain 作为一个整体,提交 task 。

正文

@Override
    // Execution 将 task submit 至此
    public CompletableFuture<Acknowledge> submitTask(
            TaskDeploymentDescriptor tdd,
            JobMasterId jobMasterId,
            Time timeout) {

        try {
            ......
            // Intermediate partition state checker to query the JobManager about the state
            // * of the producer of a result partition.
            PartitionProducerStateChecker partitionStateChecker = jobManagerConnection.getPartitionStateChecker();

            // local state restore
            final TaskLocalStateStore localStateStore = localStateStoresManager.localStateStoreForSubtask(
                jobId,
                tdd.getAllocationId(),
                taskInformation.getJobVertexId(),
                tdd.getSubtaskIndex());

            final JobManagerTaskRestore taskRestore = tdd.getTaskRestore();

            final TaskStateManager taskStateManager = new TaskStateManagerImpl(
                jobId,
                tdd.getExecutionAttemptId(),
                localStateStore,
                taskRestore,
                checkpointResponder);

            Task task = new Task(
                jobInformation,
                taskInformation,
                tdd.getExecutionAttemptId(),
                tdd.getAllocationId(),
                tdd.getSubtaskIndex(),
                tdd.getAttemptNumber(),
                tdd.getProducedPartitions(),
                tdd.getInputGates(),
                tdd.getTargetSlotNumber(),
                taskExecutorServices.getMemoryManager(),
                taskExecutorServices.getIOManager(),
                taskExecutorServices.getNetworkEnvironment(),
                taskExecutorServices.getBroadcastVariableManager(),
                taskStateManager,
                taskManagerActions,
                inputSplitProvider,
                checkpointResponder,
                aggregateManager,
                blobCacheService,
                libraryCache,
                fileCache,
                taskManagerConfiguration,
                taskMetricGroup,
                resultPartitionConsumableNotifier,
                partitionStateChecker,
                getRpcService().getExecutor());

            log.info("Received task {}.", task.getTaskInfo().getTaskNameWithSubtasks());

            boolean taskAdded;

            try {
                taskAdded = taskSlotTable.addTask(task);
            } catch (SlotNotFoundException | SlotNotActiveException e) {
                throw new TaskSubmissionException("Could not submit task.", e);
            }

            if (taskAdded) {
                //启动 task
                task.startTaskThread();

                return CompletableFuture.completedFuture(Acknowledge.get());
            } else {
                final String message = "TaskManager already contains a task for id " +
                    task.getExecutionId() + '.';

                log.debug(message);
                throw new TaskSubmissionException(message);
            }
        } catch (TaskSubmissionException e) {
            return FutureUtils.completedExceptionally(e);
        }
    }

这里创建了一个 Task 对象并启动,我们来看一下 Task 启动的时候都做了什么

/**
     * The core work method that bootstraps the task and executes its code.
     */
    // 对应的是 subtask (一个 vertex ( operator chain ) 一个 subtask ) 执行
    @Override
    public void run() {
        
        // ----------------------------
        //  Initial State transition
        // ----------------------------
        while (true) {
            ExecutionState current = this.executionState;
            if (current == ExecutionState.CREATED) {
                if (transitionState(ExecutionState.CREATED, ExecutionState.DEPLOYING)) {
                    // success, we can start our work
                    break;
                }
            } else if (current == ExecutionState.FAILED) {
                // we were immediately failed. tell the TaskManager that we reached our final state
                notifyFinalState();
                if (metrics != null) {
                    metrics.close();
                }
                return;
            }
            else if (current == ExecutionState.CANCELING) {
                if (transitionState(ExecutionState.CANCELING, ExecutionState.CANCELED)) {
                    // we were immediately canceled. tell the TaskManager that we reached our final state
                    notifyFinalState();
                    if (metrics != null) {
                        metrics.close();
                    }
                    return;
                }
            } else {
                if (metrics != null) {
                    metrics.close();
                }
                throw new IllegalStateException("Invalid state for beginning of operation of task " + this + '.');
            }
        }

        /*
        接下来,就是导入用户类加载器并加载用户代码。
然后,是向网络管理器注册当前任务(flink的各个算子在运行时进行数据交换需要依赖网络管理器),分配一些缓存以保存数据
然后,读入指定的缓存文件。
然后,再把task创建时传入的那一大堆变量用于创建一个执行环境Envrionment。
再然后,对于那些并不是第一次执行的task(比如失败后重启的)要恢复其状态。
         */
        // all resource acquisitions and registrations from here on
        // need to be undone in the end
        Map<String, Future<Path>> distributedCacheEntries = new HashMap<>();
        AbstractInvokable invokable = null;
        
        try {
            // ----------------------------
            //  Task Bootstrap - We periodically
            //  check for canceling as a shortcut
            // ----------------------------
            
            // activate safety net for task thread
            LOG.info("Creating FileSystem stream leak safety net for task {}", this);
            FileSystemSafetyNet.initializeSafetyNetForThread();
            
            blobService.getPermanentBlobService().registerJob(jobId);
            
            // first of all, get a user-code classloader
            // this may involve downloading the job's JAR files and/or classes
            LOG.info("Loading JAR files for task {}.", this);
            
            userCodeClassLoader = createUserCodeClassloader();
            final ExecutionConfig executionConfig = serializedExecutionConfig.deserializeValue(userCodeClassLoader);
            
            if (executionConfig.getTaskCancellationInterval() >= 0) {
                // override task cancellation interval from Flink config if set in ExecutionConfig
                taskCancellationInterval = executionConfig.getTaskCancellationInterval();
            }
            
            if (executionConfig.getTaskCancellationTimeout() >= 0) {
                // override task cancellation timeout from Flink config if set in ExecutionConfig
                taskCancellationTimeout = executionConfig.getTaskCancellationTimeout();
            }
            
            if (isCanceledOrFailed()) {
                throw new CancelTaskException();
            }
            
            // ----------------------------------------------------------------
            // register the task with the network stack
            // this operation may fail if the system does not have enough
            // memory to run the necessary data exchanges
            // the registration must also strictly be undone
            // ----------------------------------------------------------------
            
            LOG.info("Registering task at network: {}.", this);
            
            // registerTask的时候会为每一个 Task 的每个 ResultPartition 申请一个 BufferPool
            // 为每一个 Task 的每个 InputGate 申请一个 BufferPool,
            network.registerTask(this);
            
            // add metrics for buffers
            this.metrics.getIOMetricGroup().initializeBufferMetrics(this);
            
            // register detailed network metrics, if configured
            if (taskManagerConfig.getConfiguration().getBoolean(TaskManagerOptions.NETWORK_DETAILED_METRICS)) {
                // similar to MetricUtils.instantiateNetworkMetrics() but inside this IOMetricGroup
                MetricGroup networkGroup = this.metrics.getIOMetricGroup().addGroup("Network");
                MetricGroup outputGroup = networkGroup.addGroup("Output");
                MetricGroup inputGroup = networkGroup.addGroup("Input");
                
                // output metrics
                for (int i = 0; i < producedPartitions.length; i++) {
                    ResultPartitionMetrics.registerQueueLengthMetrics(
                        outputGroup.addGroup(i), producedPartitions[i]);
                }
                
                for (int i = 0; i < inputGates.length; i++) {
                    InputGateMetrics.registerQueueLengthMetrics(
                        inputGroup.addGroup(i), inputGates[i]);
                }
            }
            
            // next, kick off the background copying of files for the distributed cache
            // 将配置文件 jar 等拷贝到分布式缓存
            try {
                for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry :
                    DistributedCache.readFileInfoFromConfig(jobConfiguration)) {
                    LOG.info("Obtaining local cache file for '{}'.", entry.getKey());
                    Future<Path> cp = fileCache.createTmpFile(entry.getKey(), entry.getValue(), jobId, executionId);
                    distributedCacheEntries.put(entry.getKey(), cp);
                }
            } catch (Exception e) {
                throw new Exception(
                    String.format("Exception while adding files to distributed cache of task %s (%s).", taskNameWithSubtask, executionId), e);
            }
            
            if (isCanceledOrFailed()) {
                throw new CancelTaskException();
            }
            
            // ----------------------------------------------------------------
            //  call the user code initialization methods
            // ----------------------------------------------------------------
            
            TaskKvStateRegistry kvStateRegistry = network.createKvStateTaskRegistry(jobId, getJobVertexId());
            
            Environment env = new RuntimeEnvironment(
                jobId,
                vertexId,
                executionId,
                executionConfig,
                taskInfo,
                jobConfiguration,
                taskConfiguration,
                userCodeClassLoader,
                memoryManager,
                ioManager,
                broadcastVariableManager,
                taskStateManager,
                aggregateManager,
                accumulatorRegistry,
                kvStateRegistry,
                inputSplitProvider,
                distributedCacheEntries,
                producedPartitions,
                inputGates,
                network.getTaskEventDispatcher(),
                checkpointResponder,
                taskManagerConfig,
                metrics,
                this);
            
            /*
             invokable是在解析JobGraph的时候生成相关信息的,并在此处形成真正的可执行对象
             */
            // now load and instantiate the task's invokable code
            // nameOfInvokableClass ---> jobVertex.getInvokableClassName()
            //  通过反射生成对象
            invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env);
            
            // ----------------------------------------------------------------
            //  actual task core work
            // ----------------------------------------------------------------
            
            // we must make strictly sure that the invokable is accessible to the cancel() call
            // by the time we switched to running.
            this.invokable = invokable;
            
            // switch to the RUNNING state, if that fails, we have been canceled/failed in the meantime
            if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING)) {
                throw new CancelTaskException();
            }
            
            // notify everyone that we switched to running
            taskManagerActions.updateTaskExecutionState(new TaskExecutionState(jobId, executionId, ExecutionState.RUNNING));
            
            // make sure the user code classloader is accessible thread-locally
            executingThread.setContextClassLoader(userCodeClassLoader);
            
            /*
            这个方法就是用户代码所真正被执行的入口。比如我们写的什么 new MapFunction() 的逻辑,最终就是在这里被执行的
             */
            // run the invokable
            invokable.invoke();
            
            ......
    }

首先向 blobService、netWork 注册 job ,添加监控,将jar 等添加到分布式缓存中,然后就 invoke,这也是 task 真正开始执行的地方,我们以 StreamTask 为例

// task run 时会调用此方法 subtask 执行 用户代码的入口
    @Override
    public final void invoke() throws Exception {

        boolean disposed = false;
        try {
            // -------- Initialize ---------
            LOG.debug("Initializing {}.", getName());

            asyncOperationsThreadPool = Executors.newCachedThreadPool(new ExecutorThreadFactory("AsyncOperations", FatalExitExceptionHandler.INSTANCE));

            CheckpointExceptionHandlerFactory cpExceptionHandlerFactory = createCheckpointExceptionHandlerFactory();

            synchronousCheckpointExceptionHandler = cpExceptionHandlerFactory.createCheckpointExceptionHandler(
                getExecutionConfig().isFailTaskOnCheckpointError(),
                getEnvironment());

            asynchronousCheckpointExceptionHandler = new AsyncCheckpointExceptionHandler(this);

            //获取 stateBackend 和 checkpointStorage
            //application-defined > config > default MemoryBackend
            stateBackend = createStateBackend();
            checkpointStorage = stateBackend.createCheckpointStorage(getEnvironment().getJobID());

            // if the clock is not already set, then assign a default TimeServiceProvider
            if (timerService == null) {
                ThreadFactory timerThreadFactory = new DispatcherThreadFactory(TRIGGER_THREAD_GROUP,
                    "Time Trigger for " + getName(), getUserCodeClassLoader());

                timerService = new SystemProcessingTimeService(this, getCheckpointLock(), timerThreadFactory);
            }

            // 构建 task operator chain
            operatorChain = new OperatorChain<>(this, recordWriters);
            headOperator = operatorChain.getHeadOperator();

            // task specific initialization
            //初始化,比如创建 StreamInputProcessor ,当 run 的时候就可以直接执行了
            init();

            // save the work of reloading state, etc, if the task is already canceled
            if (canceled) {
                throw new CancelTaskException();
            }

            // -------- Invoke --------
            LOG.debug("Invoking {}", getName());

            // we need to make sure that any triggers scheduled in open() cannot be
            // executed before all operators are opened
            synchronized (lock) {

                // both the following operations are protected by the lock
                // so that we avoid race conditions in the case that initializeState()
                // registers a timer, that fires before the open() is called.
                //如果是有checkpoint的,那就从state信息里恢复,不然就作为全新的算子处理
                initializeState();
                //对富操作符,执行其open操作
                openAllOperators();
            }

            // final check to exit early before starting to run
            if (canceled) {
                throw new CancelTaskException();
            }

            // let the task do its work
            isRunning = true;
            // 非 source task 最终调用的是 while (running && inputProcessor.processInput()) {
            // source task 调用的是 headOperator.run(getCheckpointLock(), getStreamStatusMaintainer());
            //然后跟前面 barrier 处理、msg 全流程、kafka msg 就可以串起来了
            run();

            // if this left the run() method cleanly despite the fact that this was canceled,
            // make sure the "clean shutdown" is not attempted
            if (canceled) {
                throw new CancelTaskException();
            }

            LOG.debug("Finished task {}", getName());

            // make sure no further checkpoint and notification actions happen.
            // we make sure that no other thread is currently in the locked scope before
            // we close the operators by trying to acquire the checkpoint scope lock
            // we also need to make sure that no triggers fire concurrently with the close logic
            // at the same time, this makes sure that during any "regular" exit where still
            synchronized (lock) {
                // this is part of the main logic, so if this fails, the task is considered failed
                closeAllOperators();

                // make sure no new timers can come
                timerService.quiesce();

                // only set the StreamTask to not running after all operators have been closed!
                // See FLINK-7430
                isRunning = false;
            }

            // make sure all timers finish
            timerService.awaitPendingAfterQuiesce();

            LOG.debug("Closed operators for task {}", getName());

            // make sure all buffered data is flushed
            operatorChain.flushOutputs();

            // make an attempt to dispose the operators such that failures in the dispose call
            // still let the computation fail
            tryDisposeAllOperators();
            disposed = true;
        }
        finally {
            //当 cancel job的时候会进入此处 关闭资源 如Buffer  function.close(),这也就是为什么要 cancel job 而不是 kill job
            
            // clean up everything we initialized
            isRunning = false;

            // Now that we are outside the user code, we do not want to be interrupted further
            // upon cancellation. The shutdown logic below needs to make sure it does not issue calls
            // that block and stall shutdown.
            // Additionally, the cancellation watch dog will issue a hard-cancel (kill the TaskManager
            // process) as a backup in case some shutdown procedure blocks outside our control.
            setShouldInterruptOnCancel(false);

            // clear any previously issued interrupt for a more graceful shutdown
            Thread.interrupted();

            // stop all timers and threads
            tryShutdownTimerService();

            // stop all asynchronous checkpoint threads
            try {
                cancelables.close();
                shutdownAsyncThreads();
            }
            catch (Throwable t) {
                // catch and log the exception to not replace the original exception
                LOG.error("Could not shut down async checkpoint threads", t);
            }

            // we must! perform this cleanup
            try {
                cleanup();
            }
            catch (Throwable t) {
                // catch and log the exception to not replace the original exception
                LOG.error("Error during cleanup of stream task", t);
            }

            // if the operators were not disposed before, do a hard dispose
            if (!disposed) {
                disposeAllOperators();
            }

            // release the output resources. this method should never fail.
            if (operatorChain != null) {
                // beware: without synchronization, #performCheckpoint() may run in
                //         parallel and this call is not thread-safe
                synchronized (lock) {
                    operatorChain.releaseOutputs();
                }
            }
        }
    }

init 然后对于一些 Rich Function 会先执行其 open方法,然后开始 run,就开始真正的消费数据了。我们以 flatMap 为例
当执行 run 方法时,首先呢 OneInputStreamTask.run


    @Override
    protected void run() throws Exception {
        // cache processor reference on the stack, to make the code more JIT friendly
        final StreamInputProcessor<IN> inputProcessor = this.inputProcessor;
        //处理输入的消息
        while (running && inputProcessor.processInput()) {
            // all the work happens in the "processInput" method
        }
    }

这一块的逻辑可具体参考
一文搞定 Flink 消费消息的全流程一文搞定 Flink Checkpoint Barrier 全流程以及 一文搞懂 Flink 处理 Barrier 全过程
我们知道当往下游发送数据的时候

// 这里就是真正的,用户的代码即将被执行的地方
                        // now we can do the actual processing
                        StreamRecord<IN> record = recordOrMark.asRecord();
                        synchronized (lock) {
                            numRecordsIn.inc();
                            //throught KeySelector set KeyContext setCurrentKey
                            streamOperator.setKeyContextElement1(record);
                            streamOperator.processElement(record);
                        }

继续追踪下去到 StreamFlatMap.processElement

@Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        //有时间戳设置时间戳,没有则设置为 Integer.Min_VALUE
        collector.setTimestamp(element);
        //自己写代码
        userFunction.flatMap(element.getValue(), collector);
    }

其他的类似,如果是 kafka source task 调用的是 headOperator.run(getCheckpointLock(), getStreamStatusMaintainer()),然后去消费 kafka 中的数据。然后跟前面 一文搞定 Flink Job 提交全流程写给大忙人看的Flink 消费 Kafka一文搞定 Flink 消费消息的全流程以及一文搞定 Flink Checkpoint Barrier 全流程就可以串起来了。而 Flink 整体流程的分析,除了 restore 之外,也差不多可以告一段落了。

上一篇 下一篇

猜你喜欢

热点阅读