Kafka详解
Kafka 是一个java开发的mq中间件,依赖于zookeper,有高可用,高吞吐量等特点。
优势
- 可靠性:partition机制和replication机制,使消息的传递有着很高的可靠性
- 稳定性,支持集群
- 高性能,高吞吐量,即使在TB的数据存储情况下,仍然表现出很好的稳定性
- 支持消息广播和单播,可以根据重设offset实现消息的重复消费
角色
-
Broker:kafka集群由一个或多个kafka server组成,每个server即Broker。
-
Topic:逻辑概念。kafka对消息保存时根据Topic进行归类,一个Topic可以认为是一类消息。
-
Partition:物理概念。每个topic将被分成一到多个partition(分区),每个partition在存储层面就是一个append log文件。一个非常大的topic可以分成多个partition,分布到多个broker上。kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体(多个partition间)的顺序。
-
offset:任何发布到Partition的消息都会被直接追加到log文件的尾部,每条消息在文件中的位置称为offset(偏移量),offset为一个long型数字,它是唯一标记一条消息。kafka并没有提供其他额外的索引机制来存储offset,因此在kafka中几乎不允许对消息进行“随机读写”。
-
Producer:生成者。Producer将消息发布到指定的Topic,也可以指定Partition。
-
Consumer:消费者。Consumer采用pull的形式从Producer拉取消息
-
Consumer Group:每个 consumer 属于一个特定的 consumer group(若不指定 group name 则属于默认的 group)。一个 topic可以有多个CG,topic的消息会分发到所有的CG,但每个CG只会把消息发给该CG中的一个 consumer。如果所有的consumer都具有相同的group, 即单播,消息将会在consumers之间负载均衡;如果所有的consumer都具有不同的group,那这就是"发布-订阅",每条消息将会广播给所有的consumer。
分区机制和文件存储机制
如图,kafka中的消息是以topic进行分类的,生产者通过topic向kafka broker发送消息,消费者通过topic读取消息。然而topic在物理层面上又能够以partition进行分组,同一个topic下有多个不同的partition,每个partiton在物理上对应一个目录(文件夹),以topic名称+有序序号的形式命名(序号从0开始计,最大为partition数-1)。partition是实际物理上的概念,而topic是逻辑上的概念。Patition 的设计使得Kafka的吞吐率可以水平扩展。
每个分区文件夹下存储这个分区的所有消息(.log)和索引文件(.index)。“.index”索引文件存储大量的元数据,“.log”数据文件存储大量的消息,索引文件中的元数据指向对应数据文件中message的物理偏移地址。其中以“.index”索引文件中的元数据[3, 348]为例,在“.log”数据文件表示第3个消息,即在全局partition中表示170410+3=170413个消息,该消息的物理偏移地址为348。
image.png那么如何从partition中通过offset查找message呢?以上图为例,读取offset=170418的消息,首先查找segment文件,其中 00000000000000000000.index为最开始的文件,第二个文件为00000000000000170410.index(起始偏移为170410+1=170411),而第 三个文件为00000000000000239430.index(起始偏移为239430+1=239431),所以这个offset=170418就落到了第二个文件之中。其他 后续文件可以依次类推,以其实偏移量命名并排列这些文件,然后根据二分查找法就可以快速定位到具体文件位置。其次根据 00000000000000170410.index文件中的[8,1325]定位到00000000000000170410.log文件中的1325的位置进行读取。
Kafka中topic的每个partition有一个预写式的日志文件,虽然partition可以继续细分为若干个segment文件,但是对于上层应用来说可以将 partition看成最小的存储单元(一个有多个segment文件拼接的“巨型”文件),每个partition都由一些列有序的、不可变的消息组成,这些消息被连续的追加到partition中。
那如何保证消息均匀的分布到不同的partition中?
生产者在生产数据的时候,可以为每条消息指定Key,这样消息被发送到broker时,会根据分区规则选择被存储到哪一个分区中,如果分区规则设置的合理,那么所有的消息将会被均匀的分布到不同的分区中,这样就实现了负载均衡和水平扩展。分区规则可以自定义,比如将消息的key做了hashcode,然后和分区数(numPartitions)做模运算,使得每一个key都可以分布到一个分区中。
高可用(High availability)
kafka的高可用就是依赖于上面的文件存储结构的,kafka能保证HA的策略有 data replication和leader election。
leader 机制
为了提高消息的可靠性,Kafka每个topic的partition有N个副本(replicas),其中N(大于等于1)是topic的复制因子(replica fator)的个数。这个时候每个 partition下面就有可能有多个 replica(replication机制,相当于是partition的副本但是有可能存储在其他的broker上),但是这多个replica并不一定分布在一个broker上,而这时候为了更好的在replica之间复制数据,此时会选出一个leader,这个时候 producer会push消息到这个leader(leader机制),consumer也会从这个leader pull 消息,其他的 replica只是作为follower从leader复制数据,leader负责所有的读写;如果没有一个leader的话,所有的follower都去进行读写 那么NxN(N+1个replica之间复制消息)的互相同步数据就变得很复杂而且数据的一致性和有序性不能够保证。
如何将所有Replica均匀分布到整个集群
为了实现更高的可用性,推荐在部署kafka的时候,能够保证一个topic的partition数量大于broker的数量,而且还需要把follower均匀的分布在所有的broker上,而不是只分布在一个 broker上。zookeeper 会对partition的leader follower等进行管理。
Kafka分配Replica的算法如下:
将所有Broker(假设共n个Broker)和待分配的Partition排序
将第i个Partition分配到第(i mod n)个Broker上
将第i个Partition的第j个Replica分配到第((i + j) mod n)个Broke
leader election
当Leader宕机了,怎样在Follower中选举出新的Leader?
一种非常常用的Leader Election的方式是“Majority Vote”(“少数服从多数”),但Kafka并未采用这种方式。
Kafka在Zookeeper中动态维护了一个ISR(in-sync replicas),这个ISR里的所有Replica都跟上了leader,只有ISR里的成员才有被选为Leader的可能。
那么如何选取出leader:
最简单最直观的方案是(谁写进去谁就是leader),所有Follower都在Zookeeper上设置一个Watch,一旦Leader宕机,其对应的ephemeral znode会自动删除,此时所有Follower都尝试创建该节点,而创建成功者(Zookeeper保证只有一个能创建成功)即是新的Leader,其它Replica即为Follower。
Data Replication
消息commit
kafka在处理传播消息的时候,Producer会发布消息到某个partition上,先通知找到这个partition的leader replica,无论这个partition的 Replica factor是多少,Producer 先把消息发送给replica的leader,然后Leader在接受到消息后会写入到Log,这时候这个leader的其余follower都会去leader pull数据,这样可保证follower的replica的数据顺序和leader是一致的,follower在接受到消息之后写入到Log里面(同步),然后向leader发送ack确认,一旦Leader接收到了所有的ISR(与leader保持同步的Replica列表)中的follower的ack消息,这个消息就被认为是 commit了,然后leader增加HW并且向producer发送ack消息,表示消息已经发送完成。但是为了提高性能,每个follower在接受到消息之后就会直接返回给leader ack消息,而并非等数据写入到log里(异步),所以,可以认为对于已经commit的数据,只可以保证消息已经存在与所有的replica的内存中,但是不保证已经被持久化到磁盘中,所以进而也就不能保证完全发生异常的时候,该消息能够被consumer消费掉,如果异常发生,leader 宕机,而且内存数据消失,此时重新选举leader就会出现这样的情况,但是由于考虑大这样的情况实属少见,所以这种方式在性能和数据持久化上做了一个相对的平衡,consumer读取消息也是从 leader,并且只有已经commit之后的消息(offset小于HW)才会暴露给consumer。
消息确认
kafka的存活条件包括两个条件:
- kafka必须维持着与zookeeper的session(这个通过zookeeper的heartbeat机制来实现)
- follower必须能够及时的将数据从leader复制过去 ,不能够“落后太多”。leader会跟踪与其保持着同步的replica列表简称ISR,(in-sync replica),如果一个follower宕机或是落后太多,leader就会把它从ISR中移除掉。这里指的落后太多是说 follower复制的消息落后的超过了预设值,(该值可在KAFKA_HOME/config/server.properties中通过replica.lag.time.max.ms来配置,其默认值是10000)没有向leader发起fetch请求。
一条消息只有被ISR里的所有Follower都从Leader复制过去才会被认为已提交。这样就避免了部分数据被写进了Leader,还没来得及被任何Follower复制就宕机了,而造成数据丢失(Consumer无法消费这些数据)。而对于Producer而言,它可以选择是否等待消息commit,这可以通过request.required.acks来设置。
0---表示不进行消息接收是否成功的确认;
1---表示当Leader接收成功时确认;
-1---表示Leader和Follower都接收成功时确认;
持久性
kafka使用文件存储消息,这就直接决定kafka在性能上严重依赖文件系统的本身特性。且无论任何 OS 下,对文件系统本身的优化几乎没有可能。文件缓存/直接内存映射等是常用的手段。 因为 kafka 是对日志文件进行 append 操作,因此磁盘检索的开支是较小的;同时为了减少磁盘写入的次数,broker会将消息暂时buffer起来,当消息的个数(或尺寸)达到一定阀值时,再flush到磁盘,这样减少了磁盘IO调用的次数。
producer
指定partition
producer将会和Topic下所有partition leader保持socket连接;消息由producer直接通过socket发送到broker,中间不会经过任何"路由层".事实上,消息被路由到哪个partition上,有producer决定.比如可以采用"random""key-hash""轮询"等,如果一个topic中有多个partitions,那么在producer端实现"消息均衡分发"是必要的.
异步发送
producer.type的默认值是sync,即同步的方式。这个参数指定了在后台线程中消息的发送方式是同步的还是异步的。如果设置成异步的模式,可以运行生产者以batch的形式push数据,这样会极大的提高broker的性能,但是这样会增加丢失数据的风险。
对于异步模式,还有4个配套的参数,如下:
- queue.buffering.max.ms 5000 启用异步模式时,producer缓存消息的时间。比如我们设置成1000时,它会缓存1s的数据再一次发送出去,这样可以极大的增加broker吞吐量,但也会造成时效性的降低。
- queue.buffering.max.messages 10000 启用异步模式时,producer缓存队列里最大缓存的消息数量,如果超过这个值,producer就会阻塞或者丢掉消息。
- queue.enqueue.timeout.ms -1 当达到上面参数时producer会阻塞等待的时间。如果设置为0,buffer队列满时producer不会阻塞,消息直接被丢掉;若设置为-1,producer会被阻塞,不会丢消息。
- batch.num.messages 200 启用异步模式时,一个batch缓存的消息数量。达到这个数值时,producer才会发送消息。(每次批量发送的数量)
以batch的方式推送数据可以极大的提高处理效率,kafka producer可以将消息在内存中累计到一定数量后作为一个batch发送请求。batch的数量大小可以通过producer的参数(batch.num.messages)控制。通过增加batch的大小,可以减少网络请求和磁盘IO的次数,当然具体参数设置需要在效率和时效性方面做一个权衡。在比较新的版本中还有batch.size这个参数。
consumer
-
consumer 采用pull的方式 从broker拉取数据。采用pull方式的优点有consumer端可以根据自己的消费能力适时的去fetch消息并处理,且可以控制消息消费的进度(offset);此外,消费者可以良好的控制消息消费的数量,batch fetch.
-
consumer端向broker发送fetch请求,并告知其获取消息的offset;此后consumer将会获得一定条数的消息;consumer端也可以重置offset来重新消费消息.
-
kafka和JMS(Java Message Service)实现(activeMQ)不同的是:即使消息被消费,消息仍然不会被立即删除.日志文件将会根据broker中的配置要求,保留一定的时间之后删除;比如log文件保留2天,那么两天后,文件会被清除,无论其中的消息是否被消费.kafka通过这种简单的手段,来释放磁盘空间,以及减少消息消费之后对文件内容改动的磁盘IO开支。
对于consumer而言,它需要保存消费消息的offset,对于offset的保存和使用,有consumer来控制;当consumer正常消费消息时,offset将会"线性"的向前驱动,即消息将依次顺序被消费.事实上consumer可以使用任意顺序消费消息,它只需要将offset重置为任意值,offset将会保存在zookeeper中。
kafka集群几乎不需要维护任何consumer和producer状态信息,这些信息有zookeeper保存;因此producer和consumer的客户端实现非常轻量级,它们可以随意离开,而不会对集群造成额外的影响。
-
at most once: 消费者fetch消息,然后保存offset,然后处理消息;当client保存offset之后,但是在消息处理过程中出现了异常,导致部分消息未能继续处理.那么此后"未处理"的消息将不能被fetch到,这就是"at most once".
-
at least once: 消费者fetch消息,然后处理消息,然后保存offset.如果消息处理成功之后,但是在保存offset阶段zookeeper异常导致保存操作未能执行成功,这就导致接下来再次fetch时可能获得上次已经处理过的消息,这就是"at least once",原因offset没有及时的提交给zookeeper,zookeeper恢复正常还是之前offset状态.
消息的顺序性
Kafka分布式的单位是partition,同一个partition用一个log文件(追加写、offset读),所以可以保证FIFO的顺序。但是在多个Partition时,不能保证Topic级别的数据有序性,除非创建Topic只指定1个partition,但这样做就磨灭kafka高吞吐量的优秀特性。
kafka为了提高Topic的并发吞吐能力,可以提高Topic的partition数,并通过设置partition的replica来保证数据高可靠。
Kafka 中发送1条消息的时候,可以指定(topic, partition, key) 3个参数,业务放使用producer插入数据时,可以控制同一Key发到同一Partition,从而保证消息有序性。一个partition的消息只能被一个consumer消费。
安装
详情参见官网http://kafka.apache.org/
安装会依赖java、zookeeper。
brew install kafka
//安装的配置文件位置
/usr/local/etc/kafka/server.properties
/usr/local/etc/kafka/zookeeper.properties
//启动zookeeper -daemon 守护模式
zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties &
//启动kafka
kafka-server-start /usr/local/etc/kafka/server.properties &
//创建topic 创建单分区单副本的 topic test:
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
//查看创建的topic
kafka-topics --list --zookeeper localhost:2181
//发送消息客户端
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
//消费消息
kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning
参考文章:https://blog.csdn.net/gongzhiyao3739124/article/details/79688813