基础知识IT-资料大数据面试

Kafka基础知识扫盲

2019-08-14  本文已影响118人  刘一一同学

1. 简介

Kafka 是由 LinkedIn 开发的一个基于发布/订阅的消息系统,具有高性能、持久化、多副本备份、横向扩展能力。生产者往队列里写消息,消费者从队列里取消息进行业务逻辑。一般在架构设计中起到解耦、削峰、异步处理的作用。

Kafka 对外使用 topic 的概念,一个 topic 可以由一个或多个 partition(分区),生产者往 topic 里写消息,消费者从 topic 拉取消息。如果一个主题有多个分区,Kafka只能保证一个分区内消息的有序性,在不同的分区之间无法保证。对于时序数据而言,如果将某个主题下的数据集合分成了多个分区,可能会造成读取数据的无序。

系统遇到瓶颈时,可以通过增加 partition 的数量来进行横向扩容。

2. 相关术语

3. 使用场景

4. 消息投递语义

设置 enable.auto.commit 为 ture。
设置 auto.commit.interval.ms 为一个较小的时间间隔。
client 不要调用 commitSync(),Kafka 在特定的时间间隔内自动提交。

方法一:
设置 enable.auto.commit 为 false。
client 调用 commitSync(),增加消息偏移。

方法二:
设置 enable.auto.commit 为 ture。
设置 auto.commit.interval.ms 为一个较大的时间间隔。
client 调用 commitSync(),增加消息偏移。

设置 enable.auto.commit为 false。
保存 ConsumerRecord 中的 offset 到数据库。
当 partition 发生变化的时候需要 rebalance,有以下几个事件会触发分区变化:

  1. consumer 订阅的 topic 中的分区大小发生变化。
  2. topic 被创建或者被删除。
  3. consuer 所在 group 中有个成员挂了。
  4. 新的 consumer 通过调用 join 加入了 group。此时 consumer 通过实现 ConsumerRebalanceListener 接口,捕捉这些事件,对偏移量进行处理。consumer 通过调用 seek(TopicPartition, long) 方法,移动到指定的分区的偏移位置。

在业务中,常常都是使用 At least once的模型,整体的消息投递语义需要 Producer 端和 Consumer 端两者来保证。

5. 生产者分区选择配策略

生产者在将消息发送到某个 topic ,需要经过拦截器、序列化器和分区器(Partitioner)的一系列作用之后才能发送到对应的 broker,在发往 broker 之前是需要确定它所发往的分区。

public class ProducerRecord<K, V> {
    // 该消息需要发往的主题
    private final String topic;
    // 该消息需要发往的主题中的某个分区,如果该字段有值,则分区器不起作用,直接发往指定的分区
    // 如果该值为null,则利用分区器进行分区的选择 
    private final Integer partition;
    private final Headers headers;
    // 如果partition字段为null,则使用分区器进行分区选择时会用到该key字段,该值可为空 
    private final K key;
    private final V value;
    private final Long timestamp;

Kafka 中提供的默认分区器是 DefaultPartitioner,它实现了 Partitioner 接口(用户可以实现这个接口来自定义分区器),其中的 partition 方法就是用来实现具体的分区分配逻辑:

public class DefaultPartitioner implements Partitioner {

    private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        // 首先通过cluster从元数据中获取topic所有的分区信息
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        // 拿到该topic的分区数
        int numPartitions = partitions.size();
        // 如果消息记录中没有指定key
        if (keyBytes == null) {
            // 则获取一个自增的值
            int nextValue = nextValue(topic);
            // 通过cluster拿到所有可用的分区(可用的分区这里指的是该分区存在首领副本)
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            // 如果该topic存在可用的分区
            if (availablePartitions.size() > 0) {
                // 那么将nextValue转成正数之后对可用分区数进行取余
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                // 然后从可用分区中返回一个分区
                return availablePartitions.get(part).partition();
            } else { // 如果不存在可用的分区
                // 那么就从所有不可用的分区中通过取余的方式返回一个不可用的分区
                return Utils.toPositive(nextValue) % numPartitions;
            }
        } else { // 如果消息记录中指定了key
            // 则使用该key进行hash操作,然后对所有的分区数进行取余操作,这里的hash算法采用的是murmur2算法,然后再转成正数
            //toPositive方法很简单,直接将给定的参数与0X7FFFFFFF进行逻辑与操作。
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

    // nextValue方法可以理解为是在消息记录中没有指定key的情况下,需要生成一个数用来代替key的hash值
    // 方法就是最开始先生成一个随机数,之后在这个随机数的基础上每次请求时均进行+1的操作
    private int nextValue(String topic) {
        // 每个topic都对应着一个计数
        AtomicInteger counter = topicCounterMap.get(topic);
        if (null == counter) { // 如果是第一次,该topic还没有对应的计数
            // 那么先生成一个随机数
            counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
            // 然后将该随机数与topic对应起来存入map中
            AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
            if (currentCounter != null) {
                // 之后把这个随机数返回
                counter = currentCounter;
            }
        }
        // 一旦存入了随机数之后,后续的请求均在该随机数的基础上+1之后进行返回
        return counter.getAndIncrement();
    }

6. 消费者分区分配策略

消费者以组的名义订阅主题,主题有多个分区,消费者组中有多个消费者实例,同一时刻,一条消息只能被组中的一个消费者实例消费。

如果多个消费者负责同一个分区,那么就意味着两个消费者同时读取分区的消息,由于消费者自己可以控制读取消息的offset,就有可能C1才读到2,而C1读到1,C1还没处理完,C2已经读到3了,这就相当于多线程读取同一个消息,会造成消息处理的重复,且不能保证消息的顺序。

6.1 Range策略

range (默认分配策略)对应的实现类是org.apache.kafka.clients.consumer.RangeAssignor

  1. 假设,有1个主题、10个分区、3个消费者线程, 10 / 3 = 3,而且除不尽,那么消费者C1将会多消费一个分区,分配结果是:

    • C1将消费T1主题的0、1、2、3分区。
    • C2将消费T1主题的4、5、6分区。
    • C3将消费T1主题的7、8、9分区
  2. 假设,有11个分区,分配结果是:

    • C1将消费T1主题的0、1、2、3分区。
    • C2将消费T1主题的4、5、 6、7分区。
    • C2将消费T1主题的8、9、10分区。
  3. 假如,有2个主题(T0和T1),分别有3个分区,分配结果是:

    • C1将消费T1主题的 0、1 分区,以及 T1 主题的 0、1 分区。
    • C2将消费T1主题的 2、3 分区,以及 T2 主题的 2、3 分区。
public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,
                                                    Map<String, Subscription> subscriptions) {
    // 主题与消费者的映射                                                            
    Map<String, List<String>> consumersPerTopic = consumersPerTopic(subscriptions);
    Map<String, List<TopicPartition>> assignment = new HashMap<>();
    for (String memberId : subscriptions.keySet())
        assignment.put(memberId, new ArrayList<TopicPartition>());

    for (Map.Entry<String, List<String>> topicEntry : consumersPerTopic.entrySet()) {
        String topic = topicEntry.getKey();    //  主题
        List<String> consumersForTopic = topicEntry.getValue();    //  消费者列表

        // partitionsPerTopic表示主题和分区数的映射
        // 获取主题下有多少个分区
        Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
        if (numPartitionsForTopic == null)
            continue;

        // 消费者按字典序排序
        Collections.sort(consumersForTopic);

        // 分区数量除以消费者数量
        int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();
        // 取模,余数就是额外的分区
        int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();

        List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);
        for (int i = 0, n = consumersForTopic.size(); i < n; i++) {
            int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition);
            int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1);
            // 分配分区
            assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length));
        }
    }
    return assignment;
}

6.2 RoundRobin策略

RoundRobin 基于轮询算法,对应的实现类是 org.apache.kafka.clients.consumer.RoundRobinAssignor

假设,有两个消费者C0和C1,两个主题T0和T1,每个主题有3个分区,分配结果是:

上一篇 下一篇

猜你喜欢

热点阅读