Flink玩转大数据

Flink 源码之节点间通信

2020-01-09  本文已影响0人  AlienPaul

Flink 系列博客

Flink QuickStart
Flink双流操作
Flink on Yarn Kerberos的配置
Flink on Yarn部署和任务提交操作
Flink配置Prometheus监控
Flink in docker 部署
Flink HA 部署
Flink 常见调优参数总结
Flink 源码之任务提交流程分析
Flink 源码之基本算子
Flink 源码之Trigger
Flink 源码之Evictor
Flink 源码之Window
Flink 源码之WindowOperator
Flink 源码之StreamGraph生成
Flink 源码之JobGraph生成
Flink 源码之两阶段提交
Flink 源码之分布式快照
Flink 源码之时间处理

从collector到buffer

下面我们从数据源出开始分析数据是如何写入到Flink缓存中的。

NoTimestampContext.collect方法。该方法位于数据源(SourceFunction)中。

@Override
public void collect(T element) {
    synchronized (lock) {
        output.collect(reuse.replace(element));
    }
}

这里调用的是output对象的collect方法。Output对象是Output<StreamRecord<T>>类型。经过debug我们发现这里的output真实类型为CountingOutput类型。
CountingOutput仅仅是一个包装类型,包装了一个Output。相比于其他Output而言多出了收集元素数量的监控。CountingOutput维护了一个计数器类型监控变量:

private final Counter numRecordsOut;

在collect元素的时候调用了numRecordsOut.inc()方法,实现了对收集元素数量的监控。
NoTimestampContext的CountingOuput封装的output是什么类型的呢?我们通过debug查看发现内层的类型为RecordWriterOutput

RecordWriterOutputcollect方法如下所示:

@Override
public void collect(StreamRecord<OUT> record) {
    // outputTag使用旁路输出的时候会用到,这里只支持输出到main input
    if (this.outputTag != null) {
        // we are only responsible for emitting to the main input
        return;
    }

    pushToRecordWriter(record);
}

pushToRecordWriter方法使用序列化代理,将record传递给recordWriter。代码如下:

private <X> void pushToRecordWriter(StreamRecord<X> record) {
    serializationDelegate.setInstance(record);

    try {
        recordWriter.emit(serializationDelegate);
    }
    catch (Exception e) {
        throw new RuntimeException(e.getMessage(), e);
    }
}

RecordWriter负责把数据序列化,然后写入到缓存中。它有两个实现类:

这里我们分析下ChannelSelectorRecordWriteremit方法:

@Override
public void emit(T record) throws IOException, InterruptedException {
    emit(record, channelSelector.selectChannel(record));
}

很明显这里使用了channelSelector.selectChannel方法。该方法为record和对应下游channel id的函数关系。

接下来我们又回到了父类RecordWriter

protected void emit(T record, int targetChannel) throws IOException, InterruptedException {
    checkErroneous();

    serializer.serializeRecord(record);

    // Make sure we don't hold onto the large intermediate serialization buffer for too long
    if (copyFromSerializerToTargetChannel(targetChannel)) {
        // 压缩序列化器中间数据缓存大小
        serializer.prune();
    }
}

关键的逻辑在于copyFromSerializerToTargetChannel。此方法从序列化器中复制数据到目标channel。

protected boolean copyFromSerializerToTargetChannel(int targetChannel) throws IOException, InterruptedException {
    // We should reset the initial position of the intermediate serialization buffer before
    // copying, so the serialization results can be copied to multiple target buffers.
    // 此处Serializer为SpanningRecordSerializer
    // reset方法将serializer内部的databuffer position重置为0
    serializer.reset();

    boolean pruneTriggered = false;
    // 获取目标channel的bufferBuilder
    // bufferBuilder内维护了MemorySegment,即内存片段
    // Flink的内存管理依赖MemorySegment,可实现堆内堆外内存的管理
    // RecordWriter内有一个bufferBuilder数组,长度和下游channel数目相同
    // 该数组以channel ID为下标,存储和channel对应的bufferBuilder
    // 如果对应channel的bufferBuilder尚未创建,调用requestNewBufferBuilder申请一个新的bufferBuilder
    BufferBuilder bufferBuilder = getBufferBuilder(targetChannel);
    // 复制serializer的数据到bufferBuilder中
    SerializationResult result = serializer.copyToBufferBuilder(bufferBuilder);

    // 循环直到result完全被写入到buffer
    // 一条数据可能会被写入到多个缓存中
    // 如果缓存不够用,会申请新的缓存
    // 数据完全写入完毕之时,当前正在操作的缓存是没有写满的
    // 因此返回true,表明需要压缩该buffer的空间
    while (result.isFullBuffer()) {
        finishBufferBuilder(bufferBuilder);

        // If this was a full record, we are done. Not breaking out of the loop at this point
        // will lead to another buffer request before breaking out (that would not be a
        // problem per se, but it can lead to stalls in the pipeline).
        if (result.isFullRecord()) {
            pruneTriggered = true;
            emptyCurrentBufferBuilder(targetChannel);
            break;
        }

        bufferBuilder = requestNewBufferBuilder(targetChannel);
        result = serializer.copyToBufferBuilder(bufferBuilder);
    }
    checkState(!serializer.hasSerializedData(), "All data should be written at once");

    // 如果buffer超时时间为0,需要flush目标channel的数据
    if (flushAlways) {
        flushTargetPartition(targetChannel);
    }
    return pruneTriggered;
}

接下来分析下getBufferBuilder方法。以ChannelSelectorRecordWriter的此方法为例说明。

@Override
public BufferBuilder getBufferBuilder(int targetChannel) throws IOException, InterruptedException {
    if (bufferBuilders[targetChannel] != null) {
        return bufferBuilders[targetChannel];
    } else {
        return requestNewBufferBuilder(targetChannel);
    }
}

如果bufferBuilders数组中targetChannel下标不存在,申请一个新的BufferBuilder。此处我们发现各个channel对应的bufferBuilder是懒加载的,只有第一次用到的时候才创建。

我们跟踪到requestNewBufferBuilder方法。

@Override
public BufferBuilder requestNewBufferBuilder(int targetChannel) throws IOException, InterruptedException {
    // 首先需要检查targetChannel对应的buffer必须为null或数据已写入完毕
    checkState(bufferBuilders[targetChannel] == null || bufferBuilders[targetChannel].isFinished());

    // 获取目标分区的bufferBuilder
    BufferBuilder bufferBuilder = targetPartition.getBufferBuilder();
    // 创建一个bufferConsumer,bufferConsumer保存了bufferBuilder的memorySegment,当前写入指针和当前读取指针
    // BufferBuilder用于写入数据,BufferConsumer用于读取数据
    targetPartition.addBufferConsumer(bufferBuilder.createBufferConsumer(), targetChannel);
    bufferBuilders[targetChannel] = bufferBuilder;
    return bufferBuilder;
}

addBufferConsumer方法

@Override
public boolean addBufferConsumer(BufferConsumer bufferConsumer, int subpartitionIndex) throws IOException {
    checkNotNull(bufferConsumer);

    ResultSubpartition subpartition;
    try {
        checkInProduceState();
        // 获取subPartition
        // 此处subpartitionIndex为targetChannel
        subpartition = subpartitions[subpartitionIndex];
    }
    catch (Exception ex) {
        bufferConsumer.close();
        throw ex;
    }

    // 将bufferConsumer添加入subpartition
    return subpartition.add(bufferConsumer);
}

ReultSubPartition有两个实现类,PipelinedSubpartitionBoundedBlockingSubpartition
其中PipelinedSubpartition用于流处理场景下的数据消费。它内部维护了一个BufferBuilder队列。消费者通过调用createReadView创建一个PipelinedSubpartitionView来消费数据。创建view的时候需要提供一个BufferAvailabilityListener对象,用于作为buffer中有数据可用时候的回调。因此PipelinedSubpartition可以做到一旦有数据就及时提醒下游去消费。

BoundedBlockingSubpartition适合批处理场景下的数据消费。和PipelinedSubpartition不同的是,BoundedBlockingSubpartition数据是先写入后消费的,可以一次写入,多次消费。它的数据写入到BoundedData中。数据落地的方式随着BoundedData实现的不同而不同。数据可以保存在文件系统(FileChannelBoundedData),内存(MemoryMappedBoundedData)或者同时在文件系统和内存(FileChannelMemoryMappedBoundedData)。

下游请求SubPartition

上一段分析过数据的消费是通过ResultSubPartition调用createReadView方法。
PipelinedSubpartitioncreateReadView代码如下:

@Override
public PipelinedSubpartitionView createReadView(BufferAvailabilityListener availabilityListener) throws IOException {
    final boolean notifyDataAvailable;
    synchronized (buffers) {
        // 检查该SubPartition的缓存不能被释放
        checkState(!isReleased);
        // 检查之前不能创建过read view
        checkState(readView == null,
                "Subpartition %s of is being (or already has been) consumed, " +
                "but pipelined subpartitions can only be consumed once.", index, parent.getPartitionId());

        LOG.debug("{}: Creating read view for subpartition {} of partition {}.",
            parent.getOwningTaskName(), index, parent.getPartitionId());

        // 创建view,同时转入availabilityListener
        readView = new PipelinedSubpartitionView(this, availabilityListener);
        // 如果buffer不为空,需要调用listener通知数据已准备好,可供消费
        notifyDataAvailable = !buffers.isEmpty();
    }
    if (notifyDataAvailable) {
        notifyDataAvailable();
    }

    return readView;
}

上述方法在ResultPartitionManager中调用。
ResultPartitionManager负责维护当前创建和消费的分区。
ResultPartitionManagercreateSubpartitionView方法:

@Override
public ResultSubpartitionView createSubpartitionView(
        ResultPartitionID partitionId,
        int subpartitionIndex,
        BufferAvailabilityListener availabilityListener) throws IOException {

    synchronized (registeredPartitions) {
        final ResultPartition partition = registeredPartitions.get(partitionId);

        if (partition == null) {
            throw new PartitionNotFoundException(partitionId);
        }

        LOG.debug("Requesting subpartition {} of {}.", subpartitionIndex, partition);

        return partition.createSubpartitionView(subpartitionIndex, availabilityListener);
    }
}

该方法逻辑比较简单,ResultPartitionsetup的时候会将该分区注册到ResultPartitionManager中。创建view的时候会根据partition id从已注册的分区列表中获取到指定的ResultPartition,然后创建一个subpartition view。

继续跟踪该方法的调用链,我们可以发现该方法在两个类中调用:LocalInputChannelCreditBasedSequenceNumberingViewReader

LocalInputChannel负责从本地请求一个subPartition view。
CreditBasedSequenceNumberingViewReader负责通过网络从其他节点获取subPartition view。同时提供了credit based反压机制的支持。

我们跟踪下CreditBasedSequenceNumberingViewReaderrequestSubpartitionView方法:

@Override
public void requestSubpartitionView(
    ResultPartitionProvider partitionProvider,
    ResultPartitionID resultPartitionId,
    int subPartitionIndex) throws IOException {

    synchronized (requestLock) {
        if (subpartitionView == null) {
            // This this call can trigger a notification we have to
            // schedule a separate task at the event loop that will
            // start consuming this. Otherwise the reference to the
            // view cannot be available in getNextBuffer().
            this.subpartitionView = partitionProvider.createSubpartitionView(
                resultPartitionId,
                subPartitionIndex,
                this);
        } else {
            throw new IllegalStateException("Subpartition already requested");
        }
    }
}

方法逻辑仅为创建一个subpartitionView。继续向上跟踪该方法的调用位置,我们找到了PartitionRequestServerHandlerchannelRead0方法:

@Override
protected void channelRead0(ChannelHandlerContext ctx, NettyMessage msg) throws Exception {
    try {
        // 获取接收到消息的类型
        Class<?> msgClazz = msg.getClass();

        // ----------------------------------------------------------------
        // Intermediate result partition requests
        // ----------------------------------------------------------------
        // 如果是分区请求消息
        if (msgClazz == PartitionRequest.class) {
            PartitionRequest request = (PartitionRequest) msg;

            LOG.debug("Read channel on {}: {}.", ctx.channel().localAddress(), request);

            try {
                NetworkSequenceViewReader reader;
                // 创建一个reader
                reader = new CreditBasedSequenceNumberingViewReader(
                    request.receiverId,
                    request.credit,
                    outboundQueue);

                // 为该reader分配一个subpartitionView
                reader.requestSubpartitionView(
                    partitionProvider,
                    request.partitionId,
                    request.queueIndex);

                // 注册reader到outboundQueue中
                // outboundQueue中存放了多个reader,这些reader在队列中排队,等待数据发送
                outboundQueue.notifyReaderCreated(reader);
            } catch (PartitionNotFoundException notFound) {
                respondWithError(ctx, notFound, request.receiverId);
            }
        }
        // ----------------------------------------------------------------
        // Task events
        // ----------------------------------------------------------------
        else if (msgClazz == TaskEventRequest.class) {
            TaskEventRequest request = (TaskEventRequest) msg;

            if (!taskEventPublisher.publish(request.partitionId, request.event)) {
                respondWithError(ctx, new IllegalArgumentException("Task event receiver not found."), request.receiverId);
            }
        } else if (msgClazz == CancelPartitionRequest.class) {
            CancelPartitionRequest request = (CancelPartitionRequest) msg;

            outboundQueue.cancel(request.receiverId);
        } else if (msgClazz == CloseRequest.class) {
            outboundQueue.close();
        } else if (msgClazz == AddCredit.class) {
            AddCredit request = (AddCredit) msg;

            outboundQueue.addCredit(request.receiverId, request.credit);
        } else {
            LOG.warn("Received unexpected client request: {}", msg);
        }
    } catch (Throwable t) {
        respondWithError(ctx, t);
    }
}

此方法在上游数据发送端执行,数据发送端对应的netty角色为server。
这里我们根据netty接收到的消息的类型,来做出对应的响应。如果接收到的消息类型为PartitionRequest,需要创建一个CreditBasedSequenceNumberingViewReader并将该reader加入到outboundQueue中。

outboundQueue是一个PartitionRequestQueue类型对象。该对象负责处理partition request。每次partition request会在PartitionRequestServerHandler中创建一个NetworkSequenceViewReader对象。然后给每个reader分配SubPartitionView(调用requestSubpartitionView)。最后调用notifyReaderCreated把reader加入到PartitionRequestQueueallReaders中。PartitionRequestQueue监听下游的channel是否可写(writability)。下游channel变为可写的时候会调用channelWritabilityChanged方法,将allReaders中排队的reader逐个取出,发往下游。

channelWritabilityChanged方法代码如下:

@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
    writeAndFlushNextMessageIfPossible(ctx.channel());
}

writeAndFlushNextMessageIfPossible方法代码如下:

private void writeAndFlushNextMessageIfPossible(final Channel channel) throws IOException {
    // 如果channel不可写,返回
    if (fatalError || !channel.isWritable()) {
        return;
    }

    // The logic here is very similar to the combined input gate and local
    // input channel logic. You can think of this class acting as the input
    // gate and the consumed views as the local input channels.

    BufferAndAvailability next = null;
    try {
        while (true) {
            // 队列中取出一个reader
            NetworkSequenceViewReader reader = pollAvailableReader();

            // No queue with available data. We allow this here, because
            // of the write callbacks that are executed after each write.
            if (reader == null) {
                return;
            }

            // 获取buffer
            next = reader.getNextBuffer();
            if (next == null) {
                if (!reader.isReleased()) {
                    continue;
                }

                Throwable cause = reader.getFailureCause();
                if (cause != null) {
                    ErrorResponse msg = new ErrorResponse(
                        new ProducerFailedException(cause),
                        reader.getReceiverId());

                    ctx.writeAndFlush(msg);
                }
            } else {
                // This channel was now removed from the available reader queue.
                // We re-add it into the queue if it is still available
                if (next.moreAvailable()) {
                    registerAvailableReader(reader);
                }

                // 包装buffer
                BufferResponse msg = new BufferResponse(
                    next.buffer(),
                    reader.getSequenceNumber(),
                    reader.getReceiverId(),
                    next.buffersInBacklog());

                // Write and flush and wait until this is done before
                // trying to continue with the next buffer.
                // 将msg发送到下游
                channel.writeAndFlush(msg).addListener(writeListener);

                return;
            }
        }
    } catch (Throwable t) {
        if (next != null) {
            next.buffer().recycleBuffer();
        }

        throw new IOException(t.getMessage(), t);
    }
}

接下来我们回到PartitionRequest这个请求。PartitionRequest是在哪里发送的呢?我们跟踪到NettyPartitionRequestClientrequestSubpartition方法:

@Override
public void requestSubpartition(
        final ResultPartitionID partitionId,
        final int subpartitionIndex,
        final RemoteInputChannel inputChannel,
        int delayMs) throws IOException {

    checkNotClosed();

    LOG.debug("Requesting subpartition {} of partition {} with {} ms delay.",
            subpartitionIndex, partitionId, delayMs);

    // clientHandler为CreditBasedPartitionRequestClientHandler
    // 它内部维护了input channel ID和channel的对应关系,是一个map类型变量
    // 在读取消息的时候,需要依赖该map从channel ID获取到channel对象本身
    clientHandler.addInputChannel(inputChannel);

    // 创建PartitionRequest对象
    final PartitionRequest request = new PartitionRequest(
            partitionId, subpartitionIndex, inputChannel.getInputChannelId(), inputChannel.getInitialCredit());

    // 发送PartitionRequest请求发送成功之后的回调函数
    final ChannelFutureListener listener = new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            // 如果遇到了错误
            if (!future.isSuccess()) {
                // map中移除这个channel
                clientHandler.removeInputChannel(inputChannel);
                SocketAddress remoteAddr = future.channel().remoteAddress();
                // 为inputChannel内部的cause变量赋值,设置一个error
                inputChannel.onError(
                        new LocalTransportException(
                            String.format("Sending the partition request to '%s' failed.", remoteAddr),
                            future.channel().localAddress(), future.cause()
                        ));
            }
        }
    };

    // 如果不需要延迟发送
    if (delayMs == 0) {
        ChannelFuture f = tcpChannel.writeAndFlush(request);
        f.addListener(listener);
    } else {
    // 如果需要延迟发送,调用eventLoop的schedule方法
        final ChannelFuture[] f = new ChannelFuture[1];
        tcpChannel.eventLoop().schedule(new Runnable() {
            @Override
            public void run() {
                f[0] = tcpChannel.writeAndFlush(request);
                f[0].addListener(listener);
            }
        }, delayMs, TimeUnit.MILLISECONDS);
    }
}

继续跟踪调用链,到RemoteInputChannelrequestSubpartition方法。代码如下所示:

@VisibleForTesting
@Override
public void requestSubpartition(int subpartitionIndex) throws IOException, InterruptedException {
    if (partitionRequestClient == null) {
        // Create a client and request the partition
        try {
            partitionRequestClient = connectionManager.createPartitionRequestClient(connectionId);
        } catch (IOException e) {
            // IOExceptions indicate that we could not open a connection to the remote TaskExecutor
            throw new PartitionConnectionException(partitionId, e);
        }

        partitionRequestClient.requestSubpartition(partitionId, subpartitionIndex, this, 0);
    }
}

RemoteInputChannelrequestSubpartition方法中,如果partitionRequestClient,会预先通过connectionManager创建一个client,再调用requestSubpartition方法。

继续跟踪,我们找到SingleInputGaterequestPartitions方法。代码如下:

@VisibleForTesting
void requestPartitions() throws IOException, InterruptedException {
    synchronized (requestLock) {
        // 只能请求一次partition,第一次调用该方法后此flag会被设置为true
        if (!requestedPartitionsFlag) {
            if (closeFuture.isDone()) {
                throw new IllegalStateException("Already released.");
            }

            // Sanity checks
            if (numberOfInputChannels != inputChannels.size()) {
                throw new IllegalStateException(String.format(
                    "Bug in input gate setup logic: mismatch between " +
                    "number of total input channels [%s] and the currently set number of input " +
                    "channels [%s].",
                    inputChannels.size(),
                    numberOfInputChannels));
            }

            // 循环所有的inputChannels,请求他们对应的subPartition
            for (InputChannel inputChannel : inputChannels.values()) {
                inputChannel.requestSubpartition(consumedSubpartitionIndex);
            }
        }
        // 方法调用完毕设置flag为true,防止重复调用
        requestedPartitionsFlag = true;
    }
}

SingleInputGate继承了InputGate接口。InputGate的作用为从intermediate result读取数据到task中。
根据JobGraph(参见Flink 源码之JobGraph生成
)的分析我们可以得知operatorChain之间使用的intermediate result来作为中间结果缓存。Intermediate result在执行时的真实数据承载对象为ResultPartition(一个或多个,视分区条件而定)。ResultPartition分为一个或多个ResultSubPartition。每一个ResultSubPartition和下游的某一个InputGate有对应关系。下游的InputGate读取上游所有对应ResultSubPartition的内容。

读取数据

我们分析下StreamTaskprocessInput方法。代码如下所示:

protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {
    // 调用inputProcessor的processInput方法
    InputStatus status = inputProcessor.processInput();
    if (status == InputStatus.MORE_AVAILABLE && recordWriter.isAvailable()) {
        return;
    }
    if (status == InputStatus.END_OF_INPUT) {
        // 如果输入结束,将mailboxLoopRunning设置为false,停止运行
        controller.allActionsCompleted();
        return;
    }
    // 在inputGate recordWriter或inputProcessor恢复可用之后异步调用default action的恢复操作
    CompletableFuture<?> jointFuture = getInputOutputJointFuture(status);
    MailboxDefaultAction.Suspension suspendedDefaultAction = controller.suspendDefaultAction();
    jointFuture.thenRun(suspendedDefaultAction::resume);
}

这里我们重点关注下inputProcessor.processInput()调用。
inputProcessor有两个实现类:StreamOneInputProcessorStreamTwoInputProcessor。我们看一下StreamOneInputProcessorprocessInput方法。代码如下:

@Override
public InputStatus processInput() throws Exception {
    InputStatus status = input.emitNext(output);

    if (status == InputStatus.END_OF_INPUT) {
        synchronized (lock) {
            operatorChain.endHeadOperatorInput(1);
        }
    }

    return status;
}

这里的input具有两个实现类StreamTaskSourceInputStreamTaskNetworkInput。如果该StreamTask运行的是数据源,则实现类为StreamTaskSourceInput。其他情况使用的实现类为StreamTaskNetworkInput,需要通过网络读取数据。

我们分析下StreamTaskNetworkInputemitNext方法。代码如下:

@Override
public InputStatus emitNext(DataOutput<T> output) throws Exception {

    while (true) {
        // get the stream element from the deserializer
        if (currentRecordDeserializer != null) {
            // 从buffer的memorySegment中反序列化数据
            DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate);
            // 如果buffer已经消费了,可以回收buffer
            if (result.isBufferConsumed()) {
                currentRecordDeserializer.getCurrentBuffer().recycleBuffer();
                currentRecordDeserializer = null;
            }

            // 如果已经读取到完整记录
            if (result.isFullRecord()) {
                // 处理从buffer中反序列化出的数据,在后续博客中分析
                processElement(deserializationDelegate.getInstance(), output);
                return InputStatus.MORE_AVAILABLE;
            }
        }

        // 从CheckpointInputGate读取数据
        Optional<BufferOrEvent> bufferOrEvent = checkpointedInputGate.pollNext();
        if (bufferOrEvent.isPresent()) {
            // 处理获取到的缓存,并将缓存中的memory segment提供给currentRecordDeserializer,供反序列化出消息,代码稍后分析
            processBufferOrEvent(bufferOrEvent.get());
        } else {
            // 如果checkpointedInputGate 输入流结束,返回END_OF_INPUT
            if (checkpointedInputGate.isFinished()) {
                checkState(checkpointedInputGate.getAvailableFuture().isDone(), "Finished BarrierHandler should be available");
                if (!checkpointedInputGate.isEmpty()) {
                    throw new IllegalStateException("Trailing data in checkpoint barrier handler.");
                }
                return InputStatus.END_OF_INPUT;
            }
            return InputStatus.NOTHING_AVAILABLE;
        }
    }
}

我们再看下processBufferOrEvent方法的源代码:

private void processBufferOrEvent(BufferOrEvent bufferOrEvent) throws IOException {
    // 如果是buffer的话
    if (bufferOrEvent.isBuffer()) {
        // 读取buffer对应的channel id
        lastChannel = bufferOrEvent.getChannelIndex();
        checkState(lastChannel != StreamTaskInput.UNSPECIFIED);
        // 获取channel对应的record反序列化器
        currentRecordDeserializer = recordDeserializers[lastChannel];
        checkState(currentRecordDeserializer != null,
            "currentRecordDeserializer has already been released");

        // 此处是关键,设置反序列化器要读取的buffer为inputGate获取到的buffer
        currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
    }
    else {
        // Event received
        // 如果接收到的是event
        final AbstractEvent event = bufferOrEvent.getEvent();
        
        if (event.getClass() != EndOfPartitionEvent.class) {
            throw new IOException("Unexpected event: " + event);
        }

        // release the record deserializer immediately,
        // which is very valuable in case of bounded stream
        // 清除channel对应的反序列化器
        // 并将recordDeserializers[channelIndex] 引用置空
        releaseDeserializer(bufferOrEvent.getChannelIndex());
    }
}

我们继续跟踪buffer是如何从inputGate中获取的。经debug我们发现这个inputGate使用的2层包装。CheckpointedInputGate包装了InputGateWithMetrics,又包装了SingleInputGate。其中CheckpointedInputGate负责检查数据流中的checkpoint barrier,调用对应的barrierHandler决定是否触发checkpoint操作。参见Flink 源码之分布式快照
InputGateWithMetrics负责监控接收的数据,统计所有流入数据的总字节数。
经历2层包装之后,程序逻辑进行到SingleInputGatepollNext方法
SingleInputGatepollNextgetNext两个方法。这两个方法基本相同,唯一的区别是pollNext为非阻塞方式,getNext为阻塞方式。

@Override
public Optional<BufferOrEvent> pollNext() throws IOException, InterruptedException {
    return getNextBufferOrEvent(false);
}

getNextBufferOrEvent方法:

private Optional<BufferOrEvent> getNextBufferOrEvent(boolean blocking) throws IOException, InterruptedException {
    // 如果接收到所有分区终止的事件,则返回空
    if (hasReceivedAllEndOfPartitionEvents) {
        return Optional.empty();
    }

    // 如果input gate被关闭
    if (closeFuture.isDone()) {
        throw new CancelTaskException("Input gate is already closed.");
    }

    // 以阻塞方式读取数据
    Optional<InputWithData<InputChannel, BufferAndAvailability>> next = waitAndGetNextData(blocking);
    if (!next.isPresent()) {
        return Optional.empty();
    }

    InputWithData<InputChannel, BufferAndAvailability> inputWithData = next.get();
    return Optional.of(transformToBufferOrEvent(
        inputWithData.data.buffer(),
        inputWithData.moreAvailable,
        inputWithData.input));
}

waitAndGetNextData方法:

private Optional<InputWithData<InputChannel, BufferAndAvailability>> waitAndGetNextData(boolean blocking)
        throws IOException, InterruptedException {
    while (true) {
        // 获取channel,根据blocking参数决定是否是阻塞方式
        Optional<InputChannel> inputChannel = getChannel(blocking);
        if (!inputChannel.isPresent()) {
            return Optional.empty();
        }

        // Do not query inputChannel under the lock, to avoid potential deadlocks coming from
        // notifications.
        // 获取input channel的缓存数据
        Optional<BufferAndAvailability> result = inputChannel.get().getNextBuffer();

        synchronized (inputChannelsWithData) {
            // 能获取到数据,并且还有更多数据
            if (result.isPresent() && result.get().moreAvailable()) {
                // enqueue the inputChannel at the end to avoid starvation
                // channel加入到inputChannelsWithData队列中
                inputChannelsWithData.add(inputChannel.get());

                // 下面这个BitSet负责记录哪些channel已经加入到了inputChannelsWithData队列
                enqueuedInputChannelsWithData.set(inputChannel.get().getChannelIndex());
            }

            // 如果inputChannelsWithData为空,设置为不可用状态
            if (inputChannelsWithData.isEmpty()) {
                availabilityHelper.resetUnavailable();
            }

            // 返回包装后的结果
            if (result.isPresent()) {
                return Optional.of(new InputWithData<>(
                    inputChannel.get(),
                    result.get(),
                    !inputChannelsWithData.isEmpty()));
            }
        }
    }
}

每一个InputGate包含一个或多个InputChannel。其中InputChannel分为2种。LocalInputChannel负责从本地的SubPartition读取数据,RemoteInputChannel负责从远程(其他节点)的Subpartition读取数据。

LocalInputChannelgetNextBuffer方法:

@Override
Optional<BufferAndAvailability> getNextBuffer() throws IOException, InterruptedException {
    checkError();

    // 获取requestSubpartition方法得到的subpartitionView 
    ResultSubpartitionView subpartitionView = this.subpartitionView;
    // 如果没有获取到subpartitionView,需要再次检查subpartitionView
    // 如果此时另一线程正在调用requestSubpartition方法,checkAndWaitForSubpartitionView方法会被阻塞
    // 等待requestSubpartition执行完毕
    if (subpartitionView == null) {
        // There is a possible race condition between writing a EndOfPartitionEvent (1) and flushing (3) the Local
        // channel on the sender side, and reading EndOfPartitionEvent (2) and processing flush notification (4). When
        // they happen in that order (1 - 2 - 3 - 4), flush notification can re-enqueue LocalInputChannel after (or
        // during) it was released during reading the EndOfPartitionEvent (2).
        if (isReleased) {
            return Optional.empty();
        }

        // this can happen if the request for the partition was triggered asynchronously
        // by the time trigger
        // would be good to avoid that, by guaranteeing that the requestPartition() and
        // getNextBuffer() always come from the same thread
        // we could do that by letting the timer insert a special "requesting channel" into the input gate's queue
        subpartitionView = checkAndWaitForSubpartitionView();
    }

    // 获取缓存数据
    BufferAndBacklog next = subpartitionView.getNextBuffer();

    if (next == null) {
        if (subpartitionView.isReleased()) {
            throw new CancelTaskException("Consumed partition " + subpartitionView + " has been released.");
        } else {
            return Optional.empty();
        }
    }

    // 更新已读取字节数
    numBytesIn.inc(next.buffer().getSize());
    // 更新以读取缓存数
    numBuffersIn.inc();
    return Optional.of(new BufferAndAvailability(next.buffer(), next.isMoreAvailable(), next.buffersInBacklog()));
}

以上是LocalInputChannelgetNextBuffer方法。下面我们分析下RemoteInputChannelgetNextBuffer方法。该方法和LocalInputChannel不同的是它从receivedBuffers队列中获取buffer,而不是直接从subpartitionView获取。

@Override
Optional<BufferAndAvailability> getNextBuffer() throws IOException {
    checkState(!isReleased.get(), "Queried for a buffer after channel has been closed.");
    checkState(partitionRequestClient != null, "Queried for a buffer before requesting a queue.");

    checkError();

    final Buffer next;
    final boolean moreAvailable;

    // 从receivedBuffers队列中获取buffer
    synchronized (receivedBuffers) {
        next = receivedBuffers.poll();
        moreAvailable = !receivedBuffers.isEmpty();
    }

    numBytesIn.inc(next.getSize());
    numBuffersIn.inc();
    return Optional.of(new BufferAndAvailability(next, moreAvailable, getSenderBacklog()));
}

接下来大家会问,receivedBuffers中的缓存数据是什么时候被加入的呢?答案在onBuffer方法。代码如下:

public void onBuffer(Buffer buffer, int sequenceNumber, int backlog) throws IOException {
    // 是否需要回收此buffer
    boolean recycleBuffer = true;

    try {

        final boolean wasEmpty;
        synchronized (receivedBuffers) {
            // Similar to notifyBufferAvailable(), make sure that we never add a buffer
            // after releaseAllResources() released all buffers from receivedBuffers
            // (see above for details).
            // 在releaseAllResources()调用之后无法在接收新的buffer
            if (isReleased.get()) {
                return;
            }

            // 检查sequenceNumber
            if (expectedSequenceNumber != sequenceNumber) {
                onError(new BufferReorderingException(expectedSequenceNumber, sequenceNumber));
                return;
            }

            // 判断添加buffer之前的队列是否为空
            wasEmpty = receivedBuffers.isEmpty();
            // 添加缓存数据到队列中
            receivedBuffers.add(buffer);
            // 已接收到数据,缓存不需要回收
            recycleBuffer = false;
        }

         // 增加SequenceNumber
        ++expectedSequenceNumber;

        // 如果添加buffer之前的队列为空,需要通知对应的inputGate,现在已经有数据了(不为空)
        if (wasEmpty) {
            notifyChannelNonEmpty();
        }

        if (backlog >= 0) {
            // 负责提前分配buffer
            onSenderBacklog(backlog);
        }
    } finally {
        // 回收buffer
        if (recycleBuffer) {
            buffer.recycleBuffer();
        }
    }
}

这里我们先看一下如何提前分配buffer的逻辑。代码如下:

void onSenderBacklog(int backlog) throws IOException {
    int numRequestedBuffers = 0;

    // 锁定bufferQueue
    synchronized (bufferQueue) {
        // Similar to notifyBufferAvailable(), make sure that we never add a buffer
        // after releaseAllResources() released all buffers (see above for details).
        // 避免在releaseAllResources()之后执行
        if (isReleased.get()) {
            return;
        }

        // backlog为后续所需的buffer(积压数量)
        // initialCredit为初始预留的buffer数量
        numRequiredBuffers = backlog + initialCredit;
        // 如果可用buffer数小于numRequiredBuffers 
        // 并且不在等待请求浮动Buffers的状态
        // 需要为bufferQueue增加浮动buffer
        while (bufferQueue.getAvailableBufferSize() < numRequiredBuffers && !isWaitingForFloatingBuffers) {
            // 申请一个buffer
            Buffer buffer = inputGate.getBufferPool().requestBuffer();
            if (buffer != null) {
                // 加入浮动buffer队列
                bufferQueue.addFloatingBuffer(buffer);
                numRequestedBuffers++;
            } else if (inputGate.getBufferProvider().addBufferListener(this)) {
                // If the channel has not got enough buffers, register it as listener to wait for more floating buffers.
                // 如果请求不到buffer(channel没有足够的buffer)
                // 注册一个监听器,并且标记等待请求浮动Buffers的状态为true
                isWaitingForFloatingBuffers = true;
                break;
            }
        }
    }

    // 如果本次操作请求的buffer数量大于0
    // unannouncedCredit为未告知上游生产者的credit,用于数据反压
    // 如果unannouncedCredit在增加numRequestedBuffers之前的值为0
    // 需要通知上游这里有credit,可以接收数据
    if (numRequestedBuffers > 0 && unannouncedCredit.getAndAdd(numRequestedBuffers) == 0) {
        notifyCreditAvailable();
    }
}

上面分析到如果请求buffer失败,会注册一个监听器。那么当监听器执行到buffer创建成功的时候执行什么方法呢?我们分析下notifyBufferAvailable方法。

@Override
public NotificationResult notifyBufferAvailable(Buffer buffer) {
    NotificationResult notificationResult = NotificationResult.BUFFER_NOT_USED;
    try {
        synchronized (bufferQueue) {
            // 保证必须在等待浮动buffer状态
            checkState(isWaitingForFloatingBuffers,
                "This channel should be waiting for floating buffers.");

            // Important: make sure that we never add a buffer after releaseAllResources()
            // released all buffers. Following scenarios exist:
            // 1) releaseAllResources() already released buffers inside bufferQueue
            // -> then isReleased is set correctly
            // 2) releaseAllResources() did not yet release buffers from bufferQueue
            // -> we may or may not have set isReleased yet but will always wait for the
            // lock on bufferQueue to release buffers
            // 确保没有release,并且可用buffer数量小于所需的buffer才执行后续流程
            if (isReleased.get() || bufferQueue.getAvailableBufferSize() >= numRequiredBuffers) {
                isWaitingForFloatingBuffers = false;
                return notificationResult;
            }
            // 添加浮动buffer
            bufferQueue.addFloatingBuffer(buffer);

            // 如果可用buffer数量和所需buffer数量一致,返回不再需要新的buffer
            if (bufferQueue.getAvailableBufferSize() == numRequiredBuffers) {
                isWaitingForFloatingBuffers = false;
                notificationResult = NotificationResult.BUFFER_USED_NO_NEED_MORE;
            } else {
                // 否则返回仍需新的buffer
                notificationResult = NotificationResult.BUFFER_USED_NEED_MORE;
            }
        }

        // 如果unannouncedCredit在加1之前为0,通知上游,下游可以接收数据
        if (unannouncedCredit.getAndAdd(1) == 0) {
            notifyCreditAvailable();
        }
    } catch (Throwable t) {
        setError(t);
    }
    return notificationResult;
}

下面我们回到onBuffer方法。继续跟踪,我们发现onBuffer方法在CreditBasedPartitionRequestClientHandlerdecodeBufferOrEvent方法中调用。这个方法负责处理接收到的数据。数据的类型可能为buffer或者event。代码如下:

private void decodeBufferOrEvent(RemoteInputChannel inputChannel, NettyMessage.BufferResponse bufferOrEvent) throws Throwable {
    try {
        ByteBuf nettyBuffer = bufferOrEvent.getNettyBuffer();
        final int receivedSize = nettyBuffer.readableBytes();
        // 如果是buffer
        if (bufferOrEvent.isBuffer()) {
            // ---- Buffer ------------------------------------------------

            // Early return for empty buffers. Otherwise Netty's readBytes() throws an
            // IndexOutOfBoundsException.
            // 如果收到字节数为0,调用RemoteInputChannel的onEmptyBuffer方法
            if (receivedSize == 0) {
                inputChannel.onEmptyBuffer(bufferOrEvent.sequenceNumber, bufferOrEvent.backlog);
                return;
            }

            // 请求一个空的buffer
            Buffer buffer = inputChannel.requestBuffer();
            if (buffer != null) {
                // 写入网络读取到的数据至buffer中
                nettyBuffer.readBytes(buffer.asByteBuf(), receivedSize);
                // 设置压缩
                buffer.setCompressed(bufferOrEvent.isCompressed);

                // 调用onBuffer处理方法
                inputChannel.onBuffer(buffer, bufferOrEvent.sequenceNumber, bufferOrEvent.backlog);
            } else if (inputChannel.isReleased()) {
                // 如果channel已经release,调用取消请求方法
                cancelRequestFor(bufferOrEvent.receiverId);
            } else {
                throw new IllegalStateException("No buffer available in credit-based input channel.");
            }
        } else {
            // 如果是事件(event),创建一个memSeg ,数据为event内容
            // 再把它包裹进networkBuffer对象,通过onBuffer方法教给RemoteInputChannel处理
            // ---- Event -------------------------------------------------
            // TODO We can just keep the serialized data in the Netty buffer and release it later at the reader
            byte[] byteArray = new byte[receivedSize];
            nettyBuffer.readBytes(byteArray);

            MemorySegment memSeg = MemorySegmentFactory.wrap(byteArray);
            Buffer buffer = new NetworkBuffer(memSeg, FreeingBufferRecycler.INSTANCE, false, receivedSize);

            inputChannel.onBuffer(buffer, bufferOrEvent.sequenceNumber, bufferOrEvent.backlog);
        }
    } finally {
        // 释放netty的buffer
        bufferOrEvent.releaseBuffer();
    }
}

继续追踪此方法的调用链到decodeMsg方法。该方法的部分源代码如下:

private void decodeMsg(Object msg) throws Throwable {
    final Class<?> msgClazz = msg.getClass();

    // ---- Buffer --------------------------------------------------------
    if (msgClazz == NettyMessage.BufferResponse.class) {
        NettyMessage.BufferResponse bufferOrEvent = (NettyMessage.BufferResponse) msg;

        // 获取接收此buffer的input channel
        RemoteInputChannel inputChannel = inputChannels.get(bufferOrEvent.receiverId);
        if (inputChannel == null) {
            bufferOrEvent.releaseBuffer();

            cancelRequestFor(bufferOrEvent.receiverId);

            return;
        }

        // 调用decodeBufferOrEvent方法
        decodeBufferOrEvent(inputChannel, bufferOrEvent);

    } else if (msgClazz == NettyMessage.ErrorResponse.class) {
        // ---- Error ---------------------------------------------------------
        // 剩余代码省略
    } else {
        throw new IllegalStateException("Received unknown message from producer: " + msg.getClass());
    }
}

继续追踪,我们到netty框架的channelRead方法。代码如下:

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    try {
        decodeMsg(msg);
    } catch (Throwable t) {
        notifyAllChannelsOfErrorAndClose(t);
    }
}

到此,整个数据的读取流程分析完毕。

本博客为作者原创,欢迎大家参与讨论和批评指正。如需转载请注明出处。

上一篇下一篇

猜你喜欢

热点阅读