kakfa如何查询指定消费组lag
2019-01-20 本文已影响0人
不1见2不3散4
问题来源
实际项目中,我们会遇到kafka消费不及时,系统发现最新的数据一致无法出现,这时候通过其他kafka工具发现原来指定的消费组lag太大,也就是我们系统要么出问题,要么需要启动更多的实例加快消费消息。每次通过kafka工具去查询lag基本都是手工而且耗时且慢,能否在自己的系统中集成查询指定消费组lag的功能,然后出现问题是可以管理界面中迅速查看lag呢?答案是当然可以。 kafka提供了这些API。
备注:这里的kafka工具是指kafka自带的命令行工具,或者其他第三方提供的kafka工具。
实现思路
首先我们需要引入的依赖如下。 其中kafka-clients是作为kafka客户端访问kakfa需要的依赖包,kafka_2.12是管理端需要的依赖包。
备注:我的kafka版本是kafka_2.12-1.0.0。
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.0.0</version>
</dependency>
kafka管理端的AdminClient可以查询消费组的详情。
AdminClient.ConsumerGroupSummary consumerGroupSummary = adminClient.describeConsumerGroup(GROUP_ID, 5000);
scala.collection.immutable.Map<TopicPartition, Object> maps = adminClient.listGroupOffsets(GROUP_ID);
listGroupOffsets可以查询消费组的在指定topic的指定分区中的offset. 然后我们可以通过查询该TopicPartition最新的postion得到endOffset。
两者之差就是指定消费组在指定topic的某个分区上的lag信息。
KafkaConsumer<String, String> consumer = getNewConsumer();
consumer.assign(Arrays.asList(topicPartition));
consumer.seekToEnd(Arrays.asList(topicPartition));
long endOffset = consumer.position(topicPartition);
具体代码
完整的代码在这里,欢迎加星和fork。 谢谢!
package com.yq;
import kafka.admin.AdminClient;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import scala.Option;
import scala.collection.immutable.List;
import java.util.Arrays;
import java.util.Properties;
/**
* Simple to Introduction
* className: SendMessageMain
*
* @author EricYang
* @version 2019/01/10 11:30
*/
@Slf4j
public class AdminMain {
private static final String SERVERS = "ubuntu:9092";
private static final String GROUP_ID = "yq-consumer09";
public static long getLogEndOffset(TopicPartition topicPartition){
KafkaConsumer<String, String> consumer = getNewConsumer();
consumer.assign(Arrays.asList(topicPartition));
consumer.seekToEnd(Arrays.asList(topicPartition));
long endOffset = consumer.position(topicPartition);
consumer.close();
return endOffset;
}
public static KafkaConsumer getNewConsumer(){
Properties props = new Properties();
props.put("bootstrap.servers", SERVERS);
props.put("group.id", "test001");
props.put("enable.auto.commit", "true");
props.put("auto.offset.reset", "earliest");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
return consumer;
}
public static void main(String[] args) throws InstantiationException, IllegalAccessException {
Properties props = new Properties();
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, SERVERS);
AdminClient adminClient = AdminClient.create(props);
AdminClient.ConsumerGroupSummary consumerGroupSummary = adminClient.describeConsumerGroup(GROUP_ID, 5000);
if(consumerGroupSummary.state().equals("Empty")){
System.out.println("No grp summary");
}
System.out.println("consumerGrpSummary State " + consumerGroupSummary.state());
Option<List<AdminClient.ConsumerSummary>> consumerSummaryOption = consumerGroupSummary.consumers();
scala.collection.immutable.Map<TopicPartition, Object> maps = adminClient.listGroupOffsets(GROUP_ID);
scala.collection.Set<TopicPartition> topicPartitions = maps.keySet();
scala.collection.immutable.List<TopicPartition> topicPartitionList = topicPartitions.reversed();
for(int j =0; j< topicPartitionList.size(); j++){
TopicPartition topicPartition = topicPartitionList.apply(j);
String currentOffset = maps.get(topicPartition).get().toString();
long groupLastEndOffset = getLogEndOffset(topicPartition);
long lag = groupLastEndOffset -Long.valueOf(currentOffset);
System.out.println("topic:"+topicPartition.topic()+", partition:" + topicPartition.partition() + ", offset:"
+ currentOffset + ", groupLastEndOffset:"+ groupLastEndOffset + ", lag:"+ lag);
}
adminClient.close();
}
}