Kafka 通信协议(译)
Kafka 的 Producer、Broker 和 Consumer 之间采用的基于 TCP 的二进制协议,完全是为了 Kafka 自身的业务需求而定制的,协议定义了所有 API 的请求及响应消息。所有的消息都是通过长度来分隔,并且由后面描述的数据类型组成。
数据类型
-
定长基本类型
INT8
、INT16
、INT32
、INT64
、UINT32
-
变长基本类型
STRING
、BYTES
一个表示长度的带符号整数 N 以及后续 N 字节的内容组成,长度如果为-1则表示空(NULL)。
STRING
使用INT16
表示长度,BYTES
使用INT32
表示长度。 -
数组
用来处理重复的结构体数据,总是由一个代表元素个数的整数 N(INT32
),以及后续 N 个重复结构体组成。
这些结构体自身是由其他的基本数据类型组成。后面会把结构foo
的数组显示为[foo]
。
通用的请求和响应格式
所有请求和响应都源于以下语法,将会对这些语法逐步进行描述:
RequestOrResponse => Size (RequestMessage | ResponseMessage)
Size => int32
Field | Description |
---|---|
MessageSize | 以字节为单位给出后续请求或响应消息的大小 |
请求(Request)
所有请求都具有以下格式:
RequestMessage => ApiKey ApiVersion CorrelationId ClientId RequestMessage
ApiKey => int16
ApiVersion => int16
CorrelationId => int32
ClientId => string
RequestMessage => MetadataRequest | ProduceRequest | FetchRequest | OffsetRequest | OffsetCommitRequest | OffsetFetchRequest
Field | Description |
---|---|
ApiKey | 表示正在调用的API的ID(即它是元数据请求、生产请求、获取请求等)。 |
ApiVersion | 表示正在调用的API的版本号。 版本号允许服务器正确地解释请求内容。响应消息也始终对应于所述请求的版本的格式 |
CorrelationId | 这是一个用户提供的整数。 将会被服务器回传给客户端。用于在客户机和服务器之间匹配请求和响应 |
ClientId | 客户端的自定义的标识。 可以使用任意标识符,会被用在记录错误、监测统计信息等场景 |
响应(Responses)
Response => CorrelationId ResponseMessage
CorrelationId => int32
ResponseMessage => MetadataResponse | ProduceResponse | FetchResponse | OffsetResponse | OffsetCommitResponse | OffsetFetchResponse
Field | Description |
---|---|
CorrelationId | 服务器回传给客户端的信息于请求中的信息一致 |
所有响应都是与请求成对匹配
消息集(Message sets)
生产和获取消息指令请求使用同一个消息集结构。Kafka 中的消息由一个键值对以及少量相关的元数据组成。消息集只是一系列带有偏移量和大小信息的消息序列。这种格式同时用于 broker 上的磁盘存储和线上数据格式(on-the-wire format
)。
在 Kafka 中,消息集也是压缩的基本单位,同时允许消息递归地包含压缩的消息集,以允许批压缩。
在通讯协议中,消息集的前面没有类似的数组元素前面的 INT32
整数。
MessageSet => [Offset MessageSize Message]
Offset => int64
MessageSize => int32
消息体:
v0
Message => Crc MagicByte Attributes Key Value
Crc => int32
MagicByte => int8
Attributes => int8
Key => bytes
Value => bytes
v1 (supported since 0.10.0)
Message => Crc MagicByte Attributes Key Value
Crc => int32
MagicByte => int8
Attributes => int8
Timestamp => int64
Key => bytes
Value => bytes
Field | Description |
---|---|
Offset | Kafka 中消息的偏移量 |
Crc | 剩余消息字节的 CRC32 值,用来检查信息的完整性 |
MagicByte | 这是一个版本ID,用于允许消息二进制格式的向后兼容演进,当前值是1 |
Attributes | 此字节保存有关消息的元数据属性 |
Timestamp | 消息的时间戳 |
Key | 一个可选项,主要用来进行指派分区。可以为 null |
Value | 消息的实际内容,类型是字节数组,也可能是一个消息集。可以为 null |
在 Kafka 0.11 中,消息集和消息的结构发生了变化。不仅添加了新字段以支持新功能,如一次性语义和记录头,而且消除了以前版本消息格式的递归性质,消息集(Messageset
)称为 RecordBatch
,包含一个或多个 Record
(而不是 Message
)。当启用压缩时,RecordBatch
将保持未压缩状态,但 Records
被压缩在一起。
RecordBatch =>
FirstOffset => int64
Length => int32
PartitionLeaderEpoch => int32
Magic => int8
CRC => int32
Attributes => int16
LastOffsetDelta => int32
FirstTimestamp => int64
MaxTimestamp => int64
ProducerId => int64
ProducerEpoch => int16
FirstSequence => int32
Records => [Record]
Record =>
Length => varint
Attributes => int8
TimestampDelta => varint
OffsetDelta => varint
KeyLen => varint
Key => data
ValueLen => varint
Value => data
Headers => [Header]
Header => HeaderKey HeaderVal
HeaderKeyLen => varint
HeaderKey => string
HeaderValueLen => varint
HeaderValue => data
压缩(Compression)
Kafka 支持压缩消息以提高效率,当然,这比压缩一条消息要更复杂。单个消息可能没有足够的冗余信息以达到良好的压缩比,因此必须以特殊的批处理方式发送压缩消息。要被发送的消息被包装(未压缩)在一个 MessageSet
结构中,然后将其压缩并存储在一个 Message
的 Value
中,一起保存的还有相应的压缩编解码集。接收系统通过解压缩得到实际的消息集。
Kafka 目前支持两个压缩编解码器,编解码器编号如下:
Compression | Codec |
---|---|
None | 0 |
GZIP | 1 |
Snappy | 2 |
接口(APIs)
Kafka 六个核心的客户端请求的 API:
- 元数据接口(Metadata API)
- 生产消息接口(Produce API)
- 获取消息接口(Fetch API)
- 偏移量接口(Offset API)
- 偏移量提交接口(Offset Commit API)
- 偏移量获取接口(Offset Fetch API)
元数据接口(Metadata API)
这个 API 能获取以下信息:
- 存在哪些 Topic?
- 每个 Topic 有几个 Partition?
- 每个 Partition 的 Leader 是哪个 broker?
- 这些 broker 的地址和端口分别是什么?
这是唯一一个能发往集群中任意一个 broker 的请求消息。
因为可能有很多 Topic 存在,客户端可以提供一个可选 Topic 名称列表,只返回这些 Topic 的元数据。
返回的元数据是 Partition 级别的信息,以 Topic 分组集中在一起。每个分区的元数据中包含了 leader 以及所有副本,包括正在同步的副本的信息。
Topic Metadata Request
TopicMetadataRequest => [TopicName]
TopicName => string
Field | Description |
---|---|
TopicName | 请求元数据的 Topic。如果为空,则请求所有主题元数据 |
Metadata Response
响应包含的每个分区的元数据,以 Topic 分组,使用 Broker id 指向具体的 Broker。
MetadataResponse => [Broker][TopicMetadata]
Broker => NodeId Host Port (any number of brokers may be returned)
NodeId => int32
Host => string
Port => int32
TopicMetadata => TopicErrorCode TopicName [PartitionMetadata]
TopicErrorCode => int16
PartitionMetadata => PartitionErrorCode PartitionId Leader Replicas Isr
PartitionErrorCode => int16
PartitionId => int32
Leader => int32
Replicas => [int32]
Isr => [int32]
Field | Description |
---|---|
Leader | 分区的 Leader 节点的 Broker id。如果在 Leader 选举过程中,没有 Leader 存在,值为 -1 |
Replicas | 分区的活着的 slave 节点集合 |
Isr | Replicas 集合中,所有处在与 Leader 跟随(表示数据已经完全复制到这些节点)状态的子集 |
Broker | Broker 节点ID、主机名和端口信息 |
生产消息接口(Produce API)
用于向服务器发送消息集。为了提高效率,允许在一个请求中为多个主题分区发送消息集,使用通用消息集格式。
Produce Request
v0, v1 (supported in 0.9.0 or later) and v2 (supported in 0.10.0 or later)
ProduceRequest => RequiredAcks Timeout [TopicName [Partition MessageSetSize MessageSet]]
RequiredAcks => int16
Timeout => int32
Partition => int32
MessageSetSize => int32
Field | Description |
---|---|
RequiredAcks | 表示服务端收到多少确认后才发送 Response 给客户端。 如果设置为0,那么服务端将不发送 Response。 如果设置为1,那么服务器将等到数据写入到本地日之后发送。 如果设置为-1,那么服务端将等到数据被所有的同步副本写入后再发送 |
Timeout | 服务器等待接收 RequiredAcks 中确认数的最长时间(毫秒) |
TopicName | 发布到的 Topic |
Partition | 发布到的 Partition |
MessageSetSize | 后续消息集的长度(字节) |
MessageSet | 标准格式的消息集合 |
Produce Response
v0
ProduceResponse => [TopicName [Partition ErrorCode Offset]]
TopicName => string
Partition => int32
ErrorCode => int16
Offset => int64
v1 (supported in 0.9.0 or later)
ProduceResponse => [TopicName [Partition ErrorCode Offset]] ThrottleTime
TopicName => string
Partition => int32
ErrorCode => int16
Offset => int64
ThrottleTime => int32
v2 (supported in 0.10.0 or later)
ProduceResponse => [TopicName [Partition ErrorCode Offset Timestamp]] ThrottleTime
TopicName => string
Partition => int32
ErrorCode => int16
Offset => int64
Timestamp => int64
ThrottleTime => int32
Field | Description |
---|---|
Topic | 此响应对应的 Topic |
Partition | 此响应对应的 Partition |
ErrorCode | 分区对应的错误信息 |
Offset | 追加到该分区的消息集中的为第一个消息分配的偏移量 |
Timestamp | 如果该主题使用了 LogAppendTime,消息集中的所有消息都有相同的时间戳 |
ThrottleTime | 由于配额冲突而阻止请求的持续时间(毫秒) |
获取消息接口(Fetch API)
用于获取某些主题分区的一个或多个日志块(a chunk of one or more logs),指定了主题、分区和起始偏移量,开始提取并返回消息块。通常,返回消息的偏移量大于等于起始偏移量。但是,对于压缩的消息,返回的消息的偏移量可能小于起始偏移量。这类的消息的数量通常较少,并且调用者必须负责过滤掉这些消息。
接口使用长轮询模型,如果没有足够的数据立即可用,可以在一段时间内阻塞。
服务器被允许在消息集末尾返回部分消息,客户端应该处理这种情况。
Fetch Request
FetchRequest => ReplicaId MaxWaitTime MinBytes [TopicName [Partition FetchOffset MaxBytes]]
ReplicaId => int32
MaxWaitTime => int32
MinBytes => int32
TopicName => string
Partition => int32
FetchOffset => int64
MaxBytes => int32
Field | Description |
---|---|
ReplicaId | 发起这个请求的副本节点ID,普通消费者客户端应该始终将其指定为-1 |
MaxWaitTime | 如果没有足够的数据可发送时,最大阻塞等待时间(毫秒) |
MinBytes | 返回响应消息的最小字节数目,必须设置。 如果设为0,服务器将会立即返回,如果没有新的数据,会返回一个空消息集。 如果设为1,服务器将在至少一个分区收到一个字节的数据的情况下立即返回,或者等到超时时间达到 |
TopicName | 获取数据的 Topic |
Partition | 获取数据的 Partition |
FetchOffset | 获取数据的起始 Offset |
MaxBytes | 分区返回消息集所能包含的最大字节数 |
Fetch Response
v0
FetchResponse => [TopicName [Partition ErrorCode HighwaterMarkOffset MessageSetSize MessageSet]]
TopicName => string
Partition => int32
ErrorCode => int16
HighwaterMarkOffset => int64
MessageSetSize => int32
v1 (supported in 0.9.0 or later) and v2 (supported in 0.10.0 or later)
FetchResponse => ThrottleTime [TopicName [Partition ErrorCode HighwaterMarkOffset MessageSetSize MessageSet]]
ThrottleTime => int32
TopicName => string
Partition => int32
ErrorCode => int16
HighwaterMarkOffset => int64
MessageSetSize => int32
Field | Description |
---|---|
ThrottleTime | 由于配额冲突而阻止请求的持续时间(毫秒) |
TopicName | 此响应对应的 Topic |
Partition | 此响应对应的 Partition |
HighwaterMarkOffset | 该分区日志结尾处的偏移量,可以用来确定日志结尾后面有多少条消息。 |
MessageSetSize | 此分区的消息集的大小(字节) |
MessageSet | 此分区获取到的消息集 |
偏移量接口(Offset API,AKA ListOffset)
描述了一组主题分区的偏移量有效范围。
对于 v0 版本,响应包含请求分区的每个段的起始偏移量以及“日志结束偏移量(log end offset)”,也就是将附加到给定分区的下一条消息的偏移量。
对于 v1 版本(0.10.1.0+),Kafka 支持按消息中使用的时间戳搜索偏移量的时间索引,并对此 API 进行了更改以支持这一点。
Offset Request
// v0
ListOffsetRequest => ReplicaId [TopicName [Partition Time MaxNumberOfOffsets]]
ReplicaId => int32
TopicName => string
Partition => int32
Time => int64
MaxNumberOfOffsets => int32
// v1 (supported in 0.10.1.0 and later)
ListOffsetRequest => ReplicaId [TopicName [Partition Time]]
ReplicaId => int32
TopicName => string
Partition => int32
Time => int64
Field | Description |
---|---|
Time | 用来请求一定时间(毫秒)前的所有消息。 两个特殊取值: -1 表示获取最后一个offset -2 表示获取最早的有效偏移量 |
Offset Response
// v0
OffsetResponse => [TopicName [PartitionOffsets]]
PartitionOffsets => Partition ErrorCode [Offset]
Partition => int32
ErrorCode => int16
Offset => int64
// v1
ListOffsetResponse => [TopicName [PartitionOffsets]]
PartitionOffsets => Partition ErrorCode Timestamp [Offset]
Partition => int32
ErrorCode => int16
Timestamp => int64
Offset => int64
偏移量提交接口(Offset Commit API)
Offset Commit Request
v0 (supported in 0.8.1 or later)
OffsetCommitRequest => ConsumerGroupId [TopicName [Partition Offset Metadata]]
ConsumerGroupId => string
TopicName => string
Partition => int32
Offset => int64
Metadata => string
v1 (supported in 0.8.2 or later)
OffsetCommitRequest => ConsumerGroupId ConsumerGroupGenerationId ConsumerId [TopicName [Partition Offset TimeStamp Metadata]]
ConsumerGroupId => string
ConsumerGroupGenerationId => int32
ConsumerId => string
TopicName => string
Partition => int32
Offset => int64
TimeStamp => int64
Metadata => string
v2 (supported in 0.9.0 or later)
OffsetCommitRequest => ConsumerGroup ConsumerGroupGenerationId ConsumerId RetentionTime [TopicName [Partition Offset Metadata]]
ConsumerGroupId => string
ConsumerGroupGenerationId => int32
ConsumerId => string
RetentionTime => int64
TopicName => string
Partition => int32
Offset => int64
Metadata => string
在 v0、v1 版本中,每个分区的时间戳定义为提交时间戳,偏移量协调者将保存消费者所提交的偏移量,直到当前时间超过提交时间加偏移量保留时间(在 broker 配置中);如果时间戳没有设值,那么 broker 会将此值设定为接收到提交偏移量请求的时间。
在v2版本中,移除了时间戳,但是增加了一个全局保存时间域(KAFKA-1634)代替。
Offset Commit Response
v0, v1 and v2:
OffsetCommitResponse => [TopicName [Partition ErrorCode]]]
TopicName => string
Partition => int32
ErrorCode => int16
偏移量获取接口(Offset Fetch API)
Offset Fetch Request
v0 and v1 (supported in 0.8.2 or after):
OffsetFetchRequest => ConsumerGroup [TopicName [Partition]]
ConsumerGroup => string
TopicName => string
Partition => int32
Offset Fetch Response
v0 and v1 (supported in 0.8.2 or after):
OffsetFetchResponse => [TopicName [Partition Offset Metadata ErrorCode]]
TopicName => string
Partition => int32
Offset => int64
Metadata => string
ErrorCode => int16
另外还有一些管理接口,这里不做介绍。再来看看下面的内容。
Some Common Philosophical Questions
有些人问,为什么不使用 HTTP。有许多原因,最主要的是客户端实现可以使用一些更高级的 TCP 特性,同时 HTTP 库在许多编程语言中是令人惊讶的烂。
还有人问,为什么不支持许多不同的协议。此前的经验是,如果必须在许多协议实现中移植新特性,是很难添加和测试的,并且大多数用户并不在乎支持多个协议这些特性,只是希望在自己选择的语言中实现了良好可靠的客户端。
另一个问题是,为什么不采用 XMPP,STOMP,AMQP 或现有的协议。这一问题的答案因协议而异,共同的问题是,协议决定了大部分的实现,如果没有对协议的控制权,就不能做一些想要做的事情。也许现在的实现方式比现有的协议更好。
最后一个问题是,为什么不使用的 Protocol Buffers 或 Thrift 来定义请求消息格式。这些库擅长帮助管理非常多的序列化的消息,然而,这里只有几个种类的消息,而且这些库跨语言的支持程度不同。
最后,通过对二进制日志格式和传输协议之间映射的管理,让 API 有明确的版本并且更细致地控制兼容性。