kafka Stream

Kafka生产者

2018-05-31  本文已影响1人  周一不上班

目录

Kafka提供了用户端API,供使用者与Kafka集群通信。其中两个最基本的组件:生产者(Producer)和消费者(Consumer)。其他更高级的API,如Kafka Stream等都是在生产者和消费者API之上做的封装,以实现更丰富的功能。

producer

生产者配置了Kafka集群地址,序列化方法。

下述代码阐述了3种生产消息的方式:

只发送,不关心是否成功

public class ProducerExample {
    private static Logger log = LoggerFactory.getLogger(ProducerExample.class);
    public static void main(String... args){
        Properties prop = new Properties();
        prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, Serdes.String().serializer().getClass().getName());
        prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serdes.String().serializer().getClass().getName());
        KafkaProducer producer = new KafkaProducer(prop);

        // fire and forget
        ProducerRecord<String, String> mayLostRecord = new ProducerRecord("mytopic", "key-fire-forget","value-00");
        try {
            Future future = producer.send(mayLostRecord);
        }catch (Exception ex){
            ex.printStackTrace();
        }
    }
}

send方法返回一个Future<RecordMetadata>对象,因为我们没有使用这个返回值,所以我们无法得知消息是否发送成功或者失败。如果可以忍受消息的丢失,可以使用此方法,生产环境很少使用这种方法。

虽然我们忽略了发送时可能产生的错误,依然可以捕获到发送之前的异常,比如序列化,缓存不够用以及发送线程意外停止等等异常。

同步发送

        // sync
        ProducerRecord<String, String> record = new ProducerRecord("mytopic", "key-sync","value-11");
        try {
            RecordMetadata metadata = (RecordMetadata) producer.send(record).get();
            log.info("metadata returned: {}", metadata);
        }catch (Exception ex){
            log.error("sync send failed: {}", record);
            ex.printStackTrace();
        }

get方法阻塞主线程,一旦发送过程出错就会有异常抛出,如果成功返回一个RecordMetadata对象,包含offset,partition等信息。

异步发送

假设需要发送100条消息,发送每条消息花费10毫秒(应用与Kafka集群之间的数据传输延迟),那么每次发送都要阻塞以等待返回结果,总共将花费时间是1秒。如果不等待每条消息的返回结果,主线程将在接近瞬时的时间里结束。我们不关心返回结果,因为返回的RecordMetadata里边包含的offset,写入的哪个partition对于生产者来说都没什么用,然而我们确实要关心是否成功这个状态信息,仅仅当发送失败时,我们可以捕获到异常。

为了同时实现异步发送消息和追踪发送失败,我们使用回调函数。

        // async
        ProducerRecord<String, String> recordAsync = new ProducerRecord("mytopic", "key-async","value-22");
        producer.send(recordAsync, (recordMetadata, ex) -> {
            if (ex != null){
                log.error("async send message failed: {}", recordAsync);
                ex.printStackTrace();
            }else {
                log.info("async sent ok and metadata returned: {}", recordMetadata);
            }
        });

当发送失败并有异常抛出时,回调函数的第二个参数ex不为null,这里我们记录log。在生产环境中,可以将发送失败的消息持久化到文件或者数据库中。

序列化方法

前述中我们使用了String作为消息key和value的类型,如果消息中含有复合类型或者基于性能的考虑,同样可以使用基本类型如整型和字节数组来序列化消息。Kafka支持自定义的序列化方法,但是由于Kafka多生产者和多消费者的特性,应用代码会分散到很多服务中,数据类型改变导致的序列化方法重写需要同步到每一个生产者和消费者,维护成本很大。所以官方推荐使用JSON字符串或者其他第三方序列化框架如avro,thrift,protobuf等来做这件事,即使使用JSON字符串会有部分性能损失。

消费者代码

public class ConsumerExample {
    private static Logger log = LoggerFactory.getLogger(ConsumerExample.class);
    public static void main(String... args) throws InterruptedException {
        Properties prop = new Properties();
        prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        // random group id for consuming from beginning when restart application
        // just for debug convenience
        prop.put(ConsumerConfig.GROUP_ID_CONFIG, String.valueOf(new Random().nextDouble()));
        prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Serdes.String().deserializer().getClass().getName());
        prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Serdes.String().deserializer().getClass().getName());
        KafkaConsumer consumer = new KafkaConsumer<>(prop);
        consumer.subscribe(Arrays.asList("mytopic"));
        
        while(true) {
            ConsumerRecords records = consumer.poll(2000);

            records.forEach(record -> {
                log.info("message from kafka: {}", record.toString());
            });
            Thread.sleep(3000);
        }
    }
}

代码运行前提

完整代码

包名kafka

上一篇下一篇

猜你喜欢

热点阅读