Kafka-拦截器

2023-01-21  本文已影响0人  我可能是个假开发

一、拦截器定义

基本思想是允许应用程序在不修改逻辑的情况下,动态地实现一组可插拔的事件处理逻辑链。它能够在主业务操作的前后多个时间点上插入对应的“拦截”逻辑。

1.SpringMVC拦截器

Spring MVC 拦截器的工作原理:


image.png

拦截器 1 和拦截器 2 分别在请求发送之前、发送之后以及完成之后三个地方插入了对应的处理逻辑。

2. Flume拦截器

Flume 中的拦截器也是同理,它们插入的逻辑可以是修改待发送的消息,也可以是创建新的消息,甚至是丢弃消息。这些功能都是以配置拦截器类的方式动态插入到应用程序中的,故可以快速地切换不同的拦截器而不影响主程序逻辑。

二、Kafka 拦截器

Kafka 拦截器分为生产者拦截器和消费者拦截器。
这两种拦截器都支持链的方式,即可以将一组拦截器串连成一个大的拦截器,Kafka 会按照添加顺序依次执行拦截器逻辑。

Kafka 拦截器的设置方法是通过参数配置完成的。生产者和消费者两端有一个相同的参数,名字叫 interceptor.classes.

1.生产者拦截器

生产者拦截器允许你在发送消息前以及消息提交成功后植入你的拦截器逻辑;
在 Producer 端指定拦截器:

Properties props = new Properties();
List<String> interceptors = new ArrayList<>();
interceptors.add("com.yourcompany.kafkaproject.interceptors.AddTimestampInterceptor"); // 拦截器1
interceptors.add("com.yourcompany.kafkaproject.interceptors.UpdateCounterInterceptor"); // 拦截器2
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
……

Producer 端拦截器实现类都要继承org.apache.kafka.clients.producer.ProducerInterceptor接口

onAcknowledgement 的调用要早于 callback 的调用。值得注意的是,这个方法和 onSend 不是在同一个线程中被调用的,因此如果你在这两个方法中调用了某个共享可变对象,一定要保证线程安全。还有一点很重要,这个方法处在 Producer 发送的主路径中,所以最好别放一些太重的逻辑进去,否则你会发现你的 Producer TPS 直线下降。

2.消费者拦截器

消费者拦截器支持在消费消息前以及提交位移后编写特定逻辑。

具体的实现类要实现 org.apache.kafka.clients.consumer.ConsumerInterceptor接口

指定拦截器类时要指定它们的全限定名,即 full qualified name。通俗点说就是要把完整包名也加上,不要只有一个类名在那里,并且还要保证你的 Producer 程序能够正确加载你的拦截器类。

三、Kafka 拦截器应用场景

1.端到端系统性能检测

Kafka 默认提供的监控指标都是针对单个客户端或 Broker 的,很难从具体的消息维度去追踪集群间消息的流转路径。
在客户端程序中增加这样的统计逻辑,在应用代码中编写统一的监控逻辑其实是很难的,毕竟这东西非常灵活,不太可能提前确定好所有的计算逻辑。另外,将监控逻辑与主业务逻辑耦合也是软件工程中不提倡的做法。

通过实现拦截器的逻辑以及可插拔的机制,能够快速地观测、验证以及监控集群间的客户端性能指标,特别是能够从具体的消息层面上去收集这些数据。

2.消息审计

公司把 Kafka 作为一个私有云消息引擎平台向全公司提供服务,这必然要涉及多租户以及消息审计的功能。
查看每条消息是哪个业务方在什么时间发布的,之后又被哪些业务方在什么时刻消费。
做法就是编写一个拦截器类,实现相应的消息审计逻辑,然后强行规定所有接入你的 Kafka 服务的客户端程序必须设置该拦截器。

3.用拦截器实现统计消息端到端处理的延时

某个业务只有一个 Producer 和一个 Consumer,想知道该业务消息从被生产出来到最后被消费的平均总时长是多少。
要计算总延时,那么一定要有个公共的地方来保存它,并且这个公共的地方还是要让生产者和消费者程序都能访问的。假设数据被保存在 Redis 中。
生产者拦截器:

public class AvgLatencyProducerInterceptor implements ProducerInterceptor<String, String> {
    private Jedis jedis; // 省略Jedis初始化
    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        jedis.incr("totalSentMessage");
        return record;
    }
    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
    }
    @Override
    public void close() {
    }
    @Override
    public void configure(Map<java.lang.String, ?> configs) {
    }
}

在发送消息前更新总的已发送消息数(不考虑发送失败的情况)。

消费者拦截器:

public class AvgLatencyConsumerInterceptor implements ConsumerInterceptor<String, String> {
    private Jedis jedis; //省略Jedis初始化
    @Override
    public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
        long lantency = 0L;
        for (ConsumerRecord<String, String> record : records) {
            lantency += (System.currentTimeMillis() - record.timestamp());
        }
        jedis.incrBy("totalLatency", lantency);
        long totalLatency = Long.parseLong(jedis.get("totalLatency"));
        long totalSentMsgs = Long.parseLong(jedis.get("totalSentMessage"));
        jedis.set("avgLatency", String.valueOf(totalLatency / totalSentMsgs));
        return records;
    }
    @Override
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
    }
    @Override
    public void close() {
    }
    @Override
    public void configure(Map<String, ?> configs) {
    }
}

在真正消费一批消息前首先更新了它们的总延时,方法就是用当前的时钟时间减去封装在消息中的创建时间,然后累计得到这批消息总的端到端处理延时并更新到 Redis 中。之后分别从 Redis 中读取更新过的总延时和总消息数,两者相除即得到端到端消息的平均处理延时。

创建好生产者和消费者拦截器后,按照上面指定的方法分别将它们配置到各自的 Producer 和 Consumer 程序中,这样就能计算消息从 Producer 端到 Consumer 端平均的处理延时了。这种端到端的指标监控能够从全局角度俯察和审视业务运行情况,及时查看业务是否满足端到端的 SLA 目标。

极客时间《Kafka 核心技术与实战》学习笔记Day7 - http://gk.link/a/11UOV

上一篇 下一篇

猜你喜欢

热点阅读