spring-kafka之@KafkaListener单条或批量

2022-08-15  本文已影响0人  小郭子

来源:公众号 作者:方志朋
链接:https://mp.weixin.qq.com/s/LvaeK9u5EaHDF2SmLNecqg

消息监听容器

1、KafkaMessageListenerContainer

由spring提供用于监听以及拉取消息,并将这些消息按指定格式转换后交给由@KafkaListener注解的方法处理,相当于一个消费者;

看看其整体代码结构:

KafkaMessageListenerContainer#doStart

protected void doStart() {
 if (isRunning()) {
  return;
 }
 if (this.clientIdSuffix == null) { // stand-alone container
  checkTopics();
 }
 ContainerProperties containerProperties = getContainerProperties();
 checkAckMode(containerProperties);
 
 ......
 
    // 创建ListenerConsumer消费者并放入到线程池中执行
 this.listenerConsumer = new ListenerConsumer(listener, listenerType);
 setRunning(true);
 this.startLatch = new CountDownLatch(1);
 this.listenerConsumerFuture = containerProperties
   .getConsumerTaskExecutor()
   .submitListenable(this.listenerConsumer);
 
 ......
 
}

KafkaMessageListenerContainer.ListenerConsumer#run

public void run() { // NOSONAR complexity
 
 .......
 
 Throwable exitThrowable = null;
 while (isRunning()) {
  try {
      // 拉去消息并处理消息
   pollAndInvoke();
  }
  catch (@SuppressWarnings(UNUSED) WakeupException e) {
  
         ......
  
  }
  
  ......
  
 }
 wrapUp(exitThrowable);
}

2、ConcurrentMessageListenerContainer

并发消息监听,相当于创建消费者;其底层逻辑仍然是通过KafkaMessageListenerContainer实现处理;从实现上看就是在KafkaMessageListenerContainer上做了层包装,有多少的concurrency就创建多个KafkaMessageListenerContainer,也就是concurrency个消费者

ConcurrentMessageListenerContainer#doStart

protected void doStart() {
 if (!isRunning()) {
  checkTopics();
  
  ......
  
  setRunning(true);
 
  for (int i = 0; i < this.concurrency; i++) {
   KafkaMessageListenerContainer<K, V> container =
     constructContainer(containerProperties, topicPartitions, i);
   String beanName = getBeanName();
   container.setBeanName((beanName != null ? beanName : "consumer") + "-" + i);
   
   ......
   
   if (isPaused()) {
    container.pause();
   }
   // 这里调用KafkaMessageListenerContainer启动相关监听方法
   container.start();
   this.containers.add(container);
  }
 }
}

@KafkaListener底层监听原理

上面已经介绍了KafkaMessageListenerContainer的作用是拉取并处理消息,但还缺少关键的一步,即 如何将我们的业务逻辑与KafkaMessageListenerContainer的处理逻辑联系起来?

那么这个桥梁就是@KafkaListener注解

Spring Boot 自动加载kafka相关配置

1、KafkaAutoConfiguration

2、KafkaAnnotationDrivenConfiguration

生产配置

1、单条消息处理

@Configuration
@EnableKafka
public class KafkaConfig {

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
                        kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }
    
    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }
    
    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
        ...
        return props;
    }
}
@KafkaListener(id = "myListener", topics = "myTopic",
        autoStartup = "${listen.auto.start:true}", concurrency = "${listen.concurrency:3}")
public void listen(String data) {
    ...
}

2、批量处理

@Configuration
@EnableKafka
public class KafkaConfig {

    @Bean
public KafkaListenerContainerFactory<?, ?> batchFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setBatchListener(true);  // <<<<<<<<<<<<<<<<<<<<<<<<<
    return factory;
}

    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }
    
    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
        ...
        return props;
    }
}
@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list) {
    ...
}

3、同一个消费组支持单条和批量处理

场景:

生产上最初都采用单条消费模式,随着量的积累,部分topic常常出现消息积压,最开始通过新增消费者实例和分区来提升消费端的能力;一段时间后又开始出现消息积压,由此便从代码层面通过批量消费来提升消费能力。

只对部分topic做批量消费处理

简单的说就是需要配置批量消费和单条记录消费(从单条消费逐步向批量消费演进)

@Configuration
@EnableKafka
public class KafkaConfig {

    @Bean(name = [batchListenerContainerFactory])
public KafkaListenerContainerFactory<?, ?> batchFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    // 开启批量处理
    factory.setBatchListener(true); 
    return factory;
}

    @Bean(name = [batchConsumerFactory])
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }
    
    @Bean(name = [batchConsumerConfig])
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
        ...
        return props;
    }
}

注意点:

总结

调试及相关源码版本:

org.springframework.boot::2.3.3.RELEASE

spring-kafka:2.5.4.RELEASE

上一篇 下一篇

猜你喜欢

热点阅读