【Kafka零基础学习】Kafka请求分类
概要
Kafka 0.10.0.1支持的请求有19种,可以从KafkaApis.handle中查看。如下:
def handle(request: RequestChannel.Request) {
try {
trace("Handling request:%s from connection %s;securityProtocol:%s,principal:%s".
format(request.requestDesc(true), request.connectionId, request.securityProtocol, request.session.principal))
ApiKeys.forId(request.requestId) match {
case ApiKeys.PRODUCE => handleProducerRequest(request)
case ApiKeys.FETCH => handleFetchRequest(request)
case ApiKeys.LIST_OFFSETS => handleOffsetRequest(request)
case ApiKeys.METADATA => handleTopicMetadataRequest(request)
case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request)
case ApiKeys.UPDATE_METADATA_KEY => handleUpdateMetadataRequest(request)
case ApiKeys.CONTROLLED_SHUTDOWN_KEY => handleControlledShutdownRequest(request)
case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request)
case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request)
case ApiKeys.GROUP_COORDINATOR => handleGroupCoordinatorRequest(request)
case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request)
case ApiKeys.HEARTBEAT => handleHeartbeatRequest(request)
case ApiKeys.LEAVE_GROUP => handleLeaveGroupRequest(request)
case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request)
case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupRequest(request)
case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request)
case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)
case requestId => throw new KafkaException("Unknown api code " + requestId)
}
}
后面部分将根据作用域对这些请求进行分类并做简要说明。个人认为只要对这些请求的创建,处理过程有一个熟练的掌握,将会对Kafka有一个质的认识。所以后续有一系列文章将会围绕这些请求逐一进行详细介绍。
生产者
- ProducerRequest
用于发送消息到Kafka服务端,服务端也即Broker。
消费者
-
GroupCoordinatorRequest
Reblance第一步:用于从Broker查找GroupCoordinator。每个ConsumerGroup映射为服务端的一个GroupCoordinator。 -
JoinGroupRequest
Reblance第二步:Consumer首先向GroupCoordinator发送JoinGroupRequest请求,其中包含消费者的相关消息;服务端的GroupCoordinator收到JoinGroupRequest后会暂存消息,收集到全部消费者后,根据JoinGroupRequest中的信息来确定Consumer Group中可用的消费者,从中选取一个消费者成为Group Leader,还会选取使用的分区分配策略,最后将这些消息封装成JoinGroupResponse返回给消费者。
虽然每个消费者都会收到JoinGroupResponse,但是只有Group Leader收到的JoinGroupReponse中封装了所有消费者的消息。当消费者确定自己时Group Leader后,会根据消费者的信息及选定的分区分配策略进行分区分配。 -
SyncGroupRequest
Reblance第三步:消费者会发送SyncGroupRequest到GroupCoordinator,但是只有GroupLeader的SyncGroupRequest请求包含了分区的分配结果,GroupCoordinator根据Group Leader的分区分配结果,形成SyncGroupResponse返回给所有消费者。消费者收到SyncGroupResponse后进行解析,便可获取分配给自身的分区。 -
HeartbeatRequest
发送此请求是为了告诉GroupCoordinator此消费者正常在线。 -
FetchRequest
用于从Kafka服务端获取消息。 -
LeaveGroupRequest
当Consumer正常离开ConsumerGroup时会发送LeaveGroupRequest。 -
OffsetCommitRequest
在进行消费者正常消费过程中以及Rebalance操作开始之前,都会提交一次offset记录Consumer的当前消费位置。 -
OffsetFetchRequest
在Rebalance操作结束后,每个消费者都确定了其需要消费的分区。在开始消费之前,消费者需要确定拉取消息的起始位置。 -
OffsetRequest
在有些场景下,例如第一次消费某个Topic的分区,服务端的内部Offsets Topic中并没有记录当前消费者在此分区上的消费位置,所以消费者无法从服务端获取最近提交的offset。此时,如果用户手动指定消费的其实offset,则可以从指定offset开始消费,否者就需要重置TopicPartitionState.position字段。重置TopicPartitionState.position字段的过程中涉及OffsetsRequest和OffsetResponse。
Broker
-
LeaderAndIsrRequest
用于从ISR集合中选取新的Leader。 -
StopReplicaRequest
在分区的副本进行重新分配、关闭Broker等过程中会使用到此请求。当Broker收到来至KafkaController的StopReplicaRequest请求时,会关闭其指定的副本,并根据StopReplicaRequest中的字段决定是否删除副本对应的log。 -
UpdateMetadataRequest
MetadataCache是Broker用来缓存整个集群中全部分区状态的组件。KafkaController通过向集群中的Broker发送UpdateMetadataRequest来更新Metadata中缓存的数据,每个Broker在收到该请求后会异步更新MetadataCache中的数据。 -
ControlledShutdownRequest
在关闭JVM时触发(作为JVM的关闭钩子触发)。有两个好处,一是可以让日志文件完全同步到磁盘上,在Broker下次重现上线时不需要进行Log的恢复操作;二是对其上的Leader副本进行迁移。
脚本
-
ListGroupsRequest
用于kafka-consumer-groups脚本。 -
DescribeGroupRequest
用于kafka-consumer-groups脚本
测试
- ApiVersionsRequest
测试类中用到了。
通用
-
TopicMetadataRequest
用于生产者/消费者请求Topic metadata。 -
SaslHandshakeRequest
Kafka的身份认证请求。