kafka公众号:阿区先生

kafka——Consumer API

2020-11-30  本文已影响0人  小波同学

一、Kafka 核心 API

下图是官方文档中的一个图,形象的描述了能与 Kafka集成的客户端类型


Kafka的五类客户端API类型如下:

本文中,我们将主要介绍 Consumer API。

二、Consumer API

Consumer 消费数据时的可靠性是很容易保证的,因为数据在 Kafka 中是持久化的,故
不用担心数据丢失问题。

由于 consumer 在消费过程中可能会出现断电宕机等故障,consumer 恢复后,需要从故
障前的位置的继续消费,所以 consumer 需要实时记录自己消费到了哪个 offset,以便故障恢复后继续消费。

所以 offset 的维护是 Consumer 消费数据是必须考虑的问题。

2.1、导入相关依赖

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.6.0</version>
</dependency>

需要用到的类:

自动提交 offset 的相关参数:

2.2、自动提交 offset

private static final String TOPIC_NAME = "yibo_topic";

/**
 * 工作中这种用法有,但是不推荐
 */
public static void helloWorld(){
    Properties properties = new Properties();
    //Kafka 集群
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.174.128:9092");
    //消费者组,只要 group.id 相同,就属于同一个消费者组
    properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
    //自动提交 offset
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");
    //自动提交的延迟
    properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
    //key,value的反序列化类
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
    KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
    //消费订阅一个或多个topic
    consumer.subscribe(Arrays.asList(TOPIC_NAME));
    while(true){
        //每间隔一定时间去拉取消息
        ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(10000));
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("patition=%d, offset=%d, key=%s, value=%s%n",
                    record.partition(),record.offset(),record.key(),record.value());
        }
    }
}

虽然自动提交 offset 十分简介便利,但由于其是基于时间提交的,开发人员难以把握
offset 提交的时机。因此 Kafka 还提供了手动提交 offset 的 API。

手动提交 offset 的方法有两种:分别是 commitSync(同步阻塞提交)和 commitAsync(异步提交)。两者的相同点是,都会将本次 poll 的一批数据最高的偏移量提交;不同点是,commitSync 阻塞当前线程,一直到提交成功,并且会自动失败重试(由不可控因素导致,也会出现提交失败);而 commitAsync 则没有失败重试机制,故有可能提交失败。

2.3、手动提交offset

private static final String TOPIC_NAME = "yibo_topic";

/**
 * 手动提交offset
 */
public static void commitedOffset(){
    Properties properties = new Properties();
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.174.128:9092");
    properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
    properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
    KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
    //消费订阅一个或多个topic
    consumer.subscribe(Arrays.asList(TOPIC_NAME));
    while(true){
        //每间隔一定时间去拉取消息
        try {
            ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(10000));
            for (ConsumerRecord<String, String> record : records) {
                //消息的消费

                System.out.printf("patition=%d, offset=%d, key=%s, value=%s%n",
                        record.partition(),record.offset(),record.key(),record.value());

                //业务处理异常,则不提交
                //throw new RuntimeException("业务处理异常");
            }
            //手动控制offset提交
            consumer.commitAsync();
        } catch (Exception e) {
            log.error("consumer offset error",e);
        }
    }
}

2.4、手动提交offset,并重置offset

重置消费者的offset,该配置生效方式:消费者换组

private static final String TOPIC_NAME = "yibo_topic";

/**
 * 手动提交offset,并重置offset
 */
public static void restCommitedOffset(){
    Properties properties = new Properties();
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.174.128:9092");
    properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
    properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");

    //重置消费者的offset,该配置生效方式:消费者换组
    properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");

    KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
    //消费订阅一个或多个topic
    consumer.subscribe(Arrays.asList(TOPIC_NAME));
    while(true){
        //每间隔一定时间去拉取消息
        try {
            ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(10000));
            for (ConsumerRecord<String, String> record : records) {
                //消息的消费

                System.out.printf("patition=%d, offset=%d, key=%s, value=%s%n",
                        record.partition(),record.offset(),record.key(),record.value());

                //业务处理异常,则不提交
                //throw new RuntimeException("业务处理异常");
            }
            //手动控制offset提交
            consumer.commitAsync();
        } catch (Exception e) {
            log.error("consumer offset error",e);
        }
    }
}

2.5、手动提交offset,并且手动控制Partition

private static final String TOPIC_NAME = "yibo_topic";

/**
 * 手动提交offset,并且手动控制Partition
 */
public static void commitedOffsetWithPartition(){
    Properties properties = new Properties();
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.174.128:9092");
    properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
    properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
    KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
    //消费订阅一个或多个topic
    consumer.subscribe(Arrays.asList(TOPIC_NAME));
    while(true){
        //每间隔一定时间去拉取消息
        try {
            ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(10000));
            //每个partition单独处理
            for (TopicPartition partition : records.partitions()) {
                List<ConsumerRecord<String, String>> recordList = records.records(partition);
                for (ConsumerRecord<String, String> record : recordList) {

                    System.out.printf("patition=%d, offset=%d, key=%s, value=%s%n",
                            record.partition(),record.offset(),record.key(),record.value());

                    //业务处理异常,则不提交
                    //throw new RuntimeException("业务处理异常");
                }
                long lastOffset = recordList.get(recordList.size() - 1).offset();
                //单个partition中的offset,并且进行提交
                Map<TopicPartition, OffsetAndMetadata> offset = new HashMap<>();
                offset.put(partition,new OffsetAndMetadata(lastOffset+1));
                //对每个Partition做单独的offset提交
                consumer.commitSync(offset);

                System.out.println("--------------------- partition - " + partition + "end---------------------");
            }
        } catch (Exception e) {
            log.error("consumer offset error",e);
        }
    }
}

2.6、手动提交offset,并且手动控制Partition,更高级

private static final String TOPIC_NAME = "yibo_topic";

/**
 * 手动提交offset,并且手动控制Partition,更高级
 */
public static void commitedOffsetWithPartition2(){
    Properties properties = new Properties();
    //Kafka 集群
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.174.128:9092");
    //消费者组,只要 group.id 相同,就属于同一个消费者组
    properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
    //关闭自动提交 offset
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
    //自动提交的延迟
    properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
    //key,value的反序列化类
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
    KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);

    TopicPartition p0 = new TopicPartition(TOPIC_NAME,0);
    TopicPartition p1 = new TopicPartition(TOPIC_NAME,1);

    //消费订阅某个Topic的某个分区
    consumer.assign(Arrays.asList(p0));

    //消费订阅一个或多个topic
//        consumer.subscribe(Arrays.asList(TOPIC_NAME));
    while(true){
        //每间隔一定时间去拉取消息
        try {
            ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(10000));
            //每个partition单独处理
            for (TopicPartition partition : records.partitions()) {
                List<ConsumerRecord<String, String>> recordList = records.records(partition);
                for (ConsumerRecord<String, String> record : recordList) {

                    System.out.printf("patition=%d, offset=%d, key=%s, value=%s%n",
                            record.partition(),record.offset(),record.key(),record.value());

                    //业务处理异常,则不提交
                    //throw new RuntimeException("业务处理异常");
                }
                long lastOffset = recordList.get(recordList.size() - 1).offset();
                //单个partition中的offset,并且进行提交
                Map<TopicPartition, OffsetAndMetadata> offset = new HashMap<>();
                offset.put(partition,new OffsetAndMetadata(lastOffset+1));
                //对每个Partition做单独的offset提交
                consumer.commitSync(offset);

                System.out.println("--------------------- partition - " + partition + "end---------------------");
            }
        } catch (Exception e) {
            log.error("consumer offset error",e);
        }
    }
}

2.7、手动指定Offset的起始位置,及手动提交Offset

private static final String TOPIC_NAME = "yibo_topic";

/**
 * 手动指定Offset的起始位置,及手动提交Offset
 */
public static void controlOffset(){
    Properties properties = new Properties();
    //Kafka 集群
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.174.128:9092");
    //消费者组,只要 group.id 相同,就属于同一个消费者组
    properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
    //关闭自动提交 offset
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
    //自动提交的延迟
    properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
    //key,value的反序列化类
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
    KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);

    TopicPartition p0 = new TopicPartition(TOPIC_NAME,0);

    //消费订阅某个Topic的某个分区
    consumer.assign(Arrays.asList(p0));

    while(true){
        try {

            /**
             * 1、人为控制Offset起始位置
             * 2、如果出现程序错误,重复消费一次
             */
            /**
             * 1、第一次从0消费[一般情况]
             * 2、比如一次消费了100条,offset置为101并且存入redis中
             * 3、每次poll之前从redis中获取最新的Offset位置
             * 4、每次从这个位置开始消费
             */
            //手动指定Offset的起始位置
            consumer.seek(p0,30);

            //每间隔一定时间去拉取消息
            ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(10000));
            //每个partition单独处理
            for (TopicPartition partition : records.partitions()) {
                List<ConsumerRecord<String, String>> recordList = records.records(partition);
                for (ConsumerRecord<String, String> record : recordList) {

                    System.out.printf("patition=%d, offset=%d, key=%s, value=%s%n",
                            record.partition(),record.offset(),record.key(),record.value());

                    //业务处理异常,则不提交
                    //throw new RuntimeException("业务处理异常");
                }
                long lastOffset = recordList.get(recordList.size() - 1).offset();
                //单个partition中的offset,并且进行提交
                Map<TopicPartition, OffsetAndMetadata> offset = new HashMap<>();
                offset.put(partition,new OffsetAndMetadata(lastOffset+1));
                //对每个Partition做单独的offset提交
                consumer.commitSync(offset);

                System.out.println("--------------------- partition - " + partition + "end---------------------");
            }
        } catch (Exception e) {
            log.error("consumer offset error",e);
        }
    }
}

2.8、流量控制 限流

引入guava依赖

<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>29.0-jre</version>
</dependency>
private static final String TOPIC_NAME = "yibo_topic";

/*** 令牌生成速率,单位为秒 */
public static final int permitsPerSecond = 1;

/*** 限流器 */
private static final RateLimiter LIMITER = RateLimiter.create(permitsPerSecond);

/**
 * 流量控制 限流
 */
public static void controlPause(){
    Properties properties = new Properties();
    //Kafka 集群
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.174.128:9092");
    //消费者组,只要 group.id 相同,就属于同一个消费者组
    properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
    //关闭自动提交 offset
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
    //自动提交的延迟
    properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
    //key,value的反序列化类
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
    KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);

    TopicPartition p0 = new TopicPartition(TOPIC_NAME,0);
    TopicPartition p1 = new TopicPartition(TOPIC_NAME,1);

    //消费订阅某个Topic的某个分区
    consumer.assign(Arrays.asList(p0,p1));

    //消费订阅一个或多个topic
//        consumer.subscribe(Arrays.asList(TOPIC_NAME));
    while(true){
        //每间隔一定时间去拉取消息
        try {
            ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(10000));

            //每个partition单独处理
            for (TopicPartition partition : records.partitions()) {
                List<ConsumerRecord<String, String>> recordList = records.records(partition);
                for (ConsumerRecord<String, String> record : recordList) {
                    /**
                     * 1、接收到record信息后,去令牌桶中拿取令牌
                     * 2、如果获取到令牌,则继续业务处理
                     * 3、如果获取不到令牌,则pause等待令牌
                     * 4、当令牌桶中的令牌足够,则将consumer置为resume状态
                     */
                    // 限流
                    if (!LIMITER.tryAcquire()) {
                        System.out.println("无法获取到令牌,暂停消费");
                        consumer.pause(Arrays.asList(p0, p1));
                    }else {
                        System.out.println("获取到令牌,恢复消费");
                        consumer.resume(Arrays.asList(p0, p1));
                    }


                    System.out.printf("patition=%d, offset=%d, key=%s, value=%s%n",
                            record.partition(),record.offset(),record.key(),record.value());

                    //业务处理异常,则不提交
//                    throw new RuntimeException("业务处理异常");


                }
                long lastOffset = recordList.get(recordList.size() - 1).offset();
                //单个partition中的offset,并且进行提交
                Map<TopicPartition, OffsetAndMetadata> offset = new HashMap<>();
                offset.put(partition,new OffsetAndMetadata(lastOffset+1));
                //对每个Partition做单独的offset提交
                consumer.commitSync(offset);

                System.out.println("--------------------- partition - " + partition + "end---------------------");
            }
        } catch (Exception e) {
            log.error("consumer offset error",e);
        }
    }
}

2.9、Consumer多线程并发控制

public class ConsumerThreadSample {

    private static final String TOPIC_NAME = "yibo_topic";

    /**
     * 这种类型是经典模式,每一个线程单独创建一个KafkaConsumer,用于保证线程安全
     * @param args
     * @throws InterruptedException
     */
    public static void main(String[] args) throws InterruptedException {
        KafkaConsumerRunner r1 = new KafkaConsumerRunner();
        Thread t1 = new Thread(r1);

        t1.start();

        Thread.sleep(15000);

        r1.shutdown();
    }

    public static class KafkaConsumerRunner implements Runnable{

        private final AtomicBoolean closed = new AtomicBoolean(false);
        private final KafkaConsumer<String,String> consumer;

        public KafkaConsumerRunner() {
            Properties props = new Properties();
            props.put("bootstrap.servers", "192.168.174.128:9092");
            props.put("group.id", "test");
            props.put("enable.auto.commit", "false");
            props.put("auto.commit.interval.ms", "1000");
            props.put("session.timeout.ms", "30000");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

            consumer = new KafkaConsumer<>(props);

            TopicPartition p0 = new TopicPartition(TOPIC_NAME, 0);
            TopicPartition p1 = new TopicPartition(TOPIC_NAME, 1);

            consumer.assign(Arrays.asList(p0, p1));
        }

        @Override
        public void run() {
            try {
                while(!closed.get()){
                    //处理消息
                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000));

                    for (TopicPartition partition : records.partitions()) {
                        List<ConsumerRecord<String, String>> pRecord = records.records(partition);
                        // 处理每个分区的消息
                        for (ConsumerRecord<String, String> record : pRecord) {
                            System.out.printf("patition = %d , offset = %d, key = %s, value = %s%n",
                                    record.partition(),record.offset(), record.key(), record.value());
                        }

                        // 返回去告诉kafka新的offset
                        long lastOffset = pRecord.get(pRecord.size() - 1).offset();
                        // 注意加1
                        consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
                    }
                }
            } catch (Exception e) {
                if(!closed.get()) {
                    throw e;
                }
            } finally {
                consumer.close();
            }
        }

        public void shutdown() {
            closed.set(true);
            consumer.wakeup();
        }
    }
}

2.10、一个Consumer处理数据,但处理业务是多线程并发控制

public class ConsumerRecordThreadSample {

    private static final String TOPIC_NAME = "yibo_topic";

    public static void main(String[] args) throws InterruptedException {
        String brokerList = "192.168.174.128:9092";
        String groupId = "test";
        int workerNum = 5;

        CunsumerExecutor consumers = new CunsumerExecutor(brokerList, groupId, TOPIC_NAME);
        consumers.execute(workerNum);

        Thread.sleep(1000000);

        consumers.shutdown();

    }

    // Consumer处理
    public static class CunsumerExecutor{
        private final KafkaConsumer<String, String> consumer;
        private ExecutorService executors;

        public CunsumerExecutor(String brokerList, String groupId, String topic) {
            Properties props = new Properties();
            props.put("bootstrap.servers", brokerList);
            props.put("group.id", groupId);
            props.put("enable.auto.commit", "true");
            props.put("auto.commit.interval.ms", "1000");
            props.put("session.timeout.ms", "30000");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Arrays.asList(topic));
        }

        public void execute(int workerNum) {
            executors = new ThreadPoolExecutor(workerNum, workerNum, 0L, TimeUnit.MILLISECONDS,
                    new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy());

            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(200);
                for (final ConsumerRecord record : records) {
                    executors.submit(new ConsumerRecordWorker(record));
                }
            }
        }

        public void shutdown() {
            if (consumer != null) {
                consumer.close();
            }
            if (executors != null) {
                executors.shutdown();
            }
            try {
                if (!executors.awaitTermination(10, TimeUnit.SECONDS)) {
                    System.out.println("Timeout.... Ignore for this case");
                }
            } catch (InterruptedException ignored) {
                System.out.println("Other thread interrupted this shutdown, ignore for this case.");
                Thread.currentThread().interrupt();
            }
        }
    }

    // 记录处理
    public static class ConsumerRecordWorker implements Runnable {

        private ConsumerRecord<String, String> record;

        public ConsumerRecordWorker(ConsumerRecord record) {
            this.record = record;
        }

        @Override
        public void run() {
            //具体的业务逻辑,比如数据入库操作
            System.out.println("Thread - "+ Thread.currentThread().getName());
            System.err.printf("patition = %d , offset = %d, key = %s, value = %s%n",
                    record.partition(), record.offset(), record.key(), record.value());
        }

    }
}

2.11、数据漏消费和重复消费分析

无论是同步提交还是异步提交 offset,都有可能会造成数据的漏消费或者重复消费。先
提交 offset 后消费,有可能造成数据的漏消费;而先消费后提交 offset,有可能会造成数据的重复消费。

2.12、自定义存储 offset

Kafka 0.9 版本之前,offset 存储在 zookeeper,0.9 版本及之后,默认将 offset 存储在 Kafka的一个内置的 topic 中。除此之外,Kafka 还可以选择自定义存储 offset。

offset 的维护是相当繁琐的,因为需要考虑到消费者的 Rebalace。
当有新的消费者加入消费者组、已有的消费者推出消费者组或者所订阅的主题的分区发
生变化,就会触发到分区的重新分配,重新分配的过程叫做 Rebalance。

消费者发生 Rebalance 之后,每个消费者消费的分区就会发生变化。因此消费者要首先获取到自己被重新分配到的分区,并且定位到每个分区最近提交的 offset 位置继续消费。

要实现自定义存储 offset,需要借助 ConsumerRebalanceListener,以下为示例代码,其中提交和获取 offset 的方法,需要根据所选的 offset 存储系统自行实现。

public class CustomConsumer {
    
    private static Map<TopicPartition, Long> currentOffset = new HashMap<>();
    
    public static void main(String[] args) {
        //创建配置信息
        Properties props = new Properties();
        //Kafka 集群
        props.put("bootstrap.servers","192.168.174.128:9092");
        //消费者组,只要 group.id 相同,就属于同一个消费者组
        props.put("group.id", "test");
        //关闭自动提交 offset
        props.put("enable.auto.commit", "false");
        //Key 和 Value 的反序列化类
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //创建一个消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        //消费者订阅主题
        consumer.subscribe(Arrays.asList("first"), new ConsumerRebalanceListener() {

                    //该方法会在 Rebalance 之前调用
                    @Override
                    public void
                    onPartitionsRevoked(Collection<TopicPartition> partitions) {
                        commitOffset(currentOffset);
                    }
                    //该方法会在 Rebalance 之后调用
                    @Override
                    public void
                    onPartitionsAssigned(Collection<TopicPartition> partitions) { currentOffset.clear();
                        for (TopicPartition partition : partitions) {
                            consumer.seek(partition, getOffset(partition));//定位到最近提交的 offset 位置继续消费
                        }
                    }
                });
        while (true) {
            ConsumerRecords<String, String> records =
                    consumer.poll(100);//消费者拉取数据
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                currentOffset.put(new TopicPartition(record.topic(), record.partition()), record.offset());
            }
            commitOffset(currentOffset);//异步提交
        }
    }
    
    //获取某分区的最新 offset
    private static long getOffset(TopicPartition partition) {
        return 0;
    }
    
    //提交该消费者所有分区的 offset
    private static void commitOffset(Map<TopicPartition, Long> currentOffset) {
        
    }
}

三、SpringBoot 集成 Kafka

3.1、添加maven依赖

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.6.1</version>
</dependency>

3.2、配置 application.properties

# 指定kafka server的地址,集群配多个,中间,逗号隔开
spring.kafka.bootstrap-servers=192.168.174.128:9092

#=============== provider  =======================
# 写入失败时,重试次数。当leader节点失效,一个repli节点会替代成为leader节点,此时可能出现写入失败,
# 当retris为0时,produce不会重复。retirs重发,此时repli节点完全成为leader节点,不会产生消息丢失。
spring.kafka.producer.retries=0
# 每次批量发送消息的数量,produce积累到一定数据,一次发送
spring.kafka.producer.batch-size=16384
# produce积累数据一次发送,缓存大小达到buffer.memory就发送数据
spring.kafka.producer.buffer-memory=33554432

#procedure要求leader在考虑完成请求之前收到的确认数,用于控制发送记录在服务端的持久化,其值可以为如下:
#acks = 0 如果设置为零,则生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,无法保证服务器已收到记录,并且重试配置将不会生效(因为客户端通常不会知道任何故障),为每条记录返回的偏移量始终设置为-1。
#acks = 1 这意味着leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应,在这种情况下,如果leader在确认记录后立即失败,但在将数据复制到所有的副本服务器之前,则记录将会丢失。
#acks = all 这意味着leader将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,记录就不会丢失,这是最强有力的保证,这相当于acks = -1的设置。
#可以设置的值为:all, -1, 0, 1
spring.kafka.producer.acks=1

# 指定消息key和消息体的编解码方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

#=============== consumer  =======================
# 指定默认消费者group id --> 由于在kafka中,同一组中的consumer不会读取到同一个消息,依靠groud.id设置组名
spring.kafka.consumer.group-id=testGroup
# smallest和largest才有效,如果smallest重新0开始读取,如果是largest从logfile的offset读取。一般情况下我们都是设置smallest
spring.kafka.consumer.auto-offset-reset=earliest
# enable.auto.commit:true --> 设置自动提交offset
spring.kafka.consumer.enable-auto-commit=false
#如果'enable.auto.commit'为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000。
spring.kafka.consumer.auto-commit-interval=100

# 指定消息key和消息体的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

#=============== listener  =======================
# 在侦听器容器中运行的线程数。
spring.kafka.listener.concurrency=5
#listner负责ack,每调用一次,就立即commit
spring.kafka.listener.ack-mode=manual_immediate
spring.kafka.listener.missing-topics-fatal=false

3.3、新建Consumer

@Component
@Slf4j
public class KafkaDemoConsumer {

    private static final String TOPIC_NAME = "yibo_topic";
    private static final String TOPIC_GROUP1 = "topic_group1";
    private static final String TOPIC_GROUP2 = "topic_group2";

    @KafkaListener(topics = TOPIC_NAME, groupId = TOPIC_GROUP1)
    public void topic_test(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {

        Optional message = Optional.ofNullable(record.value());
        if (message.isPresent()) {
            Object msg = message.get();
            log.info("topic_test 消费了: Topic:" + topic + ",Message:" + msg);
            ack.acknowledge();
        }
    }

    @KafkaListener(topics = TOPIC_NAME, groupId = TOPIC_GROUP2)
    public void topic_test1(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {

        Optional message = Optional.ofNullable(record.value());
        if (message.isPresent()) {
            Object msg = message.get();
            log.info("topic_test1 消费了: Topic:" + topic + ",Message:" + msg);
            ack.acknowledge();
        }
    }
}

参考:
https://www.cnblogs.com/L-Test/p/13447269.html

上一篇下一篇

猜你喜欢

热点阅读