读书笔记:Kafka生产者客户端入门

2020-11-19  本文已影响0人  东南枝下

Kafka生产者客户端入门

[TOC]

demo

/**
 * 生产者
 *
 * @author Jenson
 */
public class ProducerFastStart {

    private static final String BROKER_LIST = "localhost:9092";
    private static final String TOPIC = "topic-demo";

    public static Properties initConfig() {
        Properties properties = new Properties();
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_LIST);
        properties.put(ProducerConfig.CLIENT_ID_CONFIG, "producer.client.id.demo");
        // 重试次数
        properties.put(ProducerConfig.RETRIES_CONFIG, 10);
        // 生产者拦截器
        properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, MyProducerInterceptor.class.getName());
        return properties;
    }

    public static void main(String[] args) {
        Properties properties = initConfig();
        // 配置生产者客户端参数,并创建生产者实例
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);

        // 构造所需要发送的消息
        ProducerRecord<String, String> record = new ProducerRecord<String, String>(TOPIC, "hello kafka! again!");

        // 发送消息
        producer.send(record, new Callback() {

            /**
             * onCompletion()方法的两个参数是互斥的,
             * 消息发送成功时,metadata 不为 null 而exception为null;
             * 消息发送异常时,metadata为null而exception不为null。
             *
             * @param recordMetadata
             * @param e
             */
            @Override
            public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                if (e != null) {
                    e.printStackTrace();
                } else {
                    System.out.println("topic : " + recordMetadata.topic() +
                            " ,hasOffset : " + recordMetadata.hasOffset() +
                            " ,offset : " + recordMetadata.offset() +
                            " ,partition : " + recordMetadata.partition());
                }
            }
        });

        // 关闭生产者客户端
        producer.close();
    }
}
/**
 * 客户化生产者拦截器
 *
 * @author Jenson
 */
public class MyProducerInterceptor implements ProducerInterceptor {
    @Override
    public ProducerRecord onSend(ProducerRecord producerRecord) {
        ProducerRecord record = new ProducerRecord(producerRecord.topic(),
                producerRecord.partition(),
                producerRecord.timestamp(),
                producerRecord.key(),
                "prefix-" + producerRecord.value(),
                producerRecord.headers());
        return record;
    }

    @Override
    public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
        if (e == null) {
            System.out.println("----> 发送成功");
        }
    }

    @Override
    public void close() {
        System.out.println("----> 结束,回收资源");
    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}

生产者对象

org.apache.kafka.clients.producer.KafkaProducer

三个必填参数:

bootstrap.servers:连接Kafka集群所需要的broker地址清单,可以为多个地址,逗号分开

key.serializer和value.serializer:broker端接收的消息必须以字节数组(byte[])的形式存在

其他参数:
client.id : KafkaProducer对应的客户端id,默认值为“”,如果客户端不设置,则KafkaProducer会自动生成一个非空字符串,“producer-”与数字的拼接。

配置参数常量类:org.apache.kafka.clients.producer.ProducerConfig

KafkaProducer是线程安全的,可以在多个线程中共享单个KafkaProducer实例,也可以将KafkaProducer实例进行池化来供其他线程调用。

发送消息主要有三种模式:发后即忘(fire-and-forget)、同步(sync)及异步(async)

消息发送

序列化

可以自定义序列化,实现 org.apache.kafka.common.serialization.Serializer 接口

分区器

生产者拦截器

上一篇 下一篇

猜你喜欢

热点阅读