Q for Kafka

2020-04-09  本文已影响0人  Secret_Sun

总览

Apache Kafka发源于LinkedIn,于2011年成为Apache的孵化项目,随后于2012年成为Apache的主要项目之一。Kafka使用Scala和Java进行编写。Apache Kafka是一个快速、可扩展的、高吞吐、可容错的分布式发布订阅消息系统。Kafka具有高吞吐量、内置分区、支持数据副本和容错的特性,适合在大规模消息处理场景中使用。

基础信息分享

需要详细阅读基础 SDK (按 Go Sarama 为例)

基础知识点

首先,我们必须理解,partiton 是 kafka 的并行单元。从 producer 和 broker 的视角看,向不同的 partition 写入是完全并行的;而对于 consumer,并发数完全取决于 partition 的数量,即,如果 consumer 数量大于 partition 数量,则必有 consumer 闲置。所以,我们可以认为 kafka 的吞吐与 partition 时线性关系。partition 的数量要根据吞吐来推断,假定 p 代表生产者写入单个 partition 的最大吞吐,c 代表消费者从单个 partition 消费的最大吞吐,我们的目标吞吐是 t,那么 partition 的数量应该是 t/p 和 t/c 中较大的那一个。实际情况中,p的影响因素有批处理的规模,压缩算法,确认机制和副本数等,通常建议 partition 的数量一定要大于等于消费者的数量来实现最大并发。

常见客户端问题

客户端入门

注意点: 客户端一定要记住客户端版本与kafka 版本对应关系

Producer Demo

package main
 
import (
    "fmt"
    "time"
 
    "github.com/Shopify/sarama"
)
 
func main() {
    kafkaVersion, err := sarama.ParseKafkaVersion("0.10.2.0") // 返回对应版本号
    address := []string{"127.0.0.1:9092","127.0.0.2:9092","127.0.0.3:9092"}
 
    config := sarama.NewConfig()
    config.Version = kafkaVersion
 
    /*
    const (
        // NoResponse doesn't send any response, the TCP ACK is all you get.
        NoResponse RequiredAcks = 0
        // WaitForLocal waits for only the local commit to succeed before responding.
        WaitForLocal RequiredAcks = 1
        // WaitForAll waits for all in-sync replicas to commit before responding.
        // The minimum number of in-sync replicas is configured on the broker via
        // the `min.insync.replicas` configuration key.
        WaitForAll RequiredAcks = -1
    )
     */
 
    config.Producer.RequiredAcks = sarama.WaitForAll          // 发送完数据需要 leader 和 follow 都确认
    config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partition
    config.Producer.Return.Successes = true                   // 成功交付的消息将在 success channel 返回
 
    // 构造一个消息
    msg := &sarama.ProducerMessage{}
    msg.Topic = "cloudinfra_test_topic"
    now := time.Now().Format("2006-01-02 15:04:05")
    msg.Value = sarama.StringEncoder(now +": This is a test log ...")
 
    // 连接 kafka
    client, err := sarama.NewSyncProducer(address, config)
    if err != nil {
        fmt.Println("producer closed, err:", err)
        return
    }
    defer client.Close()
    // 发送消息
    pid, offset, err := client.SendMessage(msg)
    if err != nil {
        fmt.Println("send msg failed, err:", err)
        return
    }
    fmt.Printf("pid:%v offset:%v\n", pid, offset)
}

Consumer Demo

package main
 
import (
    "context"
    "fmt"
    "os"
    "os/signal"
    "strings"
    "sync"
    "syscall"
 
    "github.com/Shopify/sarama"
)
 
var (
    wg sync.WaitGroup
)
 
// Consumer represents a Sarama consumer group consumer
type Consumer struct {
    ready chan bool
}
 
// Setup is run at the beginning of a new session, before ConsumeClaim
func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
    // Mark the consumer as ready
    close(consumer.ready)
    return nil
}
 
// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
    return nil
}
 
// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    // NOTE:
    // Do not move the code below to a goroutine.
    // The `ConsumeClaim` itself is called within a goroutine, see:
    // https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29
    for message := range claim.Messages() {
        fmt.Println("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)
        session.MarkMessage(message, "")
    }
    return nil
}
 
func main() {
    kafkaVersion, err := sarama.ParseKafkaVersion("0.10.2.0")
    address := []string{"127.0.0.1:9092","127.0.0.2:9092","127.0.0.3:9092"}
    config := sarama.NewConfig()
    config.Version = kafkaVersion
 
    consumer := Consumer{
        ready: make(chan bool),
    }
 
    //创建消费者
    ctx, cancel := context.WithCancel(context.Background())
    consumerGroup, err := sarama.NewConsumerGroup(address, "Cloudinfra-TestConsumerGroup", config)
 
    if err != nil {
        fmt.Println("Failed to start consumer group: %s", err)
        return
    }
 
    wg := &sync.WaitGroup{}
    wg.Add(1)
    go func() {
        defer wg.Done()
        for {
            // `Consume` should be called inside an infinite loop, when a
            // server-side rebalance happens, the consumer session will need to be
            // recreated to get the new claims
            if err := consumerGroup.Consume(ctx, strings.Split("cloudinfra_test_topic", ","), &consumer); err != nil {
                fmt.Println("Error from consumer: %v", err)
            }
            // check if context was cancelled, signaling that the consumer should stop
            if ctx.Err() != nil {
                return
            }
            consumer.ready = make(chan bool)
        }
    }()
    <-consumer.ready
    fmt.Println("Sarama consumer up and running!...")
 
    sigterm := make(chan os.Signal, 1)
    signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
    select {
    case <-ctx.Done():
        fmt.Println("terminating: context cancelled")
    case <-sigterm:
        fmt.Println("terminating: via signal")
    }
    cancel()
    wg.Wait()
    if err = consumerGroup.Close(); err != nil {
        fmt.Println("Error closing client: ", err)
    }
}

生产者最佳实践

失败重试

分布式环境下,由于网络等原因,偶尔发送失败是常见的。导致这种失败的原因可能是消息已经发送成功,但是 Ack 失败,也有可能是确实没发送成功。

云上很多全托管消息队列 Kafka 是 VIP 网络架构,会主动断开空闲连接(30 秒没活动),因此,不是一直活跃的客户端会经常收到 "connection rest by peer" 错误,建议重试消息发送。

重试参数,您可以根据业务需求,设置以下重试参数:

异步发送

发送接口是异步的,如果你想得到发送的结果,可以调用metadataFuture.get(timeout, TimeUnit.MILLISECONDS)。

线程安全

Producer 是线程安全的,且可以往任何 Topic 发送消息。通常情况下,一个应用对应一个 Producer 就足够了。

Acks

Batch

Batch 的基本思路是:把消息缓存在内存中,并进行打包发送。Kafka 通过 Batch 来提高吞吐,但同时也会增加延迟,生产时应该对两者予以权衡。 在构建 Producer 时,需要考虑以下两个参数:

Rebalance 如何避免

消费者最佳实践

消费消息基本流程

消息队列 Kafka 版订阅者在订阅消息时的基本流程是: 1. 2. 3.

  1. Poll 数据
  2. 执行消费逻辑
  3. 再次 poll 数据

负载均衡

每个 Consumer Group 可以包含多个消费实例,即可以启动多个消息队列 Kafka 版 Consumer,并把参数 group.id 设置成相同的值。属于同一个 Consumer Group 的消费实例会负载消费订阅的 Topic。

举例:Consumer Group A 订阅了 Topic A,并开启三个消费实例 C1、C2、C3,则发送到 Topic A 的每条消息最终只会传给 C1、C2、C3 的某一个。Kafka 默认会均匀地把消息传给各个消息实例,以做到消费负载均衡。

Kafka 负载消费的内部原理是,把订阅的 Topic 的分区,平均分配给各个消费实例。因此,消费实例的个数不要大于分区的数量,否则会有实例分配不到任何分区而处于空跑状态。这个负载均衡发生的时间,除了第一次启动上线之外,后续消费实例发生重启、增加、减少等变更时,都会触发一次负载均衡。

消息队列 Kafka 版的每个 Topic 的分区数量默认是 16 个,已经足够满足大部分场景的需求,且云上服务会根据容量调整分区数。

多个订阅

一个 Consumer Group 可以订阅多个 Topic。 一个 Topic 也可以被多个 Consumer Group 订阅,且各个 Consumer Group 独立消费 Topic 下的所有消息。

举例:Consumer Group A 订阅了 Topic A,Consumer Group B 也订阅了 Topic A,则发送到 Topic A 的每条消息,不仅会传一份给 Consumer Group A 的消费实例,也会传一份给 Consumer Group B 的消费实例,且这两个过程相互独立,相互没有任何影响。

消费位点

每个 Topic 会有多个分区,每个分区会统计当前消息的总条数,这个称为最大位点 MaxOffset。

消息队列 Kafka 版 Consumer 会按顺序依次消费分区内的每条消息,记录已经消费了的消息条数,称为 ConsumerOffset。

剩余的未消费的条数(也称为消息堆积量) = MaxOffset - ConsumerOffset

消费位点提交

消息队列 Kafka 版消费者有两个相关参数:

这两个参数组合的结果就是,每次 poll 数据前会先检查上次提交位点的时间,如果距离当前时间已经超过参数 auto.commit.interval.ms 规定的时长,则客户端会启动位点提交动作。

因此,如果将enable.auto.commit设置为 true,则需要在每次 poll 数据时,确保前一次 poll 出来的数据已经消费完毕,否则可能导致位点跳跃。

如果想自己控制位点提交,请把 enable.auto.commit 设为 false,并调用 commit(offsets)函数自行控制位点提交。

消费位点重置

以下两种情况,会发生消费位点重置:

Java 客户端可以通过 auto.offset.reset 来配置重置策略,主要有三种策略:

Kafka 故障转移

Broker 故障转移

在 Topic 创建时,Kafka 集群根据 Partition 配置创建多个 Topic Partition。每个 Topic Partition 有且仅有一个 leader,有一或多个 follower(数量由 replicate 因子而定)。Kafka 使用以下方法将 Topic Partition 分散到多个Broker,使得 Topic Partition 尽可能的分散:

往Kafka集群生产数据时,若将 ack 配置为 all,Broker 将确保在所有 follower 拉取到该消息时才返回给producer确认信号,如下图所示


Broker

leader 收到新消息会有落盘动作,follower 的 IO 线程拉取到新消息后,在落盘之前会回复给 leader 以 ACK 信号,此时新消息只在 follower 的内存中。这样设计是在可靠性和性能之间做权衡,因为 leader 和所有 follower 全部挂掉的概率是极低的,只要有 follower 在内存中保有新消息,就会在未来被落盘。关于 leader 和 follower 之间的数据同步


Leader

Kafka 针对每个 Topic Partition 维护了 ISR 集合,只要 follower 存在于该集合中就意味着 follower 与 leader 的消息延迟在可接受范围内,这个可接受范围是通过消息落后条数、最近一次同步时间来配置的。极端情况下,ISR 集合可能为空,这意味着已存活的但不在 ISR 集合中的 follower 落后于 leader 太多。此时若运行提升 IRS 集合以外的 follower 为 leader 则有丢消息的风险,用户需要在可用性和一致性之间做出选择。

这里引申出另一个问题,Kafka 集群为何不使用 Zookeeper 进行 leader 选举?

Controller 故障转移

上一篇下一篇

猜你喜欢

热点阅读