1. 三天掌握kafka初章—揭开kafka面纱
1 kafka简介
Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源 项目。
1.1 kafka特性
- 高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒,每个topic可以分多个partition, consumer group 对partition进行consume操作。
- 可扩展性:kafka集群支持热扩展
- 持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失
- 容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)
- 高并发:支持数千个客户端同时读写
1.2 kafka应用场景
- 日志收集:一个公司可以用Kafka可以收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、Hbase、Solr等。
- 消息系统:解耦和生产者和消费者、缓存消息等。
- 用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到hadoop、数据仓库中做离线分析和挖掘。
- 运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。
- 流式处理:比如spark streaming和storm
- 事件源
1.3 kafka核心组件
kafka结构:
kafka结构.png
- Producer:消息生产者,产生的消息将会被发送到某个topic
- Consumer:消息消费者,消费的消息内容来自某个topic
- Topic:消息根据topic进行归类,topic其本质是一个目录,即将同一主题消息归类到同一个目录
- Broker:每一个kafka实例(或者说每台kafka服务器节点)就是一个broker,一个broker可以有多个topic
- Zookeeper:zookeeper集群不属于kafka内的组件,但kafka依赖zookeeper集群保存meta信息,同时对broker分布式集群管理,所以在此做声明其重要性。
1.4 kafka基本概念
kafka拓扑结构:
image
一个典型的Kafka集群中包含若干Producer(可以是web前端产生的Page View,或者是服务器日志,系统CPU、Memory等),若干broker(Kafka支持水平扩展,一般broker数量越多,集群吞吐率越高),若干Consumer Group,以及一个Zookeeper集群。Kafka通过Zookeeper管理集群配置,选举leader,以及在Consumer Group发生变化时进行rebalance。Producer使用push模式将消息发布到broker,Consumer使用pull模式从broker订阅并消费消息。
- Broker:一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic,这种服务器被称为broker 代理、中介者
- Topic:每条发布到Kafka集群的消息都有一个类别,这个类别被称为 Topic 主题
物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处
- Partition:Parition是物理上的概念,每个Topic包含一个或多个Partition 分割、分区
为了实现扩展性,一个非常大的topic可以分布到多个 broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列。
partition中的每条消息都会被分配一个有序的id(offset)。kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体 (多个partition间)的顺序。
- Producer:负责发布消息到Kafka broker
- Consumer:消息消费者,向Kafka broker读取消息的客户端。
- Consumer Group:每个Consumer属于一个特定的Consumer Group
可为每个Consumer指定group name,若不指定group name则属于默认的group
2. kafka设计思想
kafka包括producer、broker、consumer和zookeeper四部分,为了实现了消息的生产消费以及大吞吐量、高性能等特点,kafka设计了一系列关于消息的生产、备份、持久化、负载均衡、消费者组、消费、集群管理的策略,这些就是kafka的设计思想。下面我们给大家一一介绍
2.1 ConsumerGroup
消费组是一个逻辑上的概念,它将旗下的消费者归为一类,每一个消费者只隶属于一个消费组 每个消费组都会有一个固定的名称,消费者在进行消费前需要指定其所属的消费组的名称,可以在消费者客户端通过 group.id 参数来配置,默认为空字符串。
consumer group.png
各个consumer(consumer 线程)可以组成一个组(Consumer group ),同一partition中的每个message只能被组(Consumer group )中的一个consumer(consumer 线程)消费,如果一个message可以被多个consumer(consumer 线程)消费的话,那么这些consumer必须在不同的组。
如果同一组内的consumer需要消费同一partition,则需要对组内消费者重新划分,将其分配到不同的consumer group内。同一partition被多个consumer消费,这些consumer都必须是顺序读取partition里面的message,新启动的consumer默认从partition队列最头端最新的地方开始阻塞的读message。
当启动一个consumer group去消费一个topic的时候,无论topic里面有多个少个partition,无论我们consumer group里面配置了多少个consumer thread,这个consumer group下面的所有consumer thread一定会消费全部的partition;即便这个consumer group下只有一个consumer thread,那么这个consumer thread也会去消费所有的partition。因此,最优的设计就是,consumer group下的consumer thread的数量等于partition数量,这样效率是最高的。
2.2 kafka支持的消费模式
push-and-pull : Kafka中的Producer和consumer采用的是push-and-pull模式,即Producer只管向broker push消息,consumer只管从broker pull消息,两者对消息的生产和消费是异步的。
对于消息中间件而已,一般有两种消息投递模式:点对点(P2P,Point-to-Point)模式和发布/订阅(Pub/Sub)模式。点对点模式是基于队列的,生产者将消息发送到队列,消费者从队列接收消息。发布/订阅模式定义了如何向一个内容节点发送和订阅消息,这个内容节点在Kafka中为主题(Topic)相当于一个中介,生产者发布消息到主题,而消费者消费所订阅的主题。主题使消息的订阅者和发布者互相保持独立,不需要接触就可以进行消息的传递,发布/订阅模式在消息一对多广播时使用。Kafka同时支持这两种模式,正是得益于消费者和消费组模型的契合:
- 如果所有的消费者都属于同一个消费组,那么消息就会被均衡的投递给每一个消费者,也就是一个消费组内每条消息只会被一个消费者处理,这就相当于点对点模式。
- 如果所有的消费者都不属于一个消费组,那么消息就会投递给订阅该主题的每个消费组中的一个消费者,这就相当于发布/订阅模式。
2.3 kafka broker集群
Kakfa Broker集群受Zookeeper管理,broker之间没有主从关系,各个broker在集群中地位一样,我们可以随意的增加或删除任何一个broker节点,但broker之中会选举出Controller,其他的叫Kafka Broker follower,这个过程叫做Controller在Zookeeper注册Watch。
所有的Kafka Broker节点一起去Zookeeper上注册一个临时节点,因为只有一个Kafka Broker会注册成功,其他的都会失败,所以这个成功在Zookeeper上注册临时节点的这个Kafka Broker会成为Kafka Broker Controller,其他的Kafka broker叫Kafka Broker follower。这个Controller会监听其他的Kafka Broker的所有信息,如果这个kafka broker controller宕机了,在zookeeper上面的那个临时节点就会消失,此时所有的kafka broker又会一起去Zookeeper上注册一个临时节点,因为只有一个Kafka Broker会注册成功,其他的都会失败。
Controller对宕机broker及其partition处理过程:
-
1.Controller在Zookeeper注册Watch,一旦有Broker宕机(这是用宕机代表任何让系统认为其die的情景,包括但不限于机器断电,网络不可用,GC导致的Stop The World,进程crash等),其在Zookeeper对应的znode会自动被删除,Zookeeper会fire Controller注册的watch,Controller读取最新的幸存的Broker
-
2.Controller决定set_p,该集合包含了宕机的所有Broker上的所有Partition
-
3.对set_p中的每一个Partition
3.1 从/brokers/topics/[topic]/partitions/[partition]/state读取该Partition当前的ISR
3.2 决定该Partition的新Leader。如果当前ISR中有至少一个Replica还幸存,则选择其中一个作为新Leader,新的ISR则包含当前ISR中所有幸存的Replica(选举算法的实现类似于微软的PacificA)。否则选择该Partition中任意一个幸存的Replica作为新的Leader以及ISR(该场景下可能会有潜在的数据丢失)。如果该Partition的所有Replica都宕机了,则将新的Leader设置为-1,等待恢复。
3.3 将新的Leader,ISR和新的leader_epoch及controller_epoch写入/brokers/topics/[topic]/partitions/[partition]/state。注意,该操作只有其version在3.1至3.3的过程中无变化时才会执行,否则跳转到3.1
- 直接通过RPC向set_p相关的Broker发送LeaderAndISRRequest命令。Controller可以在一个RPC操作中发送多个命令从而提高效率。
所有Replica都不工作即Partition的Leader为-1的处理策略:
上文提到,在ISR中至少有一个follower时,Kafka可以确保已经commit的数据不丢失,但如果某个Partition的所有Replica都宕机了,就无法保证数据不丢失了。这种情况下有两种可行的方案:
1.等待ISR中的任一个Replica“活”过来,并且选它作为Leader
2.选择第一个“活”过来的Replica(不一定是ISR中的)作为Leader
这就需要在可用性和一致性当中作出一个简单的折衷。如果一定要等待ISR中的Replica“活”过来,那不可用的时间就可能会相对较长。而且如果ISR中的所有Replica都无法“活”过来了,或者数据都丢失了,这个Partition将永远不可用。选择第一个“活”过来的Replica作为Leader,而这个Replica不是ISR中的Replica,那即使它并不保证已经包含了所有已commit的消息,它也会成为Leader而作为consumer的数据源(前文有说明,所有读写都由Leader完成)。Kafka0.8.*使用了第二种方式。根据Kafka的文档,在以后的版本中,Kafka支持用户通过配置选择这两种方式中的一种,从而根据不同的使用场景选择高可用性还是强一致性。 unclean.leader.election.enable 参数决定使用哪种方案,默认是true,采用第二种方案。
2.4 kafka partition及replica(备份)
partion可以看作一个有序的队列,里面的数据是储存在硬盘中的,追加式的。partition的作用就是提供分布式的扩展,一个topic可以有许多partions,多个partition可以并行处理数据,所以可以处理相当量的数据。只有partition的leader才会进行读写操作,folower仅进行复制,客户端是感知不到的。下图把kafka集群看成一个kakfa服务,仅显示leader。
offset:
每一条数据都有一个offset,是每一条数据在该partition中的唯一标识。各个consumer控制和设置其在该partition下消费到offset位置,这样下次可以以该offset位置开始进行消费。
offset.png
各个consumer的offset位置默认是在某一个broker当中的topic中保存的(为防止该broker宕掉无法获取offset信息,可以配置在每个broker中都进行保存,配置文件中配置)
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
2.5 replicas(备份)
replica数其实就是partition的副本总数,其中包括一个leader,其他的就是copy副本。如果有N个replicas,其中一个replica为leader,其他都为follower,leader处理partition的所有读写请求,于此同时,follower会被动定期的去复制leader上的数据。
replica.pngPartition leader与follower:
partition也有leader和follower之分。leader(图中标红)是主partition,producer写kafka的时候先写partition leader,再由partition leader push给其他的partition follower(图中标绿)。partition leader与follower的信息受Zookeeper控制,一旦partition leader所在的broker节点宕机,zookeeper会冲其他的broker的partition follower上选择follower变为parition leader。
Topic分配partition和partition replica的算法:
- 1.将Broker(size=n)和待分配的Partition排序。
- 2.将第i个Partition分配到第(i%n)个Broker上。
- 3.将第i个Partition的第j个Replica分配到第((i + j) % n)个Broker上。如图所示,topic1-part0(红色)即第0个partition的第1个replica分配在broker 1上,topic1-part0(broker 2上绿色)即第0个partition第2个replica。
消息投递有关partition的机制:
- producer先把message发送到partition leader,再由leader发送给其他partition follower。
- 向Producer发送ACK前需要保证有多少个Replica已经收到该消息:根据ack配的个数而定。
- 怎样处理某个Replica不工作的情况:如果这个部工作的partition replica不在ack列表中,就是producer在发送消息到partition leader上,partition leader向partition follower发送message没有响应而已,这个不会影响整个系统,也不会有什么问题。如果这个不工作的partition replica在ack列表中的话,producer发送的message的时候会等待这个不工作的partition replca写message成功,但是会等到time out,然后返回失败因为某个ack列表中的partition replica没有响应,此时kafka会自动的把这个部工作的partition replica从ack列表中移除,以后的producer发送message的时候就不会有这个ack列表下的这个部工作的partition replica了。
- 怎样处理Failed Replica恢复回来的情况:如果这个partition replica之前不在ack列表中,那么启动后重新受Zookeeper管理即可,之后producer发送message的时候,partition leader会继续发送message到这个partition follower上。如果这个partition replica之前在ack列表中,此时重启后,需要把这个partition replica再手动加到ack列表中。(ack列表是手动添加的,出现某个部工作的partition replica的时候自动从ack列表中移除的)
2.6 ISR和AR
- 分区中的所有副本统称为AR(Assigned Repllicas)。
- 所有与leader副本保持一定程度同步的副本(包括Leader)组成ISR(In-Sync Replicas),ISR集合是AR集合中的一个子集。
- 消息会先发送到leader副本,然后follower副本才能从leader副本中拉取消息进行同步,同步期间内follower副本相对于leader副本而言会有一定程度的滞后。前面所说的“一定程度”是指可以忍受的滞后范围,这个范围可以通过参数进行配置。
- 与leader副本同步滞后过多的副本(不包括leader)副本,组成OSR(Out-Sync Relipcas),由此可见:AR=ISR+OSR。
- Leader副本负责维护和跟踪ISR集合中所有的follower副本的滞后状态,当follower副本落后太多或者失效时,leader副本会吧它从ISR集合中剔除(数量滞后和时间滞后两个维度,replica.lag.time.max.ms和replica.lag.max.message可配置)。如果OSR集合中follower副本“追上”了Leader副本,之后再ISR集合中的副本才有资格被选举为leader,而在OSR集合中的副本则没有机会(这个原则可以通过修改对应的参数配置来改变)
ISR的伸缩:
Kafka在启动的时候会开启两个与ISR相关的定时任务,名称分别为“isr-expiration"和”isr-change-propagation".。isr-expiration任务会周期性的检测每个分区是否需要缩减其ISR集合。这个周期和“replica.lag.time.max.ms”参数有关。大小是这个参数一半。默认值为5000ms,当检测到ISR中有是失效的副本的时候,就会缩减ISR集合。如果某个分区的ISR集合发生变更, 则会将变更后的数据记录到ZooKerper对应/brokers/topics//partition//state节点中。节点中数据示例如下:
{“controller_cpoch":26,“leader”:0,“version”:1,“leader_epoch”:2,“isr”:{0,1}}
其中controller_epoch表示的是当前的kafka控制器epoch.leader表示当前分区的leader副本所在的broker的id编号,version表示版本号,(当前半本固定位1),leader_epoch表示当前分区的leader纪元,isr表示变更后的isr列表。
kafka扩充ISR的机制:
随着follower副本不断进行消息同步,follower副本LEO也会逐渐后移,并且最终赶上leader副本,此时follower副本就有资格进入ISR集合,追赶上leader副本的判定准侧是此副本的LEO是否小于leader副本HW,这里并不是和leader副本LEO相比。ISR扩充之后同样会更新ZooKeeper中的/broker/topics//partition//state节点和isrChangeSet,之后的步骤就和ISR收缩的时的相同。
当ISR集合发生增减时,或者ISR集合中任一副本LEO发生变化时,都会影响整个分区的HW。
如下图所示,leader副本的LEO为9,follower副本的LEO为7,而follower2副本的LEO为6,如果判定这三个副本都处于ISR集合中,那么分区的HW为6,如果follower3已经判定失效副本被剥离出ISR集合,那么此时分区HW为leader副本和follower副本中LEO的最小值,即为7.
isr扩充机制.png
HW 是 High Watermark 的缩写,俗称高水位,水印,它标识了一个特定的消息偏移量(Offset ),消费者只能拉取到这个 Offset 之前的消息。
LEO 是 Log End Offset 的缩写, 它标识了当前日志文件中 下一条待写入消息的 Offset .
注意 :LEO 标识的是下一条待写入消息的 Offset
参考资料:
2.7 消息投递可靠性
Kafka消息发送三种方式:
- 发送并忘记(不关心消息是否正常到达,对返回结果不做任何判断处理):
发送并忘记的方式本质上也是一种异步的方式,只是它不会获取消息发送的返回结果,这种方式的吞吐量是最高的,但是无法保证消息的可靠性
- 发送并忘记(不关心消息是否正常到达,对返回结果不做任何判断处理):
- 同步发送(通过get方法等待Kafka的响应,判断消息是否发送成功):
以同步的方式发送消息时,一条一条的发送,对每条消息返回的结果判断, 可以明确地知道每条消息的发送情况,但是由于同步的方式会阻塞,只有当消息通过get返回future对象时,才会继续下一条消息的发送
- 同步发送(通过get方法等待Kafka的响应,判断消息是否发送成功):
- 异步发送+回调函数(消息以异步的方式发送,通过回调函数返回消息发送成功/失败):
在调用send方法发送消息的同时,指定一个回调函数,服务器在返回响应时会调用该回调函数,通过回调函数能够对异常情况进行处理,当调用了回调函数时,只有回调函数执行完毕生产者才会结束,否则一直会阻塞
- 异步发送+回调函数(消息以异步的方式发送,通过回调函数返回消息发送成功/失败):
Kafka消息投递提供了三种确认模式:
- ack=0,生产者在成功写入消息之前不会等待任何来自服务器的响应。
- 当ack=1,表示producer写partition leader成功后,broker就返回成功,无论其他的partition follower是否写成功。
- 当ack=2,表示producer写partition leader和其他一个follower成功的时候,broker就返回成功,无论其他的partition follower是否写成功。
- 当ack=-1或all [parition的数量]的时候,表示只有producer全部写成功的时候,才算成功,kafka broker才返回成功信息。这里需要注意的是,如果ack=1的时候,一旦有个broker宕机导致partition的follower和leader切换,会导致丢数据。
2.8 消息消费
Kafka中的Producer和consumer采用的是push-and-pull模式,即Producer只管向broker push消息,consumer只管从broker pull消息,两者对消息的生产和消费是异步的。
consumer如何知道自己应该拉取哪一个partition。cordinator(某一个Kafka的broker)在分配consumer的时候,会选举consumer leader,后者分配每一个consumer要连接的broker,topic,partition,然后上报cordinator。然后consumer会根据自己被分配的partion去拉取数据。批量读取和单数据读取,ack机制。如果poll()时间超时,那么broker会认为consumer挂掉了,会踢掉该consumer。cordinator重新分配consumer。有时超时会抛异常,不过也会重新分配consumer。consumer的groupId机制。对于一个groupId中的consumer来说,一个partition只能由一个consumer来消费。即不可能多个consumer消费1个partition。
消息传输一致性语义:
- at most once(至多一次): 消费者fetch消息,然后保存offset,然后处理消息;当client保存offset之后,但是在消息处理过程中consumer进程失效(crash),导致部分消息未能继续处理.那么此后可能其他consumer会接管,但是因为offset已经提前保存,那么新的consumer将不能fetch到offset之前的消息(尽管它们尚没有被处理),这就是"at most once".
设置enable.auto.commit为ture
设置 auto.commit.interval.ms为一个较小的时间间隔.
client不要调用commitSync(),kafka在特定的时间间隔内自动提交。
public void mostOnce(){
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-1");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
process(record);
}
}
}
- at least once(至少一次): 消费者fetch消息,然后处理消息,然后保存offset.如果消息处理成功之后,但是在保存offset阶段zookeeper异常或者consumer失效,导致保存offset操作未能执行成功,这就导致接下来再次fetch时可能获得上次已经处理过的消息,这就是"at least once".
设置enable.auto.commit为false
client调用commitSync(),增加消息偏移;
- exactly once(恰好1次):最少1次+消费者的输出中额外增加已处理消息最大编号:由于已处理消息最大编号的存在,不会出现重复处理消息的情况。
如果要实现这种方式,必须自己控制消息的offset,自己记录一下当前的offset,对消息的处理和offset的移动必须保持在同一个事务中,例如在同一个事务中,把消息处理的结果存到mysql数据库同时更新此时的消息的偏移。
设置enable.auto.commit为false
保存ConsumerRecord中的offset到数据库
当partition分区发生变化的时候需要rebalance,有以下几个事件会触发分区变化1 consumer订阅的topic中的分区大小发生变化
2 topic被创建或者被删除
3 consuer所在group中有个成员挂了
4 新的consumer通过调用join加入了group
此时 consumer通过实现ConsumerRebalanceListener接口,捕捉这些事件,对偏移量进行处理。
consumer通过调用seek(TopicPartition, long)方法,移动到指定的分区的偏移位置。
顺序保证性:
由于kafka的producer的写message与consumer去读message都是顺序的读写,保证了高效的性能。