flink task之间的数据传输以及网络流控

2020-05-13  本文已影响0人  邵红晓

编译阶段生成JobGraph

image.png

运行阶段生成调度ExecutionGraph

image.png

task 数据之间的传输

image.png

数据交换机制的分析

数据交换从本质上来说就是一个典型的生产者-消费者模型,上游算子生产数据到 ResultPartition 中,下游算子通过 InputGate 消费数据。由于不同的 Task 可能在同一个 TaskManager 中运行,也可能在不同的 TaskManager 中运行:对于前者,不同的 Task 其实就是同一个 TaskManager 进程中的不同的线程,它们的数据交换就是在本地不同线程间进行的;对于后者,必须要通过网络进行通信,通过合理的设计和抽象,Flink 确保本地数据交换和通过网络进行数据交换可以复用同一套代码。

跨taskManager的反压

image.png

task输出

Task 产出的每一个 ResultPartition 都有一个关联的 ResultPartitionWriter,同时也都有一个独立的 LocalBufferPool负责提供写入数据所需的 buffer。ResultPartion 实现了 ResultPartitionWriter 接口

@Override
    public void setup() throws IOException {
        checkState(this.bufferPool == null, "Bug in result partition setup logic: Already registered buffer pool.");

        BufferPool bufferPool = checkNotNull(bufferPoolFactory.apply(this));
        checkArgument(bufferPool.getNumberOfRequiredMemorySegments() >= getNumberOfSubpartitions(),
            "Bug in result partition setup logic: Buffer pool has not enough guaranteed buffers for this result partition.");

        this.bufferPool = bufferPool;
//ResultPartitionManager
        partitionManager.registerResultPartition(this);
    }
    public static void setupPartitionsAndGates(
        ResultPartitionWriter[] producedPartitions, InputGate[] inputGates) throws IOException, InterruptedException {
        for (ResultPartitionWriter partition : producedPartitions) {
            partition.setup();
        }
        // InputGates must be initialized after the partitions, since during InputGate#setup
        // we are requesting partitions
        for (InputGate gate : inputGates) {
            gate.setup();
        }
    }
private void createSubpartitions(
            ResultPartition partition,
            ResultPartitionType type,
            BoundedBlockingSubpartitionType blockingSubpartitionType,
            ResultSubpartition[] subpartitions) {
        // Create the subpartitions.
        if (type.isBlocking()) {
            initializeBoundedBlockingPartitions(
                subpartitions,
                partition,
                blockingSubpartitionType,
                networkBufferSize,
                channelManager);
        } else {
            for (int i = 0; i < subpartitions.length; i++) {
                subpartitions[i] = new PipelinedSubpartition(i, partition);
            }
        }
    }
private static void initializeBoundedBlockingPartitions(
            ResultSubpartition[] subpartitions,
            ResultPartition parent,
            BoundedBlockingSubpartitionType blockingSubpartitionType,
            int networkBufferSize,
            FileChannelManager channelManager) {
        int i = 0;
        try {
            for (i = 0; i < subpartitions.length; i++) {
                final File spillFile = channelManager.createChannel().getPathFile();
                subpartitions[i] = blockingSubpartitionType.create(i, parent, spillFile, networkBufferSize);
            }
        }
        catch (IOException e) {
            throw new FlinkRuntimeException(e);
        }
    }

RecordWriter

Task 通过 RecordWriter 将结果写入 ResultPartition 中,主要流程
1.通过 ChannelSelector 确定写入的目标 channel
2.使用 RecordSerializer 对记录进行序列化
3.向 ResultPartition 请求 BufferBuilder,用于写入序列化结果
4.向 ResultPartition 添加 BufferConsumer,用于读取写入 Buffer 的数据

public abstract class RecordWriter<T extends IOReadableWritable> implements AvailabilityProvider {   
ChannelSelectorRecordWriter extends RecordWriter
//决定一条记录应该写入哪一个channel, 即 sub-partition
    private final ChannelSelector<T> channelSelector;
    //供每一个 channel 写入数据使用
    private final BufferBuilder[] bufferBuilders;       
    protected final ResultPartitionWriter targetPartition;
        //channel的数量,即 sub-partition的数量
    protected final int numberOfChannels;
    protected final RecordSerializer<T> serializer;
    protected final Random rng = new XORShiftRandom();
    private Counter numBytesOut = new SimpleCounter();
    private Counter numBuffersOut = new SimpleCounter();
    private final boolean flushAlways;
    /** The thread that periodically flushes the output, to give an upper latency bound. */
    @Nullable
    private final OutputFlusher outputFlusher;

ChannelSelectorRecordWriter#emit
public void emit(T record) throws IOException, InterruptedException {
        emit(record, channelSelector.selectChannel(record));
    }
protected void emit(T record, int targetChannel) throws IOException, InterruptedException {
        checkErroneous();
        serializer.serializeRecord(record);
        // Make sure we don't hold onto the large intermediate serialization buffer for too long
        if (copyFromSerializerToTargetChannel(targetChannel)) {
            serializer.prune();
        }
    }

protected boolean copyFromSerializerToTargetChannel(int targetChannel) throws IOException, InterruptedException {
        // We should reset the initial position of the intermediate serialization buffer before
        // copying, so the serialization results can be copied to multiple target buffers.
        serializer.reset();
        boolean pruneTriggered = false;
        BufferBuilder bufferBuilder = getBufferBuilder(targetChannel);
        SerializationResult result = serializer.copyToBufferBuilder(bufferBuilder);
            //buffer 写满了,调用 finishBufferBuilder方法
        while (result.isFullBuffer()) {
            finishBufferBuilder(bufferBuilder);
            // If this was a full record, we are done. Not breaking out of the loop at this point
            // will lead to another buffer request before breaking out (that would not be a
            // problem per se, but it can lead to stalls in the pipeline).
            if (result.isFullRecord()) {
                pruneTriggered = true;
                emptyCurrentBufferBuilder(targetChannel);
                break;
            }
            bufferBuilder = requestNewBufferBuilder(targetChannel);
            result = serializer.copyToBufferBuilder(bufferBuilder);
        }
        checkState(!serializer.hasSerializedData(), "All data should be written at once");

        if (flushAlways) {
            flushTargetPartition(targetChannel);
        }
        return pruneTriggered;
    }

public BufferBuilder requestNewBufferBuilder(int targetChannel) throws IOException, InterruptedException {
        checkState(bufferBuilders[targetChannel] == null || bufferBuilders[targetChannel].isFinished());
        //从 LocalBufferPool 中请求 BufferBuilder,就是上面提到的ResultPartition的bufferPool
        BufferBuilder bufferBuilder = targetPartition.getBufferBuilder();
        //添加一个BufferConsumer,用于读取写入到 MemorySegment 的数据
        targetPartition.addBufferConsumer(bufferBuilder.createBufferConsumer(), targetChannel);
        bufferBuilders[targetChannel] = bufferBuilder;
        return bufferBuilder;
    }

向 ResultPartition 添加一个 BufferConsumer, ResultPartition 会将其转交给对应的 ResultSubpartition,消费ResultSubpartition的数据
ResultPartition implements ResultPartitionWriter, BufferPoolOwner

public boolean addBufferConsumer(BufferConsumer bufferConsumer, int subpartitionIndex) throws IOException {
        checkNotNull(bufferConsumer);
        ResultSubpartition subpartition;
        try {
            checkInProduceState();
            subpartition = subpartitions[subpartitionIndex];
        }
        catch (Exception ex) {
            bufferConsumer.close();
            throw ex;
        }
        return subpartition.add(bufferConsumer);
    }

对于 Streaming 模式 PipelinedSubpartition#add 实现,通知taskmanager数据可用,可以消费,在强制进行 flush 的时候,也会发出数据可用的通知,这是因为,假如产出的数据记录较少无法完整地填充一个 MemorySegment,那么 ResultSubpartition 可能会一直处于不可被消费的状态,在 RecordWriter 中有一个 OutputFlusher 会定时触发 flush,间隔可以通过 DataStream.setBufferTimeout() 来控制。

    private boolean add(BufferConsumer bufferConsumer, boolean finish) {
        checkNotNull(bufferConsumer);
        final boolean notifyDataAvailable;
        synchronized (buffers) {
            if (isFinished || isReleased) {
                bufferConsumer.close();
                return false;
            }
            // Add the bufferConsumer and update the stats
            buffers.add(bufferConsumer);
            updateStatistics(bufferConsumer);
            increaseBuffersInBacklog(bufferConsumer);
            notifyDataAvailable = shouldNotifyDataAvailable() || finish;
            isFinished |= finish;
        }
        if (notifyDataAvailable) {
            notifyDataAvailable();
        }
        return true;
    }

private class OutputFlusher extends Thread {
        private final long timeout;
        private volatile boolean running = true;
        OutputFlusher(String name, long timeout) {
            super(name);
            setDaemon(true);
            this.timeout = timeout;
        }
        public void terminate() {
            running = false;
            interrupt();
        }
        @Override
        public void run() {
            try {
                while (running) {
                    try {
                        Thread.sleep(timeout);
                    } catch (InterruptedException e) {
                        // propagate this if we are still running, because it should not happen
                        // in that case
                        if (running) {
                            throw new Exception(e);
                        }
                    }
                    // any errors here should let the thread come to a halt and be
                    // recognized by the writer
                    flushAll();
                }
            } catch (Throwable t) {
                notifyFlusherException(t);
            }
        }
    }

task输入

前面已经介绍过,Task 的输入被抽象为 InputGate, 而 InputGate 则由 InputChannel 组成, InputChannel 和该 Task 需要消费的 ResultSubpartition 是一一对应的。如物理执行图所示

/**用于接收输入的缓冲池  Buffer pool for incoming buffers. Incoming data from remote channels is copied to buffers from this pool.*/
    private BufferPool bufferPool;
/** InputChannel 构成的队列,这些 InputChannel 中都有有可供消费的数据 Channels, which notified this input gate about available data. */
    private final ArrayDeque<InputChannel> inputChannelsWithData = new ArrayDeque<>();
/** The number of input channels (equivalent to the number of consumed partitions). */
    private final int numberOfInputChannels;

private Optional<BufferOrEvent> getNextBufferOrEvent(boolean blocking) throws IOException, InterruptedException {
        if (hasReceivedAllEndOfPartitionEvents) {
            return Optional.empty();
        }

        if (closeFuture.isDone()) {
            throw new CancelTaskException("Input gate is already closed.");
        }

        Optional<InputWithData<InputChannel, BufferAndAvailability>> next = waitAndGetNextData(blocking);
        if (!next.isPresent()) {
            return Optional.empty();
        }

        InputWithData<InputChannel, BufferAndAvailability> inputWithData = next.get();
        return Optional.of(transformToBufferOrEvent(
            inputWithData.data.buffer(),
            inputWithData.moreAvailable,
            inputWithData.input));
    }
private Optional<InputWithData<InputChannel, BufferAndAvailability>> waitAndGetNextData(boolean blocking)
            throws IOException, InterruptedException {
        while (true) {
            Optional<InputChannel> inputChannel = getChannel(blocking);
            if (!inputChannel.isPresent()) {
                return Optional.empty();
            }

            // Do not query inputChannel under the lock, to avoid potential deadlocks coming from
            // notifications.
            Optional<BufferAndAvailability> result = inputChannel.get().getNextBuffer();

            synchronized (inputChannelsWithData) {
                if (result.isPresent() && result.get().moreAvailable()) {
                    // enqueue the inputChannel at the end to avoid starvation
                    inputChannelsWithData.add(inputChannel.get());
                    enqueuedInputChannelsWithData.set(inputChannel.get().getChannelIndex());
                }

                if (inputChannelsWithData.isEmpty()) {
                    availabilityHelper.resetUnavailable();
                }

                if (result.isPresent()) {
                    return Optional.of(new InputWithData<>(
                        inputChannel.get(),
                        result.get(),
                        !inputChannelsWithData.isEmpty()));
                }
            }
        }
    }
从inputChannelsWithData  ArrayDeque 里获取有数据的channel
private Optional<InputChannel> getChannel(boolean blocking) throws InterruptedException {
        synchronized (inputChannelsWithData) {
            while (inputChannelsWithData.size() == 0) {
                if (closeFuture.isDone()) {
                    throw new IllegalStateException("Released");
                }

                if (blocking) {
 // 如果没有有数据的channel,则当前线程wait,阻塞
                    inputChannelsWithData.wait();
                }
                else {
                    availabilityHelper.resetUnavailable();
                    return Optional.empty();
                }
            }

            InputChannel inputChannel = inputChannelsWithData.remove();
            enqueuedInputChannelsWithData.clear(inputChannel.getChannelIndex());
            return Optional.of(inputChannel);
        }
    }

    //当一个 InputChannel 有数据时的回调,这个就是在 rs 通知数据可用时候调用的函数
    void notifyChannelNonEmpty(InputChannel channel) {
        queueChannel(checkNotNull(channel));
    }

private void queueChannel(InputChannel channel) {
        int availableChannels;
        CompletableFuture<?> toNotify = null;
        synchronized (inputChannelsWithData) {
            if (enqueuedInputChannelsWithData.get(channel.getChannelIndex())) {
                return;
            }
            availableChannels = inputChannelsWithData.size();
// 添加有数据的channel
            inputChannelsWithData.add(channel);
            enqueuedInputChannelsWithData.set(channel.getChannelIndex());
            if (availableChannels == 0) {
// 让刚才getchannel阻塞的线程被唤醒,消费channel
                inputChannelsWithData.notifyAll();
                toNotify = availabilityHelper.getUnavailableToResetAvailable();
            }
        }
        if (toNotify != null) {
            toNotify.complete(null);
        }
    }
启动inputgate
public void setup() throws IOException, InterruptedException {
        checkState(this.bufferPool == null, "Bug in input gate setup logic: Already registered buffer pool.");
        // assign exclusive buffers to input channels directly and use the rest for floating buffers
        assignExclusiveSegments();

        BufferPool bufferPool = bufferPoolFactory.get();
        setBufferPool(bufferPool);
    //请求分区
        requestPartitions();
    }

void requestPartitions() throws IOException, InterruptedException {
        synchronized (requestLock) {
            if (!requestedPartitionsFlag) {
                if (closeFuture.isDone()) {
                    throw new IllegalStateException("Already released.");
                }

                // Sanity checks
                if (numberOfInputChannels != inputChannels.size()) {
                    throw new IllegalStateException(String.format(
                        "Bug in input gate setup logic: mismatch between " +
                        "number of total input channels [%s] and the currently set number of input " +
                        "channels [%s].",
                        inputChannels.size(),
                        numberOfInputChannels));
                }
// 遍历inputChannels,请求NettyConnectionManager ,下面讲解
                for (InputChannel inputChannel : inputChannels.values()) {
                    inputChannel.requestSubpartition(consumedSubpartitionIndex);
                }
            }

            requestedPartitionsFlag = true;
        }
    }

RemoteInputChannel # requestSubpartition
    public void requestSubpartition(int subpartitionIndex) throws IOException, InterruptedException {
        if (partitionRequestClient == null) {
            // Create a client and request the partition
            try {
// connectionManager 对象就是基于netty的
                partitionRequestClient = connectionManager.createPartitionRequestClient(connectionId);
            } catch (IOException e) {
                // IOExceptions indicate that we could not open a connection to the remote TaskExecutor
                throw new PartitionConnectionException(partitionId, e);
            }

            partitionRequestClient.requestSubpartition(partitionId, subpartitionIndex, this, 0);
        }
    }

InputChannel 的基本逻辑比较简单,它的生命周期按照 requestSubpartition(int subpartitionIndex), getNextBuffer() 和 releaseAllResources() 这样的顺序进行。

通过网络进行数据交换

NettyConnectionManager.java
@Override
    public int start() throws IOException {
        client.init(nettyProtocol, bufferPool);

        return server.init(nettyProtocol, bufferPool);
    }
NettyServer.java#init
当 RemoteInputChannel 请求一个远端的 ResultSubpartition 的时候,NettyClient 就会发起和请求的 
ResultSubpartition 所在 Task 的 NettyServer 的连接,后续所有的数据交换都在这个连接上进行。两个 Task 
之间只会建立一个连接,这个连接会在不同的 RemoteInputChannel 和 ResultSubpartition 之间进行复用
private void initNioBootstrap() {
        // Add the server port number to the name in order to distinguish
        // multiple servers running on the same host.
        String name = NettyConfig.SERVER_THREAD_GROUP_NAME + " (" + config.getServerPort() + ")";

        NioEventLoopGroup nioGroup = new NioEventLoopGroup(config.getServerNumThreads(), getNamedThreadFactory(name));
        bootstrap.group(nioGroup).channel(NioServerSocketChannel.class);
    }
netty级别的水位线,反压机制,配置水位线,确保不往网络中写入太多数据
1.当输出缓冲中的字节数超过高水位值, 则 Channel.isWritable() 会返回false
2.当输出缓存中的字节数低于低水位值, 则 Channel.isWritable() 会重新返回true
final int defaultHighWaterMark = 64 * 1024; // from DefaultChannelConfig (not exposed)
        final int newLowWaterMark = config.getMemorySegmentSize() + 1;
        final int newHighWaterMark = 2 * config.getMemorySegmentSize();
        if (newLowWaterMark > defaultHighWaterMark) {
            bootstrap.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, newHighWaterMark);
            bootstrap.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, newLowWaterMark);
        } else { // including (newHighWaterMark < defaultLowWaterMark)
            bootstrap.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, newLowWaterMark);
            bootstrap.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, newHighWaterMark);
        }

RemoteInputChannel # requestSubpartition#partitionRequestClient = connectionManager.createPartitionRequestClient(connectionId)实现调用

public class NettyConnectionManager implements ConnectionManager {
@Override
    public PartitionRequestClient createPartitionRequestClient(ConnectionID connectionId)
            throws IOException, InterruptedException {
        return partitionRequestClientFactory.createPartitionRequestClient(connectionId);
    }

taskManager内部的反压

image.png

flink 动态反压实现

Credit-based Flow Control 的具体机制为:

1.接收端向发送端声明可用的 Credit(一个可用的 buffer 对应一点 credit);
2.当发送端获得了 X 点 Credit,表明它可以向网络中发送 X 个 buffer;当接收端分配了 X 点 Credit 给发送端,表明它有 X 个空闲的 buffer 可以接收数据;
3.只有在 Credit > 0 的情况下发送端才发送 buffer;发送端每发送一个 buffer,Credit 也相应地减少一点
由于 CheckpointBarrier,EndOfPartitionEvent 等事件可以被立即处理,因而事件可以立即发送,无需使用 Credit
4.当发送端发送 buffer 的时候,它同样把当前堆积的 buffer 数量(backlog size)告知接收端;接收端根据发送端堆积的数量来申请 floating buffer

SingleInputGate#setup()
@Override
    public void setup() throws IOException, InterruptedException {
        checkState(this.bufferPool == null, "Bug in input gate setup logic: Already registered buffer pool.");
// 请求独占的 buffer assign exclusive buffers to input channels directly and use the rest for floating buffers
        assignExclusiveSegments();

        BufferPool bufferPool = bufferPoolFactory.get();
//分配 LocalBufferPool 本地缓冲池,这是所有 channel 共享的
        setBufferPool(bufferPool);

        requestPartitions();
    }

/*** Assign the exclusive buffers to all remote input channels directly for credit-based mode.*/
    @VisibleForTesting
    public void assignExclusiveSegments() throws IOException {
        synchronized (requestLock) {
            for (InputChannel inputChannel : inputChannels.values()) {
                if (inputChannel instanceof RemoteInputChannel) {
                    ((RemoteInputChannel) inputChannel).assignExclusiveSegments();
                }
            }
        }
    }
public class RemoteInputChannel extends InputChannel implements BufferRecycler, BufferListener {
public void onBuffer(Buffer buffer, int sequenceNumber, int backlog) throws IOException {
        boolean recycleBuffer = true;

        try {

            final boolean wasEmpty;
            synchronized (receivedBuffers) {
                // Similar to notifyBufferAvailable(), make sure that we never add a buffer
                // after releaseAllResources() released all buffers from receivedBuffers
                // (see above for details).
                if (isReleased.get()) {
                    return;
                }

                if (expectedSequenceNumber != sequenceNumber) {
                    onError(new BufferReorderingException(expectedSequenceNumber, sequenceNumber));
                    return;
                }

                wasEmpty = receivedBuffers.isEmpty();
                receivedBuffers.add(buffer);
                recycleBuffer = false;
            }

            ++expectedSequenceNumber;

            if (wasEmpty) {
// 通知input gate channel不是空的
                notifyChannelNonEmpty();
            }

            if (backlog >= 0) {
    //根据客户端的积压申请float buffer
                onSenderBacklog(backlog);
            }
        } finally {
            if (recycleBuffer) {
                buffer.recycleBuffer();
            }
        }
    }

对应SingleInputGate#notifyChannelNonEmpty
void notifyChannelNonEmpty(InputChannel channel) {
        queueChannel(checkNotNull(channel));
    }

/*** Receives the backlog from the producer's buffer response. If the number of available
     * buffers is less than backlog + initialCredit, it will request floating buffers from the buffer
     * pool, and then notify unannounced credits to the producer.
    backlog 是发送端的堆积 的 buffer 数量
    如果 bufferQueue 中 buffer 的数量不足,就去须从 LocalBufferPool 中请求 floating buffer
    在请求了新的 buffer 后,通知生产者有 credit 可用
     * @param backlog The number of unsent buffers in the producer's sub partition.
     */
    void onSenderBacklog(int backlog) throws IOException {
        int numRequestedBuffers = 0;

        synchronized (bufferQueue) {
            // Similar to notifyBufferAvailable(), make sure that we never add a buffer
            // after releaseAllResources() released all buffers (see above for details).
            if (isReleased.get()) {
                return;
            }

            numRequiredBuffers = backlog + initialCredit;
            while (bufferQueue.getAvailableBufferSize() < numRequiredBuffers && !isWaitingForFloatingBuffers) {
                Buffer buffer = inputGate.getBufferPool().requestBuffer();
                if (buffer != null) {
                    bufferQueue.addFloatingBuffer(buffer);
                    numRequestedBuffers++;
                } else if (inputGate.getBufferProvider().addBufferListener(this)) {
                    // If the channel has not got enough buffers, register it as listener to wait for more floating buffers.
                    isWaitingForFloatingBuffers = true;
                    break;
                }
            }
        }

        if (numRequestedBuffers > 0 && unannouncedCredit.getAndAdd(numRequestedBuffers) == 0) {
            notifyCreditAvailable();
        }
    }

RemoteInputChannel.java 管理
    private static class AvailableBufferQueue {

        /** The current available floating buffers from the fixed buffer pool. */
        private final ArrayDeque<Buffer> floatingBuffers;

        /** The current available exclusive buffers from the global buffer pool. */
        private final ArrayDeque<Buffer> exclusiveBuffers;

flink wiki文档
https://cwiki.apache.org/confluence/display/FLINK/Data+exchange+between+tasks
参考
https://blog.jrwang.me/2019/flink-source-code-data-exchange/#%E6%A6%82%E8%A7%88
https://blog.csdn.net/yidan7063/article/details/90260434
https://ververica.cn/developers/flink-network-protocol/
Task 和 OperatorChain
https://blog.jrwang.me/2019/flink-source-code-task-lifecycle/#task-%E5%92%8C-operatorchain

上一篇 下一篇

猜你喜欢

热点阅读