Kafka2.0消费者协调器源码
2019-07-17 本文已影响0人
O_Neal
消费组和消费者
- 消费组和消费者是一对多的关系。
- 同一个消费组的消费者可以消费多个分区,且是独占的。
- 消费者的分区分配策略由接口
PartitionAssignor
定义,内置三种分配策略RangeAssignor
、RoundRobinAssignor
、StickyAssignor
,支持自定义策略。 - 不同消费组可以消费相同的分区,互不干扰。
消费者协调器和组协调器
- 客户端的消费者协调器
ConsumerCoordinator
和服务端的组协调器GroupCoordinator
通过心跳不断保持通信。 - 消费者进行消费之前,需要确保协调器是 ready 的。
- 选择具有最少请求的节点
Node
,即具有最少的InFlightRequests
的节点。 - 向该节点发送获取协调器节点的请求,发送流程类似发送拉取请求。
- 向找到的协调器节点发送加入组请求,此时会禁止心跳线程。
- 加入组响应处理器
JoinGroupResponseHandler
对响应进行处理,响应包含generationId
、memberId
、leaderId
、protocol
。 - 如果是 leader 消费者,即
memberId=leaderId
,则需要根据分配策略protocol
计算分区分配。 - 将分区分配结果封装到同步组请求,再向协调器节点发送同步组请求。
- 同步组响应处理器
SyncGroupResponseHandler
对上述请求的响应进行处理。 - 如果第5步判断不是 follower 消费者,同样需要向协调器发送同步组请求,只是请求中不需要封装分区分配结果,而是从组协调器获取。
- 加入组成功后,启动心跳线程。
- 更新本地缓存的分区分配,此处会调用消费者再平衡监听器。
- 选择具有最少请求的节点
消费者状态
- UNJOINED:消费者初始状态为
UNJOINED
,表示未加入消费组。 - REBALANCING:消费者向协调器发送加入组请求之前,状态变更为
REBALANCING
,表示再平衡状态 - STABLE:消费者监听到消息成功返回,状态变更为
STABLE
,表示稳定状态,如果是失败的消息,状态重置为UNJOINED
心跳线程
- 消费者加入消费组之后会启动心跳线程,并保持和组协调器的通信。
- 如果消费者状态不是
STABLE
,则不发送心跳。 - 如果组协调器未知,则等待一段时间重试。
- 如果心跳会话超时,则标记协调器节点未知。
- 如果心跳轮询超时,则发送离开组请求。
- 如果暂不需要发送心跳,则等待一段时间重试。
- 发送心跳,注册响应监听器,接收到响应后,设置接收时间,并进行下一轮的心跳。
偏移量
拉取偏移量
- 如果有指定的分区,消费者协调器从组协调器拉取一组分区和已提交偏移量的映射关系,缓存到
SubscriptionState
。 - 设置偏移量重置策略:
LATEST
,EARLIEST
,NONE
。 - 异步地更新消费的偏移量位置。
提交偏移量
- 消费者协调器获取当前的协调器节点。
- 向该节点发送提交偏移量请求,返回
Future
。
加入组流程
加入组流程消费者加入组流程的源码分析
boolean updateAssignmentMetadataIfNeeded(final long timeoutMs) {
final long startMs = time.milliseconds();
if (!coordinator.poll(timeoutMs)) { // 获取协调器
return false;
}
// 更新偏移量
return updateFetchPositions(remainingTimeAtLeastZero(timeoutMs, time.milliseconds() - startMs));
}
// 获取协调器
public boolean poll(final long timeoutMs) {
final long startTime = time.milliseconds();
long currentTime = startTime;
long elapsed = 0L;
if (subscriptions.partitionsAutoAssigned()) { // 是自动分配主题类型
// 更新心跳的上一次的轮询时间
pollHeartbeat(currentTime);
if (coordinatorUnknown()) { // 协调器未知
// 确保协调器已经 ready
if (!ensureCoordinatorReady(remainingTimeAtLeastZero(timeoutMs, elapsed))) {
return false;
}
}
if (rejoinNeededOrPending()) { // 需要加入消费组
// 加入组、同步组
if (!ensureActiveGroup(remainingTimeAtLeastZero(timeoutMs, elapsed))) {
return false;
}
currentTime = time.milliseconds();
}
} else { // 指定分区类型
if (metadata.updateRequested() && !client.hasReadyNodes(startTime)) {// 如果没有准备就绪的节点
// 阻塞等待元数据更新
final boolean metadataUpdated = client.awaitMetadataUpdate(remainingTimeAtLeastZero(timeoutMs, elapsed));
if (!metadataUpdated && !client.hasReadyNodes(time.milliseconds())) {
return false; // 更新元数据失败
}
currentTime = time.milliseconds();
}
}
maybeAutoCommitOffsetsAsync(currentTime); // 异步自动提交偏移量
return true;
}
// 确保协调器已经 ready
protected synchronized boolean ensureCoordinatorReady(final long timeoutMs) {
final long startTimeMs = time.milliseconds();
long elapsedTime = 0L;
while (coordinatorUnknown()) { // 如果协调器未知
final RequestFuture<Void> future = lookupCoordinator(); // 向当前请求队列最少的节点,发送获取协调器的请求
client.poll(future, remainingTimeAtLeastZero(timeoutMs, elapsedTime));
if (!future.isDone()) {
break; // 响应未完成,退出
}
}
return !coordinatorUnknown();
}
// 加入组、同步组
boolean ensureActiveGroup(long timeoutMs, long startMs) {
startHeartbeatThreadIfNeeded(); // 启动心跳线程
return joinGroupIfNeeded(joinTimeoutMs, joinStartMs);
}
boolean joinGroupIfNeeded(final long timeoutMs, final long startTimeMs) {
long elapsedTime = 0L;
while (rejoinNeededOrPending()) {
// 发送加入组请求
final RequestFuture<ByteBuffer> future = initiateJoinGroup();
client.poll(future, remainingTimeAtLeastZero(timeoutMs, elapsedTime));
if (!future.isDone()) {
// we ran out of time
return false;
}
if (future.succeeded()) { // 加入成功,回调处理响应,更新缓存的分区分配
ByteBuffer memberAssignment = future.value().duplicate();
onJoinComplete(generation.generationId, generation.memberId, generation.protocol, memberAssignment);
}
}
return true;
}
// 发送加入组请求
private synchronized RequestFuture<ByteBuffer> initiateJoinGroup() {
if (joinFuture == null) {
disableHeartbeatThread(); // 暂停心跳线程
state = MemberState.REBALANCING; // 状态改为 REBALANCING
joinFuture = sendJoinGroupRequest(); // 向协调器发送加入组请求
joinFuture.addListener(new RequestFutureListener<ByteBuffer>() { // 响应监听器
@Override
public void onSuccess(ByteBuffer value) { // 成功
synchronized (AbstractCoordinator.this) {
state = MemberState.STABLE; // 状态改为 STABLE
rejoinNeeded = false; // 不需要加入了
if (heartbeatThread != null)
heartbeatThread.enable(); // 启动暂停了的心跳
}
}
@Override
public void onFailure(RuntimeException e) { // 失败
synchronized (AbstractCoordinator.this) {
state = MemberState.UNJOINED; // 状态改为 UNJOINED
}
}
});
}
return joinFuture;
}
// 向协调器发送加入组请求
RequestFuture<ByteBuffer> sendJoinGroupRequest() {
JoinGroupRequest.Builder requestBuilder = new JoinGroupRequest.Builder(
groupId,
this.sessionTimeoutMs,
this.generation.memberId,
protocolType(),
metadata()).setRebalanceTimeout(this.rebalanceTimeoutMs);
int joinGroupTimeoutMs = Math.max(rebalanceTimeoutMs, rebalanceTimeoutMs + 5000);
return client.send(coordinator, requestBuilder, joinGroupTimeoutMs)
.compose(new JoinGroupResponseHandler()); // 异步回调响应处理类
}
// 异步回调响应处理类
private class JoinGroupResponseHandler extends CoordinatorResponseHandler<JoinGroupResponse, ByteBuffer> {
@Override
public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> future) {
Errors error = joinResponse.error();
if (error == Errors.NONE) {
synchronized (AbstractCoordinator.this) {
if (state != MemberState.REBALANCING) { // 如果是 REBALANCING,状态异常
future.raise(new UnjoinedGroupException());
} else {
AbstractCoordinator.this.generation = new Generation(joinResponse.generationId(), joinResponse.memberId(), joinResponse.groupProtocol());
if (joinResponse.isLeader()) { // 当前消费组是 leader
onJoinLeader(joinResponse).chain(future);
} else { // 当消费者是 follower
onJoinFollower().chain(future);
}
}
}
}
}
}
// 发送 leader 消费者同步组请求
private RequestFuture<ByteBuffer> onJoinLeader(JoinGroupResponse joinResponse) {
try {
// 根据响应的分配策略,给消费者分配分区
Map<String, ByteBuffer> groupAssignment = performAssignment(joinResponse.leaderId(), joinResponse.groupProtocol(), joinResponse.members());
SyncGroupRequest.Builder requestBuilder = new SyncGroupRequest.Builder(groupId, generation.generationId, generation.memberId, groupAssignment);
return sendSyncGroupRequest(requestBuilder);
} catch (RuntimeException e) {
return RequestFuture.failure(e);
}
}
// 发送 follower 消费者同步组请求
private RequestFuture<ByteBuffer> onJoinFollower() {
SyncGroupRequest.Builder requestBuilder =
new SyncGroupRequest.Builder(groupId, generation.generationId, generation.memberId,
Collections.<String, ByteBuffer>emptyMap()); // 发送不带分配信息的请求
return sendSyncGroupRequest(requestBuilder);
}