kafka08 消费指定offset的消息

2018-09-14  本文已影响375人  6c0fe9142f09

从头开始消费消息seekToBeginning()

Properties props = new Properties();
props.put("bootstrap.servers","132.232.14.247:9092");
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("group.id","group1");
KafkaConsumer<String,byte[]> consumer = new KafkaConsumer<String, byte[]>(props);
TopicPartition seekToEndPartition = new TopicPartition("mySecondTopic",3);
consumer.assign(Arrays.asList(seekToEndPartition));
consumer.seekToBeginning(Arrays.asList(seekToEndPartition));
ConsumerRecords<String, byte[]> records = consumer.poll(5000);
for(ConsumerRecord<String,byte[]> record:records){
System.out.println("s consumption message:partition="+record.partition()+",offset="+record.offset()+",key="+record.key()+",value="+record.value());
}
public class MySeekToBeginningConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers","132.232.14.247:9092");
        props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        props.put("group.id","group1");

        KafkaConsumer<String,byte[]> consumer = new KafkaConsumer<String, byte[]>(props);

        try {
            TopicPartition seekToEndPartition = new TopicPartition("mySecondTopic",3);
            consumer.assign(Arrays.asList(seekToEndPartition));
            consumer.seekToBeginning(Arrays.asList(seekToEndPartition));
            ConsumerRecords<String, byte[]> records = consumer.poll(5000);
            for(ConsumerRecord<String,byte[]> record:records){
                System.out.println("s consumption message:partition="+record.partition()+",offset="+record.offset()+",key="+record.key()+",value="+record.value());
            }
        }catch (java.lang.Exception e){
            e.printStackTrace();
        }finally {
            consumer.close();
        }
    }
}

从尾开始消费消息seekToEnd()

···
consumer.seekToEnd(Arrays.asList(seekToEndPartition ));
···
public class MySeekToEndConsumer {

    public static void main(String[] args) {
        Properties props=new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "group1");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String,byte[]> consumer=new KafkaConsumer<String,byte[]>(props);
        try {
            TopicPartition seekToEndPartition = new TopicPartition("mySecondTopic", 1);
            consumer.assign(Arrays.asList(seekToEndPartition));
            consumer.seekToEnd(Arrays.asList(seekToEndPartition ));
            ConsumerRecords<String, byte[]> records=consumer.poll(1000);
            for(ConsumerRecord<String, byte[]> record : records){
                System.out.println("MySeekToEndConsumer consumer message:partition="+record.partition()+",offset="+record.offset()+",key="+record.key()+",value="+record.value());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }finally{
            consumer.close();
        }
    }
}

消费指定offset消息seek()

//指定seekPartition和offset
consumer.seek(seekPartition,10);

public class MySeekConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "132.232.14.247:9092");
        props.put("group.id", "group1");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

        try {
            TopicPartition seekPartition = new TopicPartition("mySecondTopic", 1);
            consumer.assign(Arrays.asList(seekPartition));
            consumer.seek(seekPartition,10);
            ConsumerRecords<String, String> records = consumer.poll(5000);
            for (ConsumerRecord record:records){
                System.out.println("s consumption message:partition="+record.partition()+",offset="+record.offset()+",key="+record.key()+",value="+record.value());
            }
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            consumer.close();
        }
    }
}
上一篇 下一篇

猜你喜欢

热点阅读