Kafka(4)消费者客户端
2020-08-22 本文已影响0人
正义的杰克船长
一、前言
Kafka不仅提供了生产者客户端,同时也提供了消费者客户端(Cosumer API)。应用程序通过消费者客户端来订阅主题,然后向broker发送拉取请求,获取想要消费的主题分区消息进行消费。本文接下来将对Kafka Java版消费者客户端相关知识进行讲解。
二、重要概念
2.1 消费模式
消费模式- 消费模式分为两种:推送(push)模式和拉取(pull)模式。
- 推送模式是由broker将数据推送到消费者,实时性好,可以让消费者能够以可能的最大速率消费;不过,由于broker控制着数据传输速率,不同的消费者消费速率可能相差很大,当消费速率低于生产速率时,消费者系统消费不及时,会出现服务拒绝, 所以推送模式系统很难处理不同消费速率的消费者。一般采用推送模式的系统,需要设置一定的流控规则来避免大流量压垮消费者。
- 拉取模式是消费者主动从broker那里拉取数据,消费速率由消费者控制,可以避免出现推送模式那种超载压垮消费者的情况。不足的地方是,如果broker中没有数据,消费者可能会在一个紧密的循环中结束轮询,然后处于busy-waiting状态,直到数据到来。
- Kafka采用pull模式。消费者主动从broker那里拉取数据进行消费。为了避免busy-waiting,Kafka在pull请求中加入参数,使得消费者在一个“long pull”中阻塞等待,直到数据到来。
- Kafka 消费者可以通过pull模式在需要的时候通过回退位置再次消费对应的数据。
2.2 消费组
消费组与消费者、分区关系区- Kafka每个消费者都有一个对应的消费组。消费组是一个逻辑概念,用来对消费者进行归类。
- 如果多个消费者属于同一个消费组,那么每条消息只会被其中一个消费者处理,相当于点对点模式应用。
- 如果多个消费者不属于同一个消费组,那么每条消息会被其中每个消费者都处理,相当于订阅/发布模式应用。
- 同一个消费组内的消费者与分区是对应的,每一个分区只能被一个消费组中的一个消费者所消费。如果同一个消费组的消费者个数大于主题分区数,则会存在有消费者不能拉取处理这个主题消息的情况,如图消费者C4无法消费主题Topic1的任何消息。
- 举个例子,假如主题Topic1有消息P0M1、P1M1、P2M2,消费组A的消费者C0会处理P0M1,消费者C1会处理P1M1,消费者C2会处理P2M2,消费组A其他消费者不能处理这几条消息,但消费组B内的消费者可以消费处理这三条消息。
- 消费组消费者模型可以让整体的消费能力具有横向伸缩性,大大提高了系统消费的灵活性。
2.3 分区自动再均衡
- 在多个消费者的情况下可以根据分区分配策略来自动分配各个消费者与分区的关系。当消费组内的消费者增加或减少时,分区分配关系会自动调整,以实现消费负载均衡及故障自动转移。
三、消费消息过程分析
3.1 核心类KafkaConsumer
- KafkaConsumer类是整个消费者API的核心类。应用程序消费消息的主要操作都需要通过它来完成。它是一个非线程安全的类。
3.2 消费消息主要步骤
消费者消费消息主要步骤- 消费者消费消息主要步骤如上图所示,包括设置消费端配置参数、创建KafkaConsumer实例、订阅主题、拉取主题分区消息数据、提交位移给broker、关闭KafkaConsumer资源等。代码逻辑演示如下:
public static void main(String[] args) {
//1 设置配置参数
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "messageGroup");
props.put("enable.auto.commit", "false");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//2 创建KafkaConsumer实例
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//3 订阅主题
consumer.subscribe(Arrays.asList("topic_test_1"));
try {
while(true) {
//4 拉取主题分区消息数据
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
// 业务处理。。。
System.out.println(record.offset() + ": " + record.value());
}
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
//5 提交消费位移(最后一个offset+1)
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}
}
} finally {
//6 关闭KafkaConsumer实例
consumer.close();
}
}
3.2 必要的配置参数
- bootstrap.servers:与生产者客户端此参数作用类似。用于建立初始连接到Kafka集群(主机/端口对)的配置列表。这个配置不需要包含整个集群的服务器。不论这个参数配置了哪些服务器来初始化连接,客户端都是会均衡地与集群中的所有服务器建立连接。如果集群变化,元数据会动态更新。为了避免单节点风险,最好配置多台主机。
- group.id:消费组id,用来标识消费者所属的消费者组的唯一字符串。
- enable.auto.commit:是否自动提交位移,默认为true,如果为true,表示消费者的位移将定期在后台提交。可设置为手动提交消费位移。
- key.deserializer:key的反序列化器类,需要与生产端的key序列化器类对应。从broker拉取的消息格式是字节数组类型的,所以需要相应的反序列操作将消息还原成原有的数据类型。
- value.deserializer:value的反序列化器类,需要与生产端的value序列化器类对应。
3.3 订阅主题
- 消费者可以一次订阅若干个主题。既可以集合形式订阅多个主题,也可以以正则表达式形式订阅特定模式的主题,还可以订阅主题的特定分区。KafkaConsumer提供的订阅主题方法如下:
// 集合形式订阅主题
void subscribe(Collection<String> topics);
// 集合形式订阅主题,增加再均衡监听
void subscribe(Collection<String> topics, ConsumerRebalanceListener callback);
// 正则表达式形式订阅主题
void subscribe(Pattern pattern);
// 正则表达式形式订阅主题,增加再均衡监听
void subscribe(Pattern pattern, ConsumerRebalanceListener callback);
// 手工分配特定分区,没有自动再均衡功能
void assign(Collection<TopicPartition> partitions);
- 订阅主题后,也可取消主题订阅。KafkaConsumer提供的方法如下:
void unsubscribe();
3.4 拉取数据
- 订阅阅主题分区后,才可以拉取数据,Kafka采用拉取(pull)模式,主动从broker那里拉取数据进行消费。应用程序需要不断轮询调用poll()方法,在每次轮询中,消费者将尝试使用最后消费位移+1作为开始偏移量,并按顺序获取。最后消费位移可以调用seek()方法手工覆盖。
- poll()方法有个timeout参数,用来表示超时阻塞的最大时间。KafkaConsumer提供的方法如下:
// 从broker拉取数据
ConsumerRecords<K, V> poll(Duration timeout);
// 覆盖消费者将在下一次轮询中使用的最后消费位移
void seek(TopicPartition partition, long offset);
3.5 提交位移
- 拉取数据进行消费处理后,需要将所有主题分区列表的消费位移提交给broker,然后将消费位移存储在Kafka内部的主题__consumer_offsets中。
- 提交的位移应该是应用程序将要使用的下一条消息,即最后消费位移 + 1;
- Kafka提交位移的方式分为自动提交和手工提交。默认为自动提交方式。在默认的方式下,消费者每隔5秒会将拉取到的每个分区中最大的消息位移进行提交。自动提交间隔时间可通过参数(auto.commit.interval.ms)可修改。
- 自动提交位移在一些非正常情况下会出现重复消费和丢失消息的现象;假设在拉取一批消息消费后,还未自动提交位移之前,消费者奔溃了,那么又会从上一次消费位移处拉取数据,这样便出现了重复消费的现象。
虽然不能完全避免重复消费现象,但是我们可以通过缩小位移提交时间间隔来减少重复消费的消息区间。 - 手动提交分为同步提交和异步提交。KafkaConsumer提供的方法如下:
void commitSync();
void commitSync(Duration timeout);
void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets);
void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets, final Duration timeout);
void commitAsync();
void commitAsync(OffsetCommitCallback callback);
void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback);
3.6 优雅结束消费
- 在消费完成后,如何优雅的跳出轮询,结束消费呢,KafkaConsumer提供了wakeup()方法,这个方法是线程安全的。调用wakeup()方法的线程将抛出WakeupException异常,终止长轮询。跳出轮询后,需要显式调用close()方法关闭动作来释放系统占用资源,包括内存资源,socket网络连接资源。
3.7 指定位移消费
- 在消费者遇到关闭、崩溃以及再均衡时,可以让其他接替的消费者通过seek()方法设置指定位移来保证消费的连续。
- 当Kafka中没有初始消费位移或者服务器上的当前消费位移不再存在时(例如数据已经被删除),默认会采用自动重置位移为最后的位移进行消费。可以通过参数auto.offset.reset修改重置方式。
四、重要配置参数
消费者客户端有一些重要的参数,掌握这些参数有助于我们在实际应用中的故障排查和性能调优。
fetch.min.bytes
- 描述:服务器应该为消费者拉取请求返回的最小数据量。如果可用数据不足,请求将等待大量数据累积,然后再响应请求。默认1字节,意味着拉取请求很快会被响应。将这个参数适当调大可以提高服务器的吞吐量,但会导致服务器等待更大数量的数据积累,增加一些响应延迟。
- 类型:int。
- 默认值:1。
fetch.max.bytes
- 描述:服务器应该为消费者拉取请求返回的最大数据量。消息记录是由消费者分批拉取的,如果拉取的第一个非空分区中的第一个消息记录Batch字节数大于此值,那么仍将返回该消息记录Batch,以确保消费者能够继续执行。因此,这不是一个绝对的最大值。
- 类型:int。
- 默认值:52428800。
max.partition.fetch.bytes
- 描述:服务器将返回的每个分区的最大数据量。记录由消费者分批拉取。如果拉取的第一个非空分区中的第一个消息记录Batch字节数大于此值,那么仍将返回该消息记录Batch,以确保消费者能够继续执行。注意,该配置需结合 fetch.max.bytes以及两个主题配置message.max.bytes 和 max.message.bytes使用。
- 类型:int。
- 默认值:1048576。
auto.offset.reset
- 描述:位移重置方式。当Kafka中没有初始消费位移或者服务器上的当前消费位移不再存在时(例如数据已经被删除)场景下使用,有下列几种方式:
(1)earliest: 自动重置位移到最早位移位置
(2)latest: 自动重置位移为最后位移位置
(3)none: 如果没有找到消费者之前位移,那么向消费者直接抛出异常
(4)其他: 向消费者抛出异常。 - 类型:string。
- 默认值:latest。
isolation.level
- 描述:控制如何读取以事务方式编写的消息。如果设置为read_committed, consumer.poll()将只返回已提交的事务性消息。如果设置为read_uncommitted(默认值),consumer.poll()将返回所有消息,甚至是已经中止的事务性消息。如果是非事务性消息,在这两种模式都会无条件返回。
- 类型:string。
- 默认值:read_uncommitted。
session.timeout.ms
- 描述:在使用Kafka的组管理工具时,用于检测用户故障的超时时间。消费者定期向broker发送心跳,以表明其活动状态。如果在此会话超时过期之前broker没有接收到心跳,则broker将从消费组中删除此消费者并启动重新平衡。注意,该值必须在broker配置中按group.min.session.timeout和group.max.session.timeout.ms配置的允许范围内。
- 类型:int。
- 默认值:10000。
heartbeat.interval.ms
- 描述:在使用Kafka的组管理工具时,心跳到消费者协调器之间的间隔时间。心跳用于确保消费者的会话保持活动状态,并在新消费者加入或离开消费组时帮助重新平衡。该值必须设置低于session.timeout.ms,通常设置不超过1/3。它可以调整更低,用来控制正常重新平衡的间隔时间。
- 类型:int。
- 默认值:3000。