Kafka消费者总结
-
提交偏移量的方式,如何保证消息不丢失,不重复消费
- 自动提交偏移量,会导致消息重复消费和消息丢失。重复消费是因为,当一个消费者提交偏移量之后拉取消息到本地之后消费者崩溃了,消息就没法提交,等他恢复之后又从上次的偏移量开始消费,造成消费重复消费。消息丢失的情况:一个线程A拉取消息到缓存之后提交偏移量,另一个线程B从缓存中读取消息,如果线程B发生异常
- 手动提交偏移量:coomitSync、commitAsync
-
多线程消费,KafkaConsumer是非线程安全的
-
线程封闭。每个Partiton新建一个线程,每个线程实例化一个KafkaConsumer,一个线程处理一个或多个分区。缺点是并发度受限与分区个数,比如说只有一个分区,那么只有一个消费者线程在处理。而且这样每个线程都要占用一个TCP连接,系统开销比较大
image.png
-
每个分区多个消费线程,但是这种需要处理偏移量,比较麻烦。一般不用这种方式处理
-
处理消息模块使用多线程
image.png
-
-
重要的参数
- bookstrap.servers:Host:port,Host1:port1的形式
- group.id:一般设置为有业务意义的名称
- key.deserializer和value.deserializer和生产者的key.serializer和value.serializer相对应
- client.id,客户端的id,如果没设定,会由kafka生成
- enable.auto.commit,是否自动移交偏移量
- fetch.min.bytes,Kafka收到消费者的拉取请求后,如果需要返回的数据量少于这个参数大小,那么需要等待,这个参数默认1B,是吞吐量和延迟的平衡
- fetch.max.bytes,默认50B,Consumer再一次拉取请求从Kafka获取的最大数据量
- fetch.max.wait.ms。默认500Ms,如果Kafka的消息不满足fetch.min.bytes,最多等待这个参数的配置时间
- request.timeout.ms,Consumer等待的最长时间,默认30000ms
-
@KafkaListener、ConcurrentKafkaListenerContainerFactory、ConsumerFactory、KafkaProperties、poll-timeout
a. @KafkaListener
解析:
由KafkaListenerAnnotationBeanPostProcessor完成,实现了BeanPostProcessor,在Spring初始化之前会遍历BeanPostProcessor的实现类,执行PostProcessorAfterInitialization方法。
解析出注解了@KafkaListener的所有方法
ConfluenceInstalledFont, monospace;">Collection<KafkaListener> classLevelListeners = this.findListenerAnnotations(targetClass);
之后把KafkaListener的信息封装到MethodKafkaListenerEndpoint,再调用this.registrar.registerEndpoint(endpoint, factory);注册:KafkaListenerEndpointRegistrar实现了BeanFactoryAware, InitializingBean两个接口,因此在Spring初始化Bean的时候会遍历InitializingBean的所有实现类,并执行afterPropertiesSet方法
public void afterPropertiesSet() {
this.registerAllEndpoints();
}
protected void registerAllEndpoints() {
synchronized(this.endpointDescriptors) {
Iterator var2 = this.endpointDescriptors.iterator();
while(var2.hasNext()) {
KafkaListenerEndpointRegistrar.KafkaListenerEndpointDescriptor descriptor = (KafkaListenerEndpointRegistrar.KafkaListenerEndpointDescriptor)var2.next();
this.endpointRegistry.registerListenerContainer(descriptor.endpoint, this.resolveContainerFactory(descriptor));
}
this.startImmediately = true;
}
}
registerAllEndpoints方法将解析的KafkaListener封装到KafkaListenerEndpointDescriptor,然后注册到list里。registerListenerContainer为每一个KafkaListenerEndpointDescriptor生成一个MessageListenerContainer
MessageListenerContainer container = this.createListenerContainer(endpoint, factory);
this.listenerContainers.put(id, container);
KafkaMessageListenerContainer最终继承了Lifecycle,Spring在遍历所有的LifeStyle,执行start方法时KafkaMessageListenerContainer的dostart方法会被调用,实例化了KafkaListenerConsumer对象,ListenerConsumer实现了
Runnable,所以可以放入线程池中,这样可以并发执行,但是这里有个问题,就是getConsumerTaskExecutor如果没有配置线程池,默认的线程池是什么?
GenericMessageListener<?> listener = (GenericMessageListener)messageListener;
ListenerType listenerType = this.deteremineListenerType(listener);
this.listenerConsumer = new KafkaMessageListenerContainer.ListenerConsumer(listener, listenerType);
this.setRunning(true);
this.listenerConsumerFuture = containerProperties.getConsumerTaskExecutor().submitListenable(this.listenerConsumer);
执行:
ListenerConsumer实现了Runnable,所以最终由run方法调用的poll()来拉取消息。
public void run() {
this.consumerThread = Thread.currentThread();
if (this.genericListener instanceof ConsumerSeekAware) {
((ConsumerSeekAware)this.genericListener).registerSeekCallback(this);
}
if (this.transactionManager != null) {
ProducerFactoryUtils.setConsumerGroupId(this.consumerGroupId);
}
this.count = 0;
this.last = System.currentTimeMillis();
this.initAsignedPartitions();
while(KafkaMessageListenerContainer.this.isRunning()) {
try {
this.pollAndInvoke();
} catch (WakeupException var3) {
} catch (NoOffsetForPartitionException var4) {
this.fatalError = true;
this.logger.error("No offset and no reset policy", var4);
break;
} catch (Exception var5) {
this.handleConsumerException(var5);
} catch (Error var6) {
Runnable runnable = KafkaMessageListenerContainer.this.emergencyStop;
if (runnable != null) {
runnable.run();
}
this.logger.error("Stopping container due to an Error", var6);
this.wrapUp();
throw var6;
}
}
this.wrapUp();
}
总结:
KafkaListener内部也是多线程消费,并且是多线程消费的第一种,一个线程实例化一个KafkaConsumer实例
this.consumer = KafkaMessageListenerContainer.this.consumerFactory.createConsumer(this.consumerGroupId, this.containerProperties.getClientId(), KafkaMessageListenerContainer.this.clientIdSuffix);

b. ConcurrentKafkaListenerContainerFactory指定concurrent为4,那么就会有4个KafkaMessageListenerContainer->实例化4个LinstenerConsumer->4个KafkaConsumer
5. topic,partition,group,customer之间的联系
主题可以以业务维度区分,一个topic有多个分区,具体怎么有多少个分区合适?一个group的消费者消费一个topic。每个分区只能由一个消费者组中的消费者消费,如果想一个分区多次消费,可以另外新建消费者组。如果想消费指定的分区,可以指定key