Springboot

kafka——Producer API

2020-11-29  本文已影响0人  小波同学

一、Kafka 核心 API

下图是官方文档中的一个图,形象的描述了能与 Kafka集成的客户端类型


Kafka的五类客户端API类型如下:

本文中,我们将主要介绍 Producer API。

二、生产者客户端的基本架构图

由上图可以看出:KafkaProducer有两个基本线程。

Kafka 的 Producer 发送消息采用的是异步发送的方式。在消息发送的过程中,涉及到了
两个线程——main 线程和 Sender 线程,以及一个线程共享变量——RecordAccumulator。

main 线程将消息发送给 RecordAccumulator,Sender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafka broker。

相关参数:

三、Producer API

3.1、导入相关依赖

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.6.0</version>
</dependency>

3.2、Producer异步发送演示

private static final String TOPIC_NAME = "yibo_topic";

/**
 * Producer异步发送演示
 */
public static void producerSend(){
    Properties properties = new Properties();
    //kafka集群
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.174.128:9092");
    //ack应答级别
    properties.put(ProducerConfig.ACKS_CONFIG,"all");
    //重试次数
    properties.put(ProducerConfig.RETRIES_CONFIG,"3");
    //批次大小
    properties.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384");
    //等待时间
    properties.put(ProducerConfig.LINGER_MS_CONFIG,"1");
    //RecoderAccumulator缓冲区大小
    properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"33554432");
    //key,value的序列化类
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
    //创建生产者对象
    Producer<String,String> producer = new KafkaProducer<>(properties);
    //消息对象 ProducerRecoder
    for (int i = 0; i < 10; i++) {
        ProducerRecord<String,String> record =
                new ProducerRecord<>(TOPIC_NAME,"key-"+i,"value-"+i);
        producer.send(record);
    }
    producer.close();
}

3.3、Producer异步发送演示

回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是
RecordMetadata 和 Exception,如果 Exception 为 null,说明消息发送成功,如果
Exception 不为 null,说明消息发送失败。

注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。

private static final String TOPIC_NAME = "yibo_topic";

/**
 * Producer异步发送带回调函数演示
 */
public static void producerSendWithCallback(){
    Properties properties = new Properties();
    //kafka集群
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.174.128:9092");
    //ack应答级别
    properties.put(ProducerConfig.ACKS_CONFIG,"all");
    //重试次数
    properties.put(ProducerConfig.RETRIES_CONFIG,"0");
    //批次大小
    properties.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384");
    //等待时间
    properties.put(ProducerConfig.LINGER_MS_CONFIG,"1");
    //RecoderAccumulator缓冲区大小
    properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"33554432");
    //key,value的序列化类
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
    //创建生产者对象
    Producer<String,String> producer = new KafkaProducer<>(properties);
    //消息对象 ProducerRecoder
    ProducerRecord<String,String> record =
            new ProducerRecord<>(TOPIC_NAME,"key-hello","value-hello world");
    producer.send(record, new Callback() {
        //回调函数,该方法会在 Producer 收到 ack 时调用,为异步调用
        @Override
        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
            if (e == null) {
                log.info("send success partition: {}, offset: {}",recordMetadata.partition(),recordMetadata.offset());
            } else {
                log.error("exception",e);
            }
        }
    });
    producer.close();
}

3.4、Producer异步发送带回调函数和Partition负载均衡

private static final String TOPIC_NAME = "yibo_topic";

/**
 * Producer异步发送带回调函数和Partition负载均衡
 */
public static void producerSendWithCallbackAndPartition(){
    Properties properties = new Properties();
    //kafka集群
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.174.128:9092");
    //ack应答级别
    properties.put(ProducerConfig.ACKS_CONFIG,"all");
    //重试次数
    properties.put(ProducerConfig.RETRIES_CONFIG,"0");
    //批次大小
    properties.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384");
    //等待时间
    properties.put(ProducerConfig.LINGER_MS_CONFIG,"1");
    //RecoderAccumulator缓冲区大小
    properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"33554432");
    //key,value的序列化类
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
    //Partition负载均衡
    properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.yibo.kafka.producer.SamplePartition");
    //创建生产者对象
    Producer<String,String> producer = new KafkaProducer<>(properties);
    //消息对象 ProducerRecoder
    ProducerRecord<String,String> record =
            new ProducerRecord<>(TOPIC_NAME,"key-hello","value-hello world");
    producer.send(record, new Callback() {
        //回调函数,该方法会在 Producer 收到 ack 时调用,为异步调用
        @Override
        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
            if (e == null) {
                log.info("send success partition: {}, offset: {}",recordMetadata.partition(),recordMetadata.offset());
            } else {
                log.error("exception",e);
            }
        }
    });
    producer.close();
}

/**
 * @Description: Partitioner分区接口,以实现自定义的消息分区
 *
 * 默认分区器DefaultPartitioner org.apache.kafka.clients.producer.internals.DefaultPartitioner
 *
 * 如果消息的key为null,此时producer会使用默认的partitioner分区器将消息随机分布到topic的可用partition中。
 * 如果key不为null,并且使用了默认的分区器,kafka会使用自己的hash算法对key取hash值,
 * 使用hash值与partition数量取模,从而确定发送到哪个分区。
 * 注意:此时key相同的消息会发送到相同的分区(只要partition的数量不变化)
 */
public class SamplePartition implements Partitioner {

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        /**
         *由于我们按key分区,在这里我们规定:key值不允许为null。在实际项目中,key为null的消息*,可以发送到同一个分区。
         */
        if(keyBytes == null) {
            throw new InvalidRecordException("key cannot be null");
        }
        if(((String)key).equals("1")) {
            return 1;
        }
        System.out.println("key: " + key);
        //如果消息的key值不为1,那么使用hash值取模,确定分区。
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}

3.5、Producer异步阻塞发送演示

由于 send 方法返回的是一个 Future 对象,根据 Futrue 对象的特点,我们也可以实现同
步发送的效果,只需在调用 Future 对象的 get 方发即可。

private static final String TOPIC_NAME = "yibo_topic";

/**
 * Producer异步阻塞发送演示
 */
public static void producerSyncSend() throws Exception {
    Properties properties = new Properties();
    //kafka集群
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.174.128:9092");
    //ack应答级别
    properties.put(ProducerConfig.ACKS_CONFIG,"all");
    //重试次数
    properties.put(ProducerConfig.RETRIES_CONFIG,"0");
    //批次大小
    properties.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384");
    //等待时间
    properties.put(ProducerConfig.LINGER_MS_CONFIG,"1");
    //RecoderAccumulator缓冲区大小
    properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"33554432");
    //key,value的序列化类
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
    //创建生产者对象
    Producer<String,String> producer = new KafkaProducer<>(properties);
    //消息对象 ProducerRecoder
    ProducerRecord<String,String> record = new ProducerRecord<>(TOPIC_NAME,"hello","hello world");
    Future<RecordMetadata> send = producer.send(record);
    RecordMetadata recordMetadata = send.get();
    log.info("send success partition: {}, offset: {}",recordMetadata.partition(),recordMetadata.offset());
    producer.close();
}

四、自定义 Interceptor

4.1、拦截器原理

Producer 拦截器(interceptor)是在 Kafka 0.10 版本被引入的,主要用于实现 clients 端的定制化控制逻辑。

对于 producer 而言,interceptor 使得用户在消息发送前以及 producer 回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。同时,producer 允许用户指定多个interceptor按序作用于同一条消息从而形成一个拦截链(interceptor chain)。Intercetpor 的实现接口是org.apache.kafka.clients.producer.ProducerInterceptor,其定义的方法包括:

如前所述,interceptor 可能被运行在多个线程中,因此在具体实现时用户需要自行确保
线程安全。另外倘若指定了多个 interceptor,则 producer 将按照指定顺序调用它们,并仅仅是捕获每个 interceptor 可能抛出的异常记录到错误日志中而非在向上传递。这在使用过程中要特别留意。

4.2、 拦截器案例

public class TimeInterceptor implements ProducerInterceptor<String, String> {

    @Override
    public void configure(Map<String, ?> map) {

    }

    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {
        // 创建一个新的 record,把时间戳写入消息体的最前部
        return new ProducerRecord(producerRecord.topic(),
                producerRecord.partition(), producerRecord.timestamp(), producerRecord.key(),
                System.currentTimeMillis() + "," + producerRecord.value().toString());

    }

    @Override
    public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {

    }

    @Override
    public void close() {

    }
}
public class CounterInterceptor implements ProducerInterceptor<String, String> {

    private int errorCounter = 0;
    private int successCounter = 0;

    @Override
    public void configure(Map<String, ?> map) {

    }

    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {
        return null;
    }

    @Override
    public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
        // 统计成功和失败的次数
        if (e == null) {
            successCounter++;
        } else {
            errorCounter++;
        }

    }

    @Override
    public void close() {
        // 保存结果
        System.out.println("Successful sent: " + successCounter);
        System.out.println("Failed sent: " + errorCounter);
    }
}
public class InterceptorProducer {

    private static final String TOPIC_NAME = "yibo_topic";

    public static void main(String[] args) {
        // 1 设置配置信息
        Properties props = new Properties();
        props.put("bootstrap.servers", "hadoop102:9092");
        props.put("acks", "all");
        props.put("retries", 3);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // 2 构建拦截链
        List<String> interceptors = new ArrayList<>();
        interceptors.add("com.yibo.kafka.producer.TimeInterceptor");
        interceptors.add("com.yibo.kafka.producer.CounterInterceptor");
        props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);

        Producer<String, String> producer = new KafkaProducer<>(props);

        // 3 发送消息
        for (int i = 0; i < 10; i++) {
            ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "message" + i);
            producer.send(record);
        }

        // 4 一定要关闭 producer,这样才会调用 interceptor 的 close 方法
        producer.close();
    }
}

五、SpringBoot 集成 Kafka

5.1、添加maven依赖

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.6.1</version>
</dependency>

5.2、配置 application.properties

# 指定kafka server的地址,集群配多个,中间,逗号隔开
spring.kafka.bootstrap-servers=192.168.174.128:9092

#=============== provider  =======================
# 写入失败时,重试次数。当leader节点失效,一个repli节点会替代成为leader节点,此时可能出现写入失败,
# 当retris为0时,produce不会重复。retirs重发,此时repli节点完全成为leader节点,不会产生消息丢失。
spring.kafka.producer.retries=0
# 每次批量发送消息的数量,produce积累到一定数据,一次发送
spring.kafka.producer.batch-size=16384
# produce积累数据一次发送,缓存大小达到buffer.memory就发送数据
spring.kafka.producer.buffer-memory=33554432

#procedure要求leader在考虑完成请求之前收到的确认数,用于控制发送记录在服务端的持久化,其值可以为如下:
#acks = 0 如果设置为零,则生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,无法保证服务器已收到记录,并且重试配置将不会生效(因为客户端通常不会知道任何故障),为每条记录返回的偏移量始终设置为-1。
#acks = 1 这意味着leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应,在这种情况下,如果leader在确认记录后立即失败,但在将数据复制到所有的副本服务器之前,则记录将会丢失。
#acks = all 这意味着leader将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,记录就不会丢失,这是最强有力的保证,这相当于acks = -1的设置。
#可以设置的值为:all, -1, 0, 1
spring.kafka.producer.acks=1

# 指定消息key和消息体的编解码方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

#=============== consumer  =======================
# 指定默认消费者group id --> 由于在kafka中,同一组中的consumer不会读取到同一个消息,依靠groud.id设置组名
spring.kafka.consumer.group-id=testGroup
# smallest和largest才有效,如果smallest重新0开始读取,如果是largest从logfile的offset读取。一般情况下我们都是设置smallest
spring.kafka.consumer.auto-offset-reset=earliest
# enable.auto.commit:true --> 设置自动提交offset
spring.kafka.consumer.enable-auto-commit=false
#如果'enable.auto.commit'为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000。
spring.kafka.consumer.auto-commit-interval=100

# 指定消息key和消息体的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

#=============== listener  =======================
# 在侦听器容器中运行的线程数。
spring.kafka.listener.concurrency=5
#listner负责ack,每调用一次,就立即commit
spring.kafka.listener.ack-mode=manual_immediate
spring.kafka.listener.missing-topics-fatal=false

5.3、新建Producer

@Component
@Slf4j
public class KafkaProducer {

    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    private static final String TOPIC_NAME = "yibo_topic";

    public void send(Object obj) {
        String obj2String = JSONObject.toJSONString(obj);
        log.info("准备发送消息为:{}", obj2String);
        //发送消息
        ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(TOPIC_NAME, obj);
        future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
            @Override
            public void onFailure(Throwable throwable) {
                //发送失败的处理
                log.info(TOPIC_NAME + " - 生产者 发送消息失败:" + throwable.getMessage());
            }

            @Override
            public void onSuccess(SendResult<String, Object> stringObjectSendResult) {
                //成功的处理
                log.info(TOPIC_NAME + " - 生产者 发送消息成功:" + stringObjectSendResult.toString());
            }
        });
    }
}

参考:
https://www.cnblogs.com/L-Test/p/13443178.html

上一篇 下一篇

猜你喜欢

热点阅读