flink

Flink源码5- task执行及state和checkpoin

2020-12-27  本文已影响0人  fat32jin

一 、 task执行入口 0:15

接 上期 回顾
★ ——7 》 TaskExecutor#submitTask()

第一个入口:Task 构造函数
——》Task 构造函数()

* 注释: 当前任务的 Task 信息
*/
this.taskInfo = new TaskInfo()
......
* 注释: 初始化 ResultPartition 和 ResultSubpartition 关于输出的抽象
* ResultSubpartition具体实现为 PipelinedSubpartition
*/
final ResultPartitionWriter[] resultPartitionWriters = shuffleEnvironment
.createResultPartitionWriters()
this.consumableNotifyingPartitionWriters =
ConsumableNotifyingResultPartitionWriterDecorator
......
* 注释: 初始化 InputGate 输入的 对象,
inputchanle 从上游一个task节点拉取数据
*/
final IndexedInputGate[] gates = shuffleEnvironment.createInputGates()
* 注释:创建 Task 的线程 但是不执行 run() 方法
*/
executingThread = new Thread(TASK_THREADS_GROUP,

第二个入口:task.startTaskThread(); 通过一个线程来运行 Task 0:45
—— 》Task #run()
—— ★》Task #dorun()

Task run过程

★ 重要步骤 反射实例化StreamTask实例


image.png

dorun:1 if(transitionState(ExecutionState.CREATED, ExecutionState.DEPLOYING))
dorun:2 setupPartitionsAndGates(consumableNotifyingPartitionWriters,
dorun:3 Environment env = new RuntimeEnvironment(jobId, vertexId,
dorun:4 invokable = loadAndInstantiateInvokable(userCodeClassLoader,
dorun: 5 invokable.invoke();

—— dorun:4》Task #loadAndInstantiateInvokable()

反射调用构造函数
statelessCtor = invokableClass.getConstructor(Environment.class);

//#1 SourceStreamTask 带RuntimeEnvvironment的构造函数
//#2 OneInputStreamTask 带RuntimeEnvvironment的构造函数

Constructor<? extends AbstractInvokable> statelessCtor;
try {
statelessCtor = invokableClass.getConstructor(Environment.class);
}

——#1 SourceStreamTask(Environment env) 构造函数
——#2 OneInputStreamTask (Environment env) 构造函数

               //1,2最后都到 父类构造函数 StreamTask()  

    ——★  》 父类构造函数 StreamTask()
           【streamtask 截图或者笔记】
           4件大事

StreamTask:1 this.recordWriter = createRecordWriterDelegate(configuration,
StreamTask:2 this.mailboxProcessor = new MailboxProcessor(this::processInput,
StreamTask:3 this.stateBackend = createStateBackend();
StreamTask:4 this.subtaskCheckpointCoordinator = new

—— StreamTask:1 》 StreamTask. createRecordWriterDelegate 1:04
——》1》StreamTask.createRecordWriters()
——》1》StreamTask.createRecordWriter()
——》1》StreamTask.createRecordWriters

// TODO_MA 注释: 初始化一个 ArrayList 容器用来存放创建出来的 RecordWriter
List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>
// TODO_MA 注释: 获取该 StreamTask 的输出 StreamEdge 集合
List<StreamEdge> outEdgesInOrder = configuration.getOutEdgesInOrder
// TODO_MA 注释: 一个 out StreamEdge 来构建 一个 RecordWriter
// TODO_MA 注释: 大概率 createRecordWriter() 方法的返回值是:
ChannelSelectorRecordWriter
recordWriters.add(createRecordWriter(edge, i,
——》1》StreamTask.createRecordWriters()

StreamTask:2——》 this.mailboxProcessor = new
MailboxProcessor(this::processInput,
//#1 SourceStreamTask 带RuntimeEnvvironment的构造函数
//#2 OneInputStreamTask 带RuntimeEnvvironment的构造函数
▲回到 父类构造函数 StreamTask ▲

StreamTask:3 ——》 this.stateBackend = createStateBackend(); 1:26 ~
state简介 部分

——》 StreamTask.createStateBackend() 1:36

* 注释: 根据配置获取 StateBackend
* 一般情况下,我们在生产环境中, 在 flink-conf.yaml 文件中进行配置:
* 1、state.backend: filesystem
* 2、state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints
* 一般有三种方式:
* 1、state.backend: filesystem = FsStateBackend
* 2、state.backend: jobmanager = MemoryStateBackend
* 3、state.backend: rocksdb = RocksDBStateBackend
* 也可以在程序中,进行设置 这种方式会覆盖配置文件中的配置:
* StreamExecutionEnvironment.setStateBackend(StateBackend backend)
*/
return StateBackendLoader.fromApplicationOrConfigOrDefault
▲ 回 父类构造函数 StreamTask▲

******创建 CheckpointStorage
* 1、FsStateBackend = FsCheckpointStorage
*/
stateBackend.createCheckpointStorage(getEnvironment()

FsStateBackendcreateCheckpointStorage

****** 注释: 创建 Channel 的 IO 线程池
*/
this.channelIOExecutor = Executors.newSingleThreadExecutor
▲ 回 父类构造函数 StreamTask▲

               ▲回     ★》Task #dorun()下 的  dorun: 5   ▲ 

*** dorun: 5 invokable.invoke();
* 注释: 运行任务, 在流式应用程序中,都是 StreamTask 的子类
* AbstractInvokable 是 Task 执行的主要逻辑,也是所有被执行的任务的基类,包括 Streaming 模式和 Batch 模式。
* 在 Streaming 模式下,所有任务都继承自 StreamTask,
* 包括 StreamTask 的子类包括 SourceStreamTask, OneInputStreamTask, TwoInputStreamTask,
* 以及用于迭代模式下的 StreamIterationHead 和 StreamIterationTail。
* 每一个 StreamNode 在添加到 StreamGraph 的时候都会有一个关联的 jobVertexClass 属性,
* 这个属性就是该 StreamNode 对应的 StreamTask 类型;对于一个 OperatorChain 而言,它所对应的
* StreamTask 就是其 head operator 对应的 StreamTask。
*/
// run the invokable
invokable.invoke();

——》StreamTask#invoke()

inv 1: beforeInvoke()
inv 2: runMailboxLoop();
inv 3: afterInvoke();
inv 4: cleanUpInvoke();

——inv 1 》StreamTask#beforeInvoke()

// TODO_MA 注释: tailOperatorWrapper 在 ArrayList 的最前面
this.tailOperatorWrapper = allOpWrappers.get(0);

// TODO_MA 注释: 获取 OperatorChain 的第一个 Operator
// TODO_MA 注释: 可以认为 接收数据线程中,用到的 headOpeartor 终于被初始化
// TODO_MA 注释: 其实到此为止,可以认为,在当前 OperatorChain 中要用到的各种组件都已经创建好了,可以接收数据,然后开始流式处理了。
headOperator = operatorChain.getHeadOperator();

----------------------至此 所有准备工作完成, 准备真正执行task数据流处理到此为止,Task 初始化和预执行相关的,都基本到位了,然后就开始从我们的 SourceStreamTask 的HeadOperator 的数据接收线程,开始流式处理。-------------

中场休息

▲回到 inv 2: runMailboxLoop(); 开始 2:17

inv 2: runMailboxLoop();

——inv 2》MailboxProcessor.runMailboxLoop()

—— 跳转到父类入口 》SourceStreamTask#LegacySourceFunctionThread.run()
headOperator.run(lock,
——★ 》StreamSource#run()

// TODO_MA 注释: 没有数据,则阻塞在这儿
// TODO_MA 注释: 在 SourceStreamTask 初始化的时候,SourceThread 的代码能执行到这儿
while (isRunning && (bytesRead = reader.read(cbuf)) != -1) {
// TODO_MA 注释: 一直读到有分隔符
while (buffer.length() >= delimiter.length()
.......
***** 把读取到的数据传给ctx 上下文处理****
ctx.collect(record);
↓ 选择NonTimestamp - 时间戳为 processtime
StreamSourceContexts#NonTimestampContext#collect()

* output = CountingOutput
* -
* element 收到的 一条数据
* reuse 装一条数据,待序列化的容器
* collect 执行 collect 会对 reuse 对象执行序列化
*/
output.collect(reuse.replace(element));

—— 》 CountingOutput.collect()
* output = ChainingOutput
*/
output.collect(record);

—— 》OperatorChain#ChainingOutput.collect()
—— 》 pushToOperator(record);

—— 》 ChannelSelectorRecordWriter.copyFromSerializerToTargetChannel()
flushTargetPartition(targetChannel);
↓ 一般不是广播
—— 》ResultPartitionWriter#flushTargetPartition()
* 注释: flush 到对应的 ResultPartition 中
* targetChannel = InputChannel
* targetPartition = ResultPartition
*/
targetPartition.flush(targetChannel);

ResultPartition.flush()

PipelinedSubpartition.flush()
—— 》 PipelinedSubpartition# notifyDataAvailable();

// TODO_MA 注释: readView 是 ResultSubPartition 的消费者视图 对象
// TODO_MA 注释: 下游的一个Task 可能会消费上游的多个Task的某一个分区的数据。
/ TODO_MA 注释: 上游个任意一个Task的任意一个分区叫做: ResultSubPartition,
// TODO_MA 注释: 这个 ResultSubPartition 对应一个消费者:
PipelinedSubpartitionView

             readView.notifyDataAvailable();

—— 》PipelinedSubpartitionView#notifyDataAvailable()

—— 》LocalInputChannel#notifyDataAvailable()
* 注释:
*/
inputGate.notifyChannelNonEmpty(this);
—— 》SingleInputGate.notifyChannelNonEmpty()
* 注释: 某个 channel 有可写入数了,该干活了。
*/
queueChannel(checkNotNull(channel));
—— 》SingleInputGate.queueChannel()

以上线程会接到 steamTask启动时wait的 线程传到这个方法*
—— 》SingleInputGate.getChannel()
——主数据处理执行这个 》 StreamTaskNetworkInput.emitNext()
—— 输出》 OneInputStreamTask#StreamTaskNetworkOutput.emitRecord()

上一篇 下一篇

猜你喜欢

热点阅读