关于FlinkKafkaConsumer中增加消费延时的metr
1、简介及目的
Flink Kafka Consumer是一款流式数据源,可以从Apache Kafka 0.8.x中获取并行数据流。 消费者可以在多个并行实例中运行,每个实例都将从一个或多个Kafka分区中提取数据。既然消费kafka我们最关心的就是每个分区的消费延时,延时多大等。我们可以使用这个指标在监控作业的运行情况。如果出现延时及时作出报警。下面介绍如何实现。
2、Flink原来的metric实现
在flinkkafkaconsumer中提供了了两个重要的指标。入下图:
[if !vml]
[endif]
这两个指标currentOffsets:表示当前消费到的offset,尚未提交到kafka或者zk。committedOffsets:表示已经提交到kafka或者zk的offset。我们其实主要关心这个这个指标。这个指标是在检查点完成的时候设置的。在FlinkKafkaConsumerBase的方法notifyCheckpointComplete中调用了AbstractFetcher的方法commitInternalOffsetsToKafka。
2.1、Kafka08Fetcher设置offset的实现
AbstractFetcher的实现类Kafka08Fetcher关于这个方法commitInternalOffsetsToKafka具体实现代码如下:
[if !vml]
[endif]
2.2、自带的metrics的注册实现
那么这个metrics是在哪里注册的呢?其实在AbstractFetcher的构造方法里面注册了这个metric。代码如下:[if !vml]
[endif]
[if !vml]
[endif]
其中注意这个类OffsetGauge,它实现了flink的metric体系中的接口:Gauge,通过实现这个接口我们就可以自定义metrics。[if !vml]
[endif]
说到这里只是说了我们主要关系的这个metric的来源。其实这个metric只能反应当前作业的消费情况,说白也就是只能反应作业消费到哪里了,无法反应是否消费延时,延时了多少。
3、加入消费延时metric的实现
我们在使用storm-kafka的时候,她有一个指标spoutlag,使用的当前分区的最新offset减去当前消费的offset。这个指标就能反应消费的延时情况,只要lag变大,肯定是延时了。那么我们时候也可以在flink-kafka中实现这个metric呢?当然是可以的。下面我们看看怎么加入这个指标.
首先要加入这个指标,必须获取当前分区的最新offset,也即是OffsetRequest.LatestTime()。当前消费到的offset 我们已经有了committedOffsets。那么怎么获取最新的offset呢?获取的时机在哪里比较好?既然committedOffsets是在检查点提交的时候设置的,那么我们也可以在这个方法理做文章。
要获取分区的offset就需要知道当前分区的leader是哪个。幸运的是在源码里面AbstractPartitionDiscoverer(分区发现者)的实现类Kafka08PartitionDiscoverer提供了一个获取每个分区对应的leader的方法:
[if !vml]
[endif]
在FlinkKafkaConsumerBase的open方法里面一起初始化好了partitionDiscoverer。我们可以直接使用。
[if !vml]
[endif]
AbstractPartitionDiscoverer这个类的作用就是用于发现新的分区,并给新的分区设置相关的消费者。那么她是怎么发现新的分区的呢?在FlinkKafkaConsumerBase的ru n方法里其实是使用了一个线程定期扫描的方式。那么我们可以把发现分区leader的逻辑也可以加在这段代码里面:
[if !vml]
[endif]
这样在动态分区的发现过程中还能定期的获取分区对应的leader。再把这个结果赋值给kafkaFetcher。这样kafkaFetcher就可以在其方法里面commitInternalOffsetsToKafka根据拿到的leader 进行通讯,获取最新的offset了。但这样会有一个问题。难道每次都需要构建一次通讯,能复用现成的SimpleConsumer?为了实现这个,进行了如下的改造:
[if !vml]
[endif]
将kafkaFetcher的方法里面的brokerToThread有局部变量提到方法外部,这个属性保存的就是kafka分区的leader对应的消费者线程SimpleConsumerThread。这个SimpleConsumerThread对象里面持有一个leader对应的SimpleConsumer。我们看下它的实现:
[if !vml]
[endif]
[if !vml]
[endif]
原来的代码里面的SimpleConsumer对象是没有get方法的,我们给她增加一个get方法使外部可以访问到这个consumer。这样的话我们在通过分区的leader获取其最新的offset的时候就不用重新new一个SimpleConsumer了。
[if !vml]
[endif]
这样的话我们需要的东西都准备好了,而且都是复用flink已经提供好的对象。在代码的侵入性和性能上都做了权衡。下面我们来看看最后的Kafka08Fetcher的方法doCommitInternalOffsetsToKafka是怎么实现的吧。如图:
[if !vml]
[endif]
[if !vml]
[endif]
首先获取分区对应的leader,然后根据leader做key从属性brokerToThread里找其对应的SimpleConsumerThread,如果SimpleConsumerThread不为null,就获取其持有的属性SimpleConsumer。获取到SimpleConsumer就很容易获取分区最新的offset了。使用kafk提供的相关的api就好。相关lag的计算方式如上。这里需要隆重介绍以下这个对象
[if !vml]
[endif]
这个list保存了kafka所有的分区信息。在作业启动后如果能从state恢复,就使用state里面的数据填充,如果不能就从kafka获取相关的meta信息。这个list在flink-kafka很多类里面传递。本质上都是在FlinkKafkaConsumerBase初始化的还是构建的。而且这个对象在传递的过程中可以被修改。她里面保存的对象是这个KafkaTopicPartitionState对象。这个对象是封装了kafka的分区信息、topic信息、消费的offset信息等等。我在这里面增加了一个属性delayLag。
[if !vml]
[endif]
并提供了get、set方法方便外部访问。
对KafkaConsumerMetricConstants的改造,增加静态常量:
[if !vml]
[endif]
我们前面讲过注册metric是在AbstractFetcher的方法addOffsetStateGauge方法了。我们在这个方法里面增加我们新的metric。
[if !vml]
[endif]
同时改造OffsetGauge这个类:在其方法getValue中该表如下:
[if !vml]
[endif]
至此我们所有的改造基本完成。但是肯定还会有瑕疵,会在后面的测试后进行修补。