kafka

Kafka-消费者组

2023-01-29  本文已影响0人  我可能是个假开发

一、消费者组原理

Consumer Group(CG):消费者组,由多个consumer组成。形成一个消费者组的条件,是所有消费者的groupid相同。

每个消费者都存在一个消费者组中,命令行执行时底层自动生成了groupId。

消费者组原理.png

二、消费者组初始化流程

1.coordinator

辅助实现消费者组的初始化和分区的分配。

2.coordinator的选举

groupid的hashcode值 % 50

例:groupid的hashcode值 = 1,1% 50 = 1,那么_consumer_offsets 主题的1号分区,在哪个broker上,就选择这个节点的coordinator 作为这个消费者组的leader。消费者组下的所有的消费者提交offset的时候就往这个分区去提交offset。

3.消费者组初始化流程


消费者组初始化流程.png

三、消费者组消费流程

消费者组消费流程.png

1.创建消费者网络连接客户端(ConsumerNetworkClient),用来和kafka集群进行通信
2.消费者发送消费请求sendFetches到ConsumerNetworkClient
3.Fetch.min.bytes:每批次最小拉取大小(默认一字节);ConsumerNetworkClient默认从broker拉取一个字节数据。
4.Fetch.max.wait.ms:一批数据最小值未达到的超时时间(默认500ms);如果步骤3配置的每批次最小拉取2字节,生产者只发送了1字节,当达到超时时间后,也会拉取数据。
5.Fetch.max.bytes:每批次最大拉取数据大小;
6.ConsumerNetworkClient调用send发送拉取数据请求。
7.通过回调方法onSuccess将结果拉取回来completedFetches(queue)。
8.消费者从completedFetches队列中拉取数据,一次默认拉取500条
Max.poll.records:一次拉取数据返回的最大消息条数(默认500条)
9.将数据反序列化
10.经过拦截器
11.处理数据

四、代码

1.单个消费者-订阅主题

package kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;

/**
 * @Title: MyConsumer.java
 * @Package kafka.consumer
 * @Description: 消费者
 * @Author: hongcaixia
 * @Date: 2023/1/30 20:18
 * @Version V1.0
 */
public class MyConsumer {

    public static void main(String[] args) {
        // 创建消费者的配置对象
        Properties properties = new Properties();
        // 给消费者配置对象添加参数
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.10:9092");
        // 反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // 配置消费者组(组名任意起名)必须
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "myGroup");

        // 创建消费者对象
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
        // 订阅消费主题(可以消费多个主题)
        ArrayList<String> topics = new ArrayList<>();
        topics.add("topic1");
        kafkaConsumer.subscribe(topics);
        // 拉取数据打印
        while (true) {
            /**
             * Duration timeout:批次拉取的间隔时间
             * 设置 1s 消费一批数据
             */
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
            // 打印消费到的数据
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord);
            }
        }
    }
}

注意:在消费者API代码中必须配置消费者组id。命令行启动消费者不填写消费者组id会被自动填写随机的消费者组id。

2.单个消费者-订阅分区

package kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;

/**
 * @Title: MyConsumer.java
 * @Package kafka.consumer
 * @Description: 消费者
 * @Author: hongcaixia
 * @Date: 2023/1/30 20:18
 * @Version V1.0
 */
public class MyConsumerPartition {

    public static void main(String[] args) {
        // 创建消费者的配置对象
        Properties properties = new Properties();
        // 给消费者配置对象添加参数
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.10:9092");
        // 反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // 配置消费者组(组名任意起名)必须
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "myGroup");

        // 创建消费者对象
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);

        // 消费某个主题的某个分区数据
        ArrayList<TopicPartition> topicPartitions = new ArrayList<>();
        topicPartitions.add(new TopicPartition("myTopic", 0));
        kafkaConsumer.assign(topicPartitions);

        //消费数据
        while (true) {
            /**
             * Duration timeout:批次拉取的间隔时间
             * 设置 1s 消费一批数据
             */
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
            // 打印消费到的数据
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord);
            }
        }
    }
}

3.消费者组

package kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;

/**
 * @Title: MyConsumer.java
 * @Package kafka.consumer
 * @Description: 消费者
 * @Author: hongcaixia
 * @Date: 2023/1/30 20:18
 * @Version V1.0
 */
public class MyConsumer {

    public static void main(String[] args) {
        // 创建消费者的配置对象
        Properties properties = new Properties();
        // 给消费者配置对象添加参数
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.10:9092");
        // 反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // 配置消费者组(组名任意起名)必须
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "myGroup");

        // 创建消费者对象
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
        // 订阅消费主题(可以消费多个主题)
        ArrayList<String> topics = new ArrayList<>();
        topics.add("topic1");
        kafkaConsumer.subscribe(topics);
        // 拉取数据打印
        while (true) {
            /**
             * Duration timeout:批次拉取的间隔时间
             * 设置 1s 消费一批数据
             */
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
            // 打印消费到的数据
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord);
            }
        }
    }
}

创建2个消费者,都指定同一个组:myGroup;

极客时间《Kafka 核心技术与实战》学习笔记Day15 - http://gk.link/a/11UOV

上一篇下一篇

猜你喜欢

热点阅读