Springboot收发kafka消息

2021-01-04  本文已影响0人  一只菜鸟的笔记
@Component
public class Consumer {

    @Autowired
    private KafkaTemplate<String, String> template;

    @Bean
    KafkaListenerContainerFactory<?> batchFactory(ConsumerFactory<String, String> consumerFactory) {         
        System.setProperty("java.security.auth.login.config", client_cf_file);//加入了安全认证
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        factory.setBatchListener(true);
        return factory;
    }

    @KafkaListener(topicPattern = "${receive_topic}", containerFactory = "batchFactory")
    public void receiveMessage(List<ConsumerRecord<String, String>> records) {
        //接收消息
        for (ConsumerRecord<String, String> record : records) {
            System.out.println(record.value());
            //发送消息
            template.send("send_topic", record.value());
        }
    }
}

application.yml

spring:
  kafka:
    consumer: 
      bootstrap-servers: 100.1.0.1:9092
      group-id: sce
      max-poll-records: 1000
      auto-offset-reset: earliest
      auto-commit-interval: 1000
      enable-auto-commit: true
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      properties: 
        sasl.mechanism: PLAIN  //加入了安全认证
        security.protocol: SASL_PLAINTEXT  //加入了安全认证
    producer:
      bootstrap-servers: 100.1.0.1:9092
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      acks: 1
      properties: 
        sasl.mechanism: PLAIN  //加入了安全认证
        security.protocol: SASL_PLAINTEXT  //加入了安全认证
receive_topic:receive_topic
上一篇下一篇

猜你喜欢

热点阅读