Kafka核心设计与实践原理总结:进阶篇
kafka作为当前热门的分布式消息队列,具有高性能、持久化、多副本备份、横向扩展能力。我学习了《深入理解Kafka:核心设计与实践原理总结》一书后,对其中主要的知识点进行了总结,便于理解和掌握kafka的原理和应用。在这里分享出来,希望也能帮助到大家。总结的知识点分为两部分:
1,基础篇:基本概念、生产者和消费者的使用和原理,以及主题和分区的管理...
2,进阶篇:深入解析kafka服务端(broker)、客户端的进阶原理(包括重分配、事务等)、kafka的高级应用...
本篇是进阶篇!
五、日志存储
1.文件目录布局
一个分区副本对应一个日志(Log),一个日志会分配成多个日志分段(LogSegment),Log在物理上以文件夹形式存储,而LogSegment对应磁盘上的一个日志文件和2个索引文件及可能的其他文件
向Log追加消息时是顺序写入的,只有最后一个LogSegment才能执行写入,称为activeSegment,满足一定条件时,需要创建新的activeSegment
每个日志及索引的文件名根据基准偏移量(BaseOffset)命名,表示当前LogSegment中第一条消息的offset
broker配置了多个根目录时,会挑选分区数最少的根目录来创建主题 ##### 2.日志格式
消息压缩
Kafka 会将多条消息一起进行压缩,生产者发送的压缩数据在 broker 中也是保持压缩状态进行存储的 ,消费者从服务端获取的也是压缩的消息,消费者在处理消息之前才会解压消息。
压缩方式通过compression.type来配置:producer、gzip、snappy、lz4、uncmpressed
消息压缩时,整个消息集压缩为内层消息,整体作为外层消息的value。外层消息的offset保存了内层消息最后一条记录的offset,而内层消息在压缩时会从0开始分配一个offset,内层消息的offset会在服务端进行转换。
v2版本消息结构
使用了变长整型(Varints)来保存数值 ##### 3.日志索引
偏移量索引(.index)
relativeOffset:相对偏移量,表示消息相对于baseOffset的偏移量,当前索引文件的文件名即为baseOffset值
position:消息在日志分段文件中的物理地址
每个索引占8个字节,分为两个部分:
时间戳索引(.timeindex)
timestamp:当前日志分段最大的时间戳
relativeOffset:时间戳对应的消息的相对偏移量 ##### 4.日志清理
每个索引占12个字节,分为两个部分:
可以通过broker端参数log.cleanup.policy设置日志清理策略(默认delete)
两种清理策略:
设置log.cleanup.policy为compact,并且将log.leaner.enable设置为true(默认true)
设置log.cleanup.policy为delete
日志删除:按照一定保留策略直接删除不符合条件的日志分段
日志压缩:针对每个消息的key进行整合,对于有相同key的不同value值,只保留最后一个版本
可同时使用日志删除和日志压缩两种策略
日志删除:
删除任务延迟时间通过file.delete.delay.ms配置(默认60000,1分钟)
基于时间的保留策略:
基于日志大小的保留策略:
基于日志起始偏移量的保留策略:
优先级ms>minutes>hours(默认log.retention.hours=168,7天)
通过log.retention.hours、log.retention.minutes和log.retention.ms来配置超时清理阈值
通过log.retention.bytes配置Log日志总大小阈值(默认-1,无穷大)
通过log.segment.bytes配置日志分段文件大小阈值(默认1G)
某日志分段的下一个日志分段的起始偏移量baseOffset是否小于等于logStartOffset,若是则可以删除此日志分段
logStartOffset 的值可以通过DeleteRecordsRequest请求(比如使用KafkaAdminClient的deleteRecords()方法、使用kafka-delete-records.sh脚本〉、日志的清理和截断等操作进行修改
Kafka的日志管理器中有一个专门的日志删除任务来周期性地检测和删除不符合保留条件的日志分段文件,周期通过 broker 端参数 log.retention.check.interval.ms来配置(默认300000,5分钟)
日志分段保留策略有3种:
删除日志分段时,首先会从Log对象中所维护日志分段的跳表中移出待删除分段,以保证没有现成对其进行读取,然后将对应文件加上.deleted后缀,最后由名为delete-file的延迟任务来删除文件
日志压缩(Log Compaction)
如果一条消息的key不为null,但是其value为null,那么此消息就是墓碑消息。日志清理线程发现墓碑消息时会先进行常规的清理,并保留墓碑消息一段时间。墓碑消息的保留条件是所在的日志分段的最近修改时间 lastModifiedTime大于deleteHorizonMs。deleteHorizonMs为clean部分中最后一个日志分段的最近修改时间减去保留闽值deleteRetionMs(通过 broker 端参数log.cleaner.delete.retention.ms配置,默认86400000,即24小时) ##### 5.磁盘存储
污浊率:dirtyRatio = dirtyBytes / ( cleanBytes + dirtyBytes )
Log Compaction对于有相同key的不同value值,只保留最后一个版本。
每个日志目录下都有一个名为“cleaner-offset-checkpoint”的清理检查点文件,用来记录每个主题的每个分区中己清理的偏移量。通过清理检查点文件可以将 Log 分成两个部分。通过检查点cleaner checkpoint来划分出 一个己经清理过的clean部分和一个还未清理过的dirty部分。
注意Log Compaction是针对key的,所以在使用时应注意每个消息的key值不为null。每个broker会启动log.cleaner.thread(默认1)个日志清理线程负责执行清理任务, 这些线程会选择“污浊率”最高的日志文件进行清理。
为了防止日志不必要的频繁清理操作,使用参数log.cleaner.min.cleanable.ratio(默认0.5)来限定可进行清理操作的最小污浊率。Kafka 中用于保存消费者消费位移的主题_consumer_offsets使用的就是Log Compaction策略
每个日志清理线程会使用一个名为“SkimpyOffsetMap”的对象来构建key与offset的映射关系的哈希表。日志清理需要遍历两次日志文件,第一次遍历把每个 key 的哈希值和最后出现的offset都保存在 SkimpyOffsetMap 中,第二次遍历会检查每个消息是否符合保留条件,符合就保留下来,否则就会被清理.
墓碑消息(tombstone):
页缓存(pagecache)
读取:操作系统会先查看数据所在的页(page)是否在页缓存中,如果存在则直接返回,不存在则向磁盘发起读取请求,并将数据存入页缓存
写入:查看数据所在的页(page)是否在页缓存中,存在则直接修改页缓存,不存在则在页缓存中添加相应的页再写入。被修改过的页变成了脏页,操作系统会在合适的时间把脏页数据写入磁盘以保持数据一致性。
页缓存是操作系统实现的一种主要的磁盘缓存,用来减少对磁盘I/O的操作。具体就是把磁盘中的数据缓存到内存中,把对磁盘的访问变成对内存的访问。
Kafka大量使用了页缓存,这是实现高吞吐的重要因素之一。Kafka提供了同步刷盘及间断性强制刷盘的功能,但并不推荐使用。
磁盘I/O流程
用户调用标准C库进行I/O操作:用户程序buffer->C库标准IObuffer->文件系统页缓存->通过具体文件系统到磁盘
用户调用文件I/O:用户程序buffer->文件系统页缓存->通过具体文件系统到磁盘
用户打开文件时使用O_DIRECT,绕过页缓存直接写磁盘
用户使用类似dd工具,使用direct参数,绕过系统cache与文件系统直接写磁盘
从编程角度而言,一般I/O场景有以下4种,他们的数据流为:
最长链路数据流图示:
针对不同应用场景,I/O调度策略也会影响I/O读写性能,目前Linux提供4中调度策略:NOOP、CFQ(默认)、DEADLINE、ANTICIPATORY。
零拷贝(Zero-Copy)
## 六、深入服务端 ##### 1.协议设计
所谓的零拷贝是指将数据直接从磁盘文件复制到网卡设备中,而不需要经由应用程序之手。零拷贝大大提高了应用程序的性能,减少了内核和用户模式之间的上下文切换。对 Linux 操作系统而言,零拷贝技术依赖于底层的sendfile()方法实现。对应于Java 语言,FileChannal.transferTo()的底层实现就是sendfile()。
零拷贝和非零拷贝对比
Kafka自定义了一组基于TCP的二进制协议,用于实现各种消息相关操作
协议基本结构
不同的api(PRODUCE、FETCH等),RequestBody和ResponseBody结构也不同 ##### 2.时间轮(TimingWheel)
Kafka基于时间轮概念自定义实现了一个用于延时功能的定时器(SystemTitmer)
每个时间格代表基本时间跨度(titkMs),时间轮时间格个数(wheelSIze)是固定的。
currentTime将时间轮划分为到期部分和未到期部分,当前指向的表示刚好到期,需要处理此时间格内的TimerTaskList中的任务
wheelSize的扩充有限制,针对不同定时需要,Kafka引入层级时间轮的概念。当任务的到期时间超过了当前时间轮的时间范围,会尝试添加到上层时间轮。当延时任务所在的时间轮不能精准实现到期时间时,也会重新提交到层级时间轮,进行降级。
Kafka中的定时器只持有第一层时间轮引用,每一层时间轮中有一个引用指向更高一层。##### 3.延时操作
延时操作创建后会被加入延时操作管理器(DelayedOperationPurgatory)做专门处理,每个延时操作管理器配别一个定时器。延时操作除了满足时间条件执行,还支持外部事件触发,由一个监听池来监听每个分区的外部事件。##### 4.控制器(KafkaController)
控制器
监听partition变化
监听topic相关变化
监听broker相关变化
从zk中读取当前与topic、partition、broker相关的信息并管理
启动并管理partition状态机和replica状态机
更新集群元数据
维护分区的优先副本均衡
Kafka集群中有一个broker会被选举为控制器,负责管理整个集群中所有分区和副本的状态。其职责有:
控制器的选举及异常恢复
控制器选举依赖zk,在/controller创建临时节点存放brokerid
broker启动时尝试读取/controller节点的brokerid,如果不为-1,则放弃竞选。如果不存在/controller节点或节点数据异常,broker会尝试创建此节点,多个broker只有创建成功的会成为控制器。
broker内存会保存当前控制器brokerid:activeControllerId
Kafka通过zk的controller_epoch来保证控制器的唯一性
优雅关闭
broker通过指令关闭时(ControlledShutdown),会与controller进行多次交互,涉及副本迁移,元数据更新等操作。
分区leader的选举
当某节点被优雅关闭时,AR中找到第一个存活replica,且在ISR中,还要确保不在关闭的节点上 ##### 5.服务端参数
优先副本选举,直接将优先副本设置为leader
当分区重分配时,从重分配的AR中找到第一个存活replica,且满足在ISR中
当创建分区或原leader下线,控制器使用策略选举新的leader:按照AR集合顺序查找第一个存活的replica。
注意AR顺序在分配时就被指定,只要不发生重分配就保持不变
OfflinePartitionLeaderElectionStrategy:
ReassignPartitionLeaderElectionStrategy:
PrefferredReplicaPartitionLeaderElectionStrategy:
ControlledShutdownPartitionLeaderElectionStrategy:
broker.id
broker启动前必须设定的参数,作为broker的唯一id。
broker启动时会在zk的/brokers/ids下创建节点,broker的健康状态检查就依赖此节点,broker下线时此节点会自动删除
在 config/server.properties 或 meta.properties 中配置
可通过 broker.id.generation.enable(默认true) 和 reserved.broker.max.id(默认1000) 来配合自动生成新的brokerId。自动生成的brokerId会大于maxid的配置。
bootstrap.servers
客户端与bootstrap.servers指定的server连接,发送MetadataRequest请求获取集群的元数据信息
server收到请求后,返回MetadataResponse,其中包含集群元数据信息
客户端通过解析元数据信息,与集群各个节点建立连接
这个参数用来配置发现kafka集群元数据信息的服务地址(可以不是broker)
客户端连接kafka集群的过程
服务端参数列表
引:kafka brokers配置参数详解 ## 七、深入客户端 ##### 1.分区分配策略
消费者客户端可配置partition.assignment.strategy来设置消费者与topic之间的partition分配策略,默认为org.apache.kafka.clients.consumer.RangeAssignor,可配置多个策略,逗号分隔。
RangeAssignor:
按照consumer总数和partition总数进行整除获得一个跨度,将partition按跨度进行平均分配,保证尽可能的均匀分配给所有consumer。分配时consumerGroup内的consumer按名称字典排序,依次分配partition范围。
RoundRobinAssignor:
将group内所有consumer和被订阅的所有topic的partition按字典序排序,通过轮询方式,逐个分配partition
如果同一个group内的consumer订阅信息是不相同的,可能导致partition分配不均匀。
StickyAssignor:
主要目的:partition分配尽可能均匀,分配partition尽可能与上次保持相同
比上述两个策略更加优异
自定义分配策略
实现PartitionAssignor接口 ##### 2.消费者协调器和组协调器
旧版客户端问题
羊群效应:当监听的节点发生变化,大量Watcher通知发送到客户端,导致其他操作延迟,甚至发生死锁
脑裂问题:再均衡操作时,每个consumer与zk进行通信来监听变化情况,由于zk本身特性,可能导致同一时刻各个consumer获取的状态不一致,从而导致异常
旧版消费者客户端使用zk的监听器(Watcher)来实现分区分配。consumer和broker状态发生变化时,相应的节点也会变化,客户端就能够监听到状态。
依赖zk有两个严重问题:
再均衡的原理
新的consumer加入group
consumer宕机(长时间没有发送心跳)
consumer主动退出group(unsubscrible())
group对应的GroupCoorinator节点发生了变更
group内订阅的任一topic或partition数量发生变化
新版将全部消费组分成多个子集,每个子集在服务端对应一个GroupCoordinator对其进行管理,而消费者客户端中使用ConsumerCoordinator组件与GroupCoordinator交互
触发再均衡操作的情形:
再均衡过程
进入此阶段,consumer即处于正常工作状态
正式消费前,consumer还需要确定拉取消息的起始位置:通过OffsetFetchRequest请求获取消费位移
consumer通过向GroupCoordinator发送心跳来维持与消费组的从属关系,及对partition的所有权关系。##### 3.consumeroffsets剖析
consumer leader根据第二阶段中选举出来的策略来实施具体的分区分配,然后通过GroupCoordinator将方案同步给各个consumer。各个consumer会向GroupCoordinator发送SyncGroupRequest来同步分配方案。
consumer会向GroupCoordinator发送JoinGroupRequest
如果是原有consumer重新加入group,发送前还要执行一些准备工作:
选举消费组的leader
选举分区分配策略
enable.auto.commit为true时,需要向GroupCoordinator提交位移
执行再均衡监听器(ConsumerRebalanceListener)的onPartitionsRevoked()方法
暂时禁止心跳检测运作
GroupCoordinator需要为消费组内的consumer选举出一个leader。如果消费组内还没有leader,则第一个加入的consumer即为leader。如果原leader退出消费组,则重新选举leader(近乎随机)
每个consumer都可以设置自己的分区分配策略,而消费组需要从中选出一个来进行整体分区分配。选择被各个consumer上报投票最多的策略。
consumer需要确定所属group对应的GroupCoordinator所在的broker,并创建与该broker通信的连接
如果已经保存了GroupCoordinator节点信息且连接正常,则进入第二阶段。否则,需要向集群中某个节点(leastLoadedNode)发送FindCoordinatorRequest来查找对应的GroupCoordinator
第一阶段(FIND_COORDINATOR)
第二阶段(JOIN_GROUP)
第三阶段(SYNC_GROUP)
第四阶段(HEARTBEAT)
一般情况下,集群中第一次有consuerm消费消息时,会自动创建主题consumeroffsets
它的副本因子还受offsets.topic.replication.factor约束。分区数通过offsets.topic.num.partitions设置(默认50)。
客户端提交消费位移是使用OffsetCommitRequest实现的
删除topic时,会将consumer提交的此topic的offset一并删除 ##### 4.事务
消息传输保障
at most once:至多一次。消息可能丢失,但不会重复
at least once:至少一次。消息不会丢失,但可能重复
exactly once:恰好一次。每条消息肯定且仅传输一次
一般消息中间件的消息传输保障有3个层级
kafka提供的消息传输保障为at least once
从0.11.0.0版本开始引入幂等和事务特性来实现EOS(exactly once semantics)
幂等
retries:必须大于0(默认Integer.MAX_VALUE)
acks:必须为-1(all,默认为1)
max.in.flight.requests.per.connection:不能大于5(默认5)
生产者客户端通过设置enable.idempotence=true(默认false)开启幂等性功能
开启幂等时,客户端会对用户显式设定的一些参数进行校验
对于每个PID(producer id),消息发送到的每一个partition都有对应的序列号,从0开始,每发送一条就+1。broker在内存中为每一对
维护一个序列号,收到消息时,对比其序列号(SNnew)和内存中的序列号(SNold)。如果SNnew<SNold+1,说明消息重复写入则丢弃此消息。如果SNnew>SNold+1,可能有消息丢失,对应producer会抛出异常。
Kafka的幂等只能保证单个生产者会话(session)中单分区的幂等
事务
1.查找TransactionCoordinator
2.获取PID
3.开启事务
4.Consume-Transform-Produce:事务的处理过程
5.提交或中止事务 ## 八、可靠性探究 ##### 1.副本剖析
需要配置enable.auto.commit=false
通过配置isolation.level设置事务消息的隔离级别
KafkaConsumer通过控制消息(ControlBatch)判断事务的提交和中止
read_uncommitted(默认):可消费到未提交的事务
read_committed:消费端应用看不到尚未提交的事务内的消息(会缓存在KafkaConsumer内部直到事务提交或中止)
生产端需要配置开启幂等,和trasactional.id属性
transactionId与PID一一对应,但是transactionId是由用户显式设置,而PID是kafka内部分配。如果使用同一个transactionId开启两个producer,则前一个producer会报错并不再工作。
KafkaProducer提供了5个事务相关方法:
initTransactions():初始化事务(前提是配置了transactionId)
beginTransaction():开启事务
sendOffsetsToTransaction():在事务内的位移提交
commitTransaction():提交事务
abortTransaction():中止事务
生产端
消费端
Kafka引入事务协调器(TransactionCoordinator)负责处理事务
事务实现过程:
失效副本
同步失效或功能失效的副本成为失效副本,失效副本对应的分区成为同步失效分区(under-replicated)
同步失效判定:根据broker参数 replica.lag.time.max.ms 作为标准,当ISR中的follower副本滞后leader副本时间超过此时间则判定同步失败
ISR的伸缩
Kafka 在启动时会开启定时任务,周期型的检测每个分区是否需要缩减其ISR集合
ISR的扩充:当follower副本的LEO不小于leader副本的HW即判定可进入ISR集合
LEO与HW
follower向leader拉取消息时,带有自己的LEO信息(fetch_offset),leader更新HW(取HW和LEO中的最小值),返回follower相应消息,并带有自身的HW
follower收到新消息后,更新LEO和HW
生产者客户端发送消息至leader副本
消息追加到leader副本的本地日志,并更新日志偏移量
follower副本向leader副本请求同步数据
leader副本所在的服务器读取本地日志,并更新对应拉取的follower副本信息
leader副本所在服务器将拉取结果返回follower副本
follower副本收到结果,将消息追加到本地日志,并更新日志的偏移量信息
多副本消息追加过程
LEO和HW更新过程
在一个分区中,leader会记录所有副本的LEO,而follower只会记录自身LEO
Leader Epoch的介入
解决在需要截断数据的场景下,LEO/HW不一致导致数据丢失的问题
为什么不支持读写分离
主写从读的问题:数据一致性问题、延时问题
kafka通过分区副本机制来解决负载均衡问题 ##### 2.日志同步机制
日志同步机制的基本原则
如果告知客户端已经成功提交了某条消息,那么即使leader宕机,也要保证新选举出来的leader中能够包含这条消息
kafka通过维护ISR集合,保证leader切换后的数据完整性 ## 九、Kafka应用 ##### 1.命令行工具
kafka-configs.sh:配置管理
kafka-server-start.sh:启动kafka服务
kafka-server-stop.sh:关闭kafka服务
kafka-topics.sh:管理主题
kafka-preferred-replica-election.sh:优先副本选举
kafka-reassign-partitions.sh:分区重分配
kafka-consumer-groups.sh:消费组管理、重置消费位移
kafka-console-consumer.sh:命令行消费消息
kafka-console-producer.sh:命令行生产消息
kafka-consumer-perf-test.sh:测试消费性能
kafka-dump-log.sh:查看日志内容
kafka-delete-records.sh:删除消息 ##### 2.Kafka Connect
基本概念
Connector把一项工作分割成许多Task,然后分发到各个Worker进程去执行
Task不保存自己的状态信息,而是交给特定kafka topic保存,Connector和Task都是逻辑工作单位,必须安排在进程(Worker)中执行
Kafka Connect是一个用于将数据流输入和输出Kafka的框架,可以简单快捷地将数据从Kafka导入或导出
Source和Sink:Source负责导入数据到Kafka,Sink负责从Kafka导出数据,统称为Connector
Task和Worker:
独立模式
Worker进程运行相关配置:connect-standalone.properties
Source或Sink配置:connect-file-source.properties、connect-file-sink.properties
通过connect-standalone.sh启动,所有操作都是在一个进程中完成
需要制定两个配置文件:
REST API
/(GET):查看Kafka集群版本信息
/connectors (GET/POST):查看Connector列表、创建Connector
/connectors/{name}(GET):查看指定Connector
/connectors/{name}/config(GET/PUT):查看/修改指定Connector配置
/connectors/{name}/status(GET):查看指定Connector配置
……
分布式模式
修改Worker配置文件:connect-distributed.properties
修改Source或Sink配置:如上
运行脚本启动:connect-distributed.sh ##### 3.Kafka Mirror Maker
用于在两个集群之间同步数据的工具,原理是从源集群消费消息,然后生产到目标集群
修改配置文件:consumer.properties,producer.properties
启动脚本:kafka-mirror-maker.sh ##### 4.Kafka Streams
Kafka Streams是一个用于处理和分析数据的客户端库,它先把存储在Kafka中的数据进行处理和分析,然后将数据结果写到Kafka或发送到外部系统
解决问题:
毫秒级延迟的逐个事件处理
有状态的处理,包括分布式连接和聚合
方便的DSL
使用类似DataFlow的模型对无序数据进行窗口化
具有快速故障切换的分布式处理和容错能力
无停机滚动部署
需要引入依赖:org.apache.kafka/kafka-streams ## 十、Kafka监控 ##### 1.监控数据的来源
Kafka自身提供的监控指标(包括broker和主题的指标)都可以通过JMX来获取,需要设置JMX_PORT设置端口并开启JMX功能
开启JMX后会在zk的 /brokers/ids/<brokerId> 节点中有jmx_port值
客户端指标数据可通过ProducerMetrics和ConsumerMetrics获取 ##### 2.消费滞后
Kafka中留存的消息与Consumer的消息之间的差值就是消息滞后量(Lag),对每个分区而言,Lag = HW - ConsumerOffset
如果分区中有未完成的事务,且isolation.level = “read_committed",Lag = LSO - ConsumerOffset
计算Lag
通过FindCoordinatorRequest查找消费组对应的GroupCoordinator
通过AdminClient获取DescribeGroupsRequest,获取当前消费组元数据信息
通过OffsetFetchRequest请求获取消费位移ConsumerOffset
通过KafkaConsumer.endOffsets()方法获取HW(LSO)值
HW与ConsumerOffset相减得到分区Lag ##### 3.监控指标说明
通过jconsole查看所有MXBean ## 十一、高级应用 ##### 1.过期时间
给消息添加timeStamp和超时时间,并在消费时使用拦截器,判断是否超时后进行消费 ##### 2.延时队列
到期才能消费
1.依然采用给消息添加timeStamp和延时时间,消费者拉取一批消息后,如果有未达到延时时间的消息,就重新写入主题
2.延时消息先投递到一个指定的主题,并使用自定义服务拉取、判断,满足条件后再投递到消费者真实消费的主题 ##### 3.其他功能:
死信队列、消息路由、消息轨迹、消息审计等均可以自行设计生产消费结构来实现 ##### 4.消息代理
Kafka REST Proxy可以为Kafka集群提供一些列的REST API接口,通过这些接口可以实现发送消息、消费消息、查看集群状态和管理类操作等功能
源网络,版权归原创者所有。如有侵权烦请告知,我们会立即删除并表示歉意。
更多技术,欢迎关注下方公众号