Kafka读书笔记:生产者(Producer)

2020-11-08  本文已影响0人  RealityVibe

生产者

客户端开发

正常的生产逻辑需要具备以下几个步骤:

  1. 构建生产者客户端参数及创建相应的生产者实例
  2. 构建待发送的消息
  3. 发送消息
  4. 关闭生产者实例

消息的发送

发送消息主要有三种模式:发后即忘(fire-and-forget)、同步(sync)及异步(async)

发后即忘:

​ 只管往Kafka发送消息,而不关心消息是否正确到达。在某些时候(比如发生不可重试异常时)会造成消息的丢失。这种发送方式性能最高(因为不用做检查处理,牺牲了消息的安全性,达到高吞吐的目的),可靠性最差。

同步:

​ 利用返回Future对象实现如下代码所示,失败时,捕捉异常,并做响应的异常处理。也可以通过注释的代码部分获取RecordMetadata对象,包含当前消息的主题、分区号、分区中的偏移量(offset)、时间戳等。

异常

​ 常见的可重试异常有:NetworkException、LeaderNotAvailableException、UnknownTopicOrPartitionException、NotEnoughReplicasException、NotCoordinatorException 等。比如NetworkException 表示网络异常,这个有可能是由于网络瞬时故障而导致的异常,可以通过重试解决;又比如LeaderNotAvailableException表示分区的leader副本不可用,这个异常通常发生在leader副本下线而新的 leader 副本选举完成之前,重试之后可以重新恢复

​ 配置retries 参数设置可重试异常的重试次数。如果重试超过设置次数之后还没有恢复,那么仍会抛出异常,进而发送的外层逻辑就要处理这些异常了。

​ RecordTooLargeException异常,暗示了所发送的消息太大,KafkaProducer对此不会进行任何重试,直接抛出异常。

private static void sendData() {
    Properties properties = new Properties();
    properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

    properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());


    KafkaProducer producer = new KafkaProducer<String, String>(properties);


    ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry",
            "Precision Producer", "France2");
    try {
        producer.send(record).get();
       // Future<RecordMetadata> future = producer.send(record);
       // RecordMetadata metaData = future.get();
    } catch (Exception e) {
        System.out.println("send Fail:" + e.getMessage());
    }

    producer.close();
}
异步发送
  1. 一般是在send()方法中指定一个Callback的回调函数,Kafka在返回响应式调用改函数来实现异步的发送请求。
  2. onCompletion()方法的两个参数是互斥的,消息发送成功时,metadata 不为 null 而exception为null;消息发送异常时,metadata为null而exception不为null。
producer.send(record, new Callback() {
    @Override
    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
        // recordMetadata和e必有一个为null
        if (e != null) {
            e.printStackTrace();
        } else {
            System.out.println(recordMetadata);
        }
    }
});
close()方法

close()方法会阻塞等待之前所有的发送请求完成后再关闭KafkaProducer。下面这个带超时时间参数的close()方法,只会等待timeout的时间,如果在timeout时间后仍未完成所有的请求处理,会强行退出,在实际应用中,一般使用无参close()方法。

public void close(long timeout, java.util.concurrent.TimeUnit timeUnit)

序列化

​ 生产者需要用序列化器把对象转换成字节数组才能通过网络发送给Kafka。而在接收方,消费者需要使用反序列化器把Kafka中收到的字节数组转换成对应的对象。

生产者拦截器

​ KafkaProducer中不仅可以指定一个拦截器,还可以指定多个拦截器以形成拦截链。拦截链会按照 interceptor.classes 参数配置的拦截器的顺序来一一执行(配置的时候,各个拦截器之间使用逗号隔开)

public interface ProducerInterceptor<K, V> extends Configurable {
    public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
    public void onAcknowledgement(RecordMetadata metadata, Exception exception);
    public void close();
}

在这 3 个方法中抛出的异常都会被捕获并记录到日志中,但并不会再向上传递。

onSend()

调用时间点:将消息序列化和计算分区之前

​ KafkaProducer会在将消息序列化和计算分区之前调用生产者拦截器的onSend()方法来对消息进行相应的定制化操作。不建议在这里修改ProducerRecord的topic、key和partition信息(可能会对问题的排查和定位造成影响,以及出现难以预料的bug),比如修改key不仅会影响分区的计算,同样会影响broker端日志压缩的功能。

onAcknowledgement()

调用时间点:在消息被应答(acknowledgement)之前或消息发送失败时,优先于用户设定的CallBack方法。

​ KafkaProducer 会在消息被应答(Acknowledgement)之前或消息发送失败时调用生产者拦截器的 onAcknowledgement()方法,优先于用户设定的 Callback 之前执行。这个方法运行在Producer 的 I/O 线程中,所以这个方法中实现的代码逻辑越简单越好,否则会影响消息的发送速度。

close()

​ close()方法主要用于在关闭拦截器时执行一些资源的清理工作。

生产者客户端整体架构

生产者客户端的整体架构

分区

如果key为null,会被随机分配到主题内各个可用的分区。如果key不为null,会对key进行计算决定分到哪个分区。

优点

​ 使用Kafka自己的hash算法对key进行运算,并不会因为java版本的升级导致分区结果不同。

缺点

​ 由于使用了hash算法,对横向扩展不友好。一旦主题增加了分区的个数,可能会造成旧的数据还在老的分区,新的数据被分配到了新的分区。如果要使用键来映射分区,最好在创建主题的时候就把分区规划好。

自定义分区策略

​ 如果出现某个key的数据量特别大,导致按默认key分区后,对应的分区数据量明显过大,从而导致存储和性能上的问题。这时候就需要使用自定义分区策略实现(通过实现Partitioner)。

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.record.InvalidRecordException;
import org.apache.kafka.common.utils.Utils;

import java.io.InvalidClassException;
import java.util.Map;

/**
 * @author by yze on 2020/11/8
 * @since 202011
 */
public class MyPartitioner implements Partitioner {

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

        int numPartitions = cluster.partitionCountForTopic(topic);
        if (keyBytes == null || !(key instanceof String)) {
            throw new InvalidRecordException("不支持的数据类型");
        }
        if (((String) key).equals("special")) {
            // 专门定制到一个分区
            return numPartitions;
        }
        // 其它的key会计算后被分配到除了最后一个分区以外的分区
        return Math.abs(Utils.murmur2(keyBytes)) % (numPartitions - 1);
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}
上一篇下一篇

猜你喜欢

热点阅读