Kafka2.0生产者客户端使用

2019-07-13  本文已影响0人  O_Neal

1 初始化配置

  Kafka 通过 KafkaProducer 构造器初始化生产者客户端的配置。
  常用的重要配置,详见官网

// 基础配置
Map<String, Object> configs = new HashMap<>();
// Kafka broker 集群
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094");
// key 序列化
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// value 序列化
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

KafkaProducer<String, String> producer = new KafkaProducer<>(configs);

2 构造消息

  Kafka 提供了6种构造器来构造消息。

public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers);
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value);
public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers);
public ProducerRecord(String topic, Integer partition, K key, V value);
public ProducerRecord(String topic, K key, V value);
public ProducerRecord(String topic, V value);

3 发送消息

  支持同步发送和异步发送消息。

  同步发送

producer.send(record).get();

  异步发送

producer.send(record, new Callback() {
    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        // 回调处理流程
    }
});
上一篇 下一篇

猜你喜欢

热点阅读