玩转大数据Flink

Flink 源码之分布式快照

2019-12-16  本文已影响0人  AlienPaul

Flink源码分析系列文档目录

请点击:Flink 源码分析系列文档目录

分布式快照

在处理算系统中,数据源源不断从数据源流入系统,向下游传播。在同一时刻,系统内各个operator不可能在处理同一条数据。上游和下游operator正在处理的数据有时间差。因此无法对整个系统同时进行快照操作。只有当上游operator生成快照结束之后,下游operator再生成快照才有意义。
为了满足这个要求,协调上下游各个operator生成快照的时间。Flink引入了快照屏障(checkpoint barrier)。下面讨论下Flink如何根据checkpoint barrier协调快照生成时间。

CheckpointBarrier

下图是一个Flink处理数据流的数据流向图。Flink通过Kafka接收到的数据依次通过Data Source -> Window -> Data Sink,最终写入到Kafka输出。为了协调快照生成的时机,Flink引入了CheckpointBarrier。在数据流中,CheckpointBarrier和普通的数据不同,它是Flink内部事件(event)的一种。JobManager通过配置(env.enableCheckpointing(间隔时间毫秒)),定期通知data source在数据流中加入CheckpointBarrier。这样,CheckpointBarrier会随着数据流流经下游的各个节点。

发送Checkpoint barrier

当一个节点接收到CheckpointBarrier之后,会立即进行快照操作(先不考虑下文将要说明的barrier对齐的情况)。完毕之后会向所有下游operator广播这个barrier。下游每个节点以此类推。
各个节点的checkpoint过程成功会发送确认消息到JobManager的CheckpointCoordinator。如果JobManager收到了所有operator的确认消息,会向所有operator通知快照操作已完成。Operator接收到此通知可以做一些额外逻辑,例如两阶段提交操作,接收到快照已完成通知之时会向kafka broker提交事务(详细过程请查阅 Flink 源码之两阶段提交)。

通知快照操作完成

CheckpointBarrier类有如下三个成员变量:

什么时候Barrier向下游传播

OperatorChain中的broadcastCheckpointBarrier方法将自己接收到的CheckpointBarrier向下游传播。该方法在performCheckpoint(创建快照)方法中调用。
该方法创建一个checkpoint barrier并广播barrier到所有下游。如下所示:

public void broadcastCheckpointBarrier(long id, long timestamp, CheckpointOptions checkpointOptions) throws IOException {
    CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp, checkpointOptions);
    for (RecordWriterOutput<?> streamOutput : streamOutputs) {
        streamOutput.broadcastEvent(barrier);
    }
}

InputProcessorUtil

该类中有一个重要的方法:createCheckpointBarrierHandler。作用为根据不同的数据一致性语义,创建对应的checkpoint barrier处理器(handler)。该方法在创建Task的时候调用,调用,数据源源不断从数据源流入系统,向下游传播。在同一时刻,系统内各个operator不可能在处理同一条数据。上游和下游operator正在处理的数据有时间差。因此无法对整个系统同时进行快照操作。只有当上游operator生成快照结束之后,下游operator再生成快照才有意义。
为了满足这个要求,协调上下游各个operator生成快照的时间。Flink引入了快照屏障(checkpoint barrier)。下面讨论下Flink如何根据checkpoint barrier协调快照生成时间。

CheckpointBarrier

下图是一个Flink处理数据流的数据流向图。Flink通过Kafka接收到的数据依次通过Data Source -> Window -> Data Sink,最终写入到Kafka输出。为了协调快照生成的时机,Flink引入了CheckpointBarrier。在数据流中,CheckpointBarrier和普通的数据不同,它是Flink内部事件(event)的一种。JobManager通过配置(env.enableCheckpointing(间隔时间毫秒)),定期通知data source在数据流中加入CheckpointBarrier。这样,CheckpointBarrier会随着数据流流经下游的各个节点。

发送Checkpoint barrier

当一个节点接收到CheckpointBarrier之后,会立即进行快照操作(先不考虑下文将要说明的barrier对齐的情况)。完毕之后会向所有下游operator广播这个barrier。下游每个节点以此类推。
各个节点的checkpoint过程成功会发送确认消息到JobManager的CheckpointCoordinator。如果JobManager收到了所有operator的确认消息,会向所有operator通知快照操作已完成。Operator接收到此通知可以做一些额外逻辑,例如两阶段提交操作,接收到快照已完成通知之时会向kafka broker提交事务(详细过程请查阅 Flink 源码之两阶段提交)。

通知快照操作完成

CheckpointBarrier类有如下三个成员变量:

什么时候Barrier向下游传播

OperatorChain中的broadcastCheckpointBarrier方法将自己接收到的CheckpointBarrier向下游传播。该方法在performCheckpoint(创建快照)方法中调用。
该方法创建一个checkpoint barrier并广播barrier到所有下游。如下所示:

public void broadcastCheckpointBarrier(long id, long timestamp, CheckpointOptions checkpointOptions) throws IOException {
    CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp, checkpointOptions);
    for (RecordWriterOutput<?> streamOutput : streamOutputs) {
        streamOutput.broadcastEvent(barrier);
    }
}

InputProcessorUtil

该类中有一个重要的方法:createCheckpointBarrierHandler。作用为根据不同的数据一致性语义,创建对应的checkpoint barrier处理器(handler)。该方法在创建Task的时候调用,调用链如下:

OneInputStreamTask => init => createInputGate => createCheckpointedInputGate => createCheckpointBarrierHandler

createCheckpointBarrierHandler方法判断checkpoint的模式:如果为EXACTLY_ONCE使用CheckpointBarrierAlignerAT_LEAST_ONCE使用CheckpointBarrierTracker。这两个类均实现了CheckpointBarrierHandler接口,主要负责checkpoint barrier到来时候的处理逻辑。主要目的为处理下文中提到的barrier对齐问题。代码如下所示:

private static CheckpointBarrierHandler createCheckpointBarrierHandler(
        CheckpointingMode checkpointMode,
        int numberOfInputChannels,
        String taskName,
        AbstractInvokable toNotifyOnCheckpoint) {
    switch (checkpointMode) {
        case EXACTLY_ONCE:
            return new CheckpointBarrierAligner(
                numberOfInputChannels,
                taskName,
                toNotifyOnCheckpoint);
        case AT_LEAST_ONCE:
            return new CheckpointBarrierTracker(numberOfInputChannels, toNotifyOnCheckpoint);
        default:
            throw new UnsupportedOperationException("Unrecognized Checkpointing Mode: " + checkpointMode);
    }
}

Barrier对齐问题

只有operator具有多个数据源的时候才需要处理barrier对齐问题。
Barrier对齐的是保证operator只处理各个输入数据流中id相同的barrier之后的数据。此时进行checkpoint,可以保证operator中没有提前处理属于下一个checkpoint的数据。此时进行checkpoint,不会将下一个checkpoint阶段的数据处理状态记录到本次checkpoint中。因此barrier对齐操作可以保证数据的精准一次投送(exactly once)。


Barrier对齐示例图

如图所示。如果一个operator具有多个输入通道。系统无法保证多个输入端中id相同的barrier同时到来。当接收到一个数据输入通道的barrier之时,记录下该barrier的id。同时会暂停处理该通道的数据处理过程。该通道在暂停处理过程中到来数据会存入缓存。后续收到barrier的通道同样处理。直到所有的输入通道都收到这个id的barrier,这个时候barrier对齐操作完成,触发checkpoint操作。之后继续计算缓存中的数据,放行所有暂停的通道。
由上述分析可知barrier对齐虽然可以确保数据的精准一次投送,但是等待其他通道barrier到来的过程降低了数据的吞吐量。如果不需要精准一次投送,可以不使用barrier对齐,增加数据的吞吐量。在不对齐的场景中,若一个输入通道接收到barrier,记录下barrier id后其他通道不会暂停,数据仍然正常处理。直到各个通道都接收到这个id的barrier,触发operator的snapshot操作。可以想到的是,如果系统出现问题,从这个checkpoint恢复,本应属于下一个checkpoint保存的状态数据会被重新计算,数据会出现重复。只能够满足数据至少一次投送(at least once)。

CheckpointBarrierAligner

CheckpointBarrierAligner类实现了barrier对齐的逻辑。Exactly once模式会使用此handler。对齐的逻辑位于该类processBarrier方法中。如果之前被阻塞的input channel需要放行,方法返回true。下面是该方法的源代码:

@Override
public boolean processBarrier(CheckpointBarrier receivedBarrier, int channelIndex, long bufferedBytes) throws Exception {
    final long barrierId = receivedBarrier.getId();

    // fast path for single channel cases
    // 如果输入通仅有一个,无需barrier对齐操作,直接进行checkpoint操作
    if (totalNumberOfInputChannels == 1) {
        // 如果barrierId 大于currentCheckpointId,意味着需要进行新的checkpoint操作
        if (barrierId > currentCheckpointId) {
            // new checkpoint
            currentCheckpointId = barrierId;
            notifyCheckpoint(receivedBarrier, bufferedBytes, latestAlignmentDurationNanos);
        }
        return false;
    }

    boolean checkpointAborted = false;

    // -- general code path for multiple input channels --

    // numBarriersReceived 意思为当前接收到的barrier数量,和被阻塞的input
channel数量一致
    if (numBarriersReceived > 0) {
        // this is only true if some alignment is already progress and was not canceled

        // barrier id相等,说明同一次checkpoint的barrier从另一个数据流到来
        if (barrierId == currentCheckpointId) {
            // regular case
            // 将这个channel 阻塞住,numBarriersReceived 加一
            onBarrier(channelIndex);
        }
        else if (barrierId > currentCheckpointId) {
            // 新的barrier到来,但是id比当前正在处理的要新,说明当前处理的checkpoint尚未完成之时,又要开始处理新的checkpoint
            // 这种情况需要终止当前正在处理的checkpoint
            // we did not complete the current checkpoint, another started before
            LOG.warn("{}: Received checkpoint barrier for checkpoint {} before completing current checkpoint {}. " +
                    "Skipping current checkpoint.",
                taskName,
                barrierId,
                currentCheckpointId);

            // let the task know we are not completing this
            // 通知StreamTask,当前checkpoint操作被终止
            notifyAbort(currentCheckpointId,
                new CheckpointException(
                    "Barrier id: " + barrierId,
                    CheckpointFailureReason.CHECKPOINT_DECLINED_SUBSUMED));

            // abort the current checkpoint
            // 恢复被阻塞的channel,重置barrier计数
            // 重置对齐操作开始时间
            releaseBlocksAndResetBarriers();
            // 标记checkpoint已被终止
            checkpointAborted = true;

            // begin a the new checkpoint
            // 开始新的对齐流程
            beginNewAlignment(barrierId, channelIndex);
        }
        else {
            // ignore trailing barrier from an earlier checkpoint (obsolete now)
            // 忽略比当前正在处理的checkpoint更早的barrier
            return false;
        }
    }
    else if (barrierId > currentCheckpointId) {
        // first barrier of a new checkpoint
        // 如果当前没有已到达的barrier,并且到来的barrier id比当前的新,说明需要开始新的对齐流程
        beginNewAlignment(barrierId, channelIndex);
    }
    else {
        // either the current checkpoint was canceled (numBarriers == 0) or
        // this barrier is from an old subsumed checkpoint
        // 忽略更早的checkpoint barrier
        return false;
    }

    // check if we have all barriers - since canceled checkpoints always have zero barriers
    // this can only happen on a non canceled checkpoint
    // 如果已阻塞通道数+已关闭通道数=总的输入通道数,说明所有通道的checkpoint barrier已经到齐,对齐操作完成
    // 此时可以触发checkpoint操作,并且恢复被阻塞的channel,重置barrier计数
    if (numBarriersReceived + numClosedChannels == totalNumberOfInputChannels) {
        // actually trigger checkpoint
        if (LOG.isDebugEnabled()) {
            LOG.debug("{}: Received all barriers, triggering checkpoint {} at {}.",
                taskName,
                receivedBarrier.getId(),
                receivedBarrier.getTimestamp());
        }

        releaseBlocksAndResetBarriers();
        notifyCheckpoint(receivedBarrier, bufferedBytes, latestAlignmentDurationNanos);
        return true;
    }
    return checkpointAborted;
}

整个流程如图所示:


Barrier对齐流程

CheckpointBarrierTracker

CheckpointBarrierTracker类是at least once模式使用的checkpoint handler。
再这之前需要先介绍下CheckpointBarrierCount这个类。它记录了同一个checkpoint barrier到来的次数,还有该checkpoint是否被终止。
CheckpointBarrierTracker中还维护了pendingCheckpoints变量,用于存储已接收到barrier但是仍未触发通知进行checkpoint操作的所有checkpoint。

processBarrier方法如下所示。

@Override
public boolean processBarrier(CheckpointBarrier receivedBarrier, int channelIndex, long bufferedBytes) throws Exception {
    final long barrierId = receivedBarrier.getId();

    // fast path for single channel trackers
    // 如果输入channel只有一个,立即触发通知进行checkpoint
    if (totalNumberOfInputChannels == 1) {
        notifyCheckpoint(receivedBarrier, 0, 0);
        return false;
    }

    // general path for multiple input channels
    if (LOG.isDebugEnabled()) {
        LOG.debug("Received barrier for checkpoint {} from channel {}", barrierId, channelIndex);
    }

    // find the checkpoint barrier in the queue of pending barriers
    CheckpointBarrierCount barrierCount = null;
    int pos = 0;

    // 顺序遍历找到pendingCheckpoints中的barrierId为当前接收到的barrier id的CheckpointBarrierCount对象
    // 同时记录下它在pendingCheckpoints中的位置到pos
    for (CheckpointBarrierCount next : pendingCheckpoints) {
        if (next.checkpointId == barrierId) {
            barrierCount = next;
            break;
        }
        pos++;
    }

    // 如果找到了id相同的CheckpointBarrierCount
    if (barrierCount != null) {
        // add one to the count to that barrier and check for completion
        // 记录的barrier数量加1
        int numBarriersNew = barrierCount.incrementBarrierCount();
        // 如果barrier数量和输入channel数量相等,说明已接收到所有input channel的barrier,可以进行checkpoint操作
        if (numBarriersNew == totalNumberOfInputChannels) {
            // checkpoint can be triggered (or is aborted and all barriers have been seen)
            // first, remove this checkpoint and all all prior pending
            // checkpoints (which are now subsumed)
            // 移除此barrier之前的所有未完成的checkpoint
            for (int i = 0; i <= pos; i++) {
                pendingCheckpoints.pollFirst();
            }

            // notify the listener
            // 如果checkpoint没有终止,通知进行checkpoint操作
            if (!barrierCount.isAborted()) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Received all barriers for checkpoint {}", barrierId);
                }

                notifyCheckpoint(receivedBarrier, 0, 0);
            }
        }
    }
    else {
        // 在pendingCheckpoints中没有找到id相同的checkpoint,说明这次到来的barrier对应新的checkpoint
        // first barrier for that checkpoint ID
        // add it only if it is newer than the latest checkpoint.
        // if it is not newer than the latest checkpoint ID, then there cannot be a
        // successful checkpoint for that ID anyways
        // barrier ID是自增的,如果barrierId > latestPendingCheckpointID说明barrier比pending的所有checkpoint都要新
        // 反之,说明barrier来迟了,直接忽略
        if (barrierId > latestPendingCheckpointID) {
            // 更新latestPendingCheckpointID
            latestPendingCheckpointID = barrierId;
            // 增加当前的checkpoint到pendingCheckpoints
            pendingCheckpoints.addLast(new CheckpointBarrierCount(barrierId));

            // make sure we do not track too many checkpoints
            // 如果pendingCheckpoints中保存的数量大于MAX_CHECKPOINTS_TO_TRACK,删除最早未保存的checkpoint
            if (pendingCheckpoints.size() > MAX_CHECKPOINTS_TO_TRACK) {
                pendingCheckpoints.pollFirst();
            }
        }
    }
    return false;
}

整个过程的流程图如下所示:


非对齐checkpoint barrier操作流程

CheckpointedInputGate

CheckpointBarrierHandlerprocessBarrier方法在CheckpointedInputGate中调用。CheckpointedInputGateInputGate的一个包装类,除了负责读取上游节点的数据外(InputGate的基本功能)。还会对接收到的checkpoint barrier做出响应。
CheckpointedInputGatepollNext方法如下所示:

@Override
public Optional<BufferOrEvent> pollNext() throws Exception {
    while (true) {
        // process buffered BufferOrEvents before grabbing new ones
        Optional<BufferOrEvent> next;
        // 只有当bufferStorage为空的时候才从inputGate读取数据
        if (bufferStorage.isEmpty()) {
            next = inputGate.pollNext();
        }
        else {
            next = bufferStorage.pollNext();
            if (!next.isPresent()) {
                return pollNext();
            }
        }

        if (!next.isPresent()) {
            return handleEmptyBuffer();
        }

        BufferOrEvent bufferOrEvent = next.get();
        // 如果channel被block,则数据不会发往下游,取而代之的是进入到bufferStorage中缓存起来
        if (barrierHandler.isBlocked(offsetChannelIndex(bufferOrEvent.getChannelIndex()))) {
            // if the channel is blocked, we just store the BufferOrEvent
            bufferStorage.add(bufferOrEvent);
            if (bufferStorage.isFull()) {
                barrierHandler.checkpointSizeLimitExceeded(bufferStorage.getMaxBufferedBytes());
                // 稍后分析
                bufferStorage.rollOver();
            }
        }
        else if (bufferOrEvent.isBuffer()) {
            return next;
        }
        else if (bufferOrEvent.getEvent().getClass() == CheckpointBarrier.class) {
            // 如果获取的数据是一个checkpointBarrier对象
            CheckpointBarrier checkpointBarrier = (CheckpointBarrier) bufferOrEvent.getEvent();
            if (!endOfInputGate) {
                // process barriers only if there is a chance of the checkpoint completing
                // 调用barrierHandler的processBarrier方法
                if (barrierHandler.processBarrier(checkpointBarrier, offsetChannelIndex(bufferOrEvent.getChannelIndex()), bufferStorage.getPendingBytes())) {
                    bufferStorage.rollOver();
                }
            }
        }
        else if (bufferOrEvent.getEvent().getClass() == CancelCheckpointMarker.class) {
            // 如果是取消Checkpoint的标记,调用processCancellationBarrier方法
            if (barrierHandler.processCancellationBarrier((CancelCheckpointMarker) bufferOrEvent.getEvent())) {
                bufferStorage.rollOver();
            }
        }
        else {
            if (bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class) {
                // 如果是分区结束,调用processEndOfPartition方法
                if (barrierHandler.processEndOfPartition()) {
                    bufferStorage.rollOver();
                }
            }
            return next;
        }
    }
}

注:bufferStorage.rollOver()方法。以CachedBufferStorage为例,其中维护了两个Buffer队列:rolledOverBuffers(队列的封装)和cachedBuffers。buffer加入到bufferStorage(add方法)是将buffer加入到cachedBuffers队列,而获取缓存(pollNext方法)是从rolledOverBuffers中获取。那么rollOver方法的作用简单理解就是将buffer从cachedBuffers移动到rolledOverBuffers中。

通过以上逻辑的分析可以总结出,如果需要读取数据的channel被barrierHandler阻塞,这个channel到来的数据会暂存在bufferStorage中。直到该通道取消阻塞。从该InputGate读取的时候会优先读取bufferStorage中的数据。

上一篇下一篇

猜你喜欢

热点阅读