生产者(2019-02-15)

2019-02-15  本文已影响0人  Rondo9

                                                  Kafka生产者

架构图: 

Kafka生产者组件图

必选属性:

    bootstrap.servers: broker的地址清单(host:port)

    key.serializer: 键的序列化器(ByteArraySerializer[这个只做很少的事情], StringSerializer, IntegerSerializer, 自定义序列化器)

    value.serializer: 值的序列化器(同上)

创建Kafka生产者:

    1. 新建一个Properties对象;

    2. 因为我们打算把键和值定义成字符串类型, 所以使用内置的StringSerializer;

    3. 在这里我们创建了一个新的生产者对象, 并为键和值设置了恰当的类型, 然后把Properties对象传给它。

    private Properties kafkaProps = new Properties();

    kafkaProps.put("bootstrap.servers", "broker1:9092, broker2:9092");

    kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    KafkaProducer<String, String> producer = new kafkaProducer<String, String>(kafkaProps);

发送消息:

    1.同步发送消息

        ProducerRecord<String, String> record = new ProducerRecord<>("topic", "key", "value");

        try {

            producer.send(record).get();

        } catch (Exception e) {

            e.printStackTrace();

        }

    2.异步发送

        private class DemoProducerCallback implements Callback {

            @Override

            public void onCompletion(RecordMetadata recordMetadata, Exception e) {

                if (e != null) {

                    e.printStackTrace();

                }

            }

        }

        ProducerRecord<String, String> record = new ProducerRecord<>("topic", "key", "value");

        producer.send(record, new DemoProducerCallback());

可配置参数:

    1.acks: 有多少个分区副本收到消息生产者才会认为消息写入是成功的;

    2.buffer.memory: 设置生产者内存缓冲区的大小;

    3.compression.type: 指定消息发送时使用哪一种压缩算法进行压缩(snappy, gzip, lz4);

    4.retries: 生产者可以重发消息的次数;

    5.batch.size: 同一批次发送到同一分区使用的内存大小;

    6.linger.ms: 同批次等待时间;

    7.client.id: 任意字符串, 识别消息的来源;

    8.max.in.flight.requests.per.connection: 生产者在收到服务器的响应之前可以发送多少个消息;

    9.timeout.ms, request.timeout.ms 和 metadata.fetch.timeout.ms: 

        timeout.ms: 等待同步副本返回消息确认的时间;

        request.timeout.ms: 生产者在发送数据时等待服务器返回响应的时间;

        metadata.fetch.timeout.ms: 生产者在获取元数据时等待服务器返回响应的时间;

    10.max.block.ms: 获取元数据时的阻塞时间;

    11.max.request.size: 生产者发送请求的大小;

    12.receive.buffer.bytes 和 send.buffer.bytes: TCP socket 接收和发送数据宝的缓冲区大小;

序列化器:

    主要实现 org.apache.kafka.common.serialization.Serializer 的 byte[] serialize(String topic, Customer data) 方法

分区器:

    主要实现 org.apache.kafka.clients.producer.Partitioner 的 int partition(String topic, Object key,byte[] keyBytes, Object value,byte[] valueBytes, Cluster cluster) 方法

上一篇下一篇

猜你喜欢

热点阅读