Kafka-消费者组
一、消费者组原理
Consumer Group(CG):消费者组,由多个consumer组成。形成一个消费者组的条件,是所有消费者的groupid相同。
- 消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费。
- 消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
消费者组原理.png每个消费者都存在一个消费者组中,命令行执行时底层自动生成了groupId。
- 如果向消费组中添加更多的消费者,超过主题分区数量,则有一部分消费者就会闲置,不会接收任何消息。
- 消费者组之间互不影响。 所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
二、消费者组初始化流程
1.coordinator
辅助实现消费者组的初始化和分区的分配。
2.coordinator的选举
groupid的hashcode值 % 50
- 50: _consumer_offsets主题的分区数量
- groupid:用户自己设置的组id
例:groupid的hashcode值 = 1,1% 50 = 1,那么_consumer_offsets 主题的1号分区,在哪个broker上,就选择这个节点的coordinator 作为这个消费者组的leader。消费者组下的所有的消费者提交offset的时候就往这个分区去提交offset。
3.消费者组初始化流程
消费者组初始化流程.png
- 1.每个consumer发送joinGroup请求
- 2.coordinator选出consumer leader
- 3.coordinator把消费topic情况发送给leader消费者
- 4.消费者leader制定消费方案(分区分配策略)
- 5.消费者leader把消费方案发送给coordinator
- 6.coordinator把消费方案下发给各个consumer
- 7.消费者会和coordinator保持心跳(默认每3s发送一次心跳),一旦超时(session.timeout.ms=45s),该消费者会被移除,触发再平衡;消费处理消息的时间过长(max.poll.interval.ms=5min),触发再平衡;
三、消费者组消费流程
消费者组消费流程.png1.创建消费者网络连接客户端(ConsumerNetworkClient),用来和kafka集群进行通信
2.消费者发送消费请求sendFetches到ConsumerNetworkClient
3.Fetch.min.bytes
:每批次最小拉取大小(默认一字节);ConsumerNetworkClient默认从broker拉取一个字节数据。
4.Fetch.max.wait.ms
:一批数据最小值未达到的超时时间(默认500ms);如果步骤3配置的每批次最小拉取2字节,生产者只发送了1字节,当达到超时时间后,也会拉取数据。
5.Fetch.max.bytes
:每批次最大拉取数据大小;
6.ConsumerNetworkClient调用send发送拉取数据请求。
7.通过回调方法onSuccess将结果拉取回来completedFetches(queue)。
8.消费者从completedFetches队列中拉取数据,一次默认拉取500条
Max.poll.records
:一次拉取数据返回的最大消息条数(默认500条)
9.将数据反序列化
10.经过拦截器
11.处理数据
四、代码
1.单个消费者-订阅主题
package kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;
/**
* @Title: MyConsumer.java
* @Package kafka.consumer
* @Description: 消费者
* @Author: hongcaixia
* @Date: 2023/1/30 20:18
* @Version V1.0
*/
public class MyConsumer {
public static void main(String[] args) {
// 创建消费者的配置对象
Properties properties = new Properties();
// 给消费者配置对象添加参数
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.10:9092");
// 反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 配置消费者组(组名任意起名)必须
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "myGroup");
// 创建消费者对象
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
// 订阅消费主题(可以消费多个主题)
ArrayList<String> topics = new ArrayList<>();
topics.add("topic1");
kafkaConsumer.subscribe(topics);
// 拉取数据打印
while (true) {
/**
* Duration timeout:批次拉取的间隔时间
* 设置 1s 消费一批数据
*/
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
// 打印消费到的数据
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord);
}
}
}
}
注意:在消费者API代码中必须配置消费者组id。命令行启动消费者不填写消费者组id会被自动填写随机的消费者组id。
2.单个消费者-订阅分区
package kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;
/**
* @Title: MyConsumer.java
* @Package kafka.consumer
* @Description: 消费者
* @Author: hongcaixia
* @Date: 2023/1/30 20:18
* @Version V1.0
*/
public class MyConsumerPartition {
public static void main(String[] args) {
// 创建消费者的配置对象
Properties properties = new Properties();
// 给消费者配置对象添加参数
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.10:9092");
// 反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 配置消费者组(组名任意起名)必须
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "myGroup");
// 创建消费者对象
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
// 消费某个主题的某个分区数据
ArrayList<TopicPartition> topicPartitions = new ArrayList<>();
topicPartitions.add(new TopicPartition("myTopic", 0));
kafkaConsumer.assign(topicPartitions);
//消费数据
while (true) {
/**
* Duration timeout:批次拉取的间隔时间
* 设置 1s 消费一批数据
*/
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
// 打印消费到的数据
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord);
}
}
}
}
3.消费者组
package kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;
/**
* @Title: MyConsumer.java
* @Package kafka.consumer
* @Description: 消费者
* @Author: hongcaixia
* @Date: 2023/1/30 20:18
* @Version V1.0
*/
public class MyConsumer {
public static void main(String[] args) {
// 创建消费者的配置对象
Properties properties = new Properties();
// 给消费者配置对象添加参数
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.10:9092");
// 反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 配置消费者组(组名任意起名)必须
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "myGroup");
// 创建消费者对象
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
// 订阅消费主题(可以消费多个主题)
ArrayList<String> topics = new ArrayList<>();
topics.add("topic1");
kafkaConsumer.subscribe(topics);
// 拉取数据打印
while (true) {
/**
* Duration timeout:批次拉取的间隔时间
* 设置 1s 消费一批数据
*/
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
// 打印消费到的数据
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord);
}
}
}
}
创建2个消费者,都指定同一个组:myGroup;
- 两个消费者在消费不同分区的数据(如果只发生到一个分区,可以在发送时增加延迟代码 Thread.sleep(2);)。
- 重新发送消息到一个全新的主题中,由于默认创建的主题分区数为 1,可以看到只能有一个消费者消费到数据。
极客时间《Kafka 核心技术与实战》学习笔记Day15 - http://gk.link/a/11UOV