spring+kafka 异常处理

2021-03-29  本文已影响0人  NazgulSun

至少一次消费

我们的场景是保证至少一次消费,不能丢失数据。
出现的问题:
我们在一个 kafkaListener 里面 batch 监听数据,调用 一个API 进行消费,手动提交offset。
由于流量爆发性的问题或者网络问题,api 有时候会超时。

假设有三个批次, A, B,C。
B批次处理的时候,超时抛出了异常。
这时候,如果没有指定error handler的话,B 批次的数据就会丢失。

kafka 再 poll C的时候,不会因为 B 的 offset 没有commit 就从 offset A来poll。

因为kafka的offset下标的记录实际会有两份,服务端会自己记录一份,本地的消费者客户端也会记录一份,提交的offset会告诉服务端已经消费到这了,但是本地的并不会因此而改变offset进行再次消费。

所以本地的offset,记录的是B 的offset。 poll C的时候会冲 B的offset 开始。也就是程序会从本地内存维护的这个offset作为基准。
而不是服务端。 只有当consumer 重启的时候,才会参照 服务器端的Offset。

这个主要是给予客户端更灵活的控制机制,实现自定义的offset 拉取。

那么在我们的场景中,需要做的就是 下游异常的时候,我们重试,从新消费。所以指定了一个 error handler。
在下游消费异常的时候,就 从 上一个 提交的Offset开始拉取,达到重试的效果。

    @Bean
    public ConsumerAwareListenerErrorHandler indicConsumerErrorHandler() {
        return (m, e, c) -> {
            log.error("Indic consumerError, retry in kafka", e);
            List<ConsumerRecord> recs = (List<ConsumerRecord>)m.getPayload();
            List<String> topics = recs.stream().map(r->r.topic()).collect(Collectors.toList());
            List<Integer> partitions = recs.stream().map(r->r.partition()).collect(Collectors.toList());
            List<Long> offsets = recs.stream().map(r->r.offset()).collect(Collectors.toList());
            Map<TopicPartition, Long> offsetsToReset = new HashMap<>();
            for (int i = 0; i < topics.size(); i++) {
                int index = i;
                offsetsToReset.compute(new TopicPartition(topics.get(i), partitions.get(i)),
                        (k, v) -> v == null ? offsets.get(index) : Math.min(v, offsets.get(index)));
            }
            offsetsToReset.forEach((k, v) -> c.seek(k, v));
            return null;
        };
    }
上一篇下一篇

猜你喜欢

热点阅读