Kafka源码分析-Consumer(8)-Rebalance分

2018-12-28  本文已影响0人  陈阳001

第二阶段:

在成功找到对应的GroupCoordinator之后进入了Join Group阶段。这个阶段,消费者会向GroupCoordinator发送JoinGroupRequest请求,并处理响应。

JoinGroupRequest和JoinGroupResponse的消息格式

JoinGroup Request.jpg JoinGroup Response.jpg

JoinGroupRequest中各个字段含义:

名称 类型 含义
group_id String Consumer Group的Id
session_timeout int GroupCoordinator超过session_time指定的时间,没有收到心跳,认为消费者下线
member_id String GroupCoordinator分配给消费者的id
protocol_type String Consumer Group实现的协议,默认是“consumer”
group_protocols List 包含此消费者支持的全部PartitionAssignor类型
protocol_ name String PartitionAssignor的名称
protocol_ metadata byte数组 针对不同的PartitionAssignor,序列化后的消费者的订阅信息,其中包括用户自定义数据的userData

JoinGroupResponse中各个字段含义:

名称 类型 含义
error_code short 错误码
generation_id int GroupCoordinator分配的年代信息
group_protocol String GroupCoordinator选择的PartitionAssignor
leader_id String leader的member_id
member_id String GroupCoordinator分配给消费者的Id
members Map集合 PartitionAssignor的名称
member_ metadata byte数组 对应消费者定义的信息

分析了JoinGroupRequest和JoinGroupResponse的消息格式后,分析下第二阶段的相关处理流程,其入口方法是ensurePartitionAssignment()。ensurePartitionAssignment()流程如下:


ensurePartitionAssignment()方法的处理流程.jpg

1)调用subscriptions.partitionsAutoAssigned()方法,检测Consumer的订阅是否是AUTO_TOPIC或AUTO_PATTERN。因为AUTO_ASSIGNED不需要进行Rebalance操作,而是由用户手动指定分区。
2)如果订阅模式是AUTO_PATTERN,则检查Metadata是否需要更新。

 /**
     * Ensure our metadata is fresh (if an update is expected, this will block
     * until it has completed).
     */
    public void ensureFreshMetadata() {
        //如果长时间没有更新或 Metadata.needUpdate字段为true,则更新Metadata
        if (this.metadata.updateRequested() || this.metadata.timeToNextUpdate(time.milliseconds()) == 0)
            awaitMetadataUpdate();//阻塞
    }

前面介绍ConsumerCoordinator提过,ConsumerCoordinator的构造方法中为Metadata添加了监听器。当Metadata更新时就会使用SubscriptionState中的正则表达式过滤Topic,并更改SubscriptionState中的订阅信息。同时,也会使用metadataSnapshot字段记录当前的Metadata快照。更新Metadata的原因,是为了防止因使用过期的Metadata进行Rebalance操作而导致多次连续的Rebalance操作。
3)调用ConsumerCoordinator.needRejoin()方法判断是否要发送JoinGroupRequest加入ConsumerGroup,其实现在是检测是否使用了AUTO_TOPIC或AUTO_PATTERN模式,检测rejoinNeeded和needsPartitionAssignment两个字段的值。

@Override
    public boolean needRejoin() {
        return subscriptions.partitionsAutoAssigned() &&//检测subscriptionType
                (super.needRejoin() //检测 rejoinNeeded的值
                        || subscriptions.partitionAssignmentNeeded());
    }

4)调用onJoinPrepare()方法进行发送JoinGroupRequest之前的准备,准备三个事情:

@Override
    protected void onJoinPrepare(int generation, String memberId) {
        // commit offsets prior to rebalance if auto-commit enabled 进行一次同步提交offsets的操作
        maybeAutoCommitOffsetsSync();
        //调用SubscriptionState中设置的ConsumerRebalanceListener
        // execute the user's callback before rebalance
        ConsumerRebalanceListener listener = subscriptions.listener();
        log.info("Revoking previously assigned partitions {} for group {}", subscriptions.assignedPartitions(), groupId);
        try {
            Set<TopicPartition> revoked = new HashSet<>(subscriptions.assignedPartitions());
            listener.onPartitionsRevoked(revoked);
        } catch (WakeupException e) {
            throw e;
        } catch (Exception e) {
            log.error("User provided listener {} for group {} failed on partition revocation",
                    listener.getClass().getName(), groupId, e);
        }

        assignmentSnapshot = null;
        subscriptions.needReassignment();//将needsPartitionAssignment设置为true
    }

5)再次调用needRejoin()方法检测,之后调用ensureCoordinatorReady()方法检测以及找到GroupCoordinator且并之建立连接。
6)如果还有发往GroupCoordinator所在Node的请求,则阻塞等待这些请求全部发送完成并收到响应(即等待unsent和InFligntRequests的对应队列为空),然后返回步骤5继续进行,这是为了避免重复发送JoinGroupRequest请求。
7)调用sendJoinGroupRequest() 方法创建JoinGroupRequest请求,并调用ConsumerNetworkClient.send()方法将请求放入unsent中缓存,等待发送,具体如下:

/**
     * Join the group and return the assignment for the next generation. This function handles both
     * JoinGroup and SyncGroup, delegating to {@link #performAssignment(String, String, Map)} if
     * elected leader by the coordinator.
     * @return A request future which wraps the assignment returned from the group leader
     */
    private RequestFuture<ByteBuffer> sendJoinGroupRequest() {
        if (coordinatorUnknown())//检测GroupCoordinator
            return RequestFuture.coordinatorNotAvailable();

        // send a join group request to the coordinator
        log.info("(Re-)joining group {}", groupId);
        //创建 JoinGroupRequest
        JoinGroupRequest request = new JoinGroupRequest(
                groupId,
                this.sessionTimeoutMs,
                this.memberId,
                protocolType(),
                metadata());

        log.debug("Sending JoinGroup ({}) to coordinator {}", request, this.coordinator);
        //将JoinGroupRequest放入unsent集合等待发送
        //注意,JoinGroupResponseHandler是JoinGroupResponse处理的入口
        return client.send(coordinator, ApiKeys.JOIN_GROUP, request)
                .compose(new JoinGroupResponseHandler());
    }

8)在步骤7返回的RequestFuture<ByteBuffer>对象上添加RequestFutureListener。
9)调用ConsumerNetworkClient.poll()方法发送JoinGroupRequest,这里会阻塞直到返回JoinGroupResponse或异常。
10)检测RequestFuture.fail()。如果出现RetriableException异常则重试,其他异常则报错。如果没有异常的话,则第二阶段结束。
下面看下ensurePartitionAssignment()方法:

/**
     * Ensure that we have a valid partition assignment from the coordinator.
     */
    public void ensurePartitionAssignment() {
        if (subscriptions.partitionsAutoAssigned()) {//第一步:检测订阅类型
            // Due to a race condition between the initial metadata fetch and the initial rebalance, we need to ensure that
            // the metadata is fresh before joining initially, and then request the metadata update. If metadata update arrives
            // while the rebalance is still pending (for example, when the join group is still inflight), then we will lose
            // track of the fact that we need to rebalance again to reflect the change to the topic subscription. Without
            // ensuring that the metadata is fresh, any metadata update that changes the topic subscriptions and arrives with a
            // rebalance in progress will essentially be ignored. See KAFKA-3949 for the complete description of the problem.
            if (subscriptions.hasPatternSubscription())//第二步:检测是否需要更新Metadata
                client.ensureFreshMetadata();

            ensureActiveGroup();
        }
    }

 /**
     * Ensure that the group is active (i.e. joined and synced)
     */
    public void ensureActiveGroup() {
        // always ensure that the coordinator is ready because we may have been disconnected
        // when sending heartbeats and does not necessarily require us to rejoin the group.
        ensureCoordinatorReady();

        if (!needRejoin())//第三步:检测是否需要发送JoinGroupRequest请求。
            return;

        if (needsJoinPrepare) {
            //第四步:发送 JoinGroupRequest 请求前的准备操作。
            onJoinPrepare(generation, memberId);
            needsJoinPrepare = false;
        }

        while (needRejoin()) {
            ensureCoordinatorReady();//第五步:检测 GroupCoordinator 状态。

            // ensure that there are no pending requests to the coordinator. This is important
            // in particular to avoid resending a pending JoinGroup request.
            if (client.pendingRequestCount(this.coordinator) > 0) {
                //第六步:等待发往GroupCoordinator所在节点的消息全部完成。
                client.awaitPendingRequests(this.coordinator);
                continue;
            }
                //第七步:创建并缓存请求。
            RequestFuture<ByteBuffer> future = sendJoinGroupRequest();
            //第八步:添加监听器。
            future.addListener(new RequestFutureListener<ByteBuffer>() {
                @Override
                public void onSuccess(ByteBuffer value) {
                    // handle join completion in the callback so that the callback will be invoked
                    // even if the consumer is woken up before finishing the rebalance
                    onJoinComplete(generation, memberId, protocol, value);
                    needsJoinPrepare = true;
                    heartbeatTask.reset();
                }

                @Override
                public void onFailure(RuntimeException e) {
                    // we handle failures below after the request finishes. if the join completes
                    // after having been woken up, the exception is ignored and we will rejoin
                }
            });
            client.poll(future);//第九步:阻塞等待JoinGroupRequest请求完成。

            if (future.failed()) {//第十步:异常处理。
                RuntimeException exception = future.exception();
                if (exception instanceof UnknownMemberIdException ||
                        exception instanceof RebalanceInProgressException ||
                        exception instanceof IllegalGenerationException)
                    continue;
                else if (!future.isRetriable())
                    throw exception;
                time.sleep(retryBackoffMs);//退避一段时间重试
            }
        }
    }

通过对JoinGroupRequest发送流程的分析,我们知道JoinGroupResponse处理流程的入口是JoinGroupResponseHandler.handle()方法,其中还包括了SyncGroupRequest发送的操作,后面再详细说明,JoinGroupResponse处理流程如下:


JoinGroupResponse处理流程.jpg
  1. 解析JoinGroupResponse,获取GroupCoordinator分配的memberId,generation等信息,并更新到本地。
  2. 消费者根据leaderId检测自己是不是Leader。如果是Leader则进入onJoinLeader()方法,如果不是Leader则进入onJoinFollower()方法。onJoinFollower()方法是onJoinLeader()方法的子集,下面主要结束下onJoinLeader()方法。
  3. Leader根据JoinGroupResponse的group_protocol字段指定的Partition分配策略,查找相应的PartitionAssignor对象。
  4. Leader将JoinGroupResponse的members字段进行反序列化,得到Consumer Group中全部消费订阅的Topic。Leader会将这些Topic信息添加到其SubscriptionState.groupSubscription集合中。而Follower则只关心自己订阅的Topic信息。
    5)第四步可能有新的Topic添加进来,所以要更新Metadata信息。
    6)等到Metadata更新完毕后,会在assignmentSnapshot字段中存储一个Metadata快照(通过Metadata的Listener创建的快照)。
    7)调用PartitionAssignor.assign()方法进行分区分配。
    8)将分配的结果序列化,保存到Map中返回,其中key是消费者的member_id,value是分配结果序列化后的ByteBuffer。
    分析JoinGroupResponseHandler.handle()方法:
private class JoinGroupResponseHandler extends CoordinatorResponseHandler<JoinGroupResponse, ByteBuffer> {

        @Override
        public JoinGroupResponse parse(ClientResponse response) {
            return new JoinGroupResponse(response.responseBody());
        }

        @Override
        public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> future) {
            Errors error = Errors.forCode(joinResponse.errorCode());
            if (error == Errors.NONE) {
                //步骤一:解析JoinGroupResponse,更新到本地。
                log.debug("Received successful join group response for group {}: {}", groupId, joinResponse.toStruct());
                AbstractCoordinator.this.memberId = joinResponse.memberId();
                AbstractCoordinator.this.generation = joinResponse.generationId();
                AbstractCoordinator.this.rejoinNeeded = false;//修改了this.rejoinNeeded = false
                AbstractCoordinator.this.protocol = joinResponse.groupProtocol();
                sensors.joinLatency.record(response.requestLatencyMs());
                if (joinResponse.isLeader()) {//步骤二:判断是否为leader
                    /*
                    注意这里,此future是在前面sendJoinGroupRequest()方法返回的 RequestFuture 对象
                    在onJoinLeader()和onJoinFollower()方法中,都涉及发送 SyncGroupRequest 逻辑,
                    返回的RequestFuture 标识是SyncGroupRequest的完成情况。这里使用chain()方法,主要实现
                    的功能是:当SyncGroupResponse处理完成后,再通知这个future对象。
                    
                     */
                    
                    
                    
                    onJoinLeader(joinResponse).chain(future);
                } else {
                    onJoinFollower().chain(future);
                }
            } else if (error == Errors.GROUP_LOAD_IN_PROGRESS) {
                log.debug("Attempt to join group {} rejected since coordinator {} is loading the group.", groupId,
                        coordinator);
                // backoff and retry
                future.raise(error);
            } else if (error == Errors.UNKNOWN_MEMBER_ID) {
                // reset the member id and retry immediately
                AbstractCoordinator.this.memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID;
                log.debug("Attempt to join group {} failed due to unknown member id.", groupId);
                future.raise(Errors.UNKNOWN_MEMBER_ID);
            } else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE
                    || error == Errors.NOT_COORDINATOR_FOR_GROUP) {
                // re-discover the coordinator and retry with backoff
                coordinatorDead();
                log.debug("Attempt to join group {} failed due to obsolete coordinator information: {}", groupId, error.message());
                future.raise(error);
            } else if (error == Errors.INCONSISTENT_GROUP_PROTOCOL
                    || error == Errors.INVALID_SESSION_TIMEOUT
                    || error == Errors.INVALID_GROUP_ID) {
                // log the error and re-throw the exception
                log.error("Attempt to join group {} failed due to fatal error: {}", groupId, error.message());
                future.raise(error);
            } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                future.raise(new GroupAuthorizationException(groupId));
            } else {
                // unexpected error, throw the exception
                future.raise(new KafkaException("Unexpected error in join group response: " + error.message()));
            }
        }
    }
private RequestFuture<ByteBuffer> onJoinLeader(JoinGroupResponse joinResponse) {
        try {
            // perform the leader synchronization and send back the assignment for the group
            //步骤3-8都是在performAssignment()方法中完成
            Map<String, ByteBuffer> groupAssignment = performAssignment(joinResponse.leaderId(), joinResponse.groupProtocol(),
                    joinResponse.members());
            //创建并发送SyncGroupRequest
            SyncGroupRequest request = new SyncGroupRequest(groupId, generation, memberId, groupAssignment);
            log.debug("Sending leader SyncGroup for group {} to coordinator {}: {}", groupId, this.coordinator, request);
            return sendSyncGroupRequest(request);
        } catch (RuntimeException e) {
            return RequestFuture.failure(e);
        }
    }

@Override
    protected Map<String, ByteBuffer> performAssignment(String leaderId,
                                                        String assignmentStrategy,
                                                        Map<String, ByteBuffer> allSubscriptions) {
        //步骤三:查找分区分配使用的PartitionAssignor
        PartitionAssignor assignor = lookupAssignor(assignmentStrategy);
        if (assignor == null)
            throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy);

        Set<String> allSubscribedTopics = new HashSet<>();
        Map<String, Subscription> subscriptions = new HashMap<>();
        for (Map.Entry<String, ByteBuffer> subscriptionEntry : allSubscriptions.entrySet()) {
            Subscription subscription = ConsumerProtocol.deserializeSubscription(subscriptionEntry.getValue());
            subscriptions.put(subscriptionEntry.getKey(), subscription);
            allSubscribedTopics.addAll(subscription.topics());
        }

        // the leader will begin watching for changes to any of the topics the group is interested in,
        // which ensures that all metadata changes will eventually be seen
        //步骤四:对应leader来说,要关注Consumer group中所有消费者订阅的topic
        this.subscriptions.groupSubscribe(allSubscribedTopics);
        metadata.setTopics(this.subscriptions.groupSubscription());

        // update metadata (if needed) and keep track of the metadata used for assignment so that
        // we can check after rebalance completion whether anything has changed
        client.ensureFreshMetadata(); //步骤五:更新Metadata
        assignmentSnapshot = metadataSnapshot;//步骤六:记录快照

        log.debug("Performing assignment for group {} using strategy {} with subscriptions {}",
                groupId, assignor.name(), subscriptions);
        //步骤⑦:进行分区分配
        Map<String, Assignment> assignment = assignor.assign(metadata.fetch(), subscriptions);

        log.debug("Finished assignment for group {}: {}", groupId, assignment);
        //步骤八:将分区分配结果序列化,并保存到groupAssignment中
        Map<String, ByteBuffer> groupAssignment = new HashMap<>();
        for (Map.Entry<String, Assignment> assignmentEntry : assignment.entrySet()) {
            ByteBuffer buffer = ConsumerProtocol.serializeAssignment(assignmentEntry.getValue());
            
            groupAssignment.put(assignmentEntry.getKey(), buffer);
        }

        return groupAssignment;
    }

第三阶段

完成分区分配后进入了Synchronizing Group State阶段,逻辑是向GroupCoordinator发送SyncGroupRequest请求并处理SyncGroupResponse响应。先分析一下SyncGroupRequest和SyncGroupResponse的消息体格式:


SyncGroup Request.png SyncGroup Response.jpg

SyncGroupRequest中各个字段的含义:

名称 类型 含义
group_id String Consumer Group的Id
generation_id int 消费者保存的年代信息
member_id String GroupCoordinator分配给消费者的id
member_assignment; byte数组 分区分配的结果

SyncGroupResponse中各个字段的含义:

名称 类型 含义
error_code short 错误码
member_assignment byte数组 分配给当前消费者的分区

根据上述onJoinLeader()方法分析,我们了解了发送SyncGroupRequest请求的逻辑在分区分配之后,也是在onJoinLeader()方法中完成的。流程如下:
1)得到序列化后的分区分配结果后,Leader将其封装成SyncGroupRequest,而Follower形成的SyncGroupRequest中这部分是空的。
2)调用ConsumerNetworkClient.send()方法将请求放入unsent集合中等待发送。
对SyncGroupResponse处理的入口是SyncGroupResponseHandler.handle()方法。对于正常完成的情况,解析SyncGroupResponse,从中拿到分区分配结果并将其传递出去;对于出现异常的情况,将rejoinNeeded设置为true,并针对不同的错误码进行不同的处理。

private class SyncGroupResponseHandler extends CoordinatorResponseHandler<SyncGroupResponse, ByteBuffer> {

        @Override
        public SyncGroupResponse parse(ClientResponse response) {
            return new SyncGroupResponse(response.responseBody());
        }

        @Override
        public void handle(SyncGroupResponse syncResponse,
                           RequestFuture<ByteBuffer> future) {
            Errors error = Errors.forCode(syncResponse.errorCode());
            if (error == Errors.NONE) {
                //调用RequestFuture.complete()方法传播分区分配结果
                log.info("Successfully joined group {} with generation {}", groupId, generation);
                sensors.syncLatency.record(response.requestLatencyMs());
                future.complete(syncResponse.memberAssignment());
            } else {
                //将rejoinNeeded设置为true
                AbstractCoordinator.this.rejoinNeeded = true;
                if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                    //调用RequestFuture.raise()方法传播异常
                    future.raise(new GroupAuthorizationException(groupId));
                } else if (error == Errors.REBALANCE_IN_PROGRESS) {
                    log.debug("SyncGroup for group {} failed due to coordinator rebalance", groupId);
                    future.raise(error);
                } else if (error == Errors.UNKNOWN_MEMBER_ID
                        || error == Errors.ILLEGAL_GENERATION) {
                    log.debug("SyncGroup for group {} failed due to {}", groupId, error);
                    AbstractCoordinator.this.memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID;
                    future.raise(error);
                } else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE
                        || error == Errors.NOT_COORDINATOR_FOR_GROUP) {
                    log.debug("SyncGroup for group {} failed due to {}", groupId, error);
                    coordinatorDead();
                    future.raise(error);
                } else {
                    future.raise(new KafkaException("Unexpected error from SyncGroup: " + error.message()));
                }
            }
        }
    }

从SyncGroupResponse中得到的分区分配结果最终由ConsumerCoordinator.onJoinComplete()方法处理,调用此方法的是在第二阶段ensureActiveGroup()方法的步骤8中添加的RequestFutureListener中调用。onJoinComplete()方法的流程如下:


onJoinComplete()方法处理流程.jpg

1)在第二阶段Leader开始分配分区前,Leader使用assignmentSnapshot字段记录了Metadata快照。此时在Leader中,将此快照与最新的Metadata快照进行对比。如果和快照不一致则表示分区分配过程中出现了Topic增删或分区数量变化,这时将needsPartitionAssignment置为true,需要重新进行分区分配。
2)反序列化拿到分配给当前消费者的分区,并添加到SubscriptionState.assignment集合中 ,之后消费者会按照此集合指定的分区进行消费,将needsPartitionAssignment置为false。
3)调用PartitionAssignor的onAssignment()回调函数,默认是空实现。当用户自定义PartitionAssignor是,可以自定义这个方法
4)如果开启了自动提交的offset的功能,则重新启动AutoCommitTask定时任务。
5)调用SubscriptionState中注册的ConsumerRebalanceListener
6)将needsJoinPrepare重置为true,为下次Rebalance的操作做准备。
7)重启HeartbeatTask定时任务,定时发送心跳。
onJoinComplete()方法的代码流程:

 @Override
    protected void onJoinComplete(int generation,
                                  String memberId,
                                  String assignmentStrategy,
                                  ByteBuffer assignmentBuffer) {
        // if we were the assignor, then we need to make sure that there have been no metadata updates
        // since the rebalance begin. Otherwise, we won't rebalance again until the next metadata change
        //第一步:Leader需要比较快照,但Follower不需要。
        if (assignmentSnapshot != null && !assignmentSnapshot.equals(metadataSnapshot)) {
            subscriptions.needReassignment();
            return;
        }
        //查找使用的分配策略
        PartitionAssignor assignor = lookupAssignor(assignmentStrategy);
        if (assignor == null)
            throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy);
        //第二步:反序列化,更新assignment
        Assignment assignment = ConsumerProtocol.deserializeAssignment(assignmentBuffer);
        //将needsFetchCommittedOffsets设置为true,允许从服务端获取最近一次提交的offset。
        // set the flag to refresh last committed offsets
        subscriptions.needRefreshCommits();

        // update partition assignment
        //填充assignment集合
        subscriptions.assignFromSubscribed(assignment.partitions());

        // give the assignor a chance to update internal state based on the received assignment
        assignor.onAssignment(assignment);//第三步:回调函数

        // reschedule the auto commit starting from now
        if (autoCommitEnabled)//第四步:开启AutoCommitTask任务
            autoCommitTask.reschedule();

        // execute the user's callback after rebalance
        ConsumerRebalanceListener listener = subscriptions.listener();
        log.info("Setting newly assigned partitions {} for group {}", subscriptions.assignedPartitions(), groupId);
        try {
            Set<TopicPartition> assigned = new HashSet<>(subscriptions.assignedPartitions());
            //第五步:回调ConsumerRebalanceListener
            listener.onPartitionsAssigned(assigned);
        } catch (WakeupException e) {
            throw e;
        } catch (Exception e) {
            log.error("User provided listener {} for group {} failed on partition assignment",
                    listener.getClass().getName(), groupId, e);
        }
    }

Rebalance操作的执行流程和具体实现就分析完了。当Consumer正常离开ConsumerGroup时会发送LeaveGroupRequest,此时也会触发Rebalance操作。

上一篇下一篇

猜你喜欢

热点阅读