Kafka面试题
1. Kafka与其他MQ的区别
以时间复杂度O(1)的方式提供消息持久化的能力,即使对TB级以上的数据也能保证常数时间的访问;
高吞吐率,即使是在非常廉价的商用机器上也能做到单机每秒100K的消息传输;
分布式系统,生产者,broker及消费者都可以有多个,支持在线扩展。
支持离线数据处理和实时数据处理;
将消息持久化到磁盘,默认保留7天。
Kafka仅支持消息的拉取,没有消息推送。
Kafka的消息传递模式是发布订阅模式,而非点对点模式。
2. Kafka的低阶和高阶API的区别
高阶API:
```
public class Main{
publicstaticvoidmain(String[]args) {
Propertiesprops=newProperties();
props.setProperty("bootstrap.servers","192.168.100.101:9092,192.168.100.102:9092");
props.setProperty("group.id","csmg_01");
props.setProperty("enable.auto.commit","true");
props.setProperty("auto.commit.interval.ms","1000");
props.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String,String>consumer=newKafkaConsumer<>(props);
// 订阅要消费的主题
consumer.subscribe(Arrays.asList("tp_demo_01"));
// while循环不断从Kafka的topic中拉取消息
while(true) {
ConsumerRecords<String,String>records=consumer.poll(Duration.ofMillis(3_000));
for(ConsumerRecord<String,String>record:records)
System.out.printf("offset = %d, key = %s, value = %s%n",record.offset(),record.key(),record.value());
}
}
}
```
高阶API优点:
消费Kafka的消息很容易实现,写起来比较简单
不需要管理offset,直接通过__consumer_offsets和消费者协调器管理;也不需要管理分区、副本,由消费者协调器统一管理
消费组协调器会自动根据上一次在__consumer_offsets 中保存的offset去接着获取数据
在__consumer_offsets 中,不同的消费组(group)同一个topic记录不同的offset,这样不同程序读取同一个topic,不会受offset的影响。
高级API缺点:
不能控制offset,如:从指定的位置读取
不能细化控制分区、副本等
低阶API
通过使用低级API,可以控制offset,想从哪儿读,就可以从哪儿读。而且,可以自己控制连接分区,对分区自定义负载均衡。而且,之前offset是自动保存在__consumer_offsets 中,使用低阶API,可以将offset存储在指定的地方,如:文件、MySQL、或者内存。
低阶API,比较复杂,需要执行控制offset,连接到哪个分区,并找到分区的leader。
手动消费分区数据
不使用subscribe方法订阅主题,而使用assign方法指定想要消费的消息
Stringtopic="test";
TopicPartitionpartition0=newTopicPartition(topic,0);
TopicPartitionpartition1=newTopicPartition(topic,1);
consumer.assign(Arrays.asList(partition0,partition1));
注意
当手动管理消费分区时,即使GroupID是一样的,消费组协调器都不再起作用;
如果消费者失败,也将不再自动进行再平衡。
3. Kafka的三种ACK机制
acks = 1:生产者发送的消息,只需要在Leader副本进行确认即可;
acks = 0:生产者发送的消息,不需要确认,实际上,只要生产者将消息放到消息累加器即表示发送成功;
acks = -1:生产者发送的消息,需要Leader和ISR中所有的副本确认才表示发送成功,确保了消息的安全性;一般配合 min.insync.replicas,让发送的消息最少在这么多的副本中确认才表示发送消息成功。一般 min.insync.replicas为分区副本数的N/2+1,既保证了消息的安全性,也保证了比较好的效率。
4. 消费者如何从Kafka中消费数据
对于高阶API:
消费者需要订阅指定的主题,
消费者以消费组的形式存在。
当启动消费组中的消费者的时候,消费组协调器负责再平衡,将所订阅的主题分区按照一个分区之分配给消费组中一个消费者,一个消费者可以分配多个分区的原则进行分配。
消费者调用poll方法从主题分区拉取消息,同时需要更新消费消息的偏移量。
消费消息的偏移量默认自动提交,每5秒提交一次到__consumer_offsets主题的该消费组对应的分区;
自动提交会引起消费的重复,此时可以设置手动提交
手动提交消费偏移量分为同步提交和异步提交。
一般循环中使用异步提交的方式,每隔一定时间或每隔多少个消息就同步提交一次消费偏移量
异步提交偏移量速度快,但是不安全,手动同步提交是为了确保消费偏移量的安全。
消费者的偏移量在__consumer_offsets_对应的分区中存储方式为:
key:消费组id+主题名称+分区编号
value:消息偏移量
对于低阶API
通过使用低级API,可以控制offset,想从哪儿读,就可以从哪儿读。而且,可以自己控制连接分区,对分区自定义负载均衡。而且,之前offset是自动保存在__consumer_offsets 中,使用低阶API,可以将offset存储在指定的地方,如:文件、MySQL、或者内存。
低阶API,比较复杂,需要执行控制offset,连接到哪个分区,并找到分区的leader。
不使用subscribe方法订阅主题,而使用assign方法指定想要消费的消息
当手动管理消费分区时,即使GroupID是一样的,消费组协调器都不再起作用;
如果消费者失败,也将不再自动进行再平衡。
5. Kafka生产消费怎么保证Exactly Once
Kafka通过幂等保证消息的ExactlyOnce。
Kafka在引入幂等性之前,Producer向Broker发送消息,然后Broker将消息追加到消息流中后给Producer返回Ack信号值。实现流程如下:
生产中,会出现各种不确定的因素,比如在Producer在发送给Broker的时候出现网络异常。比如以下这种异常情况的出现:
上图这种情况,当Producer第一次发送消息给Broker时,Broker将消息(x2,y2)追加到了消息流中,但是在返回Ack信号给Producer时失败了(比如网络异常) 。此时,Producer端触发重试机制,将消息(x2,y2)重新发送给Broker,Broker接收到消息后,再次将该消息追加到消息流中,然后成功返回Ack信号给Producer。这样下来,消息流中就被重复追加了两条相同的(x2,y2)的消息。
幂等性
保证在消息重发的时候,消费者不会重复处理。即使在消费者收到重复消息的时候,重复处理,也要保证最终结果的一致性。
所谓幂等性,数学概念就是:f(f(x)) = f(x)。f函数表示对消息的处理。
比如,银行转账,如果失败,需要重试。不管重试多少次,都要保证最终结果一定是一致的。
幂等性实现
添加唯一ID,类似于数据库的主键,用于唯一标记一个消息。
Kafka为了实现幂等性,它在底层设计架构中引入了ProducerID和SequenceNumber。
ProducerID:在每个新的Producer初始化时,会被分配一个唯一的ProducerID,这个ProducerID对客户端使用者是不可见的。
SequenceNumber:对于每个ProducerID,Producer发送数据的每个Topic和Partition都对应一个从0开始单调递增的SequenceNumber值。
同样,这是一种理想状态下的发送流程。实际情况下,会有很多不确定的因素,比如Broker在发送Ack信号给Producer时出现网络异常,导致发送失败。异常情况如下图所示:
当Producer发送消息(x2,y2)给Broker时,Broker接收到消息并将其追加到消息流中。此时,Broker返回Ack信号给Producer时,发生异常导致Producer接收Ack信号失败。对于Producer来说,会触发重试机制,将消息(x2,y2)再次发送,但是,由于引入了幂等性,在每条消息中附带了PID(ProducerID)和SequenceNumber。相同的PID和SequenceNumber发送给Broker,而之前Broker缓存过之前发送的相同的消息,那么在消息流中的消息就只有一条(x2,y2),不会出现重复发送的情况。
6. Kafka如何保证消息的顺序性
Kafka消息以主题进行业务分类消息的管理。
每个主题都可以通过分区在Kafka集群中横向扩展;
对于包含多个分区的主题,Kafka不保证消息在各个分区间的顺序性,只保证在分区内部的消息有序性。
如果需要保证全局有序,则需要保证:
一个单线程运行的生产者,一个只包含一个分区的主题,一个单线程的消费者。
一般生产中消息的有序性是局部有序,此时可以通过设置:
生产者发送消息的时候,可以设置消息的key,此时消息的分区是[hash(key) % 主题分区数],只要key相同就可以路由到同一个分区,保证消息的一致性。
生产者发送消息的时候,也可以直接指定消息所属的分区,此时可以自定义分区器,实现消息在主题各个分区之间的负载均衡,保证消息在业务上的有序性。
生产者需要确保发送的消息inflight.request为1,如果发送失败重试。如果inflight.requests大于1,会在消息重发的时候引起消息乱序。
对于生产者,还需要设置 max.in.flight.requests.per.connection 为1,表示当生产者发送了一个请求,则该请求在确认之前,是一个待确认请求。
当 max.in.flight.requests.per.connection 设为1的时候,此时生产者不会发送新的请求,直到待确认请求被确认,或待确认请求重发成功之后,再发送新的请求。
如此可以保证消息在发送时候的顺序性。
7. Kafka Controller的作用
选举Leader和ISR
控制器从zookeeper的/brokers/topics加载一个topic所有分区的所有副本,从分区副本列表中选出一个作为该分区的leader,并将该分区对应所有副本置于ISR列表。
同步元数据信息包括broker和分区的元数据信息
控制器架将zookeeper的/brokers/ids以及/brokers/topics的topic下各分区leader和ISR将这些元数据信息同步到集群每个broker。
通过监控机制,当有broker或者分区发生变更时及时更新到集群保证集群每一台broker缓存的是最新元数据。
broker加入的监听和处理
控制器启动时就起一个监视器监视zookeeper的/brokers/ids/子节点。当存在broker启动加入集群后都会在zookeeper的/brokers/ids/增加一个子节点,名字是<brokerId>,控制器的监视器发现这种变化后,控制器开始执行broker加入的相关流程并更新元数据信息到集群。
broker崩溃的监听与处理
控制器启动时就起一个监视器监视zookeeper的/brokers/ids/子节点。当一个broker崩溃时,该broker与zookeeper的会话失效,zookeeper删除该子节点,控制器的监视器发现后,控制器开始执行broker删除的相关流程并更新元数据信息到集群。
topic创建的监听与处理
控制器启动时,使用监视器监视zookeeper的/brokers/topics/子节点。当通过脚本或者请求创建一个topic后,该topic对应的所有分区及其副本都会写入该目录下的一个子节点。控制器的监视器发现后,控制器开始执行topic创建的相关流程包括leader选举和ISR并同步元数据信息到集群;且新增一个监视器以监视zookeeper的/brokers/topics/<主题节点>,监听该topic内容变化。
topic删除的监听与处理
控制器启动时,使用监视器监视zookeeper的/admin/delete_topics/子节点。当通过脚本或者请求删除一个topic后,该topic会写入该目录下的一个子节点。控制器的监视器发现后,控制器开始执行topic删除的相关流程包括通知该topic所有分区的所有副本停止运行;通知所有分区所有副本删除数据;删除zookeeper的/admin/delete_topics/<待删除topic子节点>。
分区重分配监听与处理
分区重分配通过KafkaAdmin脚本执行完成一个topic下分区的副本重新分配broker。
控制器启动,使用监视器监视zookeeper的/admin/reassign_part/子节点。当通过脚本执行分区重分配后会在该目录增加一个子节点,子节点内容是按照一定格式构建的重分配方案,控制器的监视器发现这种变化后,控制器开始执行分区重分配相关流程如同步元数据信息。
分区扩展监听与处理
当创建一个topic后,控制器会增加一个监视器监视zookeeper的/brokers/topics/<新增topic子节点内容>,监听该topic内容变化。当通过脚本执行分扩展后会在该目录增加新的分区目录。控制器的监视器发现后,控制器开始执行分区扩展相应流程如选举leader和ISR并同步。
broker优雅退出
当通过脚本或kill关闭一个broker时,称为broker优雅退出。即将关闭的broker向控制器发送退出请求后一直阻塞。
控制器接收到请求后,执行leader重选举和ISR后响应broker。broker接收后退出。
该步骤不依赖zookeeper,直接通过broker和控制器RPC通信即可完成。
控制器故障转移
集群中第一个broker通过在zookeeper的/controller注册子节点brokerId使自己成为该集群的控制器,其他broker使用监视器监视zookeeper的/controller以及向/controller_EPOCH注册子节点。
如果控制器所在broker退出、崩溃或与zookeeper会话失效,则zookeeper删除/controller内该子节点,各个broker的监视器发现这种变化后,每个broker开始竞争直到有一个竞争成为新的控制器,并向/controller注册子节点,以及向/controller_EPOCH注册子节点。
8. Kafka的多副本如何选举
当一个主题分区的Leader副本不可用,会触发zk元数据的变化;
集群控制器注册的监听器会收到通知,推动副本状态机状态向前迁移;
副本状态机状态迁移引起选举Leader的操作。
kafka.controller.OfflinePartitionLeaderSelector 类中的selectLeader方法定义了选举Leader的步骤:
找到对应主题分区的在线副本集合;
找出在线的ISR副本集合所在的broker列表;
如果ISR中副本所在的broker有一个是在线的,就从ISR中选择出新Leader,以及新ISR;
如果ISR副本所在的broker没有在线的,且禁用了不干净的领导选举,则抛NoReplicaOnlineException异常;
如果启用了不干净的领导选举,则从在线的broker中选出新的Leader,该ISR只包含Leader一个副本;
如果分区分配的所有副本都不在线,则抛NoReplicaOnlineException异常;
向该主题分区的在线副本发送LeaderAndIsr请求,更新它们的缓存元数据;
一旦Leader在zookeeper注册成功,就马上更新集群控制器allLeaders缓存。
9. Kafka消费者重平衡流程
发生如下情况会引起消费组再平衡:
消费组成员的增加或减少;
消费组订阅的主题分区数发生变化,如增加分区;
消费组通过正则表达式订阅主题,当新建主题的时候如果匹配到该正则表达式,则该消费组会订阅该新建主题。
再均衡分为2步:Join和Sync
Join, 加入组。所有成员都向消费组协调器发送JoinGroup请求,请求加入消费组。一旦所有成员都发送了JoinGroup请求,协调i器从中选择一个消费者担任Leader的角色,并把组成员信息以及订阅信息发给Leader。
Sync,Leader开始分配消费方案,即哪个消费者负责消费哪些主题的哪些分区。一旦完成分配,Leader会将这个方案封装进SyncGroup请求中发给消费组协调器,非Leader也会发SyncGroup请求,只是内容为空。消费组协调器接收到分配方案之后会把方案塞进SyncGroup的response中发给各个消费者。