互联网科技Java成长之路Java 杂谈

Kafka 分区分配计算(分区器 Partitions )

2019-05-15  本文已影响10人  Java_老男孩

KafkaProducer在调用send方法发送消息至broker的过程中,首先是经过拦截器Inteceptors处理,然后是经过序列化Serializer处理,之后就到了Partitions阶段,即分区分配计算阶段。在某些应用场景下,业务逻辑需要控制每条消息落到合适的分区中,有些情形下则只要根据默认的分配规则即可。在KafkaProducer计算分配时,首先根据的是ProducerRecord中的partition字段指定的序号计算分区。读者有可能刚睡醒,看到这个ProducerRecord似曾相识,没有关系,先看段Kafka生产者的示例片段:

Producer<String,String> producer = new KafkaProducer<String,String>(properties);
String message = "kafka producer demo";
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic,message);
try {
    producer.send(producerRecord).get();
} catch (InterruptedException e) {
    e.printStackTrace();
} catch (ExecutionException e) {
    e.printStackTrace();
}

没错,ProducerRecord只是一个封装了消息的对象而已,ProducerRecord一共有5个成员变量,即:

private final String topic;//所要发送的topic
private final Integer partition;//指定的partition序号
private final Headers headers;//一组键值对,与RabbitMQ中的headers类似,kafka0.11.x版本才引入的一个属性
private final K key;//消息的key
private final V value;//消息的value,即消息体
private final Long timestamp;//消息的时间戳,可以分为Create_Time和LogAppend_Time之分,这个以后的文章中再表。123456

在KafkaProducer的源码(1.0.0)中,计算分区时调用的是下面的partition()方法:

/**
 * computes partition for given record.
 * if the record has partition returns the value otherwise
 * calls configured partitioner class to compute the partition.
 */
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
    Integer partition = record.partition();
    return partition != null ?
            partition :
            partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}

可以看出的确是先判断有无指明ProducerRecord的partition字段,如果没有指明,则再进一步计算分区。上面这段代码中的partitioner在默认情况下是指Kafka默认实现的org.apache.kafka.clients.producer.DefaultPartitioner,其partition()方法实现如下:

/**
 * Compute the partition for the given record.
 *
 * @param topic The topic name
 * @param key The key to partition on (or null if no key)
 * @param keyBytes serialized key to partition on (or null if no key)
 * @param value The value to partition on or null
 * @param valueBytes serialized value to partition on or null
 * @param cluster The current cluster metadata
 */
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    int numPartitions = partitions.size();
    if (keyBytes == null) {
        int nextValue = nextValue(topic);
        List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
        if (availablePartitions.size() > 0) {
            int part = Utils.toPositive(nextValue) % availablePartitions.size();
            return availablePartitions.get(part).partition();
        } else {
            // no partitions are available, give a non-available partition
            return Utils.toPositive(nextValue) % numPartitions;
        }
    } else {
        // hash the keyBytes to choose a partition
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }
}

private int nextValue(String topic) {
    AtomicInteger counter = topicCounterMap.get(topic);
    if (null == counter) {
        counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
        AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
        if (currentCounter != null) {
            counter = currentCounter;
        }
    }
    return counter.getAndIncrement();
}

由上源码可以看出partition的计算方式:

KafkaProducer中还支持自定义分区分配方式,与org.apache.kafka.clients.producer.internals.DefaultPartitioner一样首先实现org.apache.kafka.clients.producer.Partitioner接口,然后在KafkaProducer的配置中指定partitioner.class为对应的自定义分区器(Partitioners)即可,即:

properties.put("partitioner.class","com.hidden.partitioner.DemoPartitioner");

自定义DemoPartitioner主要是实现Partitioner接口的public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster)的方法。DemoPartitioner稍微修改了下DefaultPartitioner的计算方式,详细参考如下:

public class DemoPartitioner implements Partitioner {

    private final AtomicInteger atomicInteger = new AtomicInteger(0);

    @Override
    public void configure(Map<String, ?> configs) {}

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if (null == keyBytes || keyBytes.length<1) {
            return atomicInteger.getAndIncrement() % numPartitions;
        }
        //借用String的hashCode的计算方式
        int hash = 0;
        for (byte b : keyBytes) {
            hash = 31 * hash + b;
        }
        return hash % numPartitions;
    }

    @Override
    public void close() {}
}

这个自定义分区器的实现比较简单,读者可以根据自身业务的需求来灵活实现分配分区的计算方式,比如:一般大型电商都有多个仓库,可以将仓库的名称或者ID作为Key来灵活的记录商品信息。


本文的重点是你有没有收获与成长,其余的都不重要,希望读者们能谨记这一点。同时我经过多年的收藏目前也算收集到了一套完整的学习资料,包括但不限于:分布式架构、高可扩展、高性能、高并发、Jvm性能调优、Spring,MyBatis,Nginx源码分析,Redis,ActiveMQ、、Mycat、Netty、Kafka、Mysql、Zookeeper、Tomcat、Docker、Dubbo、Nginx等多个知识点高级进阶干货,希望对想成为架构师的朋友有一定的参考和帮助

喜欢这篇文章的朋友可以点个喜欢,也可以关注一下我的个人专题:Java成长之路

需要更详细架构师技能思维导图和以下资料的可以加一下技术交流分享群:“708 701 457”免费获取




上一篇下一篇

猜你喜欢

热点阅读