【kafka 源码】 kafka 生产者初始化过程

2019-07-11  本文已影响0人  logan_wu

Kafka 生产者初始化时,首先会初始化生产者相关配置 ProducerConfig 。然后,根据配置,初始化分区器,序列化器,拦截器,消息累加器并启动 Sender 线程。

配置

谈到 Kafka 生产者初始化之前,先要看一下生产者配置 ProducerConfig 。该类定义了生产者的配置,必填参数需要在定义生产者时配置,非必填参数已定义了默认值,如果需要也可在定义生产者时配置修改。

public class ProducerConfig extends AbstractConfig {
    
    /**
     * 定义生产者相关配置
     */
    private static final ConfigDef CONFIG;
    
    static {
        // 使用 builder 建造者模式,定义生产者配置
        CONFIG = new ConfigDef()
            .define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Collections.emptyList(),
                    // 约束,不能为空
                    new ConfigDef.NonNullValidator(), Importance.HIGH, CommonClientConfigs.BOOTSTRAP_SERVERS_DOC)
            .define(...);
    }
}

ConfigDef 定义配置时,会将参数封装为 ConfigKey 对象,并将 name 作为 key 该对象作为 value 存储。同时提供转换方法。

public class ConfigDef {
    
    private final Map<String, ConfigKey> configKeys;
    
    public ConfigDef define(String name, ...) {
        // 参数封装为 ConfigKey 对象
        return define(new ConfigKey(name, ...));
    }
    
    public ConfigDef define(ConfigKey key) {
        configKeys.put(key.name, key);
    }
    
    public Map<String, Object> parse(Map<?, ?> props) {
        // 转换所有已知 key
        Map<String, Object> values = new HashMap<>();
        for (ConfigKey key : configKeys.values()) {
            values.put(key.name, parseValue(key, props.get(key.name), props.containsKey(key.name)));
        }
        return values;
    }
}

编写生产者时会定义一些必须的配置 bootstrap.servers , key.serializervalue.serializer 以及一些其他的非必填参数如 acks 。在 ProducerConfig 初始化时,会与上面的 configkeys 相匹配并放入新 map 中。

public class AbstractConfig {
    
    /**
     * 转换后值集
     */
    private final Map<String, Object> values;
    
    public AbstractConfig(ConfigDef definition, Map<?, ?> originals, ...) {
        
        // 匹配 key ,覆盖 value。如果没有匹配到 key , 用默认 value
        this.value = definition.parse(this.originals);
    }
}

实例化

谈完了配置,再来谈一下 Kafka 如何实例化类。在 AbstractConfig 类中提供实例化类的方法,通过生产者配置的值,实例化为对应的对象和集合。

public class AbstractConfig {
    
    /**
     * 转换后值集
     */
    private final Map<String, Object> values;
    
    /**
     * 自定义配置
     */
    private final Map<String, ?> originals;
    
    public <T> T getConfiguredInstance(String key, Class<T> t) {
        // 通过 key 从生产者配置 values 中获取 value
        Class<?> c = getClass(key);
        Object o = Utils.newInstance(c);
        
        // 继承 Configurable 的类的对象,传入自定义配置 originals
        if (o instanceof Configurable) {
            ((Configurable) o).configure(originals());
        }
        return t.cast(o);
    }
    
    public <T> List<T> getConfiguredInstances(String key, Class<t> t) {
        List<T> objects = new ArrayList<>();
        
        // 通过 key 从生产者配置 values 中获取 value
        List<String> classNames = getList(key);
        for (Object klass : className) {
            Object o = Utils.newInstance(klass);
            
            // 继承 Configurable 的类的对象,传入自定义配置 originals
            if (o instanceof Configurable) {
                ((Configurable) o).configure(originals());
            }
            objects.add(t.cast(o));
        }
        
        return objects;
    }
}

初始化

明白了生产者的相关配置以及类的实例化过程,现在就可以看明白 kafka 生产者初始化过程了。

public class KafkaProducer<K, V> implements Producer<K, V> {
    
    KafkaProducer(Map<String, Object> configs, ...) {
        
        // 实例化生产者配置
        ProducerConfig config = new ProducerConfig(ProducerConfig.addSerializerToConfig(configs, ...));
        
        // 实例化分区器
        this.partitioner = config.getConfiguredInstance("partitioner.class", Partitoner.class);
        
        // 实例化序列化器
        this.keySerializer = config.getConfiguredInstance("key.serializer", Serializer.class);
        this.valueSerializer = config.getConfiguredInstance("value.serializer", Serializer.class);
        
        // 实例化拦截器
        List<ProducerInterceptor<K, V>> interceptorList = (List) configWithClientId.getConfiguredInstances(
            ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptor.class);
        this.interceptors = new ProducerInterceptors<>(interceptorList);
        
        // 实例化消息累加器
        this.accumulator = new RecordAccumulator(...);
        
        // 启动 Sender 线程
        this.sender = newSender(...);
        this.ioThread = new KafkaThread(..., this.sender, ...);
        this.ioThread.start();
    }
    
}

至此, kafka 初始化过程完成。

注: 篇幅所限,代码片段只列举了必要部分,且稍有修改。具体代码请参考 kafka 源码。

上一篇下一篇

猜你喜欢

热点阅读