Kafka(4)消费者客户端

2020-08-22  本文已影响0人  正义的杰克船长

一、前言

Kafka不仅提供了生产者客户端,同时也提供了消费者客户端(Cosumer API)。应用程序通过消费者客户端来订阅主题,然后向broker发送拉取请求,获取想要消费的主题分区消息进行消费。本文接下来将对Kafka Java版消费者客户端相关知识进行讲解。

二、重要概念

2.1 消费模式

消费模式

2.2 消费组

消费组与消费者、分区关系区

2.3 分区自动再均衡

三、消费消息过程分析

3.1 核心类KafkaConsumer

3.2 消费消息主要步骤

消费者消费消息主要步骤
public static void main(String[] args) {
        //1 设置配置参数
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "messageGroup");
        props.put("enable.auto.commit", "false");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //2 创建KafkaConsumer实例
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        //3 订阅主题
        consumer.subscribe(Arrays.asList("topic_test_1"));
        try {
            while(true) {
                //4 拉取主题分区消息数据
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                for (TopicPartition partition : records.partitions()) {
                    List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                    for (ConsumerRecord<String, String> record : partitionRecords) {
                        // 业务处理。。。
                        System.out.println(record.offset() + ": " + record.value());
                    }
                    long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                    //5 提交消费位移(最后一个offset+1)
                    consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
                }
            }
        } finally {
            //6 关闭KafkaConsumer实例
            consumer.close();
        }
    }

3.2 必要的配置参数

3.3 订阅主题

// 集合形式订阅主题
void subscribe(Collection<String> topics);
// 集合形式订阅主题,增加再均衡监听
void subscribe(Collection<String> topics, ConsumerRebalanceListener callback);
// 正则表达式形式订阅主题
void subscribe(Pattern pattern);
// 正则表达式形式订阅主题,增加再均衡监听
void subscribe(Pattern pattern, ConsumerRebalanceListener callback);
// 手工分配特定分区,没有自动再均衡功能
void assign(Collection<TopicPartition> partitions);
void unsubscribe();

3.4 拉取数据

// 从broker拉取数据
ConsumerRecords<K, V> poll(Duration timeout);
// 覆盖消费者将在下一次轮询中使用的最后消费位移
void seek(TopicPartition partition, long offset);

3.5 提交位移

void commitSync();
void commitSync(Duration timeout);
void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets);
void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets, final Duration timeout);
void commitAsync();
void commitAsync(OffsetCommitCallback callback);
void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback);

3.6 优雅结束消费

3.7 指定位移消费

四、重要配置参数

消费者客户端有一些重要的参数,掌握这些参数有助于我们在实际应用中的故障排查和性能调优。

fetch.min.bytes
fetch.max.bytes
max.partition.fetch.bytes
auto.offset.reset
isolation.level
session.timeout.ms
heartbeat.interval.ms
上一篇下一篇

猜你喜欢

热点阅读