kafka

2019-06-17  本文已影响0人  蜗牛写java

kafka

生产者组件

kafka生产者组件.png

三个必选属性

  1. bootstarp.server :一般填写两个,防止一个挂掉
  2. key.serializer : 必选实现 ora.apache.kafka.common.serialization.Serializer
  3. value.serializer
private Properties kafkaProps = new Properties();
kafkaProps.put("bootstarp.server", "broker1:9092,broker2.9002");
kafkaProps.put("key.serializer", "ora.apache.kafka.common.serialization.Serializer");
kafkaProps.put("value.serializer", "ora.apache.kafka.common.serialization.Serializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(kafkaProps);

发送三种方式

  1. 发送并忘记:发送不关系是否到达服务器(会丢数据)
  2. 同步发送:返回一个Future对象,调用get()进行等待
  3. 异步发送:指定一个回调函数

异步发送

ProducerRecord<String, String> record = new ProducerRecord<>("topicName", "keyName", "value");
try {
    producer.send(record);
} catch (Exception e) {
    e.printStackTrace();
}

同步发送

ProducerRecord<String, String> record = new ProducerRecord<>("topicName", "keyName", "value");
try {
    //返回RecordMatadata对象,可以用它获取消息的偏移量
    producer.send(record).get();
} catch (Exception e) {
    e.printStackTrace();
}

异步发送

private class DemoProducerCallback implements Callback {
    @Override
    public void onCompletion(RecordMetaData recordMetadata, Exception e) {
        if (e != null) {
            e.printStackTrace();
        }
    }
}
ProducerRecord<String, String> record = new ProducerRecord<>("topicName", "keyName", "value");
producer.send(record, new DemoProducerCallback());
上一篇 下一篇

猜你喜欢

热点阅读