kafka 消费者详解
前言
读完本文,你将了解到如下知识点:
- kafka 的消费者 和 消费者组
- 如何正确使用kafka consumer
- 常用的 kafka consumer 配置
消费者 和 消费者组
-
什么是消费者?
顾名思义,消费者就是从kafka集群消费数据的客户端,如下图,展示了一个消费者从一个topic中消费数据的模型
图1
-
单个消费者模型存在的问题?
如果这个时候 kafka 上游生产的数据很快,超过了这个消费者1
的消费速度,那么就会导致数据堆积,产生一些大家都知道的蛋疼事情了,那么我们只能加强消费者
的消费能力,所以也就有了我们下面来说的消费者组
-
什么是消费者组?
所谓消费者组
,其实就是一组消费者
的集合,当我们看到下面这张图是不是就特别舒服了,我们采用了一个消费组
来消费这个topic
,众人拾柴火焰高,其消费能力那是按倍数递增的,所以这里我们一般来说都是采用消费者组
来消费数据,而不会是单消费者
来消费数据的。这里值得我们注意的是:- 一个
topic
可以被 多个消费者组
消费,但是每个消费者组
消费的数据是 互不干扰 的,也就是说,每个消费组
消费的都是 完整的数据 。 - 一个分区只能被 同一个消费组内 的一个
消费者
消费,而 不能拆给多个消费者 消费
- 一个
-
是不是一个 消费组 的 消费者 越多其消费能力就越强呢?
从图3
我们就很好的可以回答这个问题了,我们可以看到消费者4
是完全没有消费任何的数据的,所以如果你想要加强消费者组
的能力,除了添加消费者,分区的数量也是需要跟着增加的,只有这样他们的并行度才能上的去,消费能力才会强。
-
为了提高 消费组 的 消费能力,我是不是可以随便添加 分区 和 消费者 呢?
答案当然是否定的啦。。。嘿嘿
我们看到图2
,一般来说我们建议消费者
数量 和分区
数量是一致的,当我们的消费能力不够时,就必须通过调整分区的数量来提高并行度,但是,我们应该尽量来避免这种情况发生,比如:
现在我们需要在图2
的基础上增加一个分区4
,那么这个分区4
该由谁来消费呢?这个时候kafka会进行分区再均衡
,来为这个分区分配消费者,分区再均衡
期间该消费组
是不可用的,并且作为一个被消费者
,分区数的改动将影响到每一个消费者组
,所以在创建topic
的时候,我们就应该考虑好分区数,来尽量避免这种情况发生 -
分区分配过程
上面我们提到了为 分区分配消费者,那么我们现在就来看看分配过程是怎么样的。- 确定 群组协调器
每当我们创建一个消费组,kafka 会为我们分配一个 broker 作为该消费组的 coordinator(协调器) - 注册消费者 并选出 leader consumer
当我们的有了 coordinator 之后,消费者将会开始往该 coordinator上进行注册,第一个注册的 消费者将成为该消费组的 leader,后续的 作为 follower, - 当 leader 选出来后,他会从coordinator那里实时获取分区 和 consumer 信息,并根据分区策略给每个consumer 分配 分区,并将分配结果告诉 coordinator。
- follower 消费者将从 coordinator 那里获取到自己相关的分区信息进行消费,对于所有的 follower 消费者而言,他们只知道自己消费的分区,并不知道其他消费者的存在。
- 至此,消费者都知道自己的消费的分区,分区过程结束,当发送分区再均衡的时候,leader 将会重复分配过程
- 确定 群组协调器
实践——kafka 消费者的使用
咱们以 java api
为例,下面是一个简单的 kafka consumer
public static void main(String[] args) {
//consumer 的配置属性
Properties props = new Properties();
///brokers 地址
props.put("bootstrap.servers", "localhost:9092");
//指定该 consumer 将加入的消费组
props.put("group.id", "test");
// 开启自动提交 offset,关于offset提交,我们后续再来详细说说
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
//指定序列化类
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//创建 consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//订阅消费主题,这里一个消费者可以同时消费 foo 和 bar 两个主题的数据
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
使用起来还是很简单的,不过如果想要用好 consumer,可能你还需要了解以下这些东西:
- 分区控制策略
- consumer 的一些常用配置
- offset 的控制
ok,那么我们接下来一个个来看吧。。。
分区控制策略
- 手动控制分区
咱们先来说下最简单的手动分区控制,代码如下:
String topic = "foo";
TopicPartition partition0 = new TopicPartition(topic, 0);
TopicPartition partition1 = new TopicPartition(topic, 1);
consumer.assign(Arrays.asList(partition0, partition1));
看起来只是把普通的订阅方式修改成了订阅知道 topic
的分区,其余的还是照常使用,不过这里也需要注意一下的是:
- 一般只作为独立消费者,也就是不能加入消费组,或者说他本身就是作为一个消费组存在,要保证这一点,我们只需要保证其
group id
是唯一的就可以了。 - 对于
topic
的分区变动不敏感,也就是说当topic
新增了分区,分区的数据将会发生改变,但该消费组对此确是不感知的,依然照常运行,所以很多时候需要你手动consumer.partitionsFor()
去查看topic
的分区情况 - 不要和
subscription
混合使用
- 使用
partition.assignment.strategy
进行分区策略配置
这里的话 kafka 是自带两种分区策略的,为了方便理解,我们以如下场景为例来进行解释:
已知:
TopicA 有 3 个 partition(分区):A-1,A-2,A-3;
TopicB 有 3 个 partition(分区):B-1,B-2,B-3;
ConsumerA 和 ConsumerB 作为一个消费组 ConsumerGroup 同时消费 TopicA 和 TopicB
-
Range
该方式最大的特点就是会将连续的分区分配给一个消费者,根据示例,我们可以得出如下结论:ConsumerGroup 消费 TopicA 的时候:
ConsumerA 会分配到 A-1,A-2
ConsumerB 会分配到 A-3ConsumerGroup 消费 TopicB 的时候:
ConsumerA 会分配到 B-1,B-2
ConsumerB 会分配到 B-3所以:
ConsumerA 分配到了4个分区: A-1,A-2,B-1,B-2
ConsumerB 分配到了2个分区:A-3,B-3 -
RoundRobin
该方式最大的特点就是会以轮询的方式将分区分配给一个个消费者,根据示例,我们可以得出如下结论:ConsumerGroup 消费 TopicA 的时候:
ConsumerA 分配到 A-1
ConsumerB 分配到 A-2
ConsumerA 分配到 A-3ConsumerGroup 消费 TopicB 的时候,因为上次分配到了 ConsumerA,那么这次轮到 ConsumerB了 所以:
ConsumerB 分配到 B-1
ConsumerA 分配到 B-2
ConsumerB 分配到 B-3所以:
ConsumerA 分配到了4个分区: A-1,A-3,B-2
ConsumerB 分配到了2个分区:A-2,B-1,B-3
从上面我们也是可以看出这两种策略的异同,RoundRobin 相比较 Range 会使得分区分配的更加的均衡,但如果是消费单个 topic ,那么其均衡是差不多的,而 Range 会比 RoundRobin 更具优势一点,至于这个优势,还得看你的具体业务了。
- 自定义的分区策略
上面两种分区策略是 kafka 默认自带的策略,虽然大多数情况下够用了,但是可能针对一些特殊需求,我们也可以定义自己的分区策略- Range分区策略源码
如何自定义呢?最好的方式莫过于看源码是怎么实现的,然后自己依葫芦画瓢来一个,所以我们先来看看 Range分区策略源码,如下,我只做了简单的注释,因为它本身也很简单,重点看下assign
的参数以及返回注释就 ok了
public class RangeAssignor extends AbstractPartitionAssignor{ //省略部分代码。。。。 /** * 根据订阅者 和 分区数量来进行分区 * @param partitionsPerTopic: topic->分区数量 * @param subscriptions: memberId 消费者id -> subscription 消费者信息 * @return: memberId ->list<topic名称 和 分区序号(id)> */ @Override public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic, Map<String, Subscription> subscriptions) { //topic -> list<消费者> Map<String, List<String>> consumersPerTopic = consumersPerTopic(subscriptions); //初始化 返回结果 Map<String, List<TopicPartition>> assignment = new HashMap<>(); for (String memberId : subscriptions.keySet()) assignment.put(memberId, new ArrayList<TopicPartition>()); for (Map.Entry<String, List<String>> topicEntry : consumersPerTopic.entrySet()) { //topic String topic = topicEntry.getKey(); // 消费该topic的 consumer-id List<String> consumersForTopic = topicEntry.getValue(); //topic 的分区数量 Integer numPartitionsForTopic = partitionsPerTopic.get(topic); if (numPartitionsForTopic == null) continue; Collections.sort(consumersForTopic); //平均每个消费者分配的 分区数量 int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size(); //平均之后剩下的 分区数 int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size(); //这里就是将连续分区切开然后分配给每个消费者 List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic); for (int i = 0, n = consumersForTopic.size(); i < n; i++) { int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition); int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1); assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length)); } } return assignment; } }
- 自定义一个 分区策略
这里先缓缓把,太简单把,没什么用,太复杂把,一时也想不出好的场景,如果你有需求,欢迎留言,我们一起来实现
- Range分区策略源码
Consumer 常用配置
首先,我们都应该知道,最全最全的文档应该是来自官网(虽然有时候可能官网找不到):
http://kafka.apachecn.org/documentation.html#newconsumerconfigs
嗯,以下内容来自 kafka权威指南 ,请原谅我的小懒惰。。。后续有时间会把工作中的遇到的补充上
-
fetch.min.bytes
该属性指定了消费者从服务器获取记录的最小字节数。broker 在收到消费者的数据请求时,
如果可用的数据量小于fetch.min.bytes 指定的大小,那么它会等到有足够的可用数据时才把它返回给消费者。这样可以降低消费者和 broker 的工作负载,因为它们在主题不是很活跃的时候(或者一天里的低谷时段)就不需要来来回回地处理消息。如果没有很多可用数据,但消费者的 CPU 使用率却很高,那么就需要把该属性的值设得比默认值大。如果消费者的数量比较多,把该属性的值设置得大一点可以降低broker 的工作负载。 -
fetch.max.wait.ms
我们通过 fetch.min.bytes 告诉 Kafka,等到有足够的数据时才把它返回给消费者。而feth.max.wait.ms 则用于指定 broker 的等待时间,默认是 500ms。如果没有足够的数据流入 Kafka,消费者获取最小数据量的要求就得不到满足,最终导致 500ms 的延迟。如果要降低潜在的延迟(为了满足 SLA),可以把该参数值设置得小一些。
如果 fetch.max.wait.ms 被设为 100ms,并且fetch.min.bytes 被设为 1MB,那么 Kafka 在收到消费者的请求后,要么返回 1MB 数据,要么在100ms 后返回所有可用的数据,就看哪个条件先得到满足。 -
max.partition.fetch.bytes
该属性指定了服务器从每个分区里返回给消费者的最大字节数。它的默认值是 1MB,也就是说,KafkaConsumer.poll() 方法从每个分区里返回的记录最多不超过 max.partition.fetch.bytes指定的字节。如果一个主题有 20 个分区和 5 个消费者,那么每个消费者需要至少 4MB 的可用内存来接收记录。在为消费者分配内存时,可以给它们多分配一些,因为如果群组里有消费者发生崩溃,剩下的消费者需要处理更多的分区。
max.partition.fetch.bytes 的值必须比 broker 能够接收的最大消息的字节数(通过 max.message.size 属性配置)大,否则消费者可能无法读取这些消息,导致消费者一直挂起重试。在设置该属性时,另一个需要考虑的因素是消费者处理数据的时间。消费者需要频繁调用poll() 方法来避免会话过期和发生分区再均衡,如果单次调用 poll() 返回的数据太多,消费者需要更多的时间来处理,可能无法及时进行下一个轮询来避免会话过期。
如果出现这种情况,可以把max.partition.fetch.bytes 值改小,或者延长会话过期时间。 -
session.timeout.ms
该属性指定了消费者在被认为死亡之前可以与服务器断开连接的时间,默认是 3s。如果消费者没有在session.timeout.ms 指定的时间内发送心跳给群组协调器,就被认为已经死亡,协调器就会触发再均衡,把它的分区分配给群组里的其他消费者。
该属性与 heartbeat.interval.ms 紧密相关。heartbeat.interval.ms 指定了 poll() 方法向协调器发送心跳的频率,session.timeout.ms则指定了消费者可以多久不发送心跳。
所以,一般需要同时修改这两个属性,heartbeat.interval.ms 必须比 session.timeout.ms 小,一般是session.timeout.ms 的三分之一。如果 session.timeout.ms 是 3s,那么 heartbeat.interval.ms 应该是 1s。把session.timeout.ms 值设得比默认值小,可以更快地检测和恢复崩溃的节点,不过长时间的轮询或垃圾收集可能导致非预期的再均衡。把该属性的值设置得大一些,可以减少意外的再均衡,不过检测节点崩溃需要更长的时间。 -
auto.offset.reset
该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下(因消费者长时间失效,包含偏移量的记录已经过时并被删除)该作何处理。
它的默认值是 latest,意思是说,在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)。另一个值是earliest,意思是说,在偏移量无效的情况下,消费者将从起始位置读取分区的记录。 -
enable.auto.commit
我们稍后将介绍几种不同的提交偏移量的方式。该属性指定了消费者是否自动提交偏移量,默认值是true。为了尽量避免出现重复数据和数据丢失,可以把它设为 false,由自己控制何时提交偏移量。如果把它设为true,还可以通过配置 auto.commit.interval.ms 属性来控制提交的频率。 -
partition.assignment.strategy
(这部分好像重复了 ~~~)
我们知道,分区会被分配给群组里的消费者。PartitionAssignor 根据给定的消费者和主题,决定哪些分区应该被分配给哪个消费者。Kafka 有两个默认的分配策略。
Range
该策略会把主题的若干个连续的分区分配给消费者。假设消费者 C1 和消费者 C2 同时订阅了主题T1 和 主题 T2,并且每个主题有 3 个分区。那么消费者 C1 有可能分配到这两个主题的分区 0 和分区 1,而消费者 C2 分配到这两个主题的分区 2。因为每个主题拥有奇数个分区,而分配是在主题内独立完成的,第一个消费者最后分配到比第二个消费者更多的分区。只要使用了 Range 策略,而且分区数量无法被消费者数量整除,就会出现这种情况。RoundRobin
该策略把主题的所有分区逐个分配给消费者。如果使用 RoundRobin 策略来给消费者 C1 和消费者C2 分配分区,那么消费者 C1 将分到主题 T1 的分区 0 和分区 2 以及主题 T2 的分区 1,消费者 C2 将分配到主题 T1 的分区 1 以及主题 T2 的分区 0 和分区 2。一般来说,如果所有消费者都订阅相同的主题(这种情况很常见),RoundRobin 策略会给所有消费者分配相同数量的分区(或最多就差一个分区)。可以通过设置 partition.assignment.strategy 来选择分区策略。
默认使用的是org.apache.kafka.clients.consumer.RangeAssignor,这个类实现了 Range 策略,不过也可以把它改成 org.apache.kafka.clients.consumer.RoundRobinAssignor。我们还可以使用自定义策略,在这种情况下,partition.assignment.strategy 属性的值就是自定义类的名字。 -
client.id
该属性可以是任意字符串,broker 用它来标识从客户端发送过来的消息,通常被用在日志、度量指标和配额里。 -
max.poll.records
该属性用于控制单次调用 call() 方法能够返回的记录数量,可以帮你控制在轮询里需要处理的数据量。 -
receive.buffer.bytes 和 send.buffer.bytes
socket 在读写数据时用到的 TCP 缓冲区也可以设置大小。如果它们被设为 -1,就使用操作系统的默认值。
如果生产者或消费者与 broker 处于不同的数据中心内,可以适当增大这些值,因为跨数据中心的网络一般都有比较高的延迟和比较低的带宽
offset 的控制
这篇文章有点太长了,所以准备另起一篇来专门讲 offset 的控制。
预计在周末更新吧,如果你有兴趣,可以点击关注一下,以便及时收到提醒噢!!!弱弱的,也是求一波关注,哈哈哈!!!