kafkakafka技术总结

Kafka 系统学习

2021-04-11  本文已影响0人  向梦而来

Apache Kafka 是什么?

Kafka 是基于发布与订阅消息系统。它最初由 LinkedIn 公司开发,之后成为 Apache 项目的一部分。Kafka 是一个分布式的,可分区的,冗余备份的持久性的日志服务。它主要用于处理活跃的流式数据。

在大数据系统中,常常会碰到一个问题,整个大数据是由各个子系统组成,数据需要在各个子系统中高性能、低延迟的不停流转。传统的企业消息系统并不是非常适合大规模的数据处理。为了同时搞定在线应用(消息)和离线应用(数据文件、日志),Kafka 就出现了。Kafka 可以起到两个作用:

Kafka 的主要特点?

聊聊 Kafka 的设计要点?

1)吞吐量

高吞吐是 Kafka 需要实现的核心目标之一,为此 kafka 做了以下一些设计:

2)负载均衡

3)拉取系统

由于 Kafka Broker 会持久化数据,Broker 没有内存压力,因此, Consumer 非常适合采取 pull 的方式消费数据,具有以下几点好处:

4)可扩展性

通过 Zookeeper 管理 Broker 与 Consumer 的动态加入与离开。

Kafka 的架构是怎么样的?

Kafka 架构图
Kafka 的整体架构非常简单,是分布式架构,Producer、Broker 和Consumer 都可以有多个。

几个重要的基本概念:

单纯角色来说,Kafka 和 RocketMQ 是基本一致的。比较明显的差异是:

RocketMQ 从 Kafka 演化而来。

Kafka 为什么要将 Topic 进行分区?

为了负载均衡,从而能够水平拓展。

所以,Partiton 机制可以极大的提高吞吐量,并且使得系统具备良好的水平扩展能力。

Kafka 的应用场景有哪些?

Kafka 的应用场景

1)消息队列

比起大多数的消息系统来说,Kafka 有更好的吞吐量,内置的分区,冗余及容错性,这让 Kafka 成为了一个很好的大规模消息处理应用的解决方案。消息系统一般吞吐量相对较低,但是需要更小的端到端延时,并常常依赖于 Kafka 提供的强大的持久性保障。在这个领域,Kafka 足以媲美传统消息系统,如 ActiveMQ 或 RabbitMQ 。

2)行为跟踪

Kafka 的另一个应用场景,是跟踪用户浏览页面、搜索及其他行为,以发布订阅的模式实时记录到对应的 Topic 里。那么这些结果被订阅者拿到后,就可以做进一步的实时处理,或实时监控,或放到 Hadoop / 离线数据仓库里处理。

3)元信息监控

作为操作记录的监控模块来使用,即汇集记录一些操作信息,可以理解为运维性质的数据监控吧。

4)日志收集

日志收集方面,其实开源产品有很多,包括 Scribe、Apache Flume 。很多人使用 Kafka 代替日志聚合(log aggregation)。日志聚合一般来说是从服务器上收集日志文件,然后放到一个集中的位置(文件服务器或 HDFS)进行处理。

然而, Kafka 忽略掉文件的细节,将其更清晰地抽象成一个个日志或事件的消息流。这就让 Kafka 处理过程延迟更低,更容易支持多数据源和分布式数据处理。比起以日志为中心的系统比如 Scribe 或者 Flume 来说,Kafka 提供同样高效的性能和因为复制导致的更高的耐用性保证,以及更低的端到端延迟。

5)流处理

这个场景可能比较多,也很好理解。保存收集流数据,以提供之后对接的 Storm 或其他流式计算框架进行处理。很多用户会将那些从原始 Topic 来的数据进行阶段性处理,汇总,扩充或者以其他的方式转换到新的 Topic 下再继续后面的处理。

例如一个文章推荐的处理流程,可能是先从 RSS 数据源中抓取文章的内容,然后将其丢入一个叫做“文章”的 Topic 中。后续操作可能是需要对这个内容进行清理,比如回复正常数据或者删除重复数据,最后再将内容匹配的结果返还给用户。这就在一个独立的 Topic 之外,产生了一系列的实时数据处理的流程。Strom 和 Samza 是非常著名的实现这种类型数据转换的框架。

6)事件源

事件源,是一种应用程序设计的方式。该方式的状态转移被记录为按时间顺序排序的记录序列。Kafka 可以存储大量的日志数据,这使得它成为一个对这种方式的应用来说绝佳的后台。比如动态汇总(News feed)。

7)持久性日志(Commit Log)

Kafka 可以为一种外部的持久性日志的分布式系统提供服务。这种日志可以在节点间备份数据,并为故障节点数据回复提供一种重新同步的机制。Kafka 中日志压缩功能为这种用法提供了条件。在这种用法中,Kafka 类似于 Apache BookKeeper 项目。

Kafka 消息发送和消费的简化流程是什么?

image.png

1)Producer 发送消息

Producer 采用 push 模式将消息发布到 Broker,每条消息都被 append 到 Patition 中,属于顺序写磁盘(顺序写磁盘效率比随机写内存要高,保障 Kafka 吞吐率)。Producer 发送消息到 Broker 时,会根据分区算法选择将其存储到哪一个 Partition 。

其路由机制为:

写入流程:

2)Broker 存储消息

物理上把 Topic 分成一个或多个 Patition,每个 Patition 物理上对应一个文件夹(该文件夹存储该 Patition 的所有消息和索引文件)。

3)Consumer 消费消息

high-level Consumer API 提供了 consumer group 的语义,一个消息只能被 group 内的一个 Consumer 所消费,且 Consumer 消费消息时不关注 offset ,最后一个 offset 由 ZooKeeper 保存(下次消费时,该 group 中的 Consumer 将从 offset 记录的位置开始消费)。

注意:

Consumer 采用 pull 模式从 Broker 中读取数据。

Kafka Producer 有哪些发送模式?

Kafka 的发送模式由 Producer 端的配置参数 producer.type来设置。

对于异步模式,还有 4 个配套的参数,如下:

image.png

Kafka Consumer 是否可以消费指定的分区消息?

Consumer 消费消息时,向 Broker 发出“fetch”请求去消费特定分区的消息,Consumer 指定消息在日志中的偏移量(offset),就可以消费从这个位置开始的消息,Consumer 拥有了 offset 的控制权,可以向后回滚去重新消费之前的消息,这是很有意义的。

Kafka 的 high-level API 和 low-level API 的区别?

High Level API

Low Level API

Low-level API 也就是 Simple Consumer API ,实际上非常复杂。

Kafka 的NIO网络通信模型

Kafka的网络通信模型是基于NIO的Reactor多线程模型来设计的。这里先引用Kafka源码中注释的一段话:

An NIO socket server. The threading model is
1 Acceptor thread that handles new connections.
Acceptor has N Processor threads that each have their own selector and read requests from sockets.
M Handler threads that handle requests and produce responses back to the processor threads for writing.

Kafka的网络通信层模型,主要采用了1(1个Acceptor线程)+N(N个Processor线程)+M(M个业务处理线程)。下面的表格简要的列举了下

线程数 线程名 线程具体说明
1 kafka-socket-acceptor_%x Acceptor线程,负责监听Client端发起的请求
N kafka-network-thread_%d Processor线程,负责对Socket进行读写
M kafka-request-handler-_%d Worker线程,处理具体的业务逻辑并生成Response返回

Kafka网络通信层的完整框架图如下图所示:

image

Kafka的网络通信层框架结构有几个重要概念:
(1)Acceptor:1个接收线程,负责监听新的连接请求,同时注册OP_ACCEPT 事件,将新的连接按照"round robin"方式交给对应的 Processor 线程处理;
(2)Processor:N个处理器线程,其中每个 Processor 都有自己的 selector,它会向 Acceptor 分配的 SocketChannel 注册相应的 OP_READ 事件,N 的大小由“num.networker.threads”决定;
(3)KafkaRequestHandler:M个请求处理线程,包含在线程池—KafkaRequestHandlerPool内部,从RequestChannel的全局请求队列—requestQueue中获取请求数据并交给KafkaApis处理,M的大小由“num.io.threads”决定;
(4)RequestChannel:其为Kafka服务端的请求通道,该数据结构中包含了一个全局的请求队列 requestQueue和多个与Processor处理器相对应的响应队列responseQueue,提供给Processor与请求处理线程KafkaRequestHandler和KafkaApis交换数据的地方。
(5)NetworkClient:其底层是对 Java NIO 进行相应的封装,位于Kafka的网络接口层。Kafka消息生产者对象—KafkaProducer的send方法主要调用NetworkClient完成消息发送;
(6)SocketServer:其是一个NIO的服务,它同时启动一个Acceptor接收线程和多个Processor处理器线程。提供了一种典型的Reactor多线程模式,将接收客户端请求和处理请求相分离;
(7)KafkaServer:代表了一个Kafka Broker的实例;其startup方法为实例启动的入口;
(8)KafkaApis:Kafka的业务逻辑处理Api,负责处理不同类型的请求;比如“发送消息”“获取消息偏移量—offset”“处理心跳请求”等;

Kafka网络通信层的设计与具体实现

结合Kafka网络通信层的源码来分析其设计与实现,这里主要详细介绍网络通信层的几个重要元素—SocketServer、Acceptor、Processor、RequestChannel和KafkaRequestHandler。本文分析的源码部分均基于Kafka的0.11.0版本。

1、SocketServer

SocketServer是接收客户端Socket请求连接、处理请求并返回处理结果的核心类,Acceptor及Processor的初始化、处理逻辑都是在这里实现的。在KafkaServer实例启动时会调用其startup的初始化方法,会初始化1个 Acceptor和N个Processor线程(每个EndPoint都会初始化,一般来说一个Server只会设置一个端口),其实现如下:

def startup() {
    this.synchronized {

      connectionQuotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides)

      val sendBufferSize = config.socketSendBufferBytes
      val recvBufferSize = config.socketReceiveBufferBytes
      val brokerId = config.brokerId

      var processorBeginIndex = 0
      // 一个broker一般只设置一个端口
      config.listeners.foreach { endpoint =>
        val listenerName = endpoint.listenerName
        val securityProtocol = endpoint.securityProtocol
        val processorEndIndex = processorBeginIndex + numProcessorThreads
        //N 个 processor
        for (i <- processorBeginIndex until processorEndIndex)
          processors(i) = newProcessor(i, connectionQuotas, listenerName, securityProtocol, memoryPool)
        //1个 Acceptor
        val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId,
          processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas)
        acceptors.put(endpoint, acceptor)
        KafkaThread.nonDaemon(s"kafka-socket-acceptor-$listenerName-$securityProtocol-${endpoint.port}", acceptor).start()
        acceptor.awaitStartup()

        processorBeginIndex = processorEndIndex
      }
    }

2、Acceptor

Acceptor是一个继承自抽象类AbstractServerThread的线程类。Acceptor的主要任务是监听并且接收客户端的请求,同时建立数据传输通道—SocketChannel,然后以轮询的方式交给一个后端的Processor线程处理(具体的方式是添加socketChannel至并发队列并唤醒Processor线程处理)。
在该线程类中主要可以关注以下两个重要的变量:
(1),nioSelector:通过NSelector.open()方法创建的变量,封装了JAVA NIO Selector的相关操作;
(2),serverChannel:用于监听端口的服务端Socket套接字对象;
下面来看下Acceptor主要的run方法的源码:

def run() {
    //首先注册OP_ACCEPT事件
    serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
    startupComplete()
    try {
      var currentProcessor = 0
      //以轮询方式查询并等待关注的事件发生
      while (isRunning) {
        try {
          val ready = nioSelector.select(500)
          if (ready > 0) {
            val keys = nioSelector.selectedKeys()
            val iter = keys.iterator()
            while (iter.hasNext && isRunning) {
              try {
                val key = iter.next
                iter.remove()
                if (key.isAcceptable)
                  //如果事件发生则调用accept方法对OP_ACCEPT事件处理
                  accept(key, processors(currentProcessor))
                else
                  throw new IllegalStateException("Unrecognized key state for acceptor thread.")
                //轮询算法
                // round robin to the next processor thread
                currentProcessor = (currentProcessor + 1) % processors.length
              } catch {
                case e: Throwable => error("Error while accepting connection", e)
              }
            }
          }
        }
       //代码省略
  }

  def accept(key: SelectionKey, processor: Processor) {
    val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
    val socketChannel = serverSocketChannel.accept()
    try {
      connectionQuotas.inc(socketChannel.socket().getInetAddress)
      socketChannel.configureBlocking(false)
      socketChannel.socket().setTcpNoDelay(true)
      socketChannel.socket().setKeepAlive(true)
      if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
        socketChannel.socket().setSendBufferSize(sendBufferSize)

      processor.accept(socketChannel)
    } catch {
        //省略部分代码
    }
  }

  def accept(socketChannel: SocketChannel) {
    newConnections.add(socketChannel)
    wakeup()
  }

在上面源码中可以看到,Acceptor线程启动后,首先会向用于监听端口的服务端套接字对象—ServerSocketChannel上注册OP_ACCEPT 事件。然后以轮询的方式等待所关注的事件发生。如果该事件发生,则调用accept()方法对OP_ACCEPT事件进行处理。这里,Processor是通过round robin方法选择的,这样可以保证后面多个Processor线程的负载基本均匀。
Acceptor的accept()方法的作用主要如下:
(1)通过SelectionKey取得与之对应的serverSocketChannel实例,并调用它的accept()方法与客户端建立连接;
(2)调用connectionQuotas.inc()方法增加连接统计计数;并同时设置第(1)步中创建返回的socketChannel属性(如sendBufferSize、KeepAlive、TcpNoDelay、configureBlocking等)
(3)将socketChannel交给processor.accept()方法进行处理。这里主要是将socketChannel加入Processor处理器的并发队列newConnections队列中,然后唤醒Processor线程从队列中获取socketChannel并处理。其中,newConnections会被Acceptor线程和Processor线程并发访问操作,所以newConnections是ConcurrentLinkedQueue队列(一个基于链接节点的无界线程安全队列)

3、Processor

Processor同Acceptor一样,也是一个线程类,继承了抽象类AbstractServerThread。其主要是从客户端的请求中读取数据和将KafkaRequestHandler处理完响应结果返回给客户端。在该线程类中主要关注以下几个重要的变量:
(1)newConnections:在上面的Acceptor一节中已经提到过,它是一种ConcurrentLinkedQueue[SocketChannel]类型的队列,用于保存新连接交由Processor处理的socketChannel;
(2)inflightResponses:是一个Map[String, RequestChannel.Response]类型的集合,用于记录尚未发送的响应;
(3)selector:是一个类型为KSelector变量,用于管理网络连接;
下面先给出Processor处理器线程run方法执行的流程图:

image

从上面的流程图中能够可以看出Processor处理器线程在其主流程中主要完成了这样子几步操作:
(1)处理newConnections队列中的socketChannel。遍历取出队列中的每个socketChannel并将其在selector上注册OP_READ事件;
(2)处理RequestChannel中与当前Processor对应响应队列中的Response。在这一步中会根据responseAction的类型(NoOpAction/SendAction/CloseConnectionAction)进行判断,若为“NoOpAction”,表示该连接对应的请求无需响应;若为“SendAction”,表示该Response需要发送给客户端,则会通过“selector.send”注册OP_WRITE事件,并且将该Response从responseQueue响应队列中移至inflightResponses集合中;“CloseConnectionAction”,表示该连接是要关闭的;
(3)调用selector.poll()方法进行处理。该方法底层即为调用nioSelector.select()方法进行处理。
(4)处理已接受完成的数据包队列—completedReceives。在processCompletedReceives方法中调用“requestChannel.sendRequest”方法将请求Request添加至requestChannel的全局请求队列—requestQueue中,等待KafkaRequestHandler来处理。同时,调用“selector.mute”方法取消与该请求对应的连接通道上的OP_READ事件;
(5)处理已发送完的队列—completedSends。当已经完成将response发送给客户端,则将其从inflightResponses移除,同时通过调用“selector.unmute”方法为对应的连接通道重新注册OP_READ事件;
(6)处理断开连接的队列。将该response从inflightResponses集合中移除,同时将connectionQuotas统计计数减1;

4、RequestChannel

在Kafka的网络通信层中,RequestChannel为Processor处理器线程与KafkaRequestHandler线程之间的数据交换提供了一个数据缓冲区,是通信过程中Request和Response缓存的地方。因此,其作用就是在通信中起到了一个数据缓冲队列的作用。Processor线程将读取到的请求添加至RequestChannel的全局请求队列—requestQueue中;KafkaRequestHandler线程从请求队列中获取并处理,处理完以后将Response添加至RequestChannel的响应队列—responseQueue中,并通过responseListeners唤醒对应的Processor线程,最后Processor线程从响应队列中取出后发送至客户端。

5、KafkaRequestHandler

KafkaRequestHandler也是一种线程类,在KafkaServer实例启动时候会实例化一个线程池—KafkaRequestHandlerPool对象(包含了若干个KafkaRequestHandler线程),这些线程以守护线程的方式在后台运行。在KafkaRequestHandler的run方法中会循环地从RequestChannel中阻塞式读取request,读取后再交由KafkaApis来具体处理。

6、KafkaApis

KafkaApis是用于处理对通信网络传输过来的业务消息请求的中心转发组件。该组件反映出Kafka Broker Server可以提供哪些服务。

Kafka 的数据存储模型是怎么样的?

Kafka 每个 Topic 下面的所有消息都是以 Partition 的方式分布式的存储在多个节点上。同时在 Kafka 的机器上,每个 Partition 其实都会对应一个日志目录,在目录下面会对应多个日志分段(LogSegment)。

下面先介绍一下partition中的segment file的组成:

segment

关于segment file中index与data file对应关系图,这里我们选用网上的一个图片,如下所示:

index

segment的索引文件中存储着大量的元数据,数据文件中存储着大量消息,索引文件中的元数据指向对应数据文件中的message的物理偏移地址。以索引文件中的3,497为例,在数据文件中表示第3个message(在全局partition表示第368772个message),以及该消息的物理偏移地址为497。

注:Partition中的每条message由offset来表示它在这个partition中的偏移量,这个offset并不是该Message在partition中实际存储位置,而是逻辑上的一个值(如上面的3),但它却唯一确定了partition中的一条Message(可以认为offset是partition中Message的id)。

Kafka 的消息格式是怎么样的?

message中的物理结构为:

message

参数说明:

关键字 解释说明
8 byte offset 在parition(分区)内的每条消息都有一个有序的id号,这个id号被称为偏移(offset),它可以唯一确定每条消息在parition(分区)内的位置。即offset表示partiion的第多少message
4 byte message size message大小
4 byte CRC32 用crc32校验message
1 byte “magic” 表示本次发布Kafka服务程序协议版本号
1 byte “attributes” 表示为独立版本、或标识压缩类型、或编码类型
4 byte key length 表示key的长度,当key为-1时,K byte key字段不填
K byte key 可选
value bytes payload 表示实际消息数据

3.3.通过offset查找message

假如我们想要读取offset=368776的message,需要通过下面2个步骤查找。

1). 查找segment file
00000000000000000000.index表示最开始的文件,起始偏移量(offset)为0.第二个文件00000000000000368769.index的消息量起始偏移量为368770 = 368769 + 1.同样,第三个文件00000000000000737337.index的起始偏移量为737338=737337 + 1,其他后续文件依次类推,以起始偏移量命名并排序这些文件,只要根据offset 二分查找文件列表,就可以快速定位到具体文件。
当offset=368776时定位到00000000000000368769.index|log

2). 通过segment file查找message
通过第一步定位到segment file,当offset=368776时,依次定位到00000000000000368769.index的元数据物理位置和00000000000000368769.log的物理偏移地址,然后再通过00000000000000368769.log顺序查找直到offset=368776为止。
segment index file并没有为数据文件中的每条message建立索引,而是采取稀疏索引存储方式,每隔一定字节的数据建立一条索引,它减少了索引文件大小,通过map可以直接内存操作,稀疏索引为数据文件的每个对应message设置一个元数据指针,它比稠密索引节省了更多的存储空间,但查找起来需要消耗更多的时间。

Kafka高效文件存储设计特点:

为什么不能以 Partition 作为存储单位?

如果就以 Partition 为最小存储单位,可以想象,当 Kafka Producer 不断发送消息,必然会引起 Partition 文件的无限扩张,将对消息文件的维护以及已消费的消息的清理带来严重的影响,因此,需以 segment 为单位将 Partition 进一步细分。

每个 Partition(目录)相当于一个巨型文件,被平均分配到多个大小相等的 segment(段)数据文件中(每个 segment 文件中消息数量不一定相等),这种特性也方便 old segment 的删除,即方便已被消费的消息的清理,提高磁盘的利用率。每个 Partition 只需要支持顺序读写就行,segment 的文件生命周期由服务端配置参数(log.segment.bytes,log.roll.{ms,hours} 等若干参数)决定。

Kafka 的副本机制是怎么样的?

Kafka 的副本机制,是多个 Broker 节点对其他节点的 Topic 分区的日志进行复制。当集群中的某个节点出现故障,访问故障节点的请求会被转移到其他正常节点(这一过程通常叫 Reblance),Kafka 每个主题的每个分区都有一个主副本以及 0 个或者多个副本,副本保持和主副本的数据同步,当主副本出故障时就会被替代。

副本机制

注意哈,下面说的 Leader 指的是每个 Topic 的某个分区的 Leader ,而不是 Broker 集群中的【集群控制器】。

在 Kafka 中并不是所有的副本都能被拿来替代主副本,所以在 Kafka 的Leader 节点中维护着一个 ISR(In sync Replicas)集合,翻译过来也叫正在同步中集合,在这个集合中的需要满足两个条件:

另外还有个 AR(Assigned Replicas)用来标识副本的全集,OSR 用来表示由于落后被剔除的副本集合,所以公式如下:

这里先要说下两个名词:HW 和 LEO 。

当 Producer 向 Leader 发送数据时,可以通过request.required.acks 参数来设置数据可靠性的级别:

ZooKeeper 在 Kafka 中起到什么作用?

在基于 Kafka 的分布式消息队列中,ZooKeeper 的作用有:

Kafka 如何实现高可用?

总的来说,Kafka 和 RocketMQ 的高可用方式是比较类似的,主要的差异在 Kafka Broker 的副本机制,和 RocketMQ Broker 的主从复制,两者的差异,以及差异带来的生产和消费不同。当然,实际上,都是和“主” Broker 做消息的发送和读取不是!

Kafka 是否会弄丢数据?

消费端弄丢了数据

唯一可能导致消费者弄丢数据的情况,就是说,你消费到了这个消息,然后消费者那边自动提交了 offset,让 Kafka 以为你已经消费好了这个消息,但其实你才刚准备处理这个消息,你还没处理,你自己就挂了,此时这条消息就丢咯。

这不是跟 RabbitMQ 差不多吗,大家都知道 Kafka 会自动提交 offset,那么只要关闭自动提交 offset,在处理完之后自己手动提交 offset,就可以保证数据不会丢。但是此时确实还是可能会有重复消费,比如你刚处理完,还没提交 offset,结果自己挂了,此时肯定会重复消费一次,自己保证幂等性就好了。

生产环境碰到的一个问题,就是说我们的 Kafka 消费者消费到了数据之后是写到一个内存的 queue 里先缓冲一下,结果有的时候,你刚把消息写入内存 queue,然后消费者会自动提交 offset。然后此时我们重启了系统,就会导致内存 queue 里还没来得及处理的数据就丢失了。

Kafka 弄丢了数据

这块比较常见的一个场景,就是 Kafka 某个 broker 宕机,然后重新选举 partition 的 leader。大家想想,要是此时其他的 follower 刚好还有些数据没有同步,结果此时 leader 挂了,然后选举某个 follower 成 leader 之后,不就少了一些数据?这就丢了一些数据啊。

生产环境也遇到过,我们也是,之前 Kafka 的 leader 机器宕机了,将 follower 切换为 leader 之后,就会发现说这个数据就丢了。

所以此时一般是要求起码设置如下 4 个参数:

我们生产环境就是按照上述要求配置的,这样配置之后,至少在 Kafka broker 端就可以保证在 leader 所在 broker 发生故障,进行 leader 切换时,数据不会丢失。

生产者会不会弄丢数据?

如果按照上述的思路设置了 acks=all ,一定不会丢,要求是,你的 leader 接收到消息,所有的 follower 都同步到了消息之后,才认为本次写成功了。如果没满足这个条件,生产者会自动不断的重试,重试无限次。

Kafka 如何保证消息的顺序性?

Kafka 本身,并不像 RocketMQ 一样,提供顺序性的消息。所以,提供的方案,都是相对有损的。如下:

这里的顺序消息,我们更多指的是,单个 Partition 的消息,被顺序消费。

方式一,Consumer ,对每个 Partition 内部单线程消费,单线程吞吐量太低,一般不会用这个。
方式二,Consumer ,拉取到消息后,写到 N 个内存 queue,具有相同 key 的数据都到同一个内存 queue 。然后,对于 N 个线程,每个线程分别消费一个内存 queue 即可,这样就能保证顺序性。

这种方式,相当于对【方式一】的改进,将相同 Partition 的消息进一步拆分,保证相同 key 的数据消费是顺序的。

不过这种方式,消费进度的更新会比较麻烦。

当然,实际情况也不太需要考虑消息的顺序性,基本没有业务需要。

Leader 选举

为了保证可靠性,对于任意一条消息,只有它被 ISR 中的所有 follower 都从 leader 复制过去才会被认为已提交,并返回信息给 producer。如此,可以避免因部分数据被写进 leader,而尚未被任何 follower 复制就宕机的情况下而造成数据丢失。对于 producer 而言,它可以选择是否等待消息 commit,这可以通过参数 request.required.acks 来设置。这种机制可以确保:只要 ISR 中有一个或者以上的 follower,一条被 commit 的消息就不会丢失。

问题 1:如何在保证可靠性的前提下避免吞吐量下降?

有一个很重要的问题是当 leader 宕机了,怎样在 follower 中选举出新的 leader,因为 follower 可能落后很多或者直接 crash 了,所以必须确保选择 “最新” 的 follower 作为新的 leader。一个基本的原则就是,如果 leader 挂掉,新的 leader 必须拥有原来的 leader 已经 commit 的所有消息,这不就是 ISR 中副本的特征吗?

但是,存在一个问题,ISR 列表维持多大的规模合适呢?换言之,leader 在一个消息被 commit 前需要等待多少个 follower 确认呢?等待 follower 的数量越多,与 leader 保持同步的 follower 就越多,可靠性就越高,但这也会造成吞吐率的下降。

少数服从多数的选举原则

一种常用的选举 leader 的策略是 “少数服从多数” ,不过,Kafka 并不是采用这种方式。这种模式下,如果有 2f+1 个副本,那么在 commit 之前必须保证有 f+1 个 replica 复制完消息,同时为了保证能正确选举出新的 leader,失败的副本数不能超过 f 个。这种方式有个很大的优势,系统的延迟取决于最快的几台机器,也就是说比如副本数为 3,那么延迟就取决于最快的那个 follower 而不是最慢的那个。

“少数服从多数” 的策略也有一些劣势,为了保证 leader 选举的正常进行,它所能容忍的失败的 follower 数比较少,如果要容忍 1 个 follower 挂掉,那么至少要 3 个以上的副本,如果要容忍 2 个 follower 挂掉,必须要有 5 个以上的副本。也就是说,在生产环境下为了保证较高的容错率,必须要有大量的副本,而大量的副本又会在大数据量下导致性能的急剧下降。这种算法更多用在 ZooKeeper 这种共享集群配置的系统中,而很少在需要大量数据的系统中使用。

Kafka 选举 leader 的策略是怎样的?

实际上,leader 选举的算法非常多,比如 ZooKeeper 的 Zab、Raft 以及 Viewstamped Replication。而 Kafka 所使用的 leader 选举算法更像是微软的 PacificA 算法。

Kafka 在 ZooKeeper 中为每一个 partition 动态的维护了一个 ISR,这个 ISR 里的所有 replica 都与 leader 保持同步,只有 ISR 里的成员才能有被选为 leader 的可能(通过参数配置:unclean.leader.election.enable=false)。在这种模式下,对于 f+1 个副本,一个 Kafka topic 能在保证不丢失已经 commit 消息的前提下容忍 f 个副本的失败,在大多数使用场景下,这种模式是十分有利的。事实上,对于任意一条消息,只有它被 ISR 中的所有 follower 都从 leader 复制过去才会被认为已提交,并返回信息给 producer,从而保证可靠性。但与 “少数服从多数” 策略不同的是,Kafka ISR 列表中副本的数量不需要超过副本总数的一半,即不需要满足 “多数派” 原则,通常,ISR 列表副本数大于等于 2 即可,如此,便在可靠性和吞吐量方面取得平衡。

极端情况下的 leader 选举策略

前已述及,当 ISR 中至少有一个 follower 时(ISR 包括 leader),Kafka 可以确保已经 commit 的消息不丢失,但如果某一个 partition 的所有 replica 都挂了,自然就无法保证数据不丢失了。这种情况下如何进行 leader 选举呢?通常有两种方案:

如何选择呢?这就需要在可用性和一致性当中作出抉择。如果一定要等待 ISR 中的 replica 恢复过来,不可用的时间就可能会相对较长。而且如果 ISR 中所有的 replica 都无法恢复了,或者数据丢失了,这个 partition 将永远不可用。

选择第一个恢复过来的 replica 作为 leader,如果这个 replica 不是 ISR 中的 replica,那么,它可能并不具备所有已经 commit 的消息,从而造成消息丢失。默认情况下,Kafka 采用第二种策略,即 unclean.leader.election.enable=true,也可以将此参数设置为 false 来启用第一种策略。

unclean.leader.election.enable 这个参数对于 leader 的选举、系统的可用性以及数据的可靠性都有至关重要的影响。生产环境中应慎重权衡。

上一篇下一篇

猜你喜欢

热点阅读