9. Interview-Kafka
0 消息队列使用场景
- 消息通讯
- 异步处理
- 应用解耦
- 流量削峰
- 日志处理
1 消息中间件怎么保证消息幂等性/一致性?
- producer发送幂等性
- 单会话幂等性(幂等型producer)
- PID
- sequence number
- 缺陷是应用重启,新的producer没有老的producer的状态数据,可能重复
- 多会话幂等性(事务型producer,kafka事务隔离机制)
- transactionID
- epoch
- 单会话幂等性(幂等型producer)
- consumer消费幂等性
- 消息落库
- 生产者,发送消息前判断库中是否有记录(有记录说明已发送),没有记录,先入库,状态为待消费,然后发送消息并把主键id带上。
- 消费者,接收消息,通过主键ID查询记录表,判断消息状态是否已消费。若没消费过,则处理消息,处理完后,更新消息记录的状态为已消费。
- 将消息的offset存放到Redis中,消费消息时,先去查找
- 消息落库
2 Kafka架构
kafka架构 kafka架构1)Producer :消息生产者,就是向kafka broker发消息的客户端;
2)Consumer :消息消费者,向kafka broker取消息的客户端;
3)Consumer Group (CG):消费者组,由多个consumer组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
4)Broker :一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。
5)Topic :可以理解为一个队列,生产者和消费者面向的都是一个topic;
6)Partition:为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列;
7)Replica:副本,为保证集群中的某个节点发生故障时,该节点上的partition数据不丢失,且kafka仍然能够继续工作,kafka提供了副本机制,一个topic的每个分区都有若干个副本,一个leader和若干个follower。
8)leader:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是leader。
9)follower:每个分区多个副本中的“从”,实时从leader中同步数据,保持和leader数据的同步。leader发生故障时,某个follower会成为新的follower。
3 零拷贝(Zero Copy)技术原理
- 传统的I/O操作会将同一份数据拷贝2次,并且涉及4次的内核态和用户态的上下文切换,CPU的开销非常大。
- 零拷贝是依赖操作系统sendfile()支持的,它只需要一次数据拷贝操作,2次内核态和用户态的上下文切换,并且利用DMA直接存储器进行数据的传输,减少了CPU的消耗。
- Java的NIO的零拷贝技术是通过FileChannel.transferTo()方法实现的。
4 Kafka中的ISR、OSR、AR又代表什么?
- ISR,in-sync replication,与leader replication保持同步的follower replication集合
- OSR,失效副本,如果与leader通信后,会尝试与leader同步,同步的策略是首先将当前记录的hw之后的消息删除,然后与leader同步,当与leader基本同步之后(存储的消息的offset大于当前isr中的hw),就重新回到ISR之中。
- AR,all replication,分区的所有副本
replica.lag.time.max.ms : 这个参数的含义是 Follower 副本能够落后 Leader 副本的最长时间间隔,当前默认值是 10 秒。
5 Kafka中的HW、LEO、LW、LSO等分别代表什么?
image.png- LSO是LogStartOffset,一般情况下,日志文件的起始偏移量 logStartOffset 等于第一个日志分段的 baseOffset,但这并不是绝对的,logStartOffset 的值可以通过 DeleteRecordsRequest 请求(比如使用 KafkaAdminClient 的 deleteRecords()方法、使用 kafka-delete-records.sh 脚本、日志的清理和截断等操作进行修改。
- LEO,log end offset,日志末端位移,副本日志中下一条待写入消息的offset,就是每个副本最大的offset
- HW,high watermark,水印/高水印/高水位,副本最新一条已提交消息的位移,HW<=LEO,分区HW实际上就是ISR中所有副本LEO的最小值。
- LW 是 Low Watermark 的缩写,俗称“低水位”,代表 AR 集合中最小的 logStartOffset 值。副本的拉取请求(FetchRequest,它有可能触发新建日志分段而旧的被清理,进而导致 logStartOffset 的增加)和删除消息请求(DeleteRecordRequest)都有可能促使 LW 的增长。
6 Kafka中是怎么体现消息顺序性的?怎么保证消息全局有序?
- 可以通过分区策略体现消息顺序性。分区有序,全局无序。
- 分区策略有轮询策略、随机策略、按消息键保序策略。
- 按消息键保序策略
一旦消息被定义了 Key,那么你就可以保证同一个 Key 的所有消息都进入到相同的分区里面,由于每个分区下的消息处理都是有顺序的,故这个策略被称为按消息键保序策略。
- 按消息键保序策略
kafka怎么保证消息全局有序?
- 一个topic一个分区,降低性能,不适用高并发场景;
- 同一特征的数据发往同一个分区,通过业务系统确定特征值,比如说把数据库名+表名+主键作为特征值,同一个库同一个表同一个主键的数据发到同一个分区。
topic 只设置一个partition,这样所有的数据都往这一个partition发,能保证有序
但是对性能可就大打折扣了,一旦数据量提升,且有隐患。
下游消费者对topic做分组时间排序,性能也差。
采用特征数据处理
producer.send(new ProducerRecord<>(topic,messageNo,messageStr))
对这个 方法中的messageNo做文章。
messageNo = database.table.key
比如上面那条操作的key为 id = 100
messageNo = bigdata.ruozedata.100
计算partition
假设 hash(bigdata.ruozedata.100) = 99
99 % 3 = 0
数据分发到 p0 分区上
p0:insert u1 u2 u3 u4 delete
p1:
p2:
可能 bigdata.ruozedata.666 发送到了p1 分区上
p0:
p1: insert u1 u2 u3 u4 delete
p2:
这样就能保证对某一条数据的操作分发到单个partition中去,从而保证全局的有序性。
7 Kafka中的分区器、序列化器、拦截器是否了解?它们之间的处理顺序是什么?
-
分区器
根据键值确定消息应该处于哪个分区中,默认情况下使用轮询分区,可以自行实现分区器接口自定义分区逻辑 -
序列化器
键序列化器和值序列化器,将键和值都转为二进制流 还有反序列化器 将二进制流转为指定类型数据 -
拦截器
- doSend()方法会在序列化之前完成
- onAcknowledgement()方法在消息确认或失败时调用 可以添加多个拦截器按顺序执行
-
调用顺序:
拦截器doSend() -> 序列化器 -> 分区器
8 Kafka生产者客户端的整体结构是什么样子的?使用了几个线程来处理?分别是什么?
Producer架构 producer完整架构整个生产者客户端由两个线程协调运行,这两个线程分别为主线程和 Sender 线程(发送线程)。在主线程中由 KafkaProducer 创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器(RecordAccumulator,也称为消息收集器)中。Sender 线程负责从 RecordAccumulator 中获取消息并将其发送到 Kafka 中。RecordAccumulator 主要用来缓存消息以便 Sender 线程可以批量发送,进而减少网络传输的资源消耗以提升性能。
9 “消费组中的消费者个数如果超过topic的分区,那么就会有消费者消费不到数据”这句话是否正确?如果正确,那么有没有什么hack的手段?
一般来说如果消费者过多,出现了消费者的个数大于分区个数的情况,就会有消费者分配不到任何分区。
开发者可以继承AbstractPartitionAssignor实现自定义消费策略,从而实现同一消费组内的任意消费者都可以消费订阅主题的所有分区。
10 消费者提交消费位移时提交的是当前消费到的最新消息的offset还是offset+1?
在旧消费者客户端中,消费位移是存储在 ZooKeeper 中的。而在新消费者客户端中,消费位移存储在 Kafka 内部的主题__consumer_offsets 中。当前消费者需要提交的消费位移是offset+1。
11 有哪些情形会造成重复消费?
- Rebalance
一个consumer正在消费一个分区的一条消息,还没有消费完,发生了rebalance(加入了一个consumer),从而导致这条消息没有消费成功,rebalance后,另一个consumer又把这条消息消费一遍。 - 消费者端手动提交
如果先消费消息,再更新offset位置,导致消息重复消费。 - 消费者端自动提交
设置offset为自动提交,关闭kafka时,如果在close之前,调用 consumer.unsubscribe() 则有可能部分offset没提交,下次重启会重复消费。 - 生产者端
生产者因为业务问题导致的宕机,在重启之后可能数据会重发
11 哪些情景会造成消息漏消费?
- 自动提交
设置offset为自动定时提交,当offset被自动定时提交时,数据还在内存中未处理,此时刚好把线程kill掉,那么offset已经提交,但是数据未处理,导致这部分内存中的数据丢失。 - 生产者发送消息
发送消息设置的是fire-and-forget(发后即忘),它只管往 Kafka 中发送消息而并不关心消息是否正确到达。不过在某些时候(比如发生不可重试异常时)会造成消息的丢失。这种发送方式的性能最高,可靠性也最差。 - 消费者端
先提交位移,但是消息还没消费完就宕机了,造成了消息没有被消费。自动位移提交同理 - acks没有设置为all
如果在broker还没把消息同步到其他broker的时候宕机了,那么消息将会丢失
12 当你使用 kafka-topics.sh 创建(删除)了一个 topic 之后,Kafka 背后会执行什么逻辑?
-
会在 zookeeper 中的/brokers/topics 节点下创建一个新的 topic 节点,如:/brokers/topics/first 该节点中记录了该主题的分区副本分配方案
-
触发 Controller 的监听程序 ,kafka Controller 负责 topic 的创建工作,并更新元数据信息。
13 topic的分区数可不可以增加?如果可以怎么增加?如果不可以,那又是为什么?
可以增加,当分区数增加时,就会触发订阅该主题的所有 Group 开启 Rebalance。首先,Rebalance过程中,所有 Consumer 实例都会停止消费,等待 Rebalance 完成。然后所有 Consumer 实例共同参与,全部重新分配所有分区。其实更高效的做法是尽量减少分配方案的变动。
bin/kafka-topics.sh --zookeeper localhost:2181/kafka --alter --topic topic-config --partitions 3
14 topic的分区数可不可以减少?如果可以怎么减少?如果不可以,那又是为什么?
不支持,因为删除的分区中的消息不好处理。如果直接存储到现有分区的尾部,消息的时间戳就不会递增,如此对于 Spark、Flink 这类需要消息时间戳(事件时间)的组件将会受到影响;如果分散插入现有的分区,那么在消息量很大的时候,内部的数据复制会占用很大的资源,而且在复制期间,此主题的可用性又如何得到保障?与此同时,顺序性问题、事务性问题,以及分区和副本的状态机切换问题都是不得不面对的。
15 Kafka有内部的topic吗?如果有是什么?有什么所用?
__consumer_offsets,保存消费者offset
16 kafka分区分配策略
kafka中每个主题一般都会有很多个分区,为了及时消费到数据,我们可能会启动很多个消费者去一个消费topic中的数据。每个分区只能由消费组内的一个消费者去消费。那么,同一个消费组内的消费者是如何确定消费哪些分区的数据呢?
kafka内部中存在两种分配策略:Range和RoundRobin。
kafka分配分区的条件:1)同一个消费组内消费者的新增、关闭或崩溃,2)订阅的主题新增分区。
- Range:
是对每个主题而言的。首先按照分区序号排序,然后将消费者排序。分区数/消费者数=m,如果m!=0,前m个消费者多消费一个分区(每个主题)
- RoundRobin:
使用RoundRobin策略有两个前提条件必须满足:1)同一个Consumer Group里面的所有消费者的num.streams必须相等;2)每个消费者订阅的主题必须相同。
所以这里假设前面提到的2个消费者的num.streams = 2。RoundRobin策略的工作原理:将所有主题的分区组成 TopicAndPartition 列表,然后对 TopicAndPartition 列表按照 hashCode 进行排序,分发给每一个消费者。(其实就是按分区名hash排序后平均分配给每一个消费者的线程)
总结:目前我们还不能自定义分区分配策略,只能通过partition.assignment.strategy参数选择 range 或 roundrobin。partition.assignment.strategy参数默认的值是range。
17 简述Kafka的日志目录结构?
每个分区对应一个文件夹,文件夹的命名为topic-0,topic-1,内部为.log和.index文件。
生产者生产的消息会不断追加到 log 文件末尾,为防止 log 文件过大导致数据定位效率低下,Kafka 采取了分片和索引机制,将每个 partition 分为多个 segment。每个 segment对应两个文件——“.index”文件和“.log”文件。这些文件位于一个文件夹下,该文件夹的命名规则为:topic 名称+分区序号。例如,first 这个 topic 有三个分区,则其对应的文件夹为 first-0,first-1,first-2。index 和 log 文件以当前 segment 的第一条消息的 offset 命名。下图为 index 文件和 log 文件的结构示意图。其中 “.index”文件存储大量的索引信息,“.log”文件存储大量的数据,索引文件中的元 数据指向对应数据文件中 message 的物理偏移地址。
.index和.log关系18 Kafka Controller 的作用?
它负责管理整个集群中所有分区和副本的状态。当某个分区的leader副本出现故障时,由controller负责为该分区选举新的leader副本。当检测到某个分区的ISR集合发生变化时,由controller负责通知所有broker更新其元数据信息。当使用kafka-topics.sh脚本为某个topic增加分区数量时,同样还是由控制器负责分区的重新分配。
19 Kafka中有那些地方需要选举?这些地方的选举策略又有哪些?
- partition leader(ISR)
- controller(先到先得)
20 失效副本是指什么?有那些应对措施?
osr中的副本,如果与leader通信后,会尝试与leader同步,同步的策略是首先将当前记录的hw之后的消息删除,然后与leader同步,当与leader基本同步之后(存储的消息的offset大于当前isr中的hw),就重新回到isr之中。
21 Kafka的那些设计让它有如此高的性能?
- 直接利用Linux操作系统的page cache缓存数据,而不是堆内存
- 分区
- 顺序写磁盘
- 零拷贝技术,提高60%数据发送性能
22 如果我指定了一个offset,Kafka Controller怎么查找到对应的消息?
例如,读取offset=368776的Message,需要通过如下两个步骤。
- 第一步:查找Segment File.
00000000000000000000.index表示最开始的文件,其实偏移量(offset)为0;第二个文件00000000000000368769.index的其实偏移量为368770(368769+1),依次类推。以其实偏移量命名并排序这些文件,只要根据offset二分查找文件列表,就可以快速定位到具体文件。
当offset=368776时,定位到00000000000000368769.index|log。
- 第二步:通过Segment File 查找Message。
通过第一步定位到Segment File,当offset=368776时,依次定位到00000000000000368769.index的元数据物理位置和00000000000000368769.log的物理偏移地址,然后再通过00000000000000368769.log顺序查找,知道offset=368776为止。
Segment Index File采取稀疏索引存储方式,可以减少索引文件大小,通过Linux mmap接口可以直接进行内存操作。稀疏索引为数据文件的每个对应Message设置一个元数据指针,它比稠密索引节省了更多的存储空间,但查找起来需要消耗更多的时间。
23 如果我指定了一个timestamp,Kafka怎么查找到对应的消息?
原理同上 但是时间的因为消息体中不带有时间戳 所以不精确
24 kafka的选举机制
-
控制器选举:控制器负责所有 topic 的分区副本分配和 leader 选举等工作。
- 所谓控制器就是一个Borker,在一个kafka集群中,有多个broker节点,但是它们之间需要选举出一个leader,其他的broker充当follower角色。集群中第一个启动的broker会通过在zookeeper中创建临时节点/controller来让自己成为控制器,其他broker启动时也会在zookeeper中创建临时节点,但是发现节点已经存在,所以它们会收到一个异常,意识到控制器已经存在,那么就会在zookeeper中创建watch对象,便于它们收到控制器变更的通知。
- 如果控制器由于网络原因与zookeeper断开连接或者异常退出,那么其他broker通过watch收到控制器变更的通知,就会去尝试创建临时节点/controller,如果有一个broker创建成功,那么其他broker就会收到创建异常通知,也就意味着集群中已经有了控制器,其他broker只需创建watch对象即可。
- 如果集群中有一个broker发生异常退出了,那么控制器就会检查这个broker是否有分区的副本leader,如果有那么这个分区就需要一个新的leader,此时控制器就会去遍历其他副本,决定哪一个成为新的leader,同时更新分区的ISR集合。
- 如果有一个broker加入集群中,那么控制器就会通过Broker ID去判断新加入的broker中是否含有现有分区的副本,如果有,就会从分区副本中去同步数据。
- 集群中每选举一次控制器,就会通过zookeeper创建一个controller epoch,每一个选举都会创建一个更大,包含最新信息的epoch,如果有broker收到比这个epoch旧的数据,就会忽略它们,kafka也通过这个epoch来防止集群产生“脑裂”。
- Zookeeper中还有一个与控制器有关的/controller_epoch节点,这个节点是持久(PERSISTENT)节点,节点中存放的是一个整型的controller_epoch值。controller_epoch用于记录控制器发生变更的次数,即记录当前的控制器是第几代控制器,我们也可以称之为“控制器的纪元”。controller_epoch的初始值为1,即集群中第一个控制器的纪元为1,当控制器发生变更时,每选出一个新的控制器就将该字段值加1。每个和控制器交互的请求都会携带上controller_epoch这个字段,如果请求的controller_epoch值小于内存中的controller_epoch值,则认为这个请求是向已经过期的控制器所发送的请求,那么这个请求会被认定为无效的请求。如果请求的controller_epoch值大于内存中的controller_epoch值,那么则说明已经有新的控制器当选了。由此可见,Kafka通过controller_epoch来保证控制器的唯一性,进而保证相关操作的一致性。
-
分区副本选取leader
- 如果某个分区的Leader挂了,那么其它跟随者follower将会进行选举产生一个新的leader,之后所有的读写就会转移到这个新的Leader上,在kafka中,其不是采用常见的多数选举的方式进行副本的Leader选举,而是会在Zookeeper上针对每个Topic维护一个称为ISR(in-sync replica,已同步的副本)的集合,显然还有一些副本没有来得及同步。只有这个ISR列表里面的才有资格成为leader(先使用ISR里面的第一个,如果不行依次类推,因为ISR里面的是同步副本,消息是最完整且各个节点都是一样的)。
25 KafkaConsumer是非线程安全的,那么怎么样实现多线程消费?
- 线程封闭,即为每个线程实例化一个 KafkaConsumer 对象一个线程对应一个 KafkaConsumer 实例,我们可以称之为消费线程。一个消费线程可以消费一个或多个分区中的消息,所有的消费线程都隶属于同一个消费组。
- 消费者程序使用单或多线程获取消息,同时创建多个消费线程执行消息处理逻辑。获取消息的线程可以是一个,也可以是多个,每个线程维护专属的 KafkaConsumer 实例,处理消息则交由特定的线程池来做,从而实现消息获取与消息处理的真正解耦。
26 简述消费者与消费组之间的关系?
- Consumer Group 下可以有一个或多个 Consumer 实例。这里的实例可以是一个单独的进程,也可以是同一进程下的线程。在实际场景中,使用进程更为常见一些。
- Group ID 是一个字符串,在一个 Kafka 集群中,它标识唯一的一个 Consumer Group。
- Consumer Group 下所有实例订阅的主题的单个分区,只能分配给组内的某个 Consumer 实例消费。这个分区当然也可以被其他的 Group 消费。
27 创建topic时如何选择合适的分区数?
根据集群的机器数量和需要的吞吐量来决定适合的分区数.
在 Kafka 中,性能与分区数有着必然的关系,在设定分区数时一般也需要考虑性能的因素。对不同的硬件而言,其对应的性能也会不太一样。
可以使用Kafka 本身提供的用于生产者性能测试的 kafka-producer- perf-test.sh 和用于消费者性能测试的 kafka-consumer-perf-test.sh来进行测试。
增加合适的分区数可以在一定程度上提升整体吞吐量,但超过对应的阈值之后吞吐量不升反降。如果应用对吞吐量有一定程度上的要求,则建议在投入生产环境之前对同款硬件资源做一个完备的吞吐量相关的测试,以找到合适的分区数阈值区间。
分区数的多少还会影响系统的可用性。如果分区数非常多,如果集群中的某个 broker 节点宕机,那么就会有大量的分区需要同时进行 leader 角色切换,这个切换的过程会耗费一笔可观的时间,并且在这个时间窗口内这些分区也会变得不可用。
分区数越多也会让 Kafka 的正常启动和关闭的耗时变得越长,与此同时,主题的分区数越多不仅会增加日志清理的耗时,而且在被删除时也会耗费更多的时间。
28 优先副本是什么?它有什么特殊的作用?
优先副本 会是默认的leader副本 发生leader变化时重选举会优先选择优先副本作为leader
29 Kafka有哪几处地方有分区分配的概念?简述大致的过程及原理
创建主题时
如果不手动指定分配方式 有两种分配方式
消费组内分配
30 聊一聊你对Kafka的Log Retention的理解?
kafka留存策略包括 删除和压缩两种
删除: 根据时间和大小两个方式进行删除 大小是整个partition日志文件的大小
超过的会从老到新依次删除 时间指日志文件中的最大时间戳而非文件的最后修改时间
压缩: 相同key的value只保存一个 压缩过的是clean 未压缩的dirty 压缩之后的偏移量不连续 未压缩时连续
31 聊一聊你对Kafka的Log Compaction的理解?
32 Kafka的旧版Scala的消费者客户端的设计有什么缺陷?
老版本的 Consumer Group 把位移保存在 ZooKeeper 中。Apache ZooKeeper 是一个分布式的协调服务框架,Kafka 重度依赖它实现各种各样的协调管理。将位移保存在 ZooKeeper 外部系统的做法,最显而易见的好处就是减少了 Kafka Broker 端的状态保存开销。
ZooKeeper 这类元框架其实并不适合进行频繁的写更新,而 Consumer Group 的位移更新却是一个非常频繁的操作。这种大吞吐量的写操作会极大地拖慢 ZooKeeper 集群的性能。
33 聊一聊你对Kafka底层存储的理解(页缓存、内核层、块层、设备层)
34 聊一聊Kafka的延时操作的原理
35 聊一聊Kafka控制器的作用
36 消费再均衡的原理是什么?(提示:消费者协调器和消费组协调器)
37 Kafka中的幂等是怎么实现的?
38 Kafka中的事务是怎么实现的?
39 Kafka中有那些地方需要选举?这些地方的选举策略又有哪些?
40 失效副本是指什么?有那些应对措施?
41 多副本下,各个副本中的HW和LEO的演变过程
42 为什么Kafka不支持读写分离?
43 Kafka在可靠性方面做了哪些改进?(HW, LeaderEpoch)
44 Kafka中怎么实现死信队列和重试队列?
45 Kafka中的延迟队列怎么实现
46 Kafka中怎么做消息审计?
47 Kafka中怎么做消息轨迹?
48 Kafka中有那些配置参数比较有意思?聊一聊你的看法
49 Kafka中有那些命名比较有意思?聊一聊你的看法
50 Kafka有哪些指标需要着重关注?
51 怎么计算Lag?(注意read_uncommitted和read_committed状态下的不同)
52 Kafka有什么优缺点?
参考58和59
53 还用过什么同质类的其它产品,与Kafka相比有什么优缺点?
参考59
54 为什么选择Kafka?
参考58和59
55 在使用Kafka的过程中遇到过什么困难?怎么解决的?
消费者使用过程中一定要注意不能阻塞poll方法逻辑,一旦阻塞很造成消费业务的不健康现象。
• 问题一
现象:消费到消息后去进行消费进度提交时提交失败。
可能原因:消费消息和处理消息同步进行,整体流程为“消费->处理->提交”。其中处理过程耗时过久,导致消费者和服务端超时断开,此时进行消费提交会报失败,默认超时时间为5分钟。
• 问题二
现象:多消费者并发消费,但是消费速度极慢,导致消费堆积。从服务端日志可以看到大量的消费组rebalance相关的日志。
可能原因:消费逻辑与问题一相同,需要注意的是消费者在poll方法被阻塞时无法对服务端请求做出回应。消费者断开后执行重连,从而使消费组进入Rebalancing状态并通知所有相关消费者,此时若存在消费者阻塞,无法及时响应,则会导致Rebalance动作无法及时完成,此过程中所有消费者都无法消费。
优化建议:优化处理逻辑,快速处理数据;消费和处理进行解耦,使用不同的线程处理;创建消费者时适当减小max.poll.records的配置,默认为500,减少单次消息处理时间。
56 怎么样才能确保Kafka极大程度上的可靠性?
-
生产者发消息不遗漏
- 本地消息表记录发送状态,或者存放在Redis等中
- 记录失败的消息,然后定时任务批量处理失败的消息。
- 定时任务有可能重发,因为有时候,失败也不是真的失败,也许是消息发了,但是没有返回成功的值。不过,这个没关系,因为消费端已经做了处理,不会重复消费的。
-
生产者不重复发消息
- 消费者保证不重复消费
-
生产者发消息不乱序
- producer ID + sequence number
- 如果消息序号比Broker维护的序号差值比一大,说明中间有数据尚未写入,即乱序,此时Broker拒绝该消息,Producer抛出InvalidSequenceNumber
- 如果消息序号小于等于Broker维护的序号,说明该消息已被保存,即为重复消息,Broker直接丢弃该消息,Producer抛出DuplicateSequenceNumber
- Sender发送失败后会重试,这样可以保证每个消息都被发送到broker
-
消费者消费消息不遗漏
- 关闭自动提交offset, 消息消费成功后,手动提交offset.
-
消费者不重复消费
- 可以根据业务指定唯一性的key, 存放到一个地方(数据库、redis等),推荐redis或者mongodb.每次消费的时候,如果发现已经消费过,则直接跳过。
57 聊一聊你对Kafka生态的理解
- confluent全家桶(connect/kafka stream/ksql/center/rest proxy等)
- 开源监控管理工具kafka-manager,kmanager等
58 Kafka跟RocketMQ吞吐量如何?其他方面呢?
- 吞吐量
- Kafka单机写入TPS达到百万条/秒,消息大小10个字节
- rocketmq单机写入TPS约7万条/秒,单机部署3个broker可以达12万条/秒,消息大小10个字节
- 数据可靠性
- Kafka支持异步刷盘、异步replication
- rocketmq支持异步实时刷盘、同步刷盘、异步replication、同步replication
- 消息投递实时性
- Kafka使用短轮询方式,实时性取决于轮询间隔时间
- RocketMQ使用长轮询,同Push方式实时性一致,消息的投递延时通常在几个毫秒。
- 消息失败重试
- Kafka消费不支持失败重试
- rocketmq消费失败支持定时重试,每次重试时间间隔顺延
- 分布式事务消息
- Kafka不支持事务消息
- rocketmq非开源版支持事务消息
- 消息回溯
- Kafka理论上可以根据offset来回溯消息
- rocketmq支持按照时间回溯消息,精度毫秒级
- 消息轨迹
- Kafka不支持消息轨迹
- rocketmq非开源版支持消息轨迹
- 消息查询
- Kafka不支持消息查询
- rocketmq支持根据message ID查询,也支持按照消息内容查询
- 定时消息
- Kafka不支持定时消息
- rocketmq支持定时level和指定毫秒级的延时时间
- 单机支持的队列数
- Kafka单机超过64个队列/分区,Load会发生明显的飙高现象,队列越多,load越高,发送消息响应时间变长
- RocketMQ单机支持最高5万个队列,Load不会发生明显变化
- 单机可以创建更多Topic,因为每个Topic都是由一批队列组成
- Consumer的集群规模和队列数成正比,队列越多,Consumer集群可以越大
59 消息吞吐量为什么Kafka>rocketmq>rabbitmq?rocketmq哪些方面导致吞吐量比Kafka小?
-
Kafka单机写入TPS达到百万,rocketmq单机写入TPS达到7万,单机部署3个broker可达12万,rocketmq差不多5万
-
Kafka和rocketmq都采用顺序IO和零拷贝;rabbitmq采用AMQP协议,实现非常重量级,起源于金融系统,为了保证可靠性在吞吐量上做了取舍。
-
Kafka采用异步发送机制,producer将小消息合并,批量发向broker
- producer发送一条消息并没有直接立即发送到broker而是缓存起来,立即返回消息发送成功,当缓存的消息达到一定数量时再批量发送到broker,减少了网络IO,提高了消息发送性能。
- 如果producer宕机,会导致消息丢失,Kafka利用此机制提高了性能降低了可靠性
-
rocketmq没有采用Kafka这种机制,原因是:
- rocketmq采用java语言开发,缓存过多消息,GC是个严重问题
- producer发送消息,消息未发给broker,向业务返回成功,若producer宕机,消息丢失
- producer通常为分布式系统,每台机器都是多线程发送,rocketmq作者认为线上系统单个producer每秒产生的数据量有限,不可能上万
- 消息缓存的功能可以由上层应用完成
-
当broker里面topic的partition数量过多,Kafka的性能远远不如rocketmq,因为两者在存储机制上的不同。
- kafka和rocketMq都使用文件存储,但是,kafka是一个分区一个文件,当topic过多,分区的总量也会增加,kafka中存在过多的文件,当对消息刷盘时,就会出现文件竞争磁盘,出现性能的下降。
- 一个partition(分区)一个文件,顺序读写。这样带来的影响是,一个分区只能被一个消费组中的一个 消费线程进行消费,因此可以同时消费的消费端也比较少。
- rocketMq中,所有的队列都存储在一个文件中,每个队列的存储的消息量也比较小,因此topic的增加对rocketMq的性能的影响较小。也从而rocketMq可以存在的topic比较多,可以适应比较复杂的业务。
- 所有的队列存储一个文件(commitlog)中,所以rocketmq是顺序写io,随机读。每次读消息时先读逻辑队列consumQue中的元数据,再从commitlog中找到消息体。增加了开销。
60 kafka跟rabbitmq区别?
kafka&rabbitmq61 Kafka跟MQ对比?
-
ActiveMQ是Apache的老牌消息队列,单机吞吐量万级,主从架构实现HA,有较低概率丢失数据,官方社区对5.x维护越来越小,缺乏大规模吞吐的场景使用。
-
Rabbitmq采用AMQP协议,实现非常重量级,单机吞吐量万级,源于金融系统,为了可靠性降低了吞吐量,天生高并发的语言erlang开发,二次开发困难。
-
rocketmq阿里开发,单机写入TPS达7万条/秒,单机部署3个broker达到12万,阿里大量用于金融系统,java语言开发,对C++支持不成熟。
-
Kafka单机写入TPS百万级,起源于日志系统,被大数据领域广泛采用于数据采集、传输、存储等,消息异步发送、合并小消息批量发送给broker,提高吞吐量同时降低可靠性。
62 kafka使用建议
生产消息:
• 生产消息消息发送失败需要有重试机制。建议重试3次,通过参数:retries=3 配置。
• 生产的callback函数不能阻塞,否则会阻塞客户端消息的发送对于时延敏感消息,设置发送优化:linger.ms=0。
消费消息:
• 使用长连接pull模式消费消息,不要消费结束就关闭consumer通道,这样会导致频繁rebalance,阻塞消费。
• consumer需周期性poll(建议间隔为200毫秒),维持和server端的心跳,避免因为心跳超时导致consumer频繁加入和退出,阻塞消费。
• consumer拉去消息的逻辑poll方法不能有阻塞。
• consumer数量不能超过topic的分区数,否则会有consumer拉取不到消息。
• 确保处理完消息后再做消息commit,避免业务消息处理失败,无法重新拉取处理失败的消息。
• Kafka不能保证消费重复的消息,业务侧需保证消息处理的幂等性。