实时数据相关

获取kafka consumer offset

2019-01-09  本文已影响0人  岳过山丘

1.功能

读取topic为 __consumer_offsets 里的数据,解析。

2. 代码

 
public static void main(String[] args) throws Exception {
  Consumer<byte[], byte[]> consumer = createKafkaConsumer();
        consumer.subscribe(Lists.newArrayList("__consumer_offsets"));
        while (true) {
            ConsumerRecords<byte[], byte[]> records = consumer.poll(100);
            Iterator<ConsumerRecord<byte[], byte[]>> iterator = records.iterator();
            Map<String, Integer> map = Maps.newHashMap();
            while (iterator.hasNext()) {
                ConsumerRecord<byte[], byte[]> record = iterator.next();
                if (record.key() == null) {
                    continue;
                }
                BaseKey baseKey = GroupMetadataManager.readMessageKey(ByteBuffer.wrap(record.key()));
                byte[] value = record.value();
                if (value == null) {
                    continue;
                }
                OffsetAndMetadata offset = GroupMetadataManager.readOffsetMessageValue(ByteBuffer.wrap(value));
                if (baseKey instanceof OffsetKey) {
                    OffsetKey newKey = (OffsetKey) baseKey;
                    String group = newKey.key().group();
                    TopicPartition tp = newKey.key().topicPartition();
                    System.out.println(group + "," + tp.topic() + "," + tp.partition() + "," + offset.offsetMetadata().offset()));
                }
            }
        }
}
 static Consumer<byte[], byte[]> createKafkaConsumer() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test2");
        props.put("enable.auto.commit", "false");
        props.put("auto.offset.reset", "latest");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        return new KafkaConsumer<byte[], byte[]>(props);
    }

3.pom

 <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>2.0.1</version>
            <exclusions>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
上一篇 下一篇

猜你喜欢

热点阅读