20171103周报
本周工作内容:
- 继续研究下FLink transfer latency,主要是学习相关的生产者消费者通信机制和数据传输方式,了解相关接口来编写测试topology。
- 按项目的要求重新部署了集群环境,期间浪费了不少时间,不熟练
- 了解项目功能模块,尝试编写测试样例
- 整理下掌握的FLink总体架构与知识点
下面对FLink进程间通信与数据传输所形成的执行图进行一个分析。
FLink的执行图,分为四层:StreamGraph -> JobGraph -> ExecutionGraph -> 物理执行图。
- StreamGraph:是根据用户通过 Stream API 编写的代码生成的最初的图。用来表示程序的拓扑结构。
- JobGraph:StreamGraph经过优化后生成了 JobGraph,提交给 JobManager 的数据结构。主要的优化为,将多个符合条件的节点 chain 在一起作为一个节点,这样可以减少数据在节点之间流动所需要的序列化/反序列化/传输消耗。
- ExecutionGraph:JobManager 根据 JobGraph 生成ExecutionGraph。ExecutionGraph是JobGraph的并行化版本,是调度层最核心的数据结构。
- 物理执行图:JobManager 根据 ExecutionGraph 对 Job 进行调度后,在各个TaskManager 上部署 Task 后形成的“图”,并不是一个具体的数据结构。用SocketTextStreamWordCount举例,四层执行图的演变过程如下图所示。
最后涉及到底层代码实现的物理执行图,有两个核心的概念ResultPartition和InputGate,下面分别进行分析。
ResultPartition
在最后演变出来的物理图中可以看到,FLink的Runtime中,使用结果分区(ResultPartition)来表示任务的子任务实例所生产的数据,在执行图中等价于jobmanager的IntermediateResultPartition。IntermediateResultPartition主要用于JobManager组织作业图的一种逻辑数据结构,ResultPartition是运行时的一种逻辑概念,两者处于不同的层面。任何想消费ResultPartition的任务,最终都是请求ResultPartition的某个ResultSubPartition。而请求要么是同一TaskManager中的本地请求,要么是来自另外一个TaskManager中的消费子任务实例发起的远程请求。
其中,结果分区编号(ResultPartitionID)用来标识ResultPartition。ResultPartitionID关联着IntermediateResultPartitionID(也即调度时的分区编号)和ExecutionAttemptID(部署时的生产者子任务实例编号)。在任务失败时,单靠IntermediateResultPartitionID无法鉴别ResultPartition,必须结合ExecutionAttemptID一起鉴别。
结果分区编号(ResultPartitionID)用来标识ResultPartition。在任务失败时,单靠IntermediateResultPartitionID无法鉴别ResultPartition,必须结合ExecutionAttemptID一起鉴别。
InputGate
一个InputGate包含多个输入通道(InputChannel),输入通道用于请求ResultSubpartitionView,并从中消费数据。
所谓的ResultSubpartitionView是由ResultSubpartition所创建的用于供消费者任务消费数据的视图对象。
对于每个InputChannel,消费的生命周期会经历如下的方法调用过程:
requestSubpartition:请求ResultSubpartition;
getNextBuffer:获得下一个Buffer;
releaseAllResources:释放所有的相关资源;
以getNextBufferOrEvent方法为主线来分析SingleInputGate类。SingleInputGate是消费ResultPartition的实体.
public BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedException {
//如果已接收到所有EndOfPartitionEvent事件,则说明每个ResultSubpartition中的数据都被消费完成
if (hasReceivedAllEndOfPartitionEvents) {
return null;
}
//触发所有的输入通道向ResultSubpartition发起请求
requestPartitions();
InputChannel currentChannel = null;
//阻塞并循环等待有可获取数据的通道可用
while (currentChannel == null) {
if (isReleased) {
throw new IllegalStateException("Released");
}
//从阻塞队列中请求队首的输入通道,阻塞两秒钟,如果没有获取到则不断请求,直到获取到一个输入通道位置
currentChannel = inputChannelsWithData.poll(2, TimeUnit.SECONDS);
}
//从输入通道中获得下一个Buffer
final Buffer buffer = currentChannel.getNextBuffer();
if (buffer == null) {
throw new IllegalStateException("Bug in input gate/channel logic: input gate got " +
"notified by channel about available data, but none was available.");
}
//如果该Buffer是用户数据,则构建BufferOrEvent对象并返回
if (buffer.isBuffer()) {
return new BufferOrEvent(buffer, currentChannel.getChannelIndex());
}
//否则把它当作事件来处理
else {
final AbstractEvent event = EventSerializer.fromBuffer(buffer, getClass().getClassLoader());
//如果获取到的是标识某ResultSubpartition已经生产完数据的事件
if (event.getClass() == EndOfPartitionEvent.class) {
//对获取该ResultSubpartition的通道进行标记
channelsWithEndOfPartitionEvents.set(currentChannel.getChannelIndex());
//如果所有信道都被标记了,置全部通道获取数据完成
if (channelsWithEndOfPartitionEvents.cardinality() == numberOfInputChannels) {
hasReceivedAllEndOfPartitionEvents = true;
}
//对外发出ResultSubpartition已被消费的通知同时释放资源
currentChannel.notifySubpartitionConsumed();
currentChannel.releaseAllResources();
}
//以事件来构建BufferOrEvent对象
return new BufferOrEvent(event, currentChannel.getChannelIndex());
}
}
InputChannel根据ResultPartitionLocation提供了三种实现:
LocalInputChannel:用于请求同实例中生产者任务所生产的ResultSubpartitionView的输入通道;
RemoteInputChannel:用于请求远程生产者任务所生产的ResultSubpartitionView的输入通道;
UnknownInputChannel:一种用于占位目的的输入通道,需要占位通道是因为暂未确定相对于生产者任务位置,但最终要么被替换为RemoteInputChannel,要么被替换为LocalInputChannel。
小结:结果子分区中存有真正的数据,会根据结果分区来被消费端消费,不同位置的消费方式不一样,有LOCAL、REMOTE和UNKOWN三种。管理结果分区的是结果分区管理器ResultPartitionManager,一个NetworkEnvironment下只有一个。