Flink源码解析

当 snapshot 失败时发生了什么

2020-10-10  本文已影响0人  shengjk1

工作中遇到了与 snapshot 异常相关的问题,特此总结一下,与 snapshot 相关的流程图如下:


在这里插入图片描述

当调用 AbstractUdfStreamOperator.snapshotState 方法时,实际上调用了

public static void snapshotFunctionState(
            StateSnapshotContext context,
            OperatorStateBackend backend,
            Function userFunction) throws Exception {

        Preconditions.checkNotNull(context);
        Preconditions.checkNotNull(backend);

        while (true) {

            if (trySnapshotFunctionState(context, backend, userFunction)) {
                break;
            }

            // inspect if the user function is wrapped, then unwrap and try again if we can snapshot the inner function
            if (userFunction instanceof WrappingFunction) {
                userFunction = ((WrappingFunction<?>) userFunction).getWrappedFunction();
            } else {
                break;
            }
        }
    }

    private static boolean trySnapshotFunctionState(
            StateSnapshotContext context,
            OperatorStateBackend backend,
            Function userFunction) throws Exception {

        // 调用 checkpoint function 的 snapshotState 方法
        if (userFunction instanceof CheckpointedFunction) {
            ((CheckpointedFunction) userFunction).snapshotState(context);

            return true;
        }
......

当用户定义的 snapshotState 方法向外抛异常时,异常会一直上抛至 Task.triggerCheckpointBarrier 方法

public void triggerCheckpointBarrier(
        final long checkpointID,
        long checkpointTimestamp,
        final CheckpointOptions checkpointOptions) {
        
        //实际上就是 StreamTask  Task类实际上是将 checkpoint 委托给了具体的类去执行,而 StreamTask 也将委托给更具体的类,直到业务代码
        // source ->flatMap
        // invokable 实际上是 operator chain
        final AbstractInvokable invokable = this.invokable;
        final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointID, checkpointTimestamp);
        
        if (executionState == ExecutionState.RUNNING && invokable != null) {
            
            // build a local closure
            final String taskName = taskNameWithSubtask;
            final SafetyNetCloseableRegistry safetyNetCloseableRegistry =
                FileSystemSafetyNet.getSafetyNetCloseableRegistryForThread();
            
            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    // set safety net from the task's context for checkpointing thread
                    LOG.debug("Creating FileSystem stream leak safety net for {}", Thread.currentThread().getName());
                    FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(safetyNetCloseableRegistry);
                    
                    try {
                        // invokable 事实上就是 StreamTask Task 类实际上是将 checkpoint 委托给了更具体的类去执行,而 StreamTask 也将委托给更具体的类,直到业务代码
                        // only 做 checkpoint 的异常
                        // 当 checkpoint 发生异常时,ExecutionState 会转化为 FAILED 会导致重启
                        boolean success = invokable.triggerCheckpoint(checkpointMetaData, checkpointOptions);
                        if (!success) {
                            checkpointResponder.declineCheckpoint(
                                getJobID(), getExecutionId(), checkpointID,
                                new CheckpointDeclineTaskNotReadyException(taskName));
                        }
                    } catch (Throwable t) {
                        if (getExecutionState() == ExecutionState.RUNNING) {
                            failExternally(new Exception(
                                "Error while triggering checkpoint " + checkpointID + " for " +
                                    taskNameWithSubtask, t));
                        } else {
                            LOG.debug("Encountered error while triggering checkpoint {} for " +
                                    "{} ({}) while being not in state running.", checkpointID,
                                taskNameWithSubtask, executionId, t);
                        }
                    } finally {
                        FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(null);
                    }
                }
            };
            executeAsyncCallRunnable(runnable, String.format("Checkpoint Trigger for %s (%s).", taskNameWithSubtask, executionId));
        } else {
            LOG.debug("Declining checkpoint request for non-running task {} ({}).", taskNameWithSubtask, executionId);
            
            // send back a message that we did not do the checkpoint
            checkpointResponder.declineCheckpoint(jobId, executionId, checkpointID,
                new CheckpointDeclineTaskNotReadyException(taskNameWithSubtask));
        }
    }

其中关键性的方法实际上是

if (getExecutionState() == ExecutionState.RUNNING) {
                            failExternally(new Exception(
                                "Error while triggering checkpoint " + checkpointID + " for " +
                                    taskNameWithSubtask, t));
                        } else {
                            LOG.debug("Encountered error while triggering checkpoint {} for " +
                                    "{} ({}) while being not in state running.", checkpointID,
                                taskNameWithSubtask, executionId, t);
                        }

而此方法调用了

cancelOrFailAndCancelInvokable(ExecutionState.FAILED, cause);

查看细节

private void cancelOrFailAndCancelInvokable(ExecutionState targetState, Throwable cause) {
        while (true) {
            ExecutionState current = executionState;
            
            // if the task is already canceled (or canceling) or finished or failed,
            // then we need not do anything
            if (current.isTerminal() || current == ExecutionState.CANCELING) {
                LOG.info("Task {} is already in state {}", taskNameWithSubtask, current);
                return;
            }
            
            if (current == ExecutionState.DEPLOYING || current == ExecutionState.CREATED) {
                if (transitionState(current, targetState, cause)) {
                    // if we manage this state transition, then the invokable gets never called
                    // we need not call cancel on it
                    this.failureCause = cause;
                    return;
                }
            } else if (current == ExecutionState.RUNNING) {
                if (transitionState(ExecutionState.RUNNING, targetState, cause)) {
                    // we are canceling / failing out of the running state
                    // we need to cancel the invokable
                    
                    // copy reference to guard against concurrent null-ing out the reference
                    final AbstractInvokable invokable = this.invokable;
                    
                    if (invokable != null && invokableHasBeenCanceled.compareAndSet(false, true)) {
                        this.failureCause = cause;
                        
                        LOG.info("Triggering cancellation of task code {} ({}).", taskNameWithSubtask, executionId);
                        
                        // because the canceling may block on user code, we cancel from a separate thread
                        // we do not reuse the async call handler, because that one may be blocked, in which
                        // case the canceling could not continue
                        
                        // The canceller calls cancel and interrupts the executing thread once
                        Runnable canceler = new TaskCanceler(
                            LOG,
                            invokable,
                            executingThread,
                            taskNameWithSubtask,
                            producedPartitions,
                            inputGates);
                        
                        Thread cancelThread = new Thread(
                            executingThread.getThreadGroup(),
                            canceler,
                            String.format("Canceler for %s (%s).", taskNameWithSubtask, executionId));
                        cancelThread.setDaemon(true);
                        cancelThread.setUncaughtExceptionHandler(FatalExitExceptionHandler.INSTANCE);
                        cancelThread.start();
                        
                        // the periodic interrupting thread - a different thread than the canceller, in case
                        // the application code does blocking stuff in its cancellation paths.
                        if (invokable.shouldInterruptOnCancel()) {
                            Runnable interrupter = new TaskInterrupter(
                                LOG,
                                invokable,
                                executingThread,
                                taskNameWithSubtask,
                                taskCancellationInterval);
                            
                            Thread interruptingThread = new Thread(
                                executingThread.getThreadGroup(),
                                interrupter,
                                String.format("Canceler/Interrupts for %s (%s).", taskNameWithSubtask, executionId));
                            interruptingThread.setDaemon(true);
                            interruptingThread.setUncaughtExceptionHandler(FatalExitExceptionHandler.INSTANCE);
                            interruptingThread.start();
                        }
                        
                        // if a cancellation timeout is set, the watchdog thread kills the process
                        // if graceful cancellation does not succeed
                        if (taskCancellationTimeout > 0) {
                            Runnable cancelWatchdog = new TaskCancelerWatchDog(
                                executingThread,
                                taskManagerActions,
                                taskCancellationTimeout,
                                LOG);
                            
                            Thread watchDogThread = new Thread(
                                executingThread.getThreadGroup(),
                                cancelWatchdog,
                                String.format("Cancellation Watchdog for %s (%s).",
                                    taskNameWithSubtask, executionId));
                            watchDogThread.setDaemon(true);
                            watchDogThread.setUncaughtExceptionHandler(FatalExitExceptionHandler.INSTANCE);
                            watchDogThread.start();
                        }
                    }
                    return;
                }
            } else {
                throw new IllegalStateException(String.format("Unexpected state: %s of task %s (%s).",
                    current, taskNameWithSubtask, executionId));
            }
        }
    }

主要就是将 ExecutionState 转化为 FAILED,然后进行一系列的取消操作。由于 ExecutionState 转为 FAILED,会触发 flink 的重启机制,若无重启机制,则直接失败。

上一篇下一篇

猜你喜欢

热点阅读