kafka

2020-04-09  本文已影响0人  viankoo

概念

消息通信通常有两种模式:
1、队列模式,一组消费者可能从一个服务器读取消息,每个消息被发送给了其中一个消费者,在kafka中,如果所有的消费者都处于同一个组,则这个结构就是队列模式
2、订阅模式,消息被广播给了所有的消费者,在kafka中,如果所有消费者都处于不同的组,则这个结构就是订阅模式

kafka提供的消息顺序保证机制:
传统的消息队列在服务器上有序的保存消息,当有多个消费者的时候消息也是按序发送消息。但是因为消息投递到消费者的过程是异步的,所以消息到达消费者的顺序可能是乱序的。这就意味着在并行计算的场景下,消息的有序性已经丧失了。消息系统通常采用一个“排他消费者”的概念来规避这个问题,但这样就意味着失去了并行处理的能力。
Kafka 在这一点上做的更优秀。Kafka 有一个 Topic 中按照 partition 并行的概念,这使它即可以提供消息的有序性担保,又可以提供消费者之间的负载均衡。这是通过将 Topic 中的partition 绑定到消费者组中的具体消费者实现的。通过这种方案我们可以保证消费者是某个partition 唯一消费者,从而完成消息的有序消费。因为 Topic有多个 partition所以在消费者实例之间还是负载均衡的。注意,虽然有以上方案,但是如果想担保消息的有序性那么我们就不能为一个partition 注册多个消费者了。

Kafka设计思想

持久化

kafka的消息是存储在硬盘上的,因为“磁盘慢”这个普遍性的认知,常常使人们怀疑一个这样的持久化结构是否能提供所需的性能。但实际上磁盘因为使用的方式不同,它可能比人们预想的慢很多也可能比人们预想的快很多;而且一个合理设计的磁盘文件结构常常可以使磁盘运行得和网络一样快。

kafka的设计是将所有的数据被直接写入文件系统上一个可暂不执行磁盘 flush 操作的持久化日志文件中。实际上这意味着这些数据是被传送到了内核的页缓存上。

我们在上一节讨论了磁盘性能。 一旦消除了磁盘访问模式不佳的情况,该类系统性能低下的主要原因就剩下了两个:

大量的小型 I/O 操作(小包问题),以及过多的字节拷贝(ZeroCopy)

小型的 I/O 操作发生在客户端和服务端之间以及服务端自身的持久化操作中。

为了避免这种情况,我们的协议是建立在一个 “消息块” 的抽象基础上,合理将消息分组。 这使得网络请求将多个消息打包成一组,而不是每次发送一条消息,从而使整组消息分担网络中往返的开销。服务器一次性的将多个消息快依次追加到日志文件中, Consumer 也是每次获取多个大型有序的消息块。这个简单的优化对速度有着数量级的提升。批处理允许更大的网络数据包,更大的顺序读写磁盘操作,连续的内存块等等,所有这些都使 KafKa 能将随机性突发性的消息写操作变成顺序性的写操作最终流向消费者。

另一个低效率的操作是字节拷贝,在消息量少时,这不是什么问题。但是在高负载的情况下,影响就不容忽视。为了避免这种情况,我们让 producer ,broker 和 consumer 都共享的标准化的二进制消息格式,这样数据块不用修改就能在他们之间传递。

broker 维护的消息日志本身就是一个文件目录,每个文件都由一系列以相同格式写入到磁盘的消息集合组成,这种写入格式被 producer 和 consumer 共用。保持这种通用格式可以对一些很重要的操作进行优化:持久化日志块的网络传输。 现代的 unix 操作系统提供了高度优化的数据路径,用于将数据从 pagecache 转移到 socket 网络连接中;在 Linux 中系统调用sendfile 做到这一点。
为了理解 sendfile 的意义,首先要了解数据从文件到套接字的一般数据传输路径:

操作系统从磁盘读取数据到内核空间的 pagecache
应用程序读取内核空间的数据到用户空间的缓冲区
应用程序将数据(用户空间的缓冲区)写回内核空间的套接字缓冲区(内核空间)
操作系统将数据从套接字缓冲区(内核空间)复制到通过网络发送的 NIC 缓冲区
这显然是低效的,有四次 copy 操作和两次系统调用。使用 sendfile 方法,可以允许操作系统将数据从 pagecache 直接发送到网络,这样避免重复数据复制。所以这种优化方式,只需要最后一步的 copy 操作,将数据复制到 NIC 缓冲区。我们预期的使用场景是一个 topic 被多个消费者消费。使用 zero-copy (零拷贝)优化,数据仅仅会被复制到 pagecache 一次,在后续的消费过程中都可以复用,而不是保存在内存中在每次消费时再复制到内核空间。这使得消息能够以接近网络连接的速度被消费。
pagecache 和 sendfile 的组合使用意味着,在一个 Kafka 集群中,大多数的(紧跟生产者的)consumer 消费时,将看不到磁盘上的读取活动,因为数据完全由缓存提供。

端到端的批量压缩

Kafka 以高效的批处理格式支持一批消息可以压缩在一起发送到服务器。这批消息将以压缩格式写入,并且在日志中保持压缩,只会在 consumer 消费时解压缩。

Kafka 支持 GZIP,Snappy 和 LZ4 压缩协议。

The Producer

生产者直接发送数据到主分区的服务器上,不需要经过任何中间路由。

客户端控制消息发送数据到哪个分区,这个可以实现随机的负载均衡方式,或者使用一些特定语义的分区函数。我们有提供特定分区的接口让用于根据指定的键值进行 hash 分区(当然也有选项可以重写分区函数),例如,如果使用用户 ID 作为 key,则用户相关的所有数据都会被分发到同一个分区上。这允许消费者在消费数据时做一些特定的本地化处理。这样的分区风格经常被设计用于一些对本地处理比较敏感的消费者。

批处理是提升性能的一个主要驱动,为了允许批量处理,kafka 生产者会尝试在内存中汇总数据,并用一次请求批次提交信息。 批处理,不仅仅可以配置指定的消息数量,也可以指定等待特定的延迟时间(如 64k 或 10ms),这允许汇总更多的数据后再发送,在服务器端也会减少更多的 IO 操作。 该缓冲是可配置的,并给出了一个机制,通过权衡少量额外的延迟时间获取更好的吞吐量。

The Consumer

Kafka consumer 通过向 broker 发出一个“fetch”请求来获取它想要消费的 partition。consumer 的每个请求都在 log 中指定了对应的 offset,并接收从该位置开始的一大块数据。并且可以在需要的时候通过回退到该位置再次消费对应的数据。

持续追踪已经被消费的内容是消息系统的关键性能点之一。

Kafka 使用完全不同的方式解决消息丢失问题。Kafka 的 topic 被分割成了一组完全有序的partition,其中每一个 partition 在任意给定的时间内只能被每个订阅了这个 topic 的consumer 组中的一个 consumer 消费。这意味着 partition 中 每一个consumer 的位置仅仅是一个数字,即下一条要消费的消息的 offset。这使得被消费的消息的状态信息相当少,每partition只需要一个数字。这个状态信息还可以作为周期性的 checkpoint。这以非常低的代价实现了和消息确认机制等同的效果。

这种方式还有一个附加的好处。consumer 可以回退到之前的 offset 来再次消费之前的数据,这个操作违反了队列的基本原则,但事实证明对大多数 consumer 来说这是一个必不可少的特性。 例如,如果 consumer 的代码有 bug,并且在 bug 被发现前已经有一部分数据被消费了,那么 consumer 可以在 bug 修复后通过回退到之前的 offset 来再次消费这些数据。

离线数据加载

可伸缩的持久化特性允许 consumer 只进行周期性的消费,例如批量数据加载,周期性将数据加载到诸如 Hadoop 和关系型数据库之类的离线系统中。

在 Hadoop 的应用场景中,我们通过将数据加载分配到多个独立的 map 任务来实现并行化,每一个 map 任务负责一个 node/topic/partition,从而达到充分并行化。Hadoop 提供了任务管理机制,失败的任务可以重新启动而不会有重复数据的风险,只需要简单的从原来的位置重启即可。

消息交付语义

看Kafka 社区中经常有吐槽丢消息等,其实通常来说不是Kafka 丢消息,而是用户用的不是那么明白,没有选择实现合适的交付语义,没有按照Kafka 规范来使用交付策略,下面具体来看看这几种交付语义:

Replication

Kafka 允许 topic 的 partition 拥有若干副本,你可以在 server 端配置 partition 的副本数量。
当集群中的节点出现故障时,能自动进行故障转移,保证数据的可用性。

创建副本的单位是 topic 的 partition ,正常情况下,每个分区都有一个 leader 和零或多个followers 。总的副本数是包含 leader 的总和。所有的读写操作都由 leader 处理,一般partition 的数量都比 broker 的数量多的多,各分区的 leader 均匀的分布在 brokers 中。所有的 followers 节点都同步 leader 节点的日志,日志中的消息和偏移量都和 leader 中的一致。(当然,在任何给定时间,leader 节点的日志末尾时可能有几个消息尚未被备份完成)。

Followers 节点就像普通的 consumer 那样从 leader 节点那里拉取消息并保存在自己的日志文件中。Followers 节点可以从 leader 节点那里批量拉取消息日志到自己的日志文件中。

与大多数分布式系统一样,自动处理故障需要精确定义节点 “alive” 的概念。Kafka 判断节点是否存活有两种方式:

节点必须可以维护和 ZooKeeper 的连接,Zookeeper 通过心跳机制检查每个节点的连接。
如果节点是个 follower ,它必须能及时的同步 leader 的写操作,并且延时不能太久。
我们认为满足这两个条件的节点处于 “in sync” 状态,区别于 “alive” 和 “failed” 。Leader 会追踪所有 “in sync” 的节点。如果有节点挂掉了,或是写超时,或是心跳超时,leader 就会把它从同步副本列表中移除。 同步超时和写超时的时间由 replica.lag.time.max.ms 配置确定。

分布式系统中,我们只尝试处理 “fail/recover” 模式的故障,即节点突然停止工作,然后又恢复(节点可能不知道自己曾经挂掉)的状况。Kafka 没有处理所谓的 “Byzantine” 故障,即一个节点出现了随意响应和恶意响应(可能由于 bug 或 非法操作导致)。

当所有的分区上 in sync repicas 都应用到 log 上时,消息可以认为是 “committed”,只有committed 消息才会给 consumer。这意味着 consumer 不需要担心潜在因为 leader 失败而丢失消息。而对于 producer 来说,可以依据 latency 和 durability 来权衡选择是否等待消息被committed ,这个行动由 producer 使用的 acks 设置来决定。

在所有时间里,Kafka 保证只要有至少一个同步中的节点存活,提交的消息就不会丢失。

节点挂掉后,经过短暂的故障转移后,Kafka 将仍然保持可用性,但在网络分区( networkpartitions )的情况下可能不能保持可用性

可靠性和持久性的保证

向 Kafka 写数据时,producers 设置 ack 是否提交完成,

0:不等待 broker 返回确认消息,

1: leader 保存成功返回或,

-1(all): 所有备份都保存成功返回。

请注意。设置 “ack = all” 并不能保证所有的副本都写入了消息。默认情况下,当 acks = all 时,只要 ISR 副本同步完成,就会返回消息已经写入。例如,一个 topic 仅仅设置了两个副本,那么只有一个 ISR 副本,那么当设置 acks = all 时返回写入成功时,剩下了的那个副本数据也可能数据没有写入。尽管这确保了分区的最大可用性,但是对于偏好数据持久性而不是可用性的一些用户,可能不想用这种策略,因此,我们提供了两个 topic 配置,可用于优先配置消息数据持久性:

禁用 unclean leader 选举机制 - 如果所有的备份节点都挂了,分区数据就会不可用,直到最近的 leader 恢复正常。这种策略优先于数据丢失的风险,参看上一节的 uncleanleader 选举机制。
指定最小的 ISR 集合大小,只有当 ISR 的大小大于最小值,分区才能接受写入操作,以防止仅写入单个备份的消息丢失造成消息不可用的情况,这个设置只有在生产者使acks = all 的情况下才会生效,这至少保证消息被 ISR 副本写入。此设置是一致性和可用性 之间的折衷,对于设置更大的最小 ISR 大小保证了更好的一致性,因为它保证将消息被写入了更多的备份,减少了消息丢失的可能性。但是,这会降低可用性,因为如果ISR 副本的数量低于最小阈值,那么分区将无法写入。

备份管理

上面关于 replicated logs 的讨论仅仅局限于单一 log ,比如一个 topic 分区。但是 Kafka 集群需要管理成百上千个这样的分区。我们尝试轮流的方式来在集群中平衡分区来避免在小节点上处理大容量的 topic。

同样关于 leadership 选举的过程也同样的重要,这段时间可能是无法服务的间隔。一个原始的 leader 选举实现是当一个节点失败时会在所有的分区节点中选主。相反,我们选用 broker之一作为 “controller”, 这个 controller 检测 broker 失败,并且为所有受到影响的分区改变leader。这个结果是我们能够将许多需要变更 leadership 的通知整合到一起,让选举过程变得更加容易和快速。如果 controller 失败了,存活的 broker 之一会变成新的 controller。

日志压缩

日志压缩可确保 Kafka 始终至少为单个 topic partition 的数据日志中的每个 message key 保留最新的已知值。这样的设计解决了应用程序崩溃、系统故障后恢复或者应用在运行维护过程中重启后重新加载缓存的场景。接下来让我们深入讨论这些在使用过程中的更多细节,阐述在这个过程中它是如何进行日志压缩的。

迄今为止,我们只介绍了简单的日志保留方法(当旧的数据保留时间超过指定时间、日志文件大小达到设置大小后就丢弃)。这样的策略非常适用于处理那些暂存的数据,例如记录每条消息之间相互独立的日志。然而在实际使用过程中还有一种非常重要的场景 – 根据 key 进行数据变更(例如更改数据库表内容),使用以上的方式显然不行。

让我们来讨论一个关于处理这样流式数据的具体的例子。假设我们有一个 topic,里面的内容包含用户的 email 地址;每次用户更新他们的 email 地址时,我们发送一条消息到这个topic,这里使用用户 Id 作为消息的 key 值。现在,我们在一段时间内为 id 为 123 的用户发送一些消息,每个消息对应 email 地址的改变。

日志压缩为我们提供了更精细的保留机制,所以我们至少保留每个 key 的最后一次更新(例如:bill@gmail.com)。这样我们保证日志包含每一个 key 的最终值而不只是最近变更的完整快照。这意味着下游的消费者可以获得最终的状态而无需拿到所有的变化的消息信息。

数据库更改订阅。通常需要在多个数据系统设置拥有一个数据集,这些系统中通常有一个是某种类型的数据库(无论是 RDBMS 或者新流行的 key-value 数据库)。 例如,你可能有一个数据库,缓存,搜索引擎集群或者 Hadoop 集群。每次变更数据库,也同时需要变更缓存、搜索引擎以及 hadoop 集群。 在只需处理最新日志的实时更新的情况下,你只需要最近的日志。但是,如果你希望能够重新加载缓存或恢复搜索失败的节点,你可能需要一个完整的数据集。
事件源。 这是一种应用程序设计风格,它将查询处理与应用程序设计相结合,并使用变更的日志作为应用程序的主要存储。
日志高可用。 执行本地计算的进程可以通过注销对其本地状态所做的更改来实现容错,以便另一个进程可以重新加载这些更改并在出现故障时继续进行。 一个具体的例子就是在流查询系统中进行计数,聚合和其他类似“group by”的操作。实时流处理框架Samza,使用这个特性 正是出于这一原因。
日志压缩机制是更细粒度的、每个记录都保留的机制,而不是基于时间的粗粒度。这个理念是选择性删除那些有更新的变更的记录的日志。这样最终日志至少包含每个 key 的记录的最后一个状态。

这种保留策略可以针对每一个 topci 进行设置,遮掩一个集群中,可以让部分 topic 通过时间和大小保留日志,另一些可以通过压缩策略保留。

遇到过的问题

1、解决Kafka重复消费问题?

上一篇下一篇

猜你喜欢

热点阅读