【kafka 源码】 kafka 生产者初始化过程
2019-07-11 本文已影响0人
logan_wu
Kafka
生产者初始化时,首先会初始化生产者相关配置 ProducerConfig
。然后,根据配置,初始化分区器,序列化器,拦截器,消息累加器并启动 Sender
线程。
配置
谈到 Kafka 生产者初始化之前,先要看一下生产者配置 ProducerConfig
。该类定义了生产者的配置,必填参数需要在定义生产者时配置,非必填参数已定义了默认值,如果需要也可在定义生产者时配置修改。
public class ProducerConfig extends AbstractConfig {
/**
* 定义生产者相关配置
*/
private static final ConfigDef CONFIG;
static {
// 使用 builder 建造者模式,定义生产者配置
CONFIG = new ConfigDef()
.define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Collections.emptyList(),
// 约束,不能为空
new ConfigDef.NonNullValidator(), Importance.HIGH, CommonClientConfigs.BOOTSTRAP_SERVERS_DOC)
.define(...);
}
}
ConfigDef
定义配置时,会将参数封装为 ConfigKey
对象,并将 name
作为 key
该对象作为 value
存储。同时提供转换方法。
public class ConfigDef {
private final Map<String, ConfigKey> configKeys;
public ConfigDef define(String name, ...) {
// 参数封装为 ConfigKey 对象
return define(new ConfigKey(name, ...));
}
public ConfigDef define(ConfigKey key) {
configKeys.put(key.name, key);
}
public Map<String, Object> parse(Map<?, ?> props) {
// 转换所有已知 key
Map<String, Object> values = new HashMap<>();
for (ConfigKey key : configKeys.values()) {
values.put(key.name, parseValue(key, props.get(key.name), props.containsKey(key.name)));
}
return values;
}
}
编写生产者时会定义一些必须的配置 bootstrap.servers
, key.serializer
和 value.serializer
以及一些其他的非必填参数如 acks
。在 ProducerConfig
初始化时,会与上面的 configkeys
相匹配并放入新 map
中。
public class AbstractConfig {
/**
* 转换后值集
*/
private final Map<String, Object> values;
public AbstractConfig(ConfigDef definition, Map<?, ?> originals, ...) {
// 匹配 key ,覆盖 value。如果没有匹配到 key , 用默认 value
this.value = definition.parse(this.originals);
}
}
实例化
谈完了配置,再来谈一下 Kafka 如何实例化类。在 AbstractConfig
类中提供实例化类的方法,通过生产者配置的值,实例化为对应的对象和集合。
public class AbstractConfig {
/**
* 转换后值集
*/
private final Map<String, Object> values;
/**
* 自定义配置
*/
private final Map<String, ?> originals;
public <T> T getConfiguredInstance(String key, Class<T> t) {
// 通过 key 从生产者配置 values 中获取 value
Class<?> c = getClass(key);
Object o = Utils.newInstance(c);
// 继承 Configurable 的类的对象,传入自定义配置 originals
if (o instanceof Configurable) {
((Configurable) o).configure(originals());
}
return t.cast(o);
}
public <T> List<T> getConfiguredInstances(String key, Class<t> t) {
List<T> objects = new ArrayList<>();
// 通过 key 从生产者配置 values 中获取 value
List<String> classNames = getList(key);
for (Object klass : className) {
Object o = Utils.newInstance(klass);
// 继承 Configurable 的类的对象,传入自定义配置 originals
if (o instanceof Configurable) {
((Configurable) o).configure(originals());
}
objects.add(t.cast(o));
}
return objects;
}
}
初始化
明白了生产者的相关配置以及类的实例化过程,现在就可以看明白 kafka
生产者初始化过程了。
public class KafkaProducer<K, V> implements Producer<K, V> {
KafkaProducer(Map<String, Object> configs, ...) {
// 实例化生产者配置
ProducerConfig config = new ProducerConfig(ProducerConfig.addSerializerToConfig(configs, ...));
// 实例化分区器
this.partitioner = config.getConfiguredInstance("partitioner.class", Partitoner.class);
// 实例化序列化器
this.keySerializer = config.getConfiguredInstance("key.serializer", Serializer.class);
this.valueSerializer = config.getConfiguredInstance("value.serializer", Serializer.class);
// 实例化拦截器
List<ProducerInterceptor<K, V>> interceptorList = (List) configWithClientId.getConfiguredInstances(
ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptor.class);
this.interceptors = new ProducerInterceptors<>(interceptorList);
// 实例化消息累加器
this.accumulator = new RecordAccumulator(...);
// 启动 Sender 线程
this.sender = newSender(...);
this.ioThread = new KafkaThread(..., this.sender, ...);
this.ioThread.start();
}
}
至此, kafka 初始化过程完成。
注: 篇幅所限,代码片段只列举了必要部分,且稍有修改。具体代码请参考 kafka 源码。