Apache Kafka

Kafka相关内容总结(概念和原理)

2018-12-29  本文已影响0人  猴子顶呱呱

说明

为什么需要消息系统

多个消息队列横向对比

多个消息队列横向对比

kafka 架构

拓扑结构

拓扑结构1
拓扑结构2

相关概念

producer 发布消息

写入方式

producer 采用 push 模式将消息发布到 broker,每条消息都被 append 到 patition 中,属于顺序写磁盘(顺序写磁盘效率比随机写内存要高,保障 kafka 吞吐率)。

消息路由

producer 发送消息到 broker 时,会根据分区算法选择将其存储到哪一个 partition。其路由机制为:

写入流程

producer 写入消息序列

数据的保证

消息可靠性0
消息可靠性1
消息可靠性2
消息可靠性3

kafka对数据的保证

broker 保存消息

存储方式

物理上把 topic 分成一个或多个 patition(对应 server.properties 中的 num.partitions=3 配置),每个 patition 物理上对应一个文件夹(该文件夹存储该 patition 的所有消息和索引文件),如下:


topic数据在broker的存储方式

存储策略

无论消息是否被消费,kafka 都会保留所有消息。有两种策略可以删除旧数据:
基于时间:log.retention.hours=168
基于大小:log.retention.bytes=1073741824

文件存储方式

文件存储方式

读取原理

Kafka高效文件存储设计特点

Kafka Broker一些特性

offset管理

kafka会记录offset到zk中。但是,zk client api对zk的频繁写入是一个低效的操作。0.8.2 kafka引入了native offset storage,将offset管理从zk移出,并且可以做到水平扩展。其原理就是利用了kafka的compacted topic,offset以consumer group,topic与partion的组合作为key直接提交到compacted topic中,topic名称为__consumer_offsets。同时Kafka又在内存中维护了的三元组来维护最新的offset信息,consumer来取最新offset信息的时候直接内存里拿即可。当然,kafka允许你快速的checkpoint最新的offset信息到磁盘上。根据以上的信息可以理解,既然生产者有可能因为broker的挂掉而造成丢数据,那么消费成功的offset,如果发送kafka失败,或者kafka写入失败(如broker挂掉)等情况,也有可能造成重复消费(已经消费,但是kafka写入不成功)。

Math.abs(groupID.hashCode()) % numPartitions
//0.11.0.0版本之前
bin/kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 11 --broker-list localhost:9092,localhost:9093,localhost:9094 --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter"

//0.11.0.0版本以后(含)
bin/kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 11 --broker-list localhost:9092,localhost:9093,localhost:9094 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"

输出结果如下:

...
[console-consumer-46965,test,2]::[OffsetMetadata[21,NO_METADATA],CommitTime 1479092279434,ExpirationTime 1479178679434]
[console-consumer-46965,test,1]::[OffsetMetadata[21,NO_METADATA],CommitTime 1479092284246,ExpirationTime 1479178684246]
[console-consumer-46965,test,0]::[OffsetMetadata[22,NO_METADATA],CommitTime 1479092284246,ExpirationTime 1479178684246]
[console-consumer-46965,test,2]::[OffsetMetadata[21,NO_METADATA],CommitTime 1479092284246,ExpirationTime 1479178684246]
[console-consumer-46965,test,1]::[OffsetMetadata[21,NO_METADATA],CommitTime 1479092284436,ExpirationTime 1479178684436]
[console-consumer-46965,test,0]::[OffsetMetadata[22,NO_METADATA],CommitTime 1479092284436,ExpirationTime 1479178684436]
[console-consumer-46965,test,2]::[OffsetMetadata[21,NO_METADATA],CommitTime 1479092284436,ExpirationTime 1479178684436]
 ...

上图可见,该consumer group保存在分区11上,且位移信息都是对的(这里的位移信息是已消费的位移,严格来说不是第3步中的位移。由于consumer已经消费完了所有的消息,所以这里的位移与第3步中的位移相同)。另外,可以看到__consumer_offsets topic的每一日志项的格式都是:[Group, Topic, Partition]::[OffsetMetadata[Offset, Metadata], CommitTime, ExpirationTime]

topic创建

kafka的架构图

kafka的partition

创建topic命令

./bin/kafka-topics.sh --zookeeper 10.1.112.57:2181,10.1.112.58:2181,10.1.112.59:2181/kafka --create --topic LvsKafka --replication-factor 2 --partitions 24

流程图

topic创建

流程说明

命令行部分

Controller部分(后台)

topic删除

删除topic命令

流程图

topic删除流程

流程说明(守护线程)

流程说明(后台逻辑,实现删除操作)

leader failover

broker failover

controller failover

当 controller 宕机时会触发 controller failover。每个 broker 都会在 zookeeper 的 "/controller" 节点注册 watcher,当 controller 宕机时 zookeeper 中的临时节点消失,所有存活的 broker 收到 fire 的通知,每个 broker 都尝试创建新的 controller path,只有一个竞选成功并当选为 controller。

消费消息:The high-level consumer API

high-level consumer API 提供了 consumer group 的语义,一个消息只能被 group 内的一个 consumer 所消费,且 consumer 消费消息时不关注 offset。(注:客户端开启自动提交offset,offset由kafka自行保存,这是新版kafka的功能,offset的维护不依赖于zk

消费消息:The SimpleConsumer API(低层次的接口)

The high-level consumer API之消费组(consumer group)

消费方式

ConsumerGroup最佳实践

consumer rebalance

Producer生产者

复制原理和同步方式

ISR

数据可靠性和持久性保证

关于HW的进一步探讨

Leader选举

Kafka的发送模式

消息去重

高可靠性配置

性能测试相关

消息长度对吞吐率的影响1
消息长度对吞吐率的影响2
Partition Number VS. Throughput
Replica Number VS. Throughput
Consumer Only
Producer Only

参考

上一篇 下一篇

猜你喜欢

热点阅读