20171103周报

2017-11-03  本文已影响23人  Bill_Lin

本周工作内容:

下面对FLink进程间通信与数据传输所形成的执行图进行一个分析。
FLink的执行图,分为四层:StreamGraph -> JobGraph -> ExecutionGraph -> 物理执行图。

FLink执行图

最后涉及到底层代码实现的物理执行图,有两个核心的概念ResultPartition和InputGate,下面分别进行分析。

ResultPartition

在最后演变出来的物理图中可以看到,FLink的Runtime中,使用结果分区(ResultPartition)来表示任务的子任务实例所生产的数据,在执行图中等价于jobmanager的IntermediateResultPartition。IntermediateResultPartition主要用于JobManager组织作业图的一种逻辑数据结构,ResultPartition是运行时的一种逻辑概念,两者处于不同的层面。任何想消费ResultPartition的任务,最终都是请求ResultPartition的某个ResultSubPartition。而请求要么是同一TaskManager中的本地请求,要么是来自另外一个TaskManager中的消费子任务实例发起的远程请求。

其中,结果分区编号(ResultPartitionID)用来标识ResultPartition。ResultPartitionID关联着IntermediateResultPartitionID(也即调度时的分区编号)和ExecutionAttemptID(部署时的生产者子任务实例编号)。在任务失败时,单靠IntermediateResultPartitionID无法鉴别ResultPartition,必须结合ExecutionAttemptID一起鉴别。
结果分区编号(ResultPartitionID)用来标识ResultPartition。在任务失败时,单靠IntermediateResultPartitionID无法鉴别ResultPartition,必须结合ExecutionAttemptID一起鉴别。

InputGate

一个InputGate包含多个输入通道(InputChannel),输入通道用于请求ResultSubpartitionView,并从中消费数据。

所谓的ResultSubpartitionView是由ResultSubpartition所创建的用于供消费者任务消费数据的视图对象。
对于每个InputChannel,消费的生命周期会经历如下的方法调用过程:

requestSubpartition:请求ResultSubpartition;
getNextBuffer:获得下一个Buffer;
releaseAllResources:释放所有的相关资源;
以getNextBufferOrEvent方法为主线来分析SingleInputGate类。SingleInputGate是消费ResultPartition的实体.

public BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedException {
    //如果已接收到所有EndOfPartitionEvent事件,则说明每个ResultSubpartition中的数据都被消费完成
    if (hasReceivedAllEndOfPartitionEvents) {
        return null;
    }

    //触发所有的输入通道向ResultSubpartition发起请求
    requestPartitions();

    InputChannel currentChannel = null;
    //阻塞并循环等待有可获取数据的通道可用
    while (currentChannel == null) {
        if (isReleased) {
            throw new IllegalStateException("Released");
        }

        //从阻塞队列中请求队首的输入通道,阻塞两秒钟,如果没有获取到则不断请求,直到获取到一个输入通道位置
        currentChannel = inputChannelsWithData.poll(2, TimeUnit.SECONDS);
    }

    //从输入通道中获得下一个Buffer
    final Buffer buffer = currentChannel.getNextBuffer();

    if (buffer == null) {
        throw new IllegalStateException("Bug in input gate/channel logic: input gate got " +
            "notified by channel about available data, but none was available.");
    }

    //如果该Buffer是用户数据,则构建BufferOrEvent对象并返回
    if (buffer.isBuffer()) {
        return new BufferOrEvent(buffer, currentChannel.getChannelIndex());
    }
    //否则把它当作事件来处理
    else {
        final AbstractEvent event = EventSerializer.fromBuffer(buffer, getClass().getClassLoader());

        //如果获取到的是标识某ResultSubpartition已经生产完数据的事件
        if (event.getClass() == EndOfPartitionEvent.class) {
            //对获取该ResultSubpartition的通道进行标记
            channelsWithEndOfPartitionEvents.set(currentChannel.getChannelIndex());
            //如果所有信道都被标记了,置全部通道获取数据完成
            if (channelsWithEndOfPartitionEvents.cardinality() == numberOfInputChannels) {
                hasReceivedAllEndOfPartitionEvents = true;
            }
            //对外发出ResultSubpartition已被消费的通知同时释放资源
            currentChannel.notifySubpartitionConsumed();
            currentChannel.releaseAllResources();
        }
        //以事件来构建BufferOrEvent对象
        return new BufferOrEvent(event, currentChannel.getChannelIndex());
    }
}

InputChannel根据ResultPartitionLocation提供了三种实现:

LocalInputChannel:用于请求同实例中生产者任务所生产的ResultSubpartitionView的输入通道;
RemoteInputChannel:用于请求远程生产者任务所生产的ResultSubpartitionView的输入通道;
UnknownInputChannel:一种用于占位目的的输入通道,需要占位通道是因为暂未确定相对于生产者任务位置,但最终要么被替换为RemoteInputChannel,要么被替换为LocalInputChannel。

小结:结果子分区中存有真正的数据,会根据结果分区来被消费端消费,不同位置的消费方式不一样,有LOCAL、REMOTE和UNKOWN三种。管理结果分区的是结果分区管理器ResultPartitionManager,一个NetworkEnvironment下只有一个。

上一篇下一篇

猜你喜欢

热点阅读