KAFKA 消费端代码示例

2018-01-10  本文已影响0人  GreanTea

kafka 消费端代码:

public static void main(String[] args) {
    String recordStrFormat = "offset = %d, key = %s, value = %s\n";
    Properties props = new Properties();
    props.put("bootstrap.servers", "spidercdh-01:9092");
    props.put("group.id", "default");
    props.put("enable.auto.commit", "true");
    props.put("auto.commit.interval.ms", 1000);
    props.put("session.timeout.ms", 30000);
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

    KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
    //test test2 为topic的名字
    consumer.subscribe(Arrays.asList("test","test2"));
    try {
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(String.format(recordStrFormat, record.offset(), record.key(), record.value()));
            }
        }
    } finally {
        if (null != consumer)
            consumer.close();
    }
}}
上一篇下一篇

猜你喜欢

热点阅读