Flink源码解析

一文搞懂 Flink Kafka Consumer 类两阶段提交

2020-08-07  本文已影响0人  shengjk1

一文搞懂 checkpoint 全过程,我们可以知道当 executeCheckpointing 的时候会执行 AsyncCheckpointRunnable

@Override
        public void run() {
            ......
                    reportCompletedSnapshotStates(
                        jobManagerTaskOperatorSubtaskStates,
                        localTaskOperatorSubtaskStates,
                        asyncDurationMillis);
......
        }

reportCompletedSnapshotStates 最终会通过 taskStateManager.reportTaskStateSnapshots 一层层的报告给 jobMaster 的 acknowledgeCheckpoint

// ack checkpoint 说是 JobManager 确认 checkpoint 实际上还是调用了 checkpointCoordinator.receiveAcknowledgeMessage(ackMessage);
    // 待所有的 ack task 执行完成,才会 notifyCHeckpointComplete  这也算是两阶段提交
    public void acknowledgeCheckpoint(
            final JobID jobID,
            final ExecutionAttemptID executionAttemptID,
            final long checkpointId,
            final CheckpointMetrics checkpointMetrics,
            final TaskStateSnapshot checkpointState) {

        final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();
        final AcknowledgeCheckpoint ackMessage = new AcknowledgeCheckpoint(
            jobID,
            executionAttemptID,
            checkpointId,
            checkpointMetrics,
            checkpointState);

        if (checkpointCoordinator != null) {
            getRpcService().execute(() -> {
                try {
                    checkpointCoordinator.receiveAcknowledgeMessage(ackMessage);
                } catch (Throwable t) {
                    log.warn("Error while processing checkpoint acknowledgement message", t);
                }
            });
        ......
    }

最终调用的是 checkpointCoordinator.receiveAcknowledgeMessage 方法

public boolean receiveAcknowledgeMessage(AcknowledgeCheckpoint message) throws CheckpointException {
        ......
        synchronized (lock) {
            // we need to check inside the lock for being shutdown as well, otherwise we
            // get races and invalid error log messages
            if (shutdown) {
                return false;
            }

            final PendingCheckpoint checkpoint = pendingCheckpoints.get(checkpointId);

            if (checkpoint != null && !checkpoint.isDiscarded()) {

                // 待所有的 ack task 执行完成,才会 notifyCheckpointComplete  这也算是两阶段提交
                // flink task 是横向的,即每个 operator chain 的所有 subTask 都 acknowCheckpoint, 这个 operator chain 才会进行 notifyCheckpointComplete
                switch (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getSubtaskState(), message.getCheckpointMetrics())) {
                    case SUCCESS:
                        LOG.debug("Received acknowledge message for checkpoint {} from task {} of job {}.",
                            checkpointId, message.getTaskExecutionId(), message.getJob());

                        // 待所有的 tasks 都 ack 时,completePendingCheckpoint
                        if (checkpoint.isFullyAcknowledged()) {
                            completePendingCheckpoint(checkpoint);
                        }
                        break;
                    case DUPLICATE:
                        LOG.debug("Received a duplicate acknowledge message for checkpoint {}, task {}, job {}.",
                            message.getCheckpointId(), message.getTaskExecutionId(), message.getJob());
                        break;
                    case UNKNOWN:
                        LOG.warn("Could not acknowledge the checkpoint {} for task {} of job {}, " +
                                "because the task's execution attempt id was unknown. Discarding " +
                                "the state handle to avoid lingering state.", message.getCheckpointId(),
                            message.getTaskExecutionId(), message.getJob());

                        discardSubtaskState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());

                        break;
                    case DISCARDED:
                        LOG.warn("Could not acknowledge the checkpoint {} for task {} of job {}, " +
                            "because the pending checkpoint had been discarded. Discarding the " +
                                "state handle tp avoid lingering state.",
                            message.getCheckpointId(), message.getTaskExecutionId(), message.getJob());

                        discardSubtaskState(message.getJob(), message.getTaskExecutionId(), message.getCheckpointId(), message.getSubtaskState());
                }

                return true;
                ......

关键是 checkpoint.acknowledgeTask 以及 checkpoint.isFullyAcknowledged() 方法。当每次调用 checkpoint.acknowledgeTask 方法时

//将本次 ack 的 task 从 notYetAcknowledgedTasks 中移除
            final ExecutionVertex vertex = notYetAcknowledgedTasks.remove(executionAttemptId);

将该 executionAttemptId( 也就是 subTask id ) 从 notYetAcknowledgedTasks 中移除,而当 notYetAcknowledgedTasks 为 empty ,即全部确认,即 checkpoint.isFullyAcknowledged()==true, 进行 notifyCheckpointComplete。具体时间如何 notifyCheckpointComplete 的可以参考 一文搞懂 checkpoint 全过程

上一篇下一篇

猜你喜欢

热点阅读