Apache KafkaKafka技术专栏

Kafka分区分配计算(分区器Partitions)

2018-01-06  本文已影响114人  朱小厮

KafkaProducer在调用send方法发送消息至broker的过程中,首先是经过拦截器Inteceptors处理,然后是经过序列化Serializer处理,之后就到了Partitions阶段,即分区分配计算阶段。在某些应用场景下,业务逻辑需要控制每条消息落到合适的分区中,有些情形下则只要根据默认的分配规则即可。在KafkaProducer计算分配时,首先根据的是ProducerRecord中的partition字段指定的序号计算分区。读者有可能刚睡醒,看到这个ProducerRecord似曾相识,没有关系,先看段Kafka生产者的示例片段:

Producer<String,String> producer = new KafkaProducer<String,String>(properties);
String message = "kafka producer demo";
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic,message);
try {
    producer.send(producerRecord).get();
} catch (InterruptedException e) {
    e.printStackTrace();
} catch (ExecutionException e) {
    e.printStackTrace();
}

没错,ProducerRecord只是一个封装了消息的对象而已,ProducerRecord一共有5个成员变量,即:

private final String topic;//所要发送的topic
private final Integer partition;//指定的partition序号
private final Headers headers;//一组键值对,与RabbitMQ中的headers类似,kafka0.11.x版本才引入的一个属性
private final K key;//消息的key
private final V value;//消息的value,即消息体
private final Long timestamp;//消息的时间戳,可以分为Create_Time和LogAppend_Time之分,这个以后的文章中再表。

在KafkaProducer的源码(1.0.0)中,计算分区时调用的是下面的partition()方法:

/**
 * computes partition for given record.
 * if the record has partition returns the value otherwise
 * calls configured partitioner class to compute the partition.
 */
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
    Integer partition = record.partition();
    return partition != null ?
            partition :
            partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}

可以看出的确是先判断有无指明ProducerRecord的partition字段,如果没有指明,则再进一步计算分区。上面这段代码中的partitioner在默认情况下是指Kafka默认实现的org.apache.kafka.clients.producer.DefaultPartitioner,其partition()方法实现如下:

/**
 * Compute the partition for the given record.
 *
 * @param topic The topic name
 * @param key The key to partition on (or null if no key)
 * @param keyBytes serialized key to partition on (or null if no key)
 * @param value The value to partition on or null
 * @param valueBytes serialized value to partition on or null
 * @param cluster The current cluster metadata
 */
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    int numPartitions = partitions.size();
    if (keyBytes == null) {
        int nextValue = nextValue(topic);
        List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
        if (availablePartitions.size() > 0) {
            int part = Utils.toPositive(nextValue) % availablePartitions.size();
            return availablePartitions.get(part).partition();
        } else {
            // no partitions are available, give a non-available partition
            return Utils.toPositive(nextValue) % numPartitions;
        }
    } else {
        // hash the keyBytes to choose a partition
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }
}

private int nextValue(String topic) {
    AtomicInteger counter = topicCounterMap.get(topic);
    if (null == counter) {
        counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
        AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
        if (currentCounter != null) {
            counter = currentCounter;
        }
    }
    return counter.getAndIncrement();
}

由上源码可以看出partition的计算方式:

  1. 如果key为null,则按照一种轮询的方式来计算分区分配
  2. 如果key不为null则使用称之为murmur的Hash算法(非加密型Hash函数,具备高运算性能及低碰撞率)来计算分区分配。

KafkaProducer中还支持自定义分区分配方式,与org.apache.kafka.clients.producer.internals.DefaultPartitioner一样首先实现org.apache.kafka.clients.producer.Partitioner接口,然后在KafkaProducer的配置中指定partitioner.class为对应的自定义分区器(Partitioners)即可,即:

properties.put("partitioner.class","com.hidden.partitioner.DemoPartitioner");

自定义DemoPartitioner主要是实现Partitioner接口的public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster)的方法。DemoPartitioner稍微修改了下DefaultPartitioner的计算方式,详细参考如下:

public class DemoPartitioner implements Partitioner {

    private final AtomicInteger atomicInteger = new AtomicInteger(0);

    @Override
    public void configure(Map<String, ?> configs) {}

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if (null == keyBytes || keyBytes.length<1) {
            return atomicInteger.getAndIncrement() % numPartitions;
        }
        //借用String的hashCode的计算方式
        int hash = 0;
        for (byte b : keyBytes) {
            hash = 31 * hash + b;
        }
        return hash % numPartitions;
    }

    @Override
    public void close() {}
}

这个自定义分区器的实现比较简单,读者可以根据自身业务的需求来灵活实现分配分区的计算方式,比如:一般大型电商都有多个仓库,可以将仓库的名称或者ID作为Key来灵活的记录商品信息。


PS:消息中间件(Kafka、RabbitMQ)交流可加微信:hiddenzzh
欢迎支持《RabbitMQ实战指南》以及关注微信公众号:朱小厮的博客。


欢迎关注
上一篇下一篇

猜你喜欢

热点阅读