Spring集成kafka态订阅消费消息

2019-08-19  本文已影响0人  和平菌

1、消息处理的类
定义一个类,继承MessageListener接口来处理消息

public class KakaMessageListener implements MessageListener<String, String> {
    @Override
    public void onMessage(ConsumerRecord<String, String> record) {
        String topic = record.topic();
        String content = record.value();
    }
}

2、配置Kafka的Consumer

@Data
@Component
@ConfigurationProperties(prefix = "kafkaconfig")
@EnableKafka
public class KafkaConfig {

    private String servers;
    private String groupid;

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(6);
        factory.getContainerProperties().setPollTimeout(1500);

        return factory;
    }

    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }


    public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupid);
        return propsMap;
    }
}

3、动态的进行订阅和取消订阅

@Autowired
    KakaMessageListener kakaMessageListener;

    @Autowired
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> factory;

    public static ConcurrentHashMap<String, ConcurrentMessageListenerContainer<String, String>> cache = new ConcurrentHashMap<>();

    @Override
    public void subscribe(String topic) {
        ConcurrentMessageListenerContainer<String, String> container = null;
        if(cache.containsKey(topic)){
            container = cache.get(topic);
        }

        if(container == null){
            container = factory.createContainer(topic);
            cache.put(topic, container);
        }

        container.setupMessageListener(kakaMessageListener);
        container.start();

        log.info("订阅kafka消息:" + topic);
    }

    @Override
    public void unSubscribe(String topic) {
        if(cache.containsKey(topic)){
            ConcurrentMessageListenerContainer<String, String> container = cache.get(topic);
            if(container != null){
                container.stop();
                log.info("取消订阅kafka消息:" + topic);
            }
        }
    }
上一篇下一篇

猜你喜欢

热点阅读