Kafka详解

2019-03-28  本文已影响0人  何笙

Kafka 是一个java开发的mq中间件,依赖于zookeper,有高可用,高吞吐量等特点。

优势

角色

分区机制和文件存储机制

如图,kafka中的消息是以topic进行分类的,生产者通过topic向kafka broker发送消息,消费者通过topic读取消息。然而topic在物理层面上又能够以partition进行分组,同一个topic下有多个不同的partition,每个partiton在物理上对应一个目录(文件夹),以topic名称+有序序号的形式命名(序号从0开始计,最大为partition数-1)。partition是实际物理上的概念,而topic是逻辑上的概念。Patition 的设计使得Kafka的吞吐率可以水平扩展。

每个分区文件夹下存储这个分区的所有消息(.log)和索引文件(.index)。“.index”索引文件存储大量的元数据,“.log”数据文件存储大量的消息,索引文件中的元数据指向对应数据文件中message的物理偏移地址。其中以“.index”索引文件中的元数据[3, 348]为例,在“.log”数据文件表示第3个消息,即在全局partition中表示170410+3=170413个消息,该消息的物理偏移地址为348。

image.png

那么如何从partition中通过offset查找message呢?以上图为例,读取offset=170418的消息,首先查找segment文件,其中 00000000000000000000.index为最开始的文件,第二个文件为00000000000000170410.index(起始偏移为170410+1=170411),而第 三个文件为00000000000000239430.index(起始偏移为239430+1=239431),所以这个offset=170418就落到了第二个文件之中。其他 后续文件可以依次类推,以其实偏移量命名并排列这些文件,然后根据二分查找法就可以快速定位到具体文件位置。其次根据 00000000000000170410.index文件中的[8,1325]定位到00000000000000170410.log文件中的1325的位置进行读取。

Kafka中topic的每个partition有一个预写式的日志文件,虽然partition可以继续细分为若干个segment文件,但是对于上层应用来说可以将 partition看成最小的存储单元(一个有多个segment文件拼接的“巨型”文件),每个partition都由一些列有序的、不可变的消息组成,这些消息被连续的追加到partition中。

那如何保证消息均匀的分布到不同的partition中?

生产者在生产数据的时候,可以为每条消息指定Key,这样消息被发送到broker时,会根据分区规则选择被存储到哪一个分区中,如果分区规则设置的合理,那么所有的消息将会被均匀的分布到不同的分区中,这样就实现了负载均衡和水平扩展。分区规则可以自定义,比如将消息的key做了hashcode,然后和分区数(numPartitions)做模运算,使得每一个key都可以分布到一个分区中。

高可用(High availability)

kafka的高可用就是依赖于上面的文件存储结构的,kafka能保证HA的策略有 data replication和leader election。

leader 机制

为了提高消息的可靠性,Kafka每个topic的partition有N个副本(replicas),其中N(大于等于1)是topic的复制因子(replica fator)的个数。这个时候每个 partition下面就有可能有多个 replica(replication机制,相当于是partition的副本但是有可能存储在其他的broker上),但是这多个replica并不一定分布在一个broker上,而这时候为了更好的在replica之间复制数据,此时会选出一个leader,这个时候 producer会push消息到这个leader(leader机制),consumer也会从这个leader pull 消息,其他的 replica只是作为follower从leader复制数据,leader负责所有的读写;如果没有一个leader的话,所有的follower都去进行读写 那么NxN(N+1个replica之间复制消息)的互相同步数据就变得很复杂而且数据的一致性和有序性不能够保证。

如何将所有Replica均匀分布到整个集群

为了实现更高的可用性,推荐在部署kafka的时候,能够保证一个topic的partition数量大于broker的数量,而且还需要把follower均匀的分布在所有的broker上,而不是只分布在一个 broker上。zookeeper 会对partition的leader follower等进行管理。
Kafka分配Replica的算法如下:

将所有Broker(假设共n个Broker)和待分配的Partition排序
将第i个Partition分配到第(i mod n)个Broker上
将第i个Partition的第j个Replica分配到第((i + j) mod n)个Broke

leader election

当Leader宕机了,怎样在Follower中选举出新的Leader?
一种非常常用的Leader Election的方式是“Majority Vote”(“少数服从多数”),但Kafka并未采用这种方式。
Kafka在Zookeeper中动态维护了一个ISR(in-sync replicas),这个ISR里的所有Replica都跟上了leader,只有ISR里的成员才有被选为Leader的可能。

那么如何选取出leader:
最简单最直观的方案是(谁写进去谁就是leader),所有Follower都在Zookeeper上设置一个Watch,一旦Leader宕机,其对应的ephemeral znode会自动删除,此时所有Follower都尝试创建该节点,而创建成功者(Zookeeper保证只有一个能创建成功)即是新的Leader,其它Replica即为Follower。

Data Replication

消息commit

kafka在处理传播消息的时候,Producer会发布消息到某个partition上,先通知找到这个partition的leader replica,无论这个partition的 Replica factor是多少,Producer 先把消息发送给replica的leader,然后Leader在接受到消息后会写入到Log,这时候这个leader的其余follower都会去leader pull数据,这样可保证follower的replica的数据顺序和leader是一致的,follower在接受到消息之后写入到Log里面(同步),然后向leader发送ack确认,一旦Leader接收到了所有的ISR(与leader保持同步的Replica列表)中的follower的ack消息,这个消息就被认为是 commit了,然后leader增加HW并且向producer发送ack消息,表示消息已经发送完成。但是为了提高性能,每个follower在接受到消息之后就会直接返回给leader ack消息,而并非等数据写入到log里(异步),所以,可以认为对于已经commit的数据,只可以保证消息已经存在与所有的replica的内存中,但是不保证已经被持久化到磁盘中,所以进而也就不能保证完全发生异常的时候,该消息能够被consumer消费掉,如果异常发生,leader 宕机,而且内存数据消失,此时重新选举leader就会出现这样的情况,但是由于考虑大这样的情况实属少见,所以这种方式在性能和数据持久化上做了一个相对的平衡,consumer读取消息也是从 leader,并且只有已经commit之后的消息(offset小于HW)才会暴露给consumer。

消息确认

kafka的存活条件包括两个条件:

  1. kafka必须维持着与zookeeper的session(这个通过zookeeper的heartbeat机制来实现)
  2. follower必须能够及时的将数据从leader复制过去 ,不能够“落后太多”。leader会跟踪与其保持着同步的replica列表简称ISR,(in-sync replica),如果一个follower宕机或是落后太多,leader就会把它从ISR中移除掉。这里指的落后太多是说 follower复制的消息落后的超过了预设值,(该值可在KAFKA_HOME/config/server.properties中通过replica.lag.max.messages配置,其默认值是4000),或者follower超过一定时间(该值可在KAFKA_HOME/config/server.properties中通过replica.lag.time.max.ms来配置,其默认值是10000)没有向leader发起fetch请求。

一条消息只有被ISR里的所有Follower都从Leader复制过去才会被认为已提交。这样就避免了部分数据被写进了Leader,还没来得及被任何Follower复制就宕机了,而造成数据丢失(Consumer无法消费这些数据)。而对于Producer而言,它可以选择是否等待消息commit,这可以通过request.required.acks来设置。

0---表示不进行消息接收是否成功的确认;
1---表示当Leader接收成功时确认;
-1---表示Leader和Follower都接收成功时确认;

持久性

kafka使用文件存储消息,这就直接决定kafka在性能上严重依赖文件系统的本身特性。且无论任何 OS 下,对文件系统本身的优化几乎没有可能。文件缓存/直接内存映射等是常用的手段。 因为 kafka 是对日志文件进行 append 操作,因此磁盘检索的开支是较小的;同时为了减少磁盘写入的次数,broker会将消息暂时buffer起来,当消息的个数(或尺寸)达到一定阀值时,再flush到磁盘,这样减少了磁盘IO调用的次数。

producer

指定partition

producer将会和Topic下所有partition leader保持socket连接;消息由producer直接通过socket发送到broker,中间不会经过任何"路由层".事实上,消息被路由到哪个partition上,有producer决定.比如可以采用"random""key-hash""轮询"等,如果一个topic中有多个partitions,那么在producer端实现"消息均衡分发"是必要的.

异步发送

producer.type的默认值是sync,即同步的方式。这个参数指定了在后台线程中消息的发送方式是同步的还是异步的。如果设置成异步的模式,可以运行生产者以batch的形式push数据,这样会极大的提高broker的性能,但是这样会增加丢失数据的风险。

对于异步模式,还有4个配套的参数,如下:

以batch的方式推送数据可以极大的提高处理效率,kafka producer可以将消息在内存中累计到一定数量后作为一个batch发送请求。batch的数量大小可以通过producer的参数(batch.num.messages)控制。通过增加batch的大小,可以减少网络请求和磁盘IO的次数,当然具体参数设置需要在效率和时效性方面做一个权衡。在比较新的版本中还有batch.size这个参数。

consumer

消息的顺序性

Kafka分布式的单位是partition,同一个partition用一个log文件(追加写、offset读),所以可以保证FIFO的顺序。但是在多个Partition时,不能保证Topic级别的数据有序性,除非创建Topic只指定1个partition,但这样做就磨灭kafka高吞吐量的优秀特性。

kafka为了提高Topic的并发吞吐能力,可以提高Topic的partition数,并通过设置partition的replica来保证数据高可靠。

Kafka 中发送1条消息的时候,可以指定(topic, partition, key) 3个参数,业务放使用producer插入数据时,可以控制同一Key发到同一Partition,从而保证消息有序性。一个partition的消息只能被一个consumer消费。

安装

详情参见官网http://kafka.apache.org/
安装会依赖java、zookeeper。

brew install kafka

//安装的配置文件位置
/usr/local/etc/kafka/server.properties
/usr/local/etc/kafka/zookeeper.properties

//启动zookeeper -daemon 守护模式
zookeeper-server-start  /usr/local/etc/kafka/zookeeper.properties &

//启动kafka
kafka-server-start /usr/local/etc/kafka/server.properties &

//创建topic  创建单分区单副本的 topic test:
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

//查看创建的topic
kafka-topics --list --zookeeper localhost:2181

//发送消息客户端
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 

//消费消息
kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning

参考文章:https://blog.csdn.net/gongzhiyao3739124/article/details/79688813

上一篇 下一篇

猜你喜欢

热点阅读