大数据学习

kafka学习(3) 消费者:消费者组,重平衡,订阅方式,开发步

2020-07-01  本文已影响0人  xiaogp

KafkaConsumer负责订阅主题, 并且从订阅的主题中拉取消息

消费者和消费组

每一个消费者都有一个对应的消费组, 当消息发布到主题之后, 只会被投递给订阅它的每个消费组中的一个消费者。消费组将消费者归为一类, 每一个消费者只隶属于一个消费组, 如果所有消费者都隶属于同一个消费组, 那么就是点对点模式, 如果所有消费者隶属于不同消费组就是发布/订阅模式, 可以通过group.id来配置

消费者和消费者组.png
如图所示某主题有4个分区p0, p1, p2, p3, 有两个消费组A和B, A中有4个消费者, B中有2个消费者, 按照kafka默认的分配规则, 分配结果是消费者组A中每个消费者一个分区, 消费者组B中每个消费者分配到2个分区, 两个消费组互不影响。
消费者组的理解

(1) 不使用消费者组的话, 每一条消息都会被分发到所有消费者(相当于每个消费是一个消费者组), 如果消费者组有10个消费者, 不使用消费者组,每一条消息都会被消费10次,消费者组为一个整体来消费主题的所有分区
(2) 使用消费者组的话, 所有消费者组中的消费者是一个整体, 每条消息只被消费一次
(3) 在消费者组中可以有一个或者多个消费者实例, 这些消费者共享一个公共的groupid, groupid是一个字符串,用来唯一标志一个消费者组,组内的所有消费者协调在一起来消费订阅主题的所有分区。
(4) 同一个topic下的某个分区只能被消费者组中的一个消费者消费(消费者可以少于主题的分区,一个消费者可以消费一个主题的多个分区,但是如果主题分区比消费者组中的消费者少,一个主题也只会发给一个消费者不会多发,此时多出来的消费者消费不到任何消息), 不同消费者组中的消费者可以消费相同的分区
(5) 如果消费者组当中消费者的数量超过了订阅主题分区的数量,那么多余的消费者就会被闲置,不会受到任何消息
(6) 一个消费者组的一个消费者,可以消费一个topic下的多个分区(消费者比分区少)
(7) 同一个topic下的某个分区,可以被多个消费者组,消费者消息

重平衡:

新的消费者加入消费组,它会消费一个或多个分区,而这些分区之前是由其他消费者负责的;另外,当消费者离开消费组(比如重启、宕机等)时,它所消费的分区会分配给其他分区。这种现象称为重平衡(rebalance)。重平衡是Kafka一个很重要的性质

消费者组和Kafka两种模式的关系

消息中间件有两种模式电对点(P2P)发布订阅模式(Pub、Sub)模式

为什么需要消费者组

客户端开发

消费逻辑需要以下几个步骤

(1) 配置消费者客户端参数以及创消费者实例
(2) 订阅主题
(3) 拉取消费者并且消费
(4) 提交消费者位移
(5) 关闭消费者实例

代码实现

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;

public class KafkaConsumerAnalysis {
    public static final String brokerList = "192.168.61.97:9092";
    public static final String topic = "test_gp";
    public static final String group_id = "group.demo";
    public static final AtomicBoolean isRuning = new AtomicBoolean(true);

    public static Properties initConfig() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", StringDeserializer.class.getName());
        props.put("group.id", group_id);
        props.put("client.id", "consumer.client.id.demo");
        return props;
    }

    public static void main(String[] args) {
        Properties props = initConfig();
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(Arrays.asList(topic));

        try {
          while (isRuning.get()) {
              ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
              for (ConsumerRecord<String, String> record: records) {
                  System.out.println("topic = " + record.topic() + ", partition = " + record.partition() +
                          ", offset = " + record.offset());
                  System.out.println("key = " + record.key() + ", value = " + record.value());
              }
          }
        } catch (Exception e) {
            System.out.println(e);
        } finally {
            consumer.close();
        }
    }
}

必要的参数

消费者客户端有4个参数需要设置
(1) bootstrap.servers: 指定连接的kafka集群所需的broker地址清单, 格式为host1:port1,host2:port2,可以设置一个或者多个地址, 用逗号隔开, 建议设置2个以上地址
(2) group.id: 消费者隶属的消费者组, 不能为空,这个参数需要设置成具有一定业务意义的名称。
(3) key.deserializervalue.deserializer: 与生产者客户端的序列化方式一致,与key.serializervalue.serializer保持一致。消费者从broker端获取的消息格式是字节数组byte[],所以需要响应的反序列化操作才能还原成原有的对象格式。
其他参数有client.id这个参数如果不设置KafkaConsumer会自动生成,比如“consumer-1”。
可以使用ConsumerConfig类防止参数写错

public static Properties initConfig() {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.GROUP_ID_CONFIG, group_id);
        props.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer.client.id.demo");
        return props;
    }

消费者相对于生产者,除了必要的序列化参数之外,多了一个group.id参数.

订阅主题和分区

主要方法有订阅主题集合正则表达式订阅主题定于指定主题的分区,这三种互斥, 只能指定一种。

使用集合和正则表达式订阅主题

一个消费者可以订阅一个或多个主题. 使用subscribe()方法订阅主题,可以使用集合的形式和正则表达式订阅多个主题。以下两种方式都可以订阅clear_data和clear_data_01两个主题,正则表达式.*代表后续0个或者多个任意字符。

consumer.subscribe(Arrays.asList("clear_data", "clear_data_01"));

正则表达式需要设置再平衡监听器ConsumerRebalanceListener

consumer.subscribe(Pattern.compile("clear_data.*"), new ConsumerRebalanceListener() {
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> collection) {

            }

            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> collection) {

            }
        })

如果主要消费一个主题,可以使用Collections.singletonList()将一个元素转化为一个集合,Collections.singletonList()返回的是不可变的集合,这个长度的集合只有1,可以减少内存空间

consumer.subscribe(Collections.singletonList("clear_data"));

如果在脚本中多次调用了了consumer.subscribe方法,最终订阅的是脚本最下面最新定义的主题。

订阅指定主题的指定分区

KafkaConsumer.assign()方法实现了这一功能, assign接收Collection<TopicPartition>, 其中TopicPartition有2个属性, topic和partition, 分区从0开始编号, 可以通过partitionFor()方法获得主题的分区信息, partitionFor()接受参数topic,partitionFor可以查看的参数包括:

List<PartitionInfo> res = producer.partitionsFor("pira_clear_save_data");
        for (PartitionInfo info : res) {
            System.out.println("topic:" + info.topic());
            System.out.println("partition:" + info.partition());
            System.out.println("leader:" + info.leader());
            System.out.println("replicas Array:" + Arrays.toString(info.replicas()));
            System.out.println("ISR:" + Arrays.toString(info.inSyncReplicas()));;
            System.out.println("--------------------------");
        }

PartitionInfo跟消息无关,与主题和分区本身有关,List<PartitionInfo>显示每个partition的信息,分别是主题名分区号(0,1,2,...)leader副本所在位置,AR集合(所有副本集合)的位置,ISR集合所在位置。

topic:clear_data
partition:2
leader:cloudera02:9092 (id: 77 rack: null)
replicas Array:[cloudera02:9092 (id: 77 rack: null), cloudera01:9092 (id: 78 rack: null), cloudera03:9092 (id: 79 rack: null)]
ISR:[cloudera02:9092 (id: 77 rack: null), cloudera01:9092 (id: 78 rack: null), cloudera03:9092 (id: 79 rack: null)]
--------------------------
topic:clear_data
partition:1
leader:cloudera03:9092 (id: 79 rack: null)
replicas Array:[cloudera03:9092 (id: 79 rack: null), cloudera02:9092 (id: 77 rack: null), cloudera01:9092 (id: 78 rack: null)]
ISR:[cloudera03:9092 (id: 79 rack: null), cloudera02:9092 (id: 77 rack: null), cloudera01:9092 (id: 78 rack: null)]
--------------------------
topic:clear_data
partition:0
leader:cloudera01:9092 (id: 78 rack: null)
replicas Array:[cloudera01:9092 (id: 78 rack: null), cloudera03:9092 (id: 79 rack: null), cloudera02:9092 (id: 77 rack: null)]
ISR:[cloudera01:9092 (id: 78 rack: null), cloudera03:9092 (id: 79 rack: null), cloudera02:9092 (id: 77 rack: null)]
--------------------------

也可以在kafka客户端使用describe得到分区信息

Topic:pira_clear_save_data  PartitionCount:3    ReplicationFactor:3 Configs:
    Topic: pira_clear_save_data Partition: 0    Leader: 78  Replicas: 78,79,77  Isr: 78,79,77
    Topic: pira_clear_save_data Partition: 1    Leader: 79  Replicas: 79,77,78  Isr: 79,77,78
    Topic: pira_clear_save_data Partition: 2    Leader: 77  Replicas: 77,78,79  Isr: 77,78,79

在知道主题有那些分区之后可以使用KafkaConsumer.assign()订阅指定clear_data主题分区0的消息。

consumer.assign(Arrays.asList(new TopicPartition("clear_data", 0)));
取消订阅

取消订阅调用unsubscribe()方法.

consumer.unsubscribe();
consumer.subscribe(new ArrayList<String>());
consumer.assign(new ArrayList<TopicPartition>());
subscribe()和assign()比较

subscribe具有自动在均衡的功能,来实现消费负载均衡故障自动转移,而assign不具备这种功能。

消息消费

ConsumerRecords提供了iterator()方法遍历消息来消费,也可以根据分区来进行消费和根据主题来进行消费。

遍历消费
consumer.subscribe(Collections.singletonList("clear_data"));
        ConsumerRecords<String, String> consumerRecords = consumer.poll(1000L);
        for (ConsumerRecord<String, String> record : consumerRecords) {
            JSONObject jsonObject = JSON.parseObject(record.value());
            // TODO
        }
根据分区进行消费
for (TopicPartition tp : consumerRecords.partitions()) {
            for (ConsumerRecord<String, String> record : consumerRecords.records(tp)) {
                JSONObject jsonObject = JSON.parseObject(record.value());
                // TODO
            }
        }
根据主题进行消费
consumer.subscribe(Arrays.asList("clear_data", "clear_data2"));
        ConsumerRecords<String, String> consumerRecords = consumer.poll(1000L);
        for (String topic : Arrays.asList("clear_data", "clear_data2")) {
            for (ConsumerRecord<String, String> record : consumerRecords.records(topic)) {
                JSONObject jsonObject = JSON.parseObject(record.value());
                // TODO
            }
        }

位移提交

对于kafka中的分区而言, 他的每条消息都有一个唯一的offset, 用来表示消息在分区中对应的位置.对于消费者而言它也有一个offset概念, 消费者使用offset来表示消费到分区中某个消息所在的位置.

对于消息在分区中的位置, offset作为偏移量
对于消费者消费到的位置, 将offset称为位移
对于一条消息而言, 它的偏移量和消费者消费他的位移是对等的

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);

这个默认提交的自动提交不是每消费一条就提交一次, 而是定时提交, 默认是每个5秒, 但是自动提交是在poll方法逻辑内完成的, 在每次poll请求之前都会检查是否可以进行位移提交, 如果可以就会提交上一次轮询的位移
自动位移提交可能会导致重复消费数据丢失

(1) 重复消费发生在消费者崩了, 位移未提交, 下一次重新拉取, 可以通过减小自动提交位移的时间间隔缩短重新拉去的数据大小
(2) 数据丢失发生在消费者崩了, 数据还没有处理完, 但是这一批的位移已经提交, 重启消费者从下一批数据开始拉取

自动位移提交在正常情况下不会发生重复消费或者数据丢失, 但是异常无法避免, kafka提供了手动位移提交, 很多时候不是消费到信息就算完成, 而是需要将消息写入数据库,写入本地缓存, 或者更加复杂的业务处理才能算消费成功, 此时在进行位移提交, kafka的手动调教方式就是为了给开发人员根据逻辑在合适的地方进行位移提交, 开启手动提交需要修改参数。

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

手动提交可以细分为同步提交异步提交, 对应KafkaConsumer中的commitSync()commitAsync()

while (isRuning.get()) {
              ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
              for (ConsumerRecord<String, String> record: records) {
                  System.out.println("topic = " + record.topic() + ", partition = " + record.partition() +
                          ", offset = " + record.offset());
                  System.out.println("key = " + record.key() + ", value = " + record.value());
              }
              consumer.commitSync();
          }

可以将消息存入内存, 进行批量处理和批量提交

final int minBatchSize = 10;
        List<ConsumerRecord> buffer = new ArrayList<>();

        while (isRuning.get()) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
            for (ConsumerRecord<String, String> record: records) {
                buffer.add(record);
                System.out.println(buffer.size());
            }
            if (buffer.size() >= minBatchSize) {
                System.out.println(buffer);
                consumer.commitSync();
                buffer.clear();
            }
        }

commitSync()方法会阻塞消费者线程直至位移提交完成.

控制或关闭消费

KafkaConsumer提供了对消费速度进行控制的方法, 某些情况下我们可能需要暂停谋陷分区的消费而先消费其他分区, 当达到一定条件是再回复这些分区的消费. KafkaConsumer中使用pause()和resume()来暂停某些分区在拉取时返回数据给消费者客户端 和 恢复某些分区想消费者客户端返回数据

指定位移消费

当一个新的消费组建立的时候, 他根本没有可以找到的消费位移, 或者新订阅了一个新的主题, 也没有可用的位移, 或者当__consumer_offsets主题中有关消费组的位移信息被删除, 也找不到可用的位移
当kafka中消费者找不到位移的时候, 会根据消费者客户端参数 auto.offset.reset 的配置来决定从何处开始消费. 这个参数的默认值是latest, 表示从分区末尾开始消费. 如果设置成earliest, 那么消费者会从起始处开始消费. 如果设置成none则找不到位移直接报错.
kafka中poll无法精确掌控消费的起始位置, auto.offset.reset也只能设置从在开始或者结尾开始消费, 如果要从特定的位移开始消费需要使用KafkaConsumer的seek()方法.
先通过assignment()方法获取消费者所分配到的分区信息

consumer.subscribe(Arrays.asList(topic));

        consumer.poll(Duration.ofMillis(10000));
        Set<TopicPartition> assignment = consumer.assignment();
        System.out.println(assignment);
[test_gp-0]

seek方法接受的参数partition和offset, offset指定从分区的哪个位置开始消费, seek需要获取所分配的分区, 要获取所分配的分区必须先执行一次poll操作, 因为分区分配实在poll()调用过程中实现的.

consumer.subscribe(Arrays.asList(topic));

        consumer.poll(Duration.ofMillis(10000));
        Set<TopicPartition> assignment = consumer.assignment();
        System.out.println(assignment);
        List<TopicPartition> assignment2 = new ArrayList<>(assignment);

        consumer.seek(assignment2.get(0), 140);

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord record : records) {
                System.out.println(record);
            }
        }
上一篇 下一篇

猜你喜欢

热点阅读