Apache Kafka@IT·大数据在天数的开发生活

Fetch Offset Range in Kafka with

2017-12-26  本文已影响48人  即墨灯火

有的时候需要检出Kafka中某个topic的所有partition的offset range. 比如Spark Streaming在指定fromOffset时,如果不校验边界,可能会出错。Kafka提供了命令来check。这里提供一个基于Java API的方式

代码如下

KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(config);
consumer.subscribe(topics);
ConsumerRecords<String, byte[]> records = consumer.poll(1000);

return records.partitions().parallelStream().map(topicPartition -> {
        consumer.seekToBeginning(Collections.singletonList(topicPartition));
        long offset = consumer.position(topicPartition);
        return new TopicPartitionInfo(topicPartition.topic(), topicPartition.partition(), offset);
}).collect(Collectors.toList());

完整代码:See Here

依赖

<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>1.0.0</version>
</dependency>

2018.01.09更新

上述代码中,如果poll(1000)获得的records并没有包含所有的partition的record,records.partitions()所获取的并非为全部的该topic的partition。

records.partitions()只会返回这段records中所含有的partition。

因此,你可能需要

Map<TopicPartition, Long> fromOffsets = new HashMap<>();
//do fill your fromOffsets with your own local offset-store here
KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(kafkaParams);
consumer.subscribe(topics);
consumer.poll(100);

for (TopicPartition topicPartition : fromOffsets.keySet()) {
    consumer.seekToBeginning(Collections.singletonList(topicPartition));
    long offset = consumer.position(topicPartition);
    long consumedOffset = fromOffsets.getOrDefault(topicPartition, 0L);
    if (offset > consumedOffset) {
        log.warn("At partition {}, our system has consumed to {} but we can start only from {} because of retention expiration.", topicPartition.partition(), consumedOffset, offset);
        log.warn("At partition {}, start offset has been adjusted to {}", topicPartition.partition(), offset);
        fromOffsets.put(topicPartition, offset);
    }
}
consumer.unsubscribe();
上一篇 下一篇

猜你喜欢

热点阅读