kafka

1. 三天掌握kafka初章—揭开kafka面纱

2020-06-07  本文已影响0人  火山_6c7b

1 kafka简介

  Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源 项目。

1.1 kafka特性

1.2 kafka应用场景

1.3 kafka核心组件

kafka结构:


kafka结构.png

1.4 kafka基本概念

kafka拓扑结构:


image

  一个典型的Kafka集群中包含若干Producer(可以是web前端产生的Page View,或者是服务器日志,系统CPU、Memory等),若干broker(Kafka支持水平扩展,一般broker数量越多,集群吞吐率越高),若干Consumer Group,以及一个Zookeeper集群。Kafka通过Zookeeper管理集群配置,选举leader,以及在Consumer Group发生变化时进行rebalance。Producer使用push模式将消息发布到broker,Consumer使用pull模式从broker订阅并消费消息。

物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处
为了实现扩展性,一个非常大的topic可以分布到多个 broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列。
partition中的每条消息都会被分配一个有序的id(offset)。kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体 (多个partition间)的顺序。
可为每个Consumer指定group name,若不指定group name则属于默认的group

2. kafka设计思想

  kafka包括producer、broker、consumer和zookeeper四部分,为了实现了消息的生产消费以及大吞吐量、高性能等特点,kafka设计了一系列关于消息的生产、备份、持久化、负载均衡、消费者组、消费、集群管理的策略,这些就是kafka的设计思想。下面我们给大家一一介绍

2.1 ConsumerGroup

  消费组是一个逻辑上的概念,它将旗下的消费者归为一类,每一个消费者只隶属于一个消费组 每个消费组都会有一个固定的名称,消费者在进行消费前需要指定其所属的消费组的名称,可以在消费者客户端通过 group.id 参数来配置,默认为空字符串。


consumer group.png

  各个consumer(consumer 线程)可以组成一个组(Consumer group ),同一partition中的每个message只能被组(Consumer group )中的一个consumer(consumer 线程)消费,如果一个message可以被多个consumer(consumer 线程)消费的话,那么这些consumer必须在不同的组。
  如果同一组内的consumer需要消费同一partition,则需要对组内消费者重新划分,将其分配到不同的consumer group内。同一partition被多个consumer消费,这些consumer都必须是顺序读取partition里面的message,新启动的consumer默认从partition队列最头端最新的地方开始阻塞的读message。
  当启动一个consumer group去消费一个topic的时候,无论topic里面有多个少个partition,无论我们consumer group里面配置了多少个consumer thread,这个consumer group下面的所有consumer thread一定会消费全部的partition;即便这个consumer group下只有一个consumer thread,那么这个consumer thread也会去消费所有的partition。因此,最优的设计就是,consumer group下的consumer thread的数量等于partition数量,这样效率是最高的。

2.2 kafka支持的消费模式

push-and-pull : Kafka中的Producer和consumer采用的是push-and-pull模式,即Producer只管向broker push消息,consumer只管从broker pull消息,两者对消息的生产和消费是异步的。

  对于消息中间件而已,一般有两种消息投递模式:点对点(P2P,Point-to-Point)模式和发布/订阅(Pub/Sub)模式。点对点模式是基于队列的,生产者将消息发送到队列,消费者从队列接收消息。发布/订阅模式定义了如何向一个内容节点发送和订阅消息,这个内容节点在Kafka中为主题(Topic)相当于一个中介,生产者发布消息到主题,而消费者消费所订阅的主题。主题使消息的订阅者和发布者互相保持独立,不需要接触就可以进行消息的传递,发布/订阅模式在消息一对多广播时使用。Kafka同时支持这两种模式,正是得益于消费者和消费组模型的契合:

2.3 kafka broker集群

  Kakfa Broker集群受Zookeeper管理,broker之间没有主从关系,各个broker在集群中地位一样,我们可以随意的增加或删除任何一个broker节点,但broker之中会选举出Controller,其他的叫Kafka Broker follower,这个过程叫做Controller在Zookeeper注册Watch。
  所有的Kafka Broker节点一起去Zookeeper上注册一个临时节点,因为只有一个Kafka Broker会注册成功,其他的都会失败,所以这个成功在Zookeeper上注册临时节点的这个Kafka Broker会成为Kafka Broker Controller,其他的Kafka broker叫Kafka Broker follower。这个Controller会监听其他的Kafka Broker的所有信息,如果这个kafka broker controller宕机了,在zookeeper上面的那个临时节点就会消失,此时所有的kafka broker又会一起去Zookeeper上注册一个临时节点,因为只有一个Kafka Broker会注册成功,其他的都会失败。

Controller对宕机broker及其partition处理过程:

3.1 从/brokers/topics/[topic]/partitions/[partition]/state读取该Partition当前的ISR
3.2 决定该Partition的新Leader。如果当前ISR中有至少一个Replica还幸存,则选择其中一个作为新Leader,新的ISR则包含当前ISR中所有幸存的Replica(选举算法的实现类似于微软的PacificA)。否则选择该Partition中任意一个幸存的Replica作为新的Leader以及ISR(该场景下可能会有潜在的数据丢失)。如果该Partition的所有Replica都宕机了,则将新的Leader设置为-1,等待恢复。
3.3 将新的Leader,ISR和新的leader_epoch及controller_epoch写入/brokers/topics/[topic]/partitions/[partition]/state。注意,该操作只有其version在3.1至3.3的过程中无变化时才会执行,否则跳转到3.1

所有Replica都不工作即Partition的Leader为-1的处理策略:
  上文提到,在ISR中至少有一个follower时,Kafka可以确保已经commit的数据不丢失,但如果某个Partition的所有Replica都宕机了,就无法保证数据不丢失了。这种情况下有两种可行的方案:

1.等待ISR中的任一个Replica“活”过来,并且选它作为Leader

2.选择第一个“活”过来的Replica(不一定是ISR中的)作为Leader

  这就需要在可用性和一致性当中作出一个简单的折衷。如果一定要等待ISR中的Replica“活”过来,那不可用的时间就可能会相对较长。而且如果ISR中的所有Replica都无法“活”过来了,或者数据都丢失了,这个Partition将永远不可用。选择第一个“活”过来的Replica作为Leader,而这个Replica不是ISR中的Replica,那即使它并不保证已经包含了所有已commit的消息,它也会成为Leader而作为consumer的数据源(前文有说明,所有读写都由Leader完成)。Kafka0.8.*使用了第二种方式。根据Kafka的文档,在以后的版本中,Kafka支持用户通过配置选择这两种方式中的一种,从而根据不同的使用场景选择高可用性还是强一致性。 unclean.leader.election.enable 参数决定使用哪种方案,默认是true,采用第二种方案。

2.4 kafka partition及replica(备份)

  partion可以看作一个有序的队列,里面的数据是储存在硬盘中的,追加式的。partition的作用就是提供分布式的扩展,一个topic可以有许多partions,多个partition可以并行处理数据,所以可以处理相当量的数据。只有partition的leader才会进行读写操作,folower仅进行复制,客户端是感知不到的。下图把kafka集群看成一个kakfa服务,仅显示leader。

topic.png
offset:
  每一条数据都有一个offset,是每一条数据在该partition中的唯一标识。各个consumer控制和设置其在该partition下消费到offset位置,这样下次可以以该offset位置开始进行消费。
offset.png
  各个consumer的offset位置默认是在某一个broker当中的topic中保存的(为防止该broker宕掉无法获取offset信息,可以配置在每个broker中都进行保存,配置文件中配置)
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3

2.5 replicas(备份)

  replica数其实就是partition的副本总数,其中包括一个leader,其他的就是copy副本。如果有N个replicas,其中一个replica为leader,其他都为follower,leader处理partition的所有读写请求,于此同时,follower会被动定期的去复制leader上的数据。

replica.png

Partition leader与follower:
  partition也有leader和follower之分。leader(图中标红)是主partition,producer写kafka的时候先写partition leader,再由partition leader push给其他的partition follower(图中标绿)。partition leader与follower的信息受Zookeeper控制,一旦partition leader所在的broker节点宕机,zookeeper会冲其他的broker的partition follower上选择follower变为parition leader。

Topic分配partition和partition replica的算法:

消息投递有关partition的机制:

2.6 ISR和AR

ISR的伸缩:

  Kafka在启动的时候会开启两个与ISR相关的定时任务,名称分别为“isr-expiration"和”isr-change-propagation".。isr-expiration任务会周期性的检测每个分区是否需要缩减其ISR集合。这个周期和“replica.lag.time.max.ms”参数有关。大小是这个参数一半。默认值为5000ms,当检测到ISR中有是失效的副本的时候,就会缩减ISR集合。如果某个分区的ISR集合发生变更, 则会将变更后的数据记录到ZooKerper对应/brokers/topics//partition//state节点中。节点中数据示例如下:

{“controller_cpoch":26,“leader”:0,“version”:1,“leader_epoch”:2,“isr”:{0,1}}

​   其中controller_epoch表示的是当前的kafka控制器epoch.leader表示当前分区的leader副本所在的broker的id编号,version表示版本号,(当前半本固定位1),leader_epoch表示当前分区的leader纪元,isr表示变更后的isr列表。

kafka扩充ISR的机制:

  随着follower副本不断进行消息同步,follower副本LEO也会逐渐后移,并且最终赶上leader副本,此时follower副本就有资格进入ISR集合,追赶上leader副本的判定准侧是此副本的LEO是否小于leader副本HW,这里并不是和leader副本LEO相比。ISR扩充之后同样会更新ZooKeeper中的/broker/topics//partition//state节点和isrChangeSet,之后的步骤就和ISR收缩的时的相同。

​  当ISR集合发生增减时,或者ISR集合中任一副本LEO发生变化时,都会影响整个分区的HW。

​  如下图所示,leader副本的LEO为9,follower副本的LEO为7,而follower2副本的LEO为6,如果判定这三个副本都处于ISR集合中,那么分区的HW为6,如果follower3已经判定失效副本被剥离出ISR集合,那么此时分区HW为leader副本和follower副本中LEO的最小值,即为7.


isr扩充机制.png

HW 是 High Watermark 的缩写,俗称高水位,水印,它标识了一个特定的消息偏移量(Offset ),消费者只能拉取到这个 Offset 之前的消息。

LEO 是 Log End Offset 的缩写, 它标识了当前日志文件中 下一条待写入消息的 Offset .
注意 :LEO 标识的是下一条待写入消息的 Offset

参考资料:

2.7 消息投递可靠性

Kafka消息发送三种方式:

Kafka消息投递提供了三种确认模式:

2.8 消息消费

​  Kafka中的Producer和consumer采用的是push-and-pull模式,即Producer只管向broker push消息,consumer只管从broker pull消息,两者对消息的生产和消费是异步的。

  consumer如何知道自己应该拉取哪一个partition。cordinator(某一个Kafka的broker)在分配consumer的时候,会选举consumer leader,后者分配每一个consumer要连接的broker,topic,partition,然后上报cordinator。然后consumer会根据自己被分配的partion去拉取数据。批量读取和单数据读取,ack机制。如果poll()时间超时,那么broker会认为consumer挂掉了,会踢掉该consumer。cordinator重新分配consumer。有时超时会抛异常,不过也会重新分配consumer。consumer的groupId机制。对于一个groupId中的consumer来说,一个partition只能由一个consumer来消费。即不可能多个consumer消费1个partition。

  消息传输一致性语义:

设置enable.auto.commit为ture
设置 auto.commit.interval.ms为一个较小的时间间隔.
client不要调用commitSync(),kafka在特定的时间间隔内自动提交。

public void mostOnce(){
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("group.id", "test-1");
    props.put("enable.auto.commit", "true");
    props.put("auto.commit.interval.ms", "1000");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Arrays.asList("my-topic", "bar"));
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records) {
            process(record);
        }
    }
}

设置enable.auto.commit为false
client调用commitSync(),增加消息偏移;

如果要实现这种方式,必须自己控制消息的offset,自己记录一下当前的offset,对消息的处理和offset的移动必须保持在同一个事务中,例如在同一个事务中,把消息处理的结果存到mysql数据库同时更新此时的消息的偏移。
设置enable.auto.commit为false
保存ConsumerRecord中的offset到数据库
当partition分区发生变化的时候需要rebalance,有以下几个事件会触发分区变化

1 consumer订阅的topic中的分区大小发生变化
2 topic被创建或者被删除
3 consuer所在group中有个成员挂了
4 新的consumer通过调用join加入了group

此时 consumer通过实现ConsumerRebalanceListener接口,捕捉这些事件,对偏移量进行处理。
consumer通过调用seek(TopicPartition, long)方法,移动到指定的分区的偏移位置。

顺序保证性:
由于kafka的producer的写message与consumer去读message都是顺序的读写,保证了高效的性能。

参考:Kafka client 消息接收的三种模式

上一篇下一篇

猜你喜欢

热点阅读