kafka
2019-06-17 本文已影响0人
蜗牛写java
kafka
生产者组件

三个必选属性
- bootstarp.server :一般填写两个,防止一个挂掉
- key.serializer : 必选实现 ora.apache.kafka.common.serialization.Serializer
- value.serializer
private Properties kafkaProps = new Properties();
kafkaProps.put("bootstarp.server", "broker1:9092,broker2.9002");
kafkaProps.put("key.serializer", "ora.apache.kafka.common.serialization.Serializer");
kafkaProps.put("value.serializer", "ora.apache.kafka.common.serialization.Serializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(kafkaProps);
发送三种方式
- 发送并忘记:发送不关系是否到达服务器(会丢数据)
- 同步发送:返回一个Future对象,调用get()进行等待
- 异步发送:指定一个回调函数
异步发送
ProducerRecord<String, String> record = new ProducerRecord<>("topicName", "keyName", "value");
try {
producer.send(record);
} catch (Exception e) {
e.printStackTrace();
}
同步发送
ProducerRecord<String, String> record = new ProducerRecord<>("topicName", "keyName", "value");
try {
//返回RecordMatadata对象,可以用它获取消息的偏移量
producer.send(record).get();
} catch (Exception e) {
e.printStackTrace();
}
异步发送
private class DemoProducerCallback implements Callback {
@Override
public void onCompletion(RecordMetaData recordMetadata, Exception e) {
if (e != null) {
e.printStackTrace();
}
}
}
ProducerRecord<String, String> record = new ProducerRecord<>("topicName", "keyName", "value");
producer.send(record, new DemoProducerCallback());