kafka05 开发自定义分区器

2018-09-11  本文已影响5人  6c0fe9142f09

开发自定义分区器

上一节我们看到,如果在发送消息的时候没有指定对应的分区,会使用默认分区器对消息进行分区,这一节我们试着写一个自己的分区器

默认分区器源码阅读
org.apache.kafka.clients.producer.internals.DefaultPartitioner
**********
获取当前partition的数量
如果key==null:随机在partition中进行分配
如果key!=null:会对key进行hash取值,使用hash对partition数量进行取模
+ 所以key相同的消息,会发送到相同的分区
**********
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if (keyBytes == null) {
            int nextValue = nextValue(topic);
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() > 0) {
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return availablePartitions.get(part).partition();
            } else {
                // no partitions are available, give a non-available partition
                return Utils.toPositive(nextValue) % numPartitions;
            }
        } else {
            // hash the keyBytes to choose a partition
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }
自定义分区器
public class MyPartition implements Partitioner {
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        System.out.println("Customed Partitioner is running...");
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();

        if (keyBytes == null){
            throw new InvalidRecordException("key cannot be null..");
        }else {
            if (((String)key).equals("1")){
                return 1;
            }else {
                return (Math.abs(Utils.murmur2(keyBytes)) % (numPartitions));
            }
        }
    }

    public void close() {

    }

    public void configure(Map<String, ?> configs) {

    }
kafkaProps.put("partitioner.class","com.shiyanlou.MyPartition");
package com.shiyanlou;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.record.InvalidRecordException;
import org.apache.kafka.common.utils.Utils;

import javax.rmi.CORBA.Util;
import java.util.List;
import java.util.Map;

public class MyPartition implements Partitioner {


    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        System.out.println("Customed Partitioner is running...");
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();

        if (keyBytes == null){
            throw new InvalidRecordException("key cannot be null..");
        }else {
            if (((String)key).equals("1")){
                return 1;
            }else {
                return (Math.abs(Utils.murmur2(keyBytes)) % (numPartitions));
            }
        }
    }

    public void close() {

    }

    public void configure(Map<String, ?> configs) {

    }
}
package com.shiyanlou.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;
import java.util.concurrent.Future;

public class MySecondProducer {
    public static void main(String[] args) {
        Properties kafkaProps = new Properties();
        kafkaProps.put("bootstrap.servers", "132.232.14.247:9094");
        kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        kafkaProps.put("partitioner.class","com.shiyanlou.MyPartition");

        KafkaProducer<String,String> producer = new KafkaProducer<String, String>(kafkaProps);
        ProducerRecord<String,String> record = new ProducerRecord<String, String>("mySecondTopic","1","hello kafka");

        long startTime = System.currentTimeMillis();
        for (int i=0;i<10;i++) {
            try {
                //发送前面创建的消息对象ProducerRecord到kafka集群
                //发送消息过程中可能发送错误,如无法连接kafka集群,所以在这里使用捕获异常代码
                Future<RecordMetadata> future = producer.send(record);
                //producer的send方法返回Future对象,我们使用Future对象的get方法来实现同步发送消息。
                //Future对象的get方法会产生阻塞,直到获取kafka集群的响应,响应结果分两种:
                //1、响应中有异常:此时get方法会抛出异常,我们可以捕获此异常进行相应的业务处理
                //2、响应中无异常:此时get方法会返回RecordMetadata对象,此对象包含了当前发送成功的消息在Topic中的offset、partition等信息
                RecordMetadata recordMetadata = future.get();
                long offset=recordMetadata.offset();
                int partition=recordMetadata.partition();
                System.out.println("the message  offset : "+offset+" ,partition:"+partition+"。");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        long endTime = System.currentTimeMillis();
        System.out.println(endTime-startTime);

        producer.close();
    }
}
上一篇下一篇

猜你喜欢

热点阅读