DevOPS

Kafka 进阶:Producer & Broker &

2018-07-13  本文已影响4人  63e29c663713

1. Producer

Producers 负责生产消息。
消息成功写到 topic 后,broker 会返回 producer 消息的 topic, partition & the offset of the record within the partition。

1.1 Send

发送方式有两种:

1.2 Retry

消息可能因为一些异常原因写失败,异常分为两类:

1.3 Acks

acks 参数控制 producer 认为 message 写成功之前必须接收到 partition 成功写入的副本数(针对 replicas)。可以把acks 理解为用来控制数据备份时的一致性强弱的

当配置 acks 为:

2. Brokers and Clusters

一个 Kafka 的服务器叫做一个 broker。broker 接受 producer 传递过来的 messages,store messages 到指定的 partition 中,并分配 offsets。它还接受 consumer 发过来的 poll messages request & heartbeats request

broker 的 metadata 在 zookeeper 中维护。每一个 broker 都配置有自身唯一的 id,当 broker start 时,broker 将自身的 id 注册到 zk 中(通过写一个 ephemeral node),如果已经存在一个相同 ID 的 ephemeral node,则会报错。

当 broker 和 zk 断掉连接后(broker stop / network partition / long garbage-collection pause),broker 启动时创建的 ephemeral node 将自动被 zk 删除。

当完全丢失 broker & 删除对应的 ephemeral node 后,重启一个具有相同 id 的新 broker,该 broker 将替代丢失的旧 broker,接受原 broker 相同的 partitions & topics。

2.1 The Controller

一组 brokers 可以搭建成一个 cluster。在 cluster 中有一个 broker 担任 cluster controller,一般是第一个加入 cluster 的 broker 担任 controller,它会在 zk 中创建一个名叫 /controller 的 ephemeral node。

当 controller broker stop or loses connectivity to zk 时,它创建的 /controller node 会被 zk 删除。cluster 中别的 brokers 将被通知 controller 丢失,剩下的 brokers 继续抢占 /controller node,第一个写成功的成为新的 controller。

controller broker 除了承担普通的 broker 功能外,还负责 partition leaders 的选举。如果 controller broker 发现有别的 broker 离开 cluster(通过监听 zk 的相关路径 node)时,那么所有存在于丢失 broker 上的 leader partitions 需要新的 leader,controller 负责选择一个 partition 作为 leader,并通知给各个 brokers partitions。新的 partition leaders 明确自己的职责,followers 则明确自己需要同步的 new leader。

The controller uses the epoch number to prevent a “split brain” scenario where two nodes believe each is the current controller.

2.2 Multiple Clusters

The replication mechanisms within the Kafka clusters are designed only to work within a single cluster, not between multiple clusters.
The Kafka project includes a tool called MirrorMaker, used for this purpose.

2.3 Zookeeper

kafka with zookeeper.jpg

Kafka 用 Zookeeper 来维护 broker cluster,存储 brokers, topics, partitions 的 metadata。
Consumer 的 metadata 在 kafka v0.9 之前的版本中,是通过 Zookeeper 维护。但是在 v0.9 之后,可以选择通过 zookeeper 管理,也可以选择通过 kafka brokers 管理,因为频繁的读写 offsets 对 zk 的压力较大,所以推荐通过 Kafka broker 管理。

3. Replication

前段时间一直在系统性的学习分布式存储的知识,某种程度上来说,Kafka, Redis, MySQL Cluster, Zookeeper 等都可以理解为分布式存储的实现。而分布式存储中的核心实现方法就是 partition & replication。前面我们介绍了 Kafka 的 topic partition,现在来了解下 replication。

为了实现系统的可靠性(availability) & 耐用性(durability),我们可以对 partition 做 replicated。kafka 中的 data 按 topics 组织,每个 topic 可以做 partition,每个 partition 可以有多个 replicas。

replicas 有两种角色:

consumer.poll / replica fetch messages 时,会把自身已有的最大 offset 带给 leader 来获得准确的 messages。当 leader crashes 时,其中一个拥有最多消息(最大 offsets)的 followers 将变成 leader。

4. Consumer

consumers 消费消息。同时 consumer 会跟踪上报自己已消费消息的 offset(kafka 的每个消息在 topic 的 partition 中都有一个唯一的 offset)。
consumers 是以 consumer group 的形式工作的。group 保证每个 partition 只能被一个 consumer 消费,换言之,group 中的 consumer 消费互不相同的 partition。
一个 consumer group/ consumers 可以(通过正则表达式)订阅多个 topics,当新增满足正则表达式的 topic 时,能自动读取到该 topic 的 msg 。

4.1 Consumer Groups

consumer groups 的可能组织结构有:


consumer group.jpg consumer group2.jpg consumer group3.jpg consumer group4.jpg

由于 consumer 经常会做一些高延迟的操作,例如写数据库、分析数据等,consumer 的消费能力可能会小于 producer 的生成能力。

分析以下场景:

上面我们提到的都是一个 consumer group 对 topic 的消费。很多情况下,同一个 topic 的消息会有多个不同的应用(user cases)感兴趣(每一个 use case 都能拿到该 topic 的所有 messages),这时就需要为不同的 user case 创建不同的 consumer group,即有多个 consumer groups 消费同一个 topic。


consumer group5.jpg

4.2 Partition Rebalance

发生一下情况时,需要对 consumer group 进行 partition rebalance:

通过对 partition rebalance 的支持,Kafka 具备了 high availability & scalability。但是正常情况下,尽量避免 partition rebalance。因为:

设计阶段,就要把由于 rebalance 引发的潜在的消息重复处理的情况考虑进去。

4.3 Group coordinator & Group leader

关于 consumers 的维护,有两个重要的概念:group coordinator & group leader:

  1. group coordinator: 是特殊的 broker。consumers poll 消息 & commit 消费消息记录时,会发送 heartbeats 到 group coordinator 来同时告知自己的健康状况。
  2. group leader: 第一个加入 consumer group 的 consumer 就是该 group 的 group leader。group coordinator 会把 consumers 列表发给 group leader 来维护。group leader 负责 assign & reassign partitions。

如果 consumer crashed/network failure,长时间没有发送 heartbeats 到 group coordinator 时,group coordinator 会认为该 consumer 失联,并通知 group leader rebalance partition,group leader 将 rebalance 的结果通知 group coordinator,由 coordinator 来通知 consumers 新的 partitions 关系。

4.4 Poll loop

consumer 中的核心功能几乎都在 consumer.poll() 方法中。poll(timeout) 通过 timeout 参数控制 poll 的阻塞等待数据时间。如果 timeout = 0,则立即返回,无论是否有新消息。timeout 的值是需要根据自身业务设置的。但是它不仅仅是从 broker 中读取消息:

  1. 初始调用 poll() 时,会去找 groupCoordinator & 加入 consumer group & 接收 partition assignment
  2. poll 内部负责处理 partition rebalance
  3. 发送 heartbeat: 当 consumer 停止 poll() 时,会停止发 heartbeat,被 group coordinator 认为 fail,把分配给它的 partitions 分配给 consumer group 中别的 consumer。

所以:

4.5 Commits and Offsets

Kafka 不像大多数 JMS queue 那样,broker 不主动跟踪 consumer 的 ack,而是通过 consumer 发起 commit 来更新最新的 offset 到 _consumer_offsets topic 中

consumer.poll()时, broker 会返回还没有消费的消息记录,消息中带有自身的 offset。
consumer 通过 commit 动作发送一个带有 partition offset 的 message 到 kafka broker 的特殊 topic(__consumer_offsets topic) 来更新 offset。

consumer 什么时候 commit 该消息呢?

Automatic Commit

配置 enable.auto.commit=true,consumer 会每隔一个 interval (默认每隔5s)自动提交 consumer 通过 poll() 收到的最大的 offset。
automatic commits 也是通过 poll loop 来实现的。每次 poll, consumer 都会自动检查是否到时间执行一次 commit 来提交最近一次 poll() 获得的最大的 offsets。

当 consumer crashes / new consumer 加入 consumer group,会触发 rebalance。在 rebalance 后,每个 consumer 被分配一组新的 partitions,并获取到最新的 committed offset of each partition 来继续工作。
但是考虑以下情况:假设配置每隔 5s commit the latest offset,上次提交 2s 后,发生了 rebalance,所有 consumers 获取到之前最近的 offsets,但这个 offset 其实是 2s 前的,这 2s 间到达 consumers 的消息将会被处理两次。
可以配置较小的 interval 来减少重复消费的消息,但是本质上无法完全避免。

可以看出,操作 committed offset 的位置,是可能发生以下情况:

Commit Current Offset

如果想对 offset 的控制更准确,配置 auto.commit.offset=false,手动 commit offset。

需要开发人员手动调用 commitSync(),将把 poll() 返回的最新的 offset,建议:

Asynchronous Commit

commitSync() 会阻塞应用,直到收到 broker 的明确响应。这将很大的影响系统吞吐。可以考虑使用异步 consumer.commitAsync()

commitSync() 内部有 retry,如果遇到 retriable failure,会持续重试,影响性能,如果遇到 nonretriable failure,则会直接 commit fail。// todo retry count
commitAsync() 没有 retry,之所以不支持 retry,是因为它本身是 async 的、非阻塞的。如果失败了又重试,可能会把这段时间发生的更新的 commit 的数据修改回去。当然了,如果真想重试,是可以找到解决方案的,如记录一个全局单调递增的 sequence number,重试前检查如果 offset 小于该 number,则取消 retry。

Combining Synchronous and Asynchronous Commits

对于手动控制 offset 的情况,commitAsync() & commitSync() 可以结合使用。正常情况下使用 async,提高性能,并且偶尔由于网络原因发生的失败也不需要 retry,一般都会在接下来的 commit 中成功,等待服务停止消费时,调用 sync,确保最终正确提交 offset。

Commit Specified Offset

commitSync() & commitAsync() 存在一个问题,只能在对 batch messages 全部处理完后,将最大的 offset 提交,无法做到更细粒度的控制。当处理 batch messages 的耗时很长,或者 batch 的消息个数很多时,如果在消费过程中发生了 rebalance,这次 poll 获取的所有 messages 都需要重新处理一次。

commitSync() & commitAsync() 都提供了带参数的方法,允许我们根据业务在消费 batch messages 的过程中按需要提交 offset。

4.6 Rebalance Listeners

在发生 partition rebalance 时(可能处在 processing batch messages 间隙),consumer 需要做一些 cleanup work,包括对正在处理的消息的收尾工作,对文件、数据库连接等的管理。我们可以通过在调用 consumer.subscribe() 方法中传入自定义的 ConsumerRebalanceListener 来实现。

ConsumerRebalanceListener 有两个方法需要实现:

Consuming Records with Specific Offsets

seek(TopicPartition partition, long offset)
seekToBeginning(TopicPartition tp)
seekToEnd(TopicPartition tp)

Standalone Consumer: Use a Consumer Without a Group

有些情况下,consumer 会需要指定消费某些具体的 partitions, 而不是 join consumer group,由 consumer group 分配 partition & rebalance。可以调用 consumer.assign() 来实现该需求。

上一篇 下一篇

猜你喜欢

热点阅读