Flink 源码之分布式快照
Flink源码分析系列文档目录
请点击:Flink 源码分析系列文档目录
分布式快照
在处理算系统中,数据源源不断从数据源流入系统,向下游传播。在同一时刻,系统内各个operator不可能在处理同一条数据。上游和下游operator正在处理的数据有时间差。因此无法对整个系统同时进行快照操作。只有当上游operator生成快照结束之后,下游operator再生成快照才有意义。
为了满足这个要求,协调上下游各个operator生成快照的时间。Flink引入了快照屏障(checkpoint barrier)。下面讨论下Flink如何根据checkpoint barrier协调快照生成时间。
CheckpointBarrier
下图是一个Flink处理数据流的数据流向图。Flink通过Kafka接收到的数据依次通过Data Source -> Window -> Data Sink,最终写入到Kafka输出。为了协调快照生成的时机,Flink引入了CheckpointBarrier。在数据流中,CheckpointBarrier和普通的数据不同,它是Flink内部事件(event)的一种。JobManager通过配置(env.enableCheckpointing(间隔时间毫秒)
),定期通知data source在数据流中加入CheckpointBarrier。这样,CheckpointBarrier会随着数据流流经下游的各个节点。
当一个节点接收到CheckpointBarrier之后,会立即进行快照操作(先不考虑下文将要说明的barrier对齐的情况)。完毕之后会向所有下游operator广播这个barrier。下游每个节点以此类推。
各个节点的checkpoint过程成功会发送确认消息到JobManager的CheckpointCoordinator。如果JobManager收到了所有operator的确认消息,会向所有operator通知快照操作已完成。Operator接收到此通知可以做一些额外逻辑,例如两阶段提交操作,接收到快照已完成通知之时会向kafka broker提交事务(详细过程请查阅 Flink 源码之两阶段提交)。
CheckpointBarrier类有如下三个成员变量:
- id:和checkpoint id对应。保持严格单调递增。后续代码逻辑会通过比较id值大小来确定checkpoint新旧。ID越大的checkpoint越新。
- timestamp:记录checkpoint barrier产生的时间。
ScheduledTrigger
的run
方法转入了系统当前时间为checkpoint的timestamp。 - checkpointOptions 进行checkpoint操作时的选项。包含checkpoint类型和checkpoint保存位置偏好设置。
什么时候Barrier向下游传播
OperatorChain
中的broadcastCheckpointBarrier
方法将自己接收到的CheckpointBarrier向下游传播。该方法在performCheckpoint(创建快照)方法中调用。
该方法创建一个checkpoint barrier并广播barrier到所有下游。如下所示:
public void broadcastCheckpointBarrier(long id, long timestamp, CheckpointOptions checkpointOptions) throws IOException {
CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp, checkpointOptions);
for (RecordWriterOutput<?> streamOutput : streamOutputs) {
streamOutput.broadcastEvent(barrier);
}
}
InputProcessorUtil
该类中有一个重要的方法:createCheckpointBarrierHandler
。作用为根据不同的数据一致性语义,创建对应的checkpoint barrier处理器(handler)。该方法在创建Task的时候调用,调用,数据源源不断从数据源流入系统,向下游传播。在同一时刻,系统内各个operator不可能在处理同一条数据。上游和下游operator正在处理的数据有时间差。因此无法对整个系统同时进行快照操作。只有当上游operator生成快照结束之后,下游operator再生成快照才有意义。
为了满足这个要求,协调上下游各个operator生成快照的时间。Flink引入了快照屏障(checkpoint barrier)。下面讨论下Flink如何根据checkpoint barrier协调快照生成时间。
CheckpointBarrier
下图是一个Flink处理数据流的数据流向图。Flink通过Kafka接收到的数据依次通过Data Source -> Window -> Data Sink,最终写入到Kafka输出。为了协调快照生成的时机,Flink引入了CheckpointBarrier。在数据流中,CheckpointBarrier和普通的数据不同,它是Flink内部事件(event)的一种。JobManager通过配置(env.enableCheckpointing(间隔时间毫秒)
),定期通知data source在数据流中加入CheckpointBarrier。这样,CheckpointBarrier会随着数据流流经下游的各个节点。
当一个节点接收到CheckpointBarrier之后,会立即进行快照操作(先不考虑下文将要说明的barrier对齐的情况)。完毕之后会向所有下游operator广播这个barrier。下游每个节点以此类推。
各个节点的checkpoint过程成功会发送确认消息到JobManager的CheckpointCoordinator。如果JobManager收到了所有operator的确认消息,会向所有operator通知快照操作已完成。Operator接收到此通知可以做一些额外逻辑,例如两阶段提交操作,接收到快照已完成通知之时会向kafka broker提交事务(详细过程请查阅 Flink 源码之两阶段提交)。
CheckpointBarrier类有如下三个成员变量:
- id:和checkpoint id对应。保持严格单调递增。后续代码逻辑会通过比较id值大小来确定checkpoint新旧。ID越大的checkpoint越新。
- timestamp:记录checkpoint barrier产生的时间。
ScheduledTrigger
的run
方法转入了系统当前时间为checkpoint的timestamp。 - checkpointOptions 进行checkpoint操作时的选项。包含checkpoint类型和checkpoint保存位置偏好设置。
什么时候Barrier向下游传播
OperatorChain
中的broadcastCheckpointBarrier
方法将自己接收到的CheckpointBarrier向下游传播。该方法在performCheckpoint(创建快照)方法中调用。
该方法创建一个checkpoint barrier并广播barrier到所有下游。如下所示:
public void broadcastCheckpointBarrier(long id, long timestamp, CheckpointOptions checkpointOptions) throws IOException {
CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp, checkpointOptions);
for (RecordWriterOutput<?> streamOutput : streamOutputs) {
streamOutput.broadcastEvent(barrier);
}
}
InputProcessorUtil
该类中有一个重要的方法:createCheckpointBarrierHandler
。作用为根据不同的数据一致性语义,创建对应的checkpoint barrier处理器(handler)。该方法在创建Task的时候调用,调用链如下:
OneInputStreamTask
=>init
=>createInputGate
=>createCheckpointedInputGate
=>createCheckpointBarrierHandler
createCheckpointBarrierHandler
方法判断checkpoint的模式:如果为EXACTLY_ONCE
使用CheckpointBarrierAligner
,AT_LEAST_ONCE
使用CheckpointBarrierTracker
。这两个类均实现了CheckpointBarrierHandler
接口,主要负责checkpoint barrier到来时候的处理逻辑。主要目的为处理下文中提到的barrier对齐问题。代码如下所示:
private static CheckpointBarrierHandler createCheckpointBarrierHandler(
CheckpointingMode checkpointMode,
int numberOfInputChannels,
String taskName,
AbstractInvokable toNotifyOnCheckpoint) {
switch (checkpointMode) {
case EXACTLY_ONCE:
return new CheckpointBarrierAligner(
numberOfInputChannels,
taskName,
toNotifyOnCheckpoint);
case AT_LEAST_ONCE:
return new CheckpointBarrierTracker(numberOfInputChannels, toNotifyOnCheckpoint);
default:
throw new UnsupportedOperationException("Unrecognized Checkpointing Mode: " + checkpointMode);
}
}
Barrier对齐问题
只有operator具有多个数据源的时候才需要处理barrier对齐问题。
Barrier对齐的是保证operator只处理各个输入数据流中id相同的barrier之后的数据。此时进行checkpoint,可以保证operator中没有提前处理属于下一个checkpoint的数据。此时进行checkpoint,不会将下一个checkpoint阶段的数据处理状态记录到本次checkpoint中。因此barrier对齐操作可以保证数据的精准一次投送(exactly once)。
Barrier对齐示例图
如图所示。如果一个operator具有多个输入通道。系统无法保证多个输入端中id相同的barrier同时到来。当接收到一个数据输入通道的barrier之时,记录下该barrier的id。同时会暂停处理该通道的数据处理过程。该通道在暂停处理过程中到来数据会存入缓存。后续收到barrier的通道同样处理。直到所有的输入通道都收到这个id的barrier,这个时候barrier对齐操作完成,触发checkpoint操作。之后继续计算缓存中的数据,放行所有暂停的通道。
由上述分析可知barrier对齐虽然可以确保数据的精准一次投送,但是等待其他通道barrier到来的过程降低了数据的吞吐量。如果不需要精准一次投送,可以不使用barrier对齐,增加数据的吞吐量。在不对齐的场景中,若一个输入通道接收到barrier,记录下barrier id后其他通道不会暂停,数据仍然正常处理。直到各个通道都接收到这个id的barrier,触发operator的snapshot操作。可以想到的是,如果系统出现问题,从这个checkpoint恢复,本应属于下一个checkpoint保存的状态数据会被重新计算,数据会出现重复。只能够满足数据至少一次投送(at least once)。
CheckpointBarrierAligner
CheckpointBarrierAligner类实现了barrier对齐的逻辑。Exactly once模式会使用此handler。对齐的逻辑位于该类processBarrier
方法中。如果之前被阻塞的input channel需要放行,方法返回true。下面是该方法的源代码:
@Override
public boolean processBarrier(CheckpointBarrier receivedBarrier, int channelIndex, long bufferedBytes) throws Exception {
final long barrierId = receivedBarrier.getId();
// fast path for single channel cases
// 如果输入通仅有一个,无需barrier对齐操作,直接进行checkpoint操作
if (totalNumberOfInputChannels == 1) {
// 如果barrierId 大于currentCheckpointId,意味着需要进行新的checkpoint操作
if (barrierId > currentCheckpointId) {
// new checkpoint
currentCheckpointId = barrierId;
notifyCheckpoint(receivedBarrier, bufferedBytes, latestAlignmentDurationNanos);
}
return false;
}
boolean checkpointAborted = false;
// -- general code path for multiple input channels --
// numBarriersReceived 意思为当前接收到的barrier数量,和被阻塞的input
channel数量一致
if (numBarriersReceived > 0) {
// this is only true if some alignment is already progress and was not canceled
// barrier id相等,说明同一次checkpoint的barrier从另一个数据流到来
if (barrierId == currentCheckpointId) {
// regular case
// 将这个channel 阻塞住,numBarriersReceived 加一
onBarrier(channelIndex);
}
else if (barrierId > currentCheckpointId) {
// 新的barrier到来,但是id比当前正在处理的要新,说明当前处理的checkpoint尚未完成之时,又要开始处理新的checkpoint
// 这种情况需要终止当前正在处理的checkpoint
// we did not complete the current checkpoint, another started before
LOG.warn("{}: Received checkpoint barrier for checkpoint {} before completing current checkpoint {}. " +
"Skipping current checkpoint.",
taskName,
barrierId,
currentCheckpointId);
// let the task know we are not completing this
// 通知StreamTask,当前checkpoint操作被终止
notifyAbort(currentCheckpointId,
new CheckpointException(
"Barrier id: " + barrierId,
CheckpointFailureReason.CHECKPOINT_DECLINED_SUBSUMED));
// abort the current checkpoint
// 恢复被阻塞的channel,重置barrier计数
// 重置对齐操作开始时间
releaseBlocksAndResetBarriers();
// 标记checkpoint已被终止
checkpointAborted = true;
// begin a the new checkpoint
// 开始新的对齐流程
beginNewAlignment(barrierId, channelIndex);
}
else {
// ignore trailing barrier from an earlier checkpoint (obsolete now)
// 忽略比当前正在处理的checkpoint更早的barrier
return false;
}
}
else if (barrierId > currentCheckpointId) {
// first barrier of a new checkpoint
// 如果当前没有已到达的barrier,并且到来的barrier id比当前的新,说明需要开始新的对齐流程
beginNewAlignment(barrierId, channelIndex);
}
else {
// either the current checkpoint was canceled (numBarriers == 0) or
// this barrier is from an old subsumed checkpoint
// 忽略更早的checkpoint barrier
return false;
}
// check if we have all barriers - since canceled checkpoints always have zero barriers
// this can only happen on a non canceled checkpoint
// 如果已阻塞通道数+已关闭通道数=总的输入通道数,说明所有通道的checkpoint barrier已经到齐,对齐操作完成
// 此时可以触发checkpoint操作,并且恢复被阻塞的channel,重置barrier计数
if (numBarriersReceived + numClosedChannels == totalNumberOfInputChannels) {
// actually trigger checkpoint
if (LOG.isDebugEnabled()) {
LOG.debug("{}: Received all barriers, triggering checkpoint {} at {}.",
taskName,
receivedBarrier.getId(),
receivedBarrier.getTimestamp());
}
releaseBlocksAndResetBarriers();
notifyCheckpoint(receivedBarrier, bufferedBytes, latestAlignmentDurationNanos);
return true;
}
return checkpointAborted;
}
整个流程如图所示:
Barrier对齐流程
CheckpointBarrierTracker
CheckpointBarrierTracker类是at least once模式使用的checkpoint handler。
再这之前需要先介绍下CheckpointBarrierCount这个类。它记录了同一个checkpoint barrier到来的次数,还有该checkpoint是否被终止。
CheckpointBarrierTracker中还维护了pendingCheckpoints变量,用于存储已接收到barrier但是仍未触发通知进行checkpoint操作的所有checkpoint。
processBarrier方法如下所示。
@Override
public boolean processBarrier(CheckpointBarrier receivedBarrier, int channelIndex, long bufferedBytes) throws Exception {
final long barrierId = receivedBarrier.getId();
// fast path for single channel trackers
// 如果输入channel只有一个,立即触发通知进行checkpoint
if (totalNumberOfInputChannels == 1) {
notifyCheckpoint(receivedBarrier, 0, 0);
return false;
}
// general path for multiple input channels
if (LOG.isDebugEnabled()) {
LOG.debug("Received barrier for checkpoint {} from channel {}", barrierId, channelIndex);
}
// find the checkpoint barrier in the queue of pending barriers
CheckpointBarrierCount barrierCount = null;
int pos = 0;
// 顺序遍历找到pendingCheckpoints中的barrierId为当前接收到的barrier id的CheckpointBarrierCount对象
// 同时记录下它在pendingCheckpoints中的位置到pos
for (CheckpointBarrierCount next : pendingCheckpoints) {
if (next.checkpointId == barrierId) {
barrierCount = next;
break;
}
pos++;
}
// 如果找到了id相同的CheckpointBarrierCount
if (barrierCount != null) {
// add one to the count to that barrier and check for completion
// 记录的barrier数量加1
int numBarriersNew = barrierCount.incrementBarrierCount();
// 如果barrier数量和输入channel数量相等,说明已接收到所有input channel的barrier,可以进行checkpoint操作
if (numBarriersNew == totalNumberOfInputChannels) {
// checkpoint can be triggered (or is aborted and all barriers have been seen)
// first, remove this checkpoint and all all prior pending
// checkpoints (which are now subsumed)
// 移除此barrier之前的所有未完成的checkpoint
for (int i = 0; i <= pos; i++) {
pendingCheckpoints.pollFirst();
}
// notify the listener
// 如果checkpoint没有终止,通知进行checkpoint操作
if (!barrierCount.isAborted()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Received all barriers for checkpoint {}", barrierId);
}
notifyCheckpoint(receivedBarrier, 0, 0);
}
}
}
else {
// 在pendingCheckpoints中没有找到id相同的checkpoint,说明这次到来的barrier对应新的checkpoint
// first barrier for that checkpoint ID
// add it only if it is newer than the latest checkpoint.
// if it is not newer than the latest checkpoint ID, then there cannot be a
// successful checkpoint for that ID anyways
// barrier ID是自增的,如果barrierId > latestPendingCheckpointID说明barrier比pending的所有checkpoint都要新
// 反之,说明barrier来迟了,直接忽略
if (barrierId > latestPendingCheckpointID) {
// 更新latestPendingCheckpointID
latestPendingCheckpointID = barrierId;
// 增加当前的checkpoint到pendingCheckpoints
pendingCheckpoints.addLast(new CheckpointBarrierCount(barrierId));
// make sure we do not track too many checkpoints
// 如果pendingCheckpoints中保存的数量大于MAX_CHECKPOINTS_TO_TRACK,删除最早未保存的checkpoint
if (pendingCheckpoints.size() > MAX_CHECKPOINTS_TO_TRACK) {
pendingCheckpoints.pollFirst();
}
}
}
return false;
}
整个过程的流程图如下所示:
非对齐checkpoint barrier操作流程
CheckpointedInputGate
CheckpointBarrierHandler
的processBarrier
方法在CheckpointedInputGate
中调用。CheckpointedInputGate
是InputGate
的一个包装类,除了负责读取上游节点的数据外(InputGate的基本功能)。还会对接收到的checkpoint barrier做出响应。
CheckpointedInputGate
的pollNext
方法如下所示:
@Override
public Optional<BufferOrEvent> pollNext() throws Exception {
while (true) {
// process buffered BufferOrEvents before grabbing new ones
Optional<BufferOrEvent> next;
// 只有当bufferStorage为空的时候才从inputGate读取数据
if (bufferStorage.isEmpty()) {
next = inputGate.pollNext();
}
else {
next = bufferStorage.pollNext();
if (!next.isPresent()) {
return pollNext();
}
}
if (!next.isPresent()) {
return handleEmptyBuffer();
}
BufferOrEvent bufferOrEvent = next.get();
// 如果channel被block,则数据不会发往下游,取而代之的是进入到bufferStorage中缓存起来
if (barrierHandler.isBlocked(offsetChannelIndex(bufferOrEvent.getChannelIndex()))) {
// if the channel is blocked, we just store the BufferOrEvent
bufferStorage.add(bufferOrEvent);
if (bufferStorage.isFull()) {
barrierHandler.checkpointSizeLimitExceeded(bufferStorage.getMaxBufferedBytes());
// 稍后分析
bufferStorage.rollOver();
}
}
else if (bufferOrEvent.isBuffer()) {
return next;
}
else if (bufferOrEvent.getEvent().getClass() == CheckpointBarrier.class) {
// 如果获取的数据是一个checkpointBarrier对象
CheckpointBarrier checkpointBarrier = (CheckpointBarrier) bufferOrEvent.getEvent();
if (!endOfInputGate) {
// process barriers only if there is a chance of the checkpoint completing
// 调用barrierHandler的processBarrier方法
if (barrierHandler.processBarrier(checkpointBarrier, offsetChannelIndex(bufferOrEvent.getChannelIndex()), bufferStorage.getPendingBytes())) {
bufferStorage.rollOver();
}
}
}
else if (bufferOrEvent.getEvent().getClass() == CancelCheckpointMarker.class) {
// 如果是取消Checkpoint的标记,调用processCancellationBarrier方法
if (barrierHandler.processCancellationBarrier((CancelCheckpointMarker) bufferOrEvent.getEvent())) {
bufferStorage.rollOver();
}
}
else {
if (bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class) {
// 如果是分区结束,调用processEndOfPartition方法
if (barrierHandler.processEndOfPartition()) {
bufferStorage.rollOver();
}
}
return next;
}
}
}
注:bufferStorage.rollOver()方法。以CachedBufferStorage为例,其中维护了两个Buffer队列:rolledOverBuffers
(队列的封装)和cachedBuffers
。buffer加入到bufferStorage(add
方法)是将buffer加入到cachedBuffers
队列,而获取缓存(pollNext
方法)是从rolledOverBuffers
中获取。那么rollOver
方法的作用简单理解就是将buffer从cachedBuffers
移动到rolledOverBuffers
中。
通过以上逻辑的分析可以总结出,如果需要读取数据的channel被barrierHandler
阻塞,这个channel到来的数据会暂存在bufferStorage
中。直到该通道取消阻塞。从该InputGate
读取的时候会优先读取bufferStorage
中的数据。