Flink源码解析玩转大数据

Flink 消费 kafka 时如何处理空闲 task

2020-03-25  本文已影响0人  shengjk1

我们都知道 flink 消费 kafka 是一个 partition 对应一个 task,但比如说 flink task 数多于 kafka partition 时。flink 是如何处理这个空闲的 task 的。

@Override
    public void run(SourceContext<T> sourceContext) throws Exception {
        if (subscribedPartitionsToStartOffsets == null) {
            throw new Exception("The partitions were not set for the consumer");
        }

        // initialize commit metrics and default offset callback method
        this.successfulCommits = this.getRuntimeContext().getMetricGroup().counter(COMMITS_SUCCEEDED_METRICS_COUNTER);
        this.failedCommits =  this.getRuntimeContext().getMetricGroup().counter(COMMITS_FAILED_METRICS_COUNTER);

        // offset commit 的回调方法,当 notifyCheckpointComplete 时,会调用此方法
        this.offsetCommitCallback = new KafkaCommitCallback() {
            @Override
            public void onSuccess() {
                successfulCommits.inc();
            }

            @Override
            public void onException(Throwable cause) {
                LOG.warn("Async Kafka commit failed.", cause);
                failedCommits.inc();
            }
        };

        // mark the subtask as temporarily idle if there are no initial seed partitions;
        // once this subtask discovers some partitions and starts collecting records, the subtask's
        // status will automatically be triggered back to be active.
        //标记为该 task 为空闲状态。什么样的场景会被标记为空闲状态呢?当 Flink 的并行度大于 partitions 数时,有一个 task 就会被标记为空闲状态
        //标记为空闲状态时,就会通知下游,我不在发送任何 recode 和 watermarks,可以理解为我不存在
        if (subscribedPartitionsToStartOffsets.isEmpty()) {
            sourceContext.markAsTemporarilyIdle();
        }

        ......
    }

标记为空闲状态就完了

上一篇 下一篇

猜你喜欢

热点阅读