Kafka基础
kafka是一个分布式基于发布/订阅模式的消息队列,主要应用于大数据实时处理领域。
- 发布/订阅:消息的发布不会将消息直接发送给特定的订阅者,而是将发布的消息分为不同的类型,订阅者只接收感兴趣的消息。
- 最新定义:是一个开源的分布式事件流平台,用于高性能的数据管道流分析、数据集成和关键任务应用。
- 传统应用场景:削峰、解耦、异步。
1. 发送消息模式和基础架构
点对点模式
消费者主动拉取数据,消息收到后清除消息,只有一个topic和一个消费者。
发布/订阅模式
可以有多个topic主题,消费者消费数据后,不删除数据,每个消费者相互独立,都可以消费到数据。
1.1 基础架构
- 为方便扩展并提高吞吐量,一个topic分为多个partition(分区)。
- 配合分区设计,提出消费者组的概念,组内每个消费者并行消费,一个分区的数据只能由一个消费者组内的一个消费者进行消费。
- 为提高可用性。为每个partition增加副本,消费者只针对leader副本进行消费,follower的作用就是当leader挂了升为leader。
-
zookeeper中记录谁是leader,那些broker是正常的。
2. 消息发送原理
在消息发送过程中,涉及到两个线程——main线程和sender线程,在main线程中创建一个双端队列RecordAccumulator。main线程将消息发送给双端队列,sender线程不断从队列中拉取消息发送到broker。
2.1 发送流程
- 外部数据由producer发出会由main线程发送消息。
- 在发送数据过程中可能会遇到拦截器(可选)。
- 通过序列化器使用自己自带的序列化器进行序列化。
- 通过分区器指定发到哪一个双端队列中。
双端队列大小默认是32m,内部每个批次请求(RecordBatch)的大小是16kb,双端队列中还有一个内存池,在发送批次数据时,会创建批次大小会从内存池中取出对应的内存,发往kafka集群后就会释放内存归还到内存池
- Sender线程会从双端队列拉取数据,这里有两个条件:
batch.size
只有累计到达该值之后,sender才会发送数据,默认是16kb。
linger.ms
(单位毫秒) 如果数据迟迟未到达 batch.size,sender等待设置的时间到了之后就会发送数据,默认是0ms表示没有延迟。以上两种是或的关系,满足一项就可以发送数据。 - 将分区中的数据以节点的方式发送,默认每个broker节点最多缓存5个请求。
- 通过selector将通道打通开始发送数据,集群收到数据后开始同步到副本,完成之后进行应答:
0
: 生产者发过来的数据,不需要等待数据落盘应答。
1
: 生产者数据发送过来之后,等待leader应答。
-1(all)
: 生产者发送过来的数据leader和ISR队列里面所有的节点收起数据后应答;如果应答成功,清理对应的请求,同时清理分区的数据;没有应答成功,则进行重试,重试的最大值默认是int的最大值。
2.2 发送方式
- 发送并忘记:指的是外部数据到指定分区的过程,不关心消息是否到达,对返回结果不做任何判断。
- 回调异步发送:消息发送失败会自动重试,不需要在回调函数中手动重试,由双端队列返回主题、分区等信息。
- 同步发送:在异步方法后面调用get()就是同步发送,每次发送数据都要等上一次数据走完整个流程才能继续发送。
3. 生产者分区
3.1 优势
- 便于合理利用存储资源,每个partition在broker上存储,可以把海量数据按照分区切割成一块一块的数据存储在多台Broker中,合理控制分区的任务,可以实现负载均衡的效果。
- 提高并行度,生产者可以以分区为单位发送数据,消费者可以以分区为单位消费数据
3.2 分区策略
- 指定分区器:直接将指明的值作为partition的值。
- 没有指明分区器的情况:将key的hash值与topic的分区数进行取余。
- 粘性分区器:既没有key也没有指明,则选择粘性分区器,随机选择一个分区,并尽可能一直使用该分区,待该分区的batch的值满了再选择其他分区。kafka默认的分区器:
DefultPartitioner
- 自定义分区器:实现partitioner接口重写partition方法。
生产者如何提高吞吐量
- batch.size:批次大小,默认16kb
- linger.ms:等待时间,修改为5--100ms,时间太长会导致数据延迟。
- compression.type:压缩
- RecordAccumulator:缓冲区大小修改为64m
4. 数据可靠性与数据有序性
4.1 ack应答级别
- ack = 0:用的少,数据可靠性低。
- ack = 1:也存在丢数据的问题,在leader未来得及同步给follower时,leader挂了,但是leader已经ack了这条消息,那么重新选举的新的leader并不知道上一条消息,producer也不会重发,所以数据会丢失,但是相对于0的情况丢失概率低一点。
- ack = -1:可靠性高,但是存在一个follower故障不能收到应答的问题,所以leader维护了一个ISR,意为和leader保持同步的follower+leader集合,如果follower长时间没有向leader发送通信请求或同步数据,则follower将被踢出ISR,该时间由
replica.lag.time.max.ms
参数设置,默认是30s,这样就不用等待长期联系不上或已经故障的节点。当分区副本数设置为1或者ISR设置最小副本数为1,这样就等同于ack=1。
数据完全可靠条件
ack = -1 + 分区副本数 >= 2 + ISR最小副本数 >= 2
4.2 数据重复
- 至少一次:数据完全可靠性条件,不能保证数据不重复。
- 最多一次:ack = 0,不能保证数据可靠。
- 精确一次:kafka 0.11版本后引入了幂等性和事务
幂等性
producer不管向broker发送多少条数据,broker只会持久化一条数据。
重复数据判断标准:具有<PID.partition.seqNumber>
相同全键的消息提交时,broker只会持久化一条,其中PID是kafka每次重启都会分配一个新的;partition表示分区号;seqNumber表示单调自增的数,所以幂等性只能保证单分区单会话的不重复。
4.3 事务
开启事务的前提是开启了幂等性,生产者在使用事务前必须自定义一个唯一的事务id,有了事务id,即使客户端挂了,重启后也能继续处理未完成的事务,Kafka的事务特性就是要确保跨分区的多个写操作的原子性。
- transactionalId 与 PID相对应,为了保证新的 Producer 启动之后,具有相同的 transactionalId 的旧生产者立即失效,每个 Producer 通过 transactionalId 获取 PID 的时候,还会获取一个单调递增的 producer epoch。
- Kafka 的事务主要是针对 Producer 而言的。对于 Consumer,考虑到日志压缩(相同 Key 的日志被新消息覆盖)、可追溯的 seek() 等原因,Consumer 关于事务语义较弱。
- 对于 Kafka Consumer,在实现事务配置时,一定要关闭自动提交的选项,即
props.put("ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false")
事务原理
- producer发送查找事务协调者 (TransactionalCoordinator) 的请求 (FindCoordinatorRequest),Broker 集群根据 Request 中包含的 transactionalId 查找对应的 TransactionalCoordinator 节点并返回给 Producer。
- 事务协调器返回pid给producer,分两种情况:
- 不包含 transactionId:直接生成一个新的 Producer ID,返回给生产者客户端。
- 包含 transactionId:根据 transactionId 获取 PID,这个对应关系保存在事务日志中。
注:如果 TransactionalCoordinator 第一次收到包含该 transactionalId 的消息,则将相关消息存入主题 __transaction_state 中。
- 生产者通过方法
producer.beginTransaction()
启动事务,此时只是生产者内部状态记录为事务开始。对于事务协调者,直到生产者发送第一条消息,才认为已经发起了事务。 - Producer 在向新分区发送数据之前,首先向 TransactionalCoordinator 发送请求,使 TransactionalCoordinator 存储对应关系 (transactionalId, TopicPartition) 到主题 __transaction_state 中。
- 生产者发送事务消息到分区,发送的请求中,包含 pid, epoch, sequence number 字段。
epoch
:生产者用于标识同一个事务 ID 在一次事务中的轮数,每次初始化事务的时候,都会递增,从而让服务端知道生产者请求是否为旧的请求。如果检查当前事务提交者的epoch不是最新的,那么就会拒绝该Producer的请求。从而达成拒绝僵尸实例的目标。 - 生产者通过
producer.sendOffsetsToTransaction()
接口,发送分区的 Offset 信息到事务协调者,协调者将分区信息增加到事务中。 - 在前面生产者调用事务提交 offset 接口后,会发送一个 TxnOffsetCommitRequest 请求到消费组协调者,消费组协调者会把 offset 存储到 Kafka 内部主题 __consumer_offsets 中。协调者会根据请求的 pid 与 epoch 验证生产者是否允许发起这个请求。只有当事务提交之后,offset 才会对外可见。
- 用户调用
producer.commitTransaction()
或abortTransaction()
方法,提交或回滚事务,此时会执行一个两阶段提交:- EndTxnRequest:生产者完成事务之后,客户端需要显式调用结束事务,或者回滚事务。前者使消息对消费者可见,后者使消息标记为 abort 状态,对消费者不可见。无论提交或者回滚,都会发送一个 EndTxnRequest 请求到事务协调者,同时写入 PREPARE_COMMIT 或者 PREPARE_ABORT 信息到事务记录日志中。
- WriteTxnMarkerRequest:事务协调者收到 EndTxnRequest 之后,其中包含消息是否对消费者可见的信息,然后就需要向事务中各分区的 Leader 发送消息,告知消费者当前消息是哪个事务,该消息应该接受还是丢弃。每个 Broker 收到请求之后,会把该消息应该 commit 或 abort 的信息写到数据日志中。
4.4 数据有序
kafka保证了单分区有序,无法保证多分区有序
- kafka 1.x版本之前保证单分区有序条件:max.in.flight.request.per.connection = 1 (不需要考虑幂等性)
- 1.x 版本之后保证单分区有序条件:
- 未开启幂等性:max.in.flight.request.per.connection = 1
- 开启幂等性:max.in.flight.request.per.connection <= 5
在kafka 1.x版本后启用幂等性后,kafka服务端会缓存producer发送过来的5个request的元数据,故无论如何都可以保证最近的5个request是有序的,如果出现乱序问题,那么会在服务端重新排序后才会落盘。
5. Broker
缓存代理,Kafka 集群中的一台或多台服务器统称为 broker
5.1 broker工作流程
zookeeper存储的kafka信息
- /kafka/brokers/ids: [0,1,2] 记录有哪些正常的服务器
- /kafka/brokers/topic/first/partitions/0/state: {"leader":1,"isr":[0,1,2]} 记录谁是leader,有哪些副本可用
- /kafka/consumer:0.9版本前用于存储offset信息,之后offset存储在kafka主题中,这样的方式提高了效率,避免kafka和zookeeper频繁通信。
- /kafka/controller: 辅助选举leader
总体工作流程
- broker启动后在zk中注册信息。
- 每一个broker内部有一个controller会注册到zk中,谁先注册谁就监听brokers节点变化。
- controller决定leader选举,选举规则:在isr存活的前提下,按照AR(kafka分区中的所有副本统称)中排在前面的优先,然后按照轮询选举出新的leader。
- 选举完成后controller将节点信息上传到zk存储topic信息的节点。
- 其他controller从zk中同步信息到自己的controller中。
- 假设一台broker挂了,controller监听到节点变化,就会获取isr选举新的leader更新信息到zk中。
5.2 kafka副本
- 作用:提高数据可靠性。
- kafka默认一个副本,生产一般配置为两个保证数据可靠性,太多副本会增加磁盘存储空间,增加网络上数据和传输,降低效率。
- kafka中的副本分为leader和follower,kafka生产者只会把数据发往leader,然后follower找leader进行数据同步。
- kafka分区中副本的统称为AR,AR=ISR+OSR(OSR表示follower与leader副本同步时延迟过多的副本)
副本故障处理
LEO:
每个副本的最后一个offset,leo其实就是offset+1。
HW:
所有副本中最小的LEO。
leader故障
leader故障后会从ISR中选举出一个新的leader,为保证多个副本之间的数据一致性,其余的follower会将各自的log文件高于HW的部分截掉,然后从新的leader同步数据。这只能保证各个副本之间的数据一致性,并不能保证数据不丢失或不重复。
follower故障
- follower发生故障后被提出ISR
- 期间其他leader和follower继续接收数据
- 等待该follower恢复后,follower会读取本地磁盘记录的上次的HW,并将log文件高于HW的部分截掉,从HW开始向leader进行同步
- 等待该follower的LEO大于等于该partition的HW,即follower追上leader后,就可以重新加入ISR了
5.3 分区副本分配
leader partition自动平衡:一般不建议开启,浪费集群大量的平衡操作,正常情况下kafka本身会自动把leader partition均匀分散到各个机器来保证每台机器的读写吞吐量是均匀的,但是如果某些broker宕机了,会导致leader partition过于集中在其他少部分几台broker上,这会导致少数几台broker的读写请求压力过高,其他宕机的leader重启之后都是follower partition,读写请求很低,造成集群负载不均衡。
-
auto.leader.rebalance.enable:true
:默认是true,自动平衡 -
leader.imbalance.per.broker.percentage:10%
:默认是10%。每个broker允许的不平衡的leader比列,如果每个broker超过这个比例,控制器会触发leader的平衡。 -
leader.imbalance.check.interval.seconds:300s
:默认是300s,检查leader负载时候平衡的间隔时间。
5.4 文件存储
文件存储机制
topic是一个逻辑上的概念,而partition是一个物理概念,每个partition对应于一个log文件,该log文件中存储的就是producer生产的数据。producer生产的数据会不断追加到log文件的末端,为了防止log文件过大导致数据定位效率低下,kafka采用分片和索引机制,将每个partition分为多个segment,每个segment包括一个index文件
,log文件
和time index文件
等,这些文件位于一个文件夹下,该文件夹的命名规则为topic+分区序号
例如:topicA-0。
- index为稀疏索引,大约每往log文件写入4kb数据,就会往index文件写入一条索引,配置参数:
log.index.interval.bytes:4kb
。 - index文件中保存的offset为相对offset,这样能保证offset的值所占空间不会过大,因此能将offset的值控制在固定大小。
查找流程
- 根据目标offset定位segment文件
- 找到大于等目标offset的最大offset对应的索引项
- 定位到log文件
- 向下遍历找到目标记录
文件清除策略
kafka中默认的日志保存时间是7天,可以通过调整如下参数修改保存时间:
-
log.retention.hours
最低优先级小时,默认是7天 -
log.retention.minutes
分钟 -
log.retention.ms
毫秒 -
log.retention.check.interval.ms
负责设置检查周期,默认是5分钟,一旦超过设置的时间,就会执行清除策略
两种日志清除策略
- delete:日志删除
-
log.cleanup.policy=delete
:所有数据启用删除策略。- 基于时间:默认打开,以segment中所有记录中的最大时间戳作为该文件的时间戳
- 基于大小:默认关闭,超过设置的所有日志大小,删除最早的segment。
log.retention.byte
默认为-1,表示无穷大。
-
- compact:压缩日志
-
log.cleanup.policy=compact
:所有数据启用压缩日志。- 对于相同key的不同value值,只保留最后一个版本,压缩后offset可能是不连续的。
-
5.5 高效读写数据
- kafka本身是分布式集群,可以采用分区技术,并行度高。
- 读数据采用稀疏索引,可以快速定位要消费的数据。
- 顺序写磁盘,比随机写磁盘快,因为省去了大量的磁头寻址的时间。
- 依赖页缓存和零拷贝技术。
- 零拷贝:kafka的数据处理操作加油kafka生产者和kafka消费者处理,broker应用层不关心存储的数据,所以传输效率高。
-
页缓存(pageCache):kafka重度依赖操作系统的pageCache功能,当上层有写操作时,操作系统只是将数据写入pageCache,当读操作发生时,先从pageCache中查找,如果找不到,再去磁盘中读取,实际上pageCache是把尽可能多的空间内存都当作了磁盘缓存来使用。
6. 消费者
- pull模式:consumer采用从broker中主动拉取数据,kafka采用这种模式。实时性比较差,如果kafka没有数据,消费者就会陷入循环中,一直返回空数据。
- push模式:kafka集群监听内部是否有新的消息,如果有新的消息就会给消费者推送,由broker决定发送效率,很难适应消费者的消费速率。
6.1 消费者工作流程
一个消费者可以消费多个分区的数据,一个消费者组中的消费者不能够对同一分区的数据进行消费。offset是由每个消费者提交到系统主题中保存。
消费者组:多个消费者组成,所有消费者的groupId相同,那么他们就属于同一个消费者组,消费者组之间互不影响,所有的消费者都属于某个消费者组。
消费者组初始化流程
coordinator
:每一个broker中都有一个coordinator,作用是辅助实现消费者组的初始化和分区的分配。节点的选择= groupId的hashcode值 % 50(_consumer_offsets的分区数量)。
- 所有消费者组中的消费者都会向coordinator发送joinGroup请求。
- coordinator会从消费者中选出一个作为leader
- coordinator把要消费的topic情况发送给leader消费者
- leader指定消费方案并发送给coordinator
- coordinator就把消费方案下发给各个consumer
每个消费者会和coordinator保持心跳(默认3s),一旦超时(
session.timeout.ms=45s
)该消费者会被移除,并触发再平衡,或者消费者处理消息时间过长(max.poll.interval.ms=5分钟
)也会触发再平衡。
触发再平衡的条件:订阅的topic个数发生变化,订阅的topic分区发生变化
消费流程
- consumer发送sendFetches消费请求
- Fetch.min.bytes:每批次最小抓取大小,默认1字节
- Fetch.max.wait.ms:一批次数据未达到的最小超时时间,默认500ms
- Fetch.max.bytes :每批次最大抓取大小,默认50m
- 向kafka发送send,通过回调函数onSuccess将数据放到一个队列中
- FetchRecords 从队列中抓取数据,max.poll.records:一次拉取数据返回信息的最大条数,默认500条。
- 反序列化
- 拦截器
- 处理数据
消费者组内的消费者多于分区数量,那么就一定会有多余的消费者空闲,不会接受任何消息
6.2 分区分配以及再平衡
分区分配策略
可以通过配置参数partition.assignment.strategy
修改分区的分配策略,kafka可以同时支持使用多个分区分配策略。
-
Range
:针对一个topic而言,对每一个topic进行序号排序,对消费者也进行排序,通过过分区个数 / 消费者数来决定每个消费者应该消费几个分区,如果除不尽那么前面的消费者就会多一个分区。此种方式针对一个topic,消费多个分区影响不大,但是如果有多个topic,那么针对每一个topic,排在前面的消费者就会都多消费一个分区,会出现数据倾斜。 -
RoundRobin
:针对所有topic,轮询分区策略,把所有的分区和消费者都列出现,然后按照hashcode进行排序,最后通过轮询算法来分配分区给消费者。 -
Sticky
:粘性分区,可以理解为分配的结果带有粘性,即在执行一次新的分配前考虑上一次分配的结果,尽量少的调整分配的变动,可以节省大量的开销。
6.3 offset位移
0.9版本后consumer默认将offset保存在kafka一个内置的topic中:_consumer_offsets,该主题采用key,value的方式存储数据,key是groupId+topic+分区号,value就是当前offset的值,每隔一定时间kafka会对这个topic进行compact,也就是新值覆盖旧值。
自动提交offset
该方式可能会导致提前消费,开发人员难以把握offset提交的时机。
- enable.auto.commit:是否开启自动提交,默认是true。
- auto.commit.interval.ms:自动提交offset的时间间隔,默认是5s。
手动提交offset
- 同步提交(commitSync):阻塞当前线程,直到提交成功,并且会失败自动重试(也会出现提交失败)。
- 异步提交(commitAsync):没有失败重试机制,有可能提交失败,发送完提交offset请求后,就开始消费下一批数据。
指定offset消费
auto.offset.reset = earliest / latest(默认) / none
-
earliest
:自动将偏移量重置为最早的偏移量。 -
latest
:自动将偏移量重置为最新的偏移量。 -
none
:如果未找到消费者组的先前偏移量,则向消费者抛异常。
指定时间消费:将时间转换为对应的offset
6.4 漏消费和重复消费
- 重复消费:已经消费了数据,但是offset没有提交,自动提交offset引起。
- 漏消费:先提交offset后消费,有可能造成数据的漏消费,手动提交引起。
消费者事务:如果想要完成消费者一次精确的消费,那么就需要将kafka消费端消费过程和提交offset过程做原子绑定,可以将offset提交到支持事务的其他介质中,比如MySQL。
6.5 消息积压
- 如果是kafka消费能力不足,则可以考虑增加topic的分区数,同时提升消费者则的消费者数量,消费者数=分区数,两者缺一不可。
- 如果是下游处理不及时,那么可以提高每次拉取的数量,批次拉取数据过少使处理的数据小于数据的生产速度也会造成消息积压。
7. kafka的kraft模式
kafka 2.8.0的新特性,由于原来版本重度依赖Zookeeper集群,在做元数据管理、Controller的选举等都需要依赖Zookeeper集群,当Zookeeper集群性能发生抖动时,kafka的性能也会收到很大的影响。因此,在kafka发展的过程当中,为了解决这个问题,提供kraft模式,来取消Kafka对Zookeeper的依赖。在 kraft 中,一部分 broker 被指定为控制器,这些控制器提供过去由 ZooKeeper 提供的共识服务。所有集群元数据都将存储在 kafka 主题中并在内部进行管理。
在 kraft 模式下,kafka 集群可以以专用或共享模式运行。在专用模式下,一些节点将其process.roles配置设置为controller,而其余节点将其设置为broker。对于共享模式,一些节点将process.roles设置为controller, broker并且这些节点将执行双重任务。采用哪种方式取决于集群的大小。
修改config/kraft/server.properties
process.roles=broker,controller
7.1 优势
- 更简单的部署和管理——通过只安装和管理一个应用程序,kafka 现在的运营足迹要小得多。这也使得在边缘的小型设备中更容易利用 Kafka;
- 提高可扩展性——kraft 的恢复时间比 ZooKeeper 快一个数量级。这使我们能够有效地扩展到单个集群中的数百万个分区。ZooKeeper 的有效限制是数万;
- 更有效的元数据传播——基于日志、事件驱动的元数据传播可以提高 Kafka 的许多核心功能的性能。