Kafka多环境隔离

2025-05-09  本文已影响0人  陈柴盐

使用场景

  1. 环境隔离需求

    • 当需要在同一个Kafka集群上同时运行生产环境和灰度环境的消息队列
    • 避免不同环境的消息互相干扰
    • 方便进行灰度测试和验证
  2. 动态Topic路由

    • 无需修改业务代码
    • 通过配置实现消息的自动路由
    • 支持灵活切换环境

实现原理

  1. 生产者拦截器(KafkaProducerInterceptor)

    • 实现了 ProducerInterceptor 接口
    • 在消息发送前通过 onSend 方法拦截消息
    • 根据配置的前缀( AppConst.KAFKA_PREFIX_KEY )动态修改目标Topic
    • 例如:原始topic为"order",配置前缀为"grey_",最终发送到"grey_order"队列

2.监听器配置

实现代码

import cn.hutool.core.util.StrUtil;
import cn.example.common.AppConst;
import com.google.common.collect.Lists;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.reflections.Reflections;
import org.reflections.scanners.FieldAnnotationsScanner;
import org.reflections.scanners.MethodAnnotationsScanner;
import org.reflections.scanners.MethodParameterScanner;
import org.reflections.scanners.SubTypesScanner;
import org.reflections.util.ConfigurationBuilder;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.boot.autoconfigure.AutoConfigurationPackages;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.EnvironmentAware;
import org.springframework.context.annotation.Profile;
import org.springframework.core.env.Environment;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import java.lang.reflect.Field;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.List;
import java.util.Map;
import java.util.Set;


/**
 * 需要配合cn.example.interceptor.KafkaProducerInterceptor
 * KafkaListenerFactoryBeanPostProcessor:更改监听队列
 * KafkaProducerInterceptor:更改消息发送队列
 */
@Component
@Slf4j
public class KafkaListenerFactoryBeanPostProcessor implements BeanFactoryPostProcessor, EnvironmentAware {


    private Environment env;



    @SneakyThrows
    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {

        if (StrUtil.isNotBlank(env.getProperty(AppConst.KAFKA_PREFIX_KEY))) {
            List<String> packageNames = AutoConfigurationPackages.get(beanFactory);

            for (String packageName : packageNames) {
                Reflections reflections = new Reflections(new ConfigurationBuilder()
                        // 指定路径URL
                        .forPackages(packageName)
                        // 添加子类扫描工具
                        .addScanners(new SubTypesScanner())
                        // 添加 属性注解扫描工具
                        .addScanners(new FieldAnnotationsScanner())
                        // 添加 方法注解扫描工具
                        .addScanners(new MethodAnnotationsScanner())
                        // 添加方法参数扫描工具
                        .addScanners(new MethodParameterScanner())
                );

                Set<Method> methodSet = reflections.getMethodsAnnotatedWith(KafkaListener.class);
                if (!CollectionUtils.isEmpty(methodSet)) {
                    for (Method method : methodSet) {
                        KafkaListener kafkaListener = method.getAnnotation(KafkaListener.class);
                        changeTopics(kafkaListener);
                    }
                }
            }
        }

    }


    private void changeTopics(KafkaListener kafkaListener) throws Exception {
        InvocationHandler invocationHandler = Proxy.getInvocationHandler(kafkaListener);
        Field memberValuesField = invocationHandler.getClass().getDeclaredField("memberValues");
        memberValuesField.setAccessible(true);
        Map<String, Object> memberValues = (Map<String, Object>) memberValuesField.get(invocationHandler);
        String[] topics = (String[]) memberValues.get("topics");
        log.info("修改前topics:{}", Lists.newArrayList(topics));
        for (int i = 0; i < topics.length; i++) {
            topics[i] = env.getProperty(AppConst.KAFKA_PREFIX_KEY) + topics[i];
        }
        memberValues.put("topics", topics);
        log.info("修改后topics:{}", Lists.newArrayList(kafkaListener.topics()));

    }

    @Override
    public void setEnvironment(Environment environment) {
        env = environment;
    }
}
import cn.hutool.core.util.StrUtil;
import cn.example.common.AppConst;
import cn.example.common.utils.ApplicationContextUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.context.annotation.Profile;
import org.springframework.core.env.Environment;

import java.util.Map;
import java.util.Objects;

/**
 * 需要配合cn.example.beanPostProcesser.KafkaListenerFactoryBeanPostProcessor
 * KafkaListenerFactoryBeanPostProcessor:更改监听队列
 * KafkaProducerInterceptor:更改消息发送队列
 */
@Slf4j
public class KafkaProducerInterceptor implements ProducerInterceptor<String, String> {


    /**
     * 运行在用户主线程中,在消息被序列化之前调用
     *
     * @param record
     * @return
     */
    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        Environment environment = ApplicationContextUtil.getEnvironment();
        String prefixKey = environment.getProperty(AppConst.KAFKA_PREFIX_KEY);
        if (StrUtil.isNotBlank(prefixKey)) {
            log.info("原始topic:{}", record.topic());
            String targetTopic = prefixKey + record.topic();
            log.info("修改后的topic:{}",prefixKey+record.topic());
            return new ProducerRecord<String, String>( targetTopic,
                    record.partition(), record.timestamp(), record.key(), record.value());
        }
        return record;
    }


    /**
     * 在消息被应答之前或者消息发送失败时调用,通常在producer回调逻辑触发之前,运行在produer的io线程中
     *
     * @param metadata
     * @param exception
     */
    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        log.info("实际topic:{}", metadata.topic());
    }


    /**
     * 清理工作
     */
    @Override
    public void close() {
    }


    /**
     * 初始化工作
     *
     * @param configs
     */
    @Override
    public void configure(Map<String, ?> configs) {

    }
}

配置方式

  1. 在配置文件中设置前缀
# 生产环境不设置前缀
kafka.prefix.key=

# 灰度环境设置前缀
kafka.prefix.key=grey_
上一篇 下一篇

猜你喜欢

热点阅读