kafka 同步、异步发送

2020-05-12  本文已影响0人  陆阳226

kafka producer默认是异步发送:

同步发送

如果需要使用同步发送,可以在每次发送之后使用get方法,因为producer.send方法返回一个Future类型的结果,Future的get方法会一直阻塞直到该线程的任务得到返回值,也就是broker返回发送成功。

kafkaTemplate.send("testJson", message).get();

异步发送回调

可以从返回的future对象中稍后获取发送的结果,ProducerRecord、RecordMetadata包含了返回的结果信息

ListenableFuture<SendResult<String, Message>> future = kafkaTemplate.send("testJson", message);
future.addCallback(new ListenableFutureCallback<SendResult<String, Message>>() {
    @Override
    public void onFailure(Throwable ex) {
        ex.printStackTrace();
    }

    @Override
    public void onSuccess(SendResult<String, Message> result) {
        System.out.println(result.getProducerRecord());
        System.out.println(result.getRecordMetadata());
    }
});
上一篇 下一篇

猜你喜欢

热点阅读