Kafka设计

2021-03-12  本文已影响0人  一剑光寒十九洲

全局架构图

全局架构图

磁盘结构

记录格式

type VarInt int // 变长整型,使用Varints和ZigZag编码的整型

type RecordBatch struct {
    FirstOffset          int64 // 起始偏移
    Length               int32 // 从PartitionLeaderEpoch开始的长度
    PartitionLeaderEpoch int32 // 分区Leader纪元
    Magic                int8  // 消息版本号,当前为2,表示V2
    Crc32                int32 // crc校验和
    Attributes           int16 // [0-2]压缩格式, 4时间戳类型, 5是否出于事务中, 6控制消息
    LastOffsetDelta      int32 // 最后一个Record的offset与FirstOffset的差值,用于保证消息组装的正确性
    FirstTimestamp       int64 // 第一个Record的时间戳
    MaxTimestamp         int64 // 最后一个Record的时间戳,用于保证消息组装的正确性
    ProducerID           int64 // 用于支持幂等性和事务
    ProducerEpoch        int32 // 用于支持幂等性和事务
    FirstSequence        int32 // 用于支持幂等性和事务
    RecordsCount         int32 // RecordsCount数组元素个数
    Records              []Record
}

type Record struct {
    Length         VarInt   // Record长度
    Attributions   int8     // 属性,暂时没用
    TimestampDelta VarInt   // 相对于RecordBatch的FirstTimestamp的偏移量
    OffsetDelta    VarInt   // 相对于RecordBatch的FirstOffset的偏移量
    KeyLength      VarInt   // key长度
    Key            []byte   // key内容
    ValueLength    VarInt   // value长度
    Value          []byte   // value内容
    HeadersCount   VarInt   // Headers数组元素个数
    Headers        []Header // Headers数组,用于支持应用级别扩展
}

type Header struct {
    HeaderKeyLength   VarInt
    HeaderKey         string
    HeaderValueLength VarInt
    HeaderValue       string
}

日志文件存储

disk

使用时间戳查找消息

  1. 通过时间戳日志分段索引文件名查找对应的日志分段文件
  2. 在该日志分段中通过二分法查找到最近的偏移量
  3. 通过该偏移量在偏移量日志分段索引文件中查找对应的消息位置
  4. 从该位置开始,向后查找,直到找到不小于指定时间戳的消息

日志清理

日志删除

日志压缩/合并

对于相同的key的不同value值,只保留最后一个版本。当应用仅关心消息的最新value时,可以开启日志合并功能。

消费位移

  1. 保存在_comsumer_offset主题中
  2. 可以通过offset或者时间戳进行定位
  3. 利用seek功能,我们可以将消费位移保存在外部存储中

消费者重均衡

消费组分区分配策略

RangeAssignor

RoundRobinAssignor

StickyAssignor

自定义Assignor

发生时机

流程

kafka重均衡

分区重分配

基本原理:先通过控制器为每个分区添加新副本(增加副本因子),待复制完成后,将旧的副本从副本清单中删除(恢复为原先的副本因子)

事务

幂等性

实现原理:
Kafka 的幂等只能保证单个生产者会话(session)中单分区的幂等。对于每一个生产者,kafka会为其分配一个pid,每一对<pid,partiton>都对应一个序列号,在生产者发送消息的时候,序列号递增。当kafka收到新消息时,如果序列号sn<so+1,则说明发生了重复写入,则丢弃;如果序列号sn>so+1,说明出现了消息乱序,抛出异常OutOfOrderSequenceException。

事务

概念:kafka的事务可以保证应用程序将多个的消费消息、生产消息、提交消费位移当作原子操作来处理,同时成功或失败,即使该生产或消费会跨多个分区。

应用场景:Consume-Transform-Produce,以支持流失计算

事务流程
  1. 使用transactionID获取计算获取TransactionCoordinator的broker地址。
  2. 使用transactionID请求得到PID信息,TC在收到该请求后会将transaction和pid保存到__transaction_state中,以进行持久化。
  3. 生产者使用beginTransaction()开启一个事务。
  4. 消费-转换-生产
    1. 应用程序通过消费者消费到消息,转换完成后,在生产者向新的分区写入消息之前,先通过AddPartitionsToTxnRequest将新的分区记录到__transaction_state中,包括<transactionID,pid,topic-partitions>。
    2. 生产者向对应的分区所在的broker发送消息,消息中会包含<pid,seq_num>,注意由于写入的消息的事务控制字段都是1,所以在read_commited级别下对应用程序是不可见的。
    3. 通过AddOffsetsToTxnRequest将所有要提交的分区的offset的信息和group_id写入__transaction_state中,TC可以通过对应的group_id来计算出GC,GC也会保存在__transaction_state中,从而在生产者宕机后,支持后续TC的崩溃恢复。
    4. 生产者通过TxnOffsetCommitRequest将所有分区的偏移量条,写入到__consumer_offsets中,注意由于写入的消息的事务控制字段都是1,所以在read_commited级别下对应用程序是不可见的。
    5. 生产者通过EndTxnRequest向TC提交或者中止事务,TC会将PREPARE_COMMIT或PREPARE_ABORT信息写入到__transaction_state中,然后在通过WriteTxnMarkersRequest请求向分区(GC和生产者写入的分区)写入COMMIT或ABORT消息,再之后将COMPLETE_COMMIT或COMPLETE_ABORT写入到__transaction_state中。

复制

复制

如上一主三从

上一篇 下一篇

猜你喜欢

热点阅读