Kafka源码分析-Consumer(3)-ConsumerNe

2018-11-12  本文已影响0人  陈阳001

一.概述KafkaConsumer类

Kafka Consumer不是个线程安全的类。为了便于分析,我们认为消费者所有的操作都是在同一个线程中完成,所以不用考虑锁的问题。这种设计将实现多线程处理消息的逻辑转移到了调用KafkaConsumer的代码中,可以根据业务逻辑使用不同的实现方式。例如,可以使用“线程封闭”的方式,每个业务线程拥有一个KafkaConsumer对象,这种方式实现简单,快速。还可以使用两个线程池实现“生产者-消费者”模式,解耦消息消费和消息处理的逻辑。其中一个线程池中每个线程拥有一个KafkaConsumer对象,负责从Kafka集群拉取消息,然后将消息放入队列中缓存,另一个线程池中的线程负责从队列中获取消息,执行处理消息的业务逻辑。
Kafka Consumer实现了Consumer接口,Consumer接口中定义了Kafka Consumer对外的api,核心接口有以下六类。

ConsumerNetworkClient:

NetworkClient依赖KSeelctor,InFlightRequests,Metadata等组件,负责管理客户端与Kafka集群中各个Node节点之间的连接,通过KSelector类实现了发送请求的功能,并通过一系列handle*()方法处理请求响应,超时请求以及断线重连。ConsumerNetworkClient在NetworkClient的基础上进行了封装,提供了更高级的功能和更加易用的API。


image.png

下面介绍下流程:
(1)调用ConsumerNetworkClient.trySend()方法循环处理unsent中缓存的请求。具体逻辑是:对每个Node节点,循环遍历所有对应的ClientRequest列表,每次循环都调用NetworkClient.ready()方法检测消费者与此节点之间的连接,已经发送请求的条件。若符合发送条件,则调用NetworkClient.send()方法将请求放入InFlightRequests队列中等待响应,然后放入KafkaChannel的send字段中等待发送,并将此消息从列表删除。代码如下:

 private boolean trySend(long now) {
        // send any requests that can be sent now
        boolean requestsSent = false;
        for (Map.Entry<Node, List<ClientRequest>> requestEntry: unsent.entrySet()) {
            Node node = requestEntry.getKey();
            Iterator<ClientRequest> iterator = requestEntry.getValue().iterator();
            while (iterator.hasNext()) {//便利unsent集合
                ClientRequest request = iterator.next();
                //调用NetworkClient.ready()检测是否可以发送请求
                if (client.ready(node, now)) {
                    //等待发送请求
                    client.send(request, now);
                    //从unsent集合中删除此请求。
                    iterator.remove();
                    requestsSent = true;
                }
            }
        }
        return requestsSent;
    }

(2)计算超时时间,由timeout和delayedTasks队列中最近要执行的定时任务的时间共同决定。下面的NetworkClient.poll()方法中,会使用此超时时间作为最长阻塞时间,避免影响定时任务的执行。
(3)调用NetworkClient.poll()方法,将KafkaChannel.send字段指定的消息发送出去。NetworkClient.poll()方法可能会更新Metadata,并且使用一系列handle*()方法处理请求响应,连接断开,超时等情况,并调用每个请求的回调函数。
(4)调用ConsumerNetworkClient.maybeTriggerWakeup()方法,检测wakeup和wakeupDisabledCount,查看是否有其他线程中断。如果有中断请求抛出WakeupException异常,中断当前ConsumerNetworkClient.poll()方法。

private void maybeTriggerWakeup() {
        //通过wakeupDisabledCount检测是否在执行不可中断的方法,通过wakeup检测是否有中断请求
        if (wakeupDisabledCount == 0 && wakeup.get()) {
            wakeup.set(false);//重置中断标志
            throw new WakeupException();
        }
    }

(5)调用checkDisconnects()方法检测连接状态。检测消费者和每个Node之间的连接状态,当检测到连接断开的Node时,会将其在unsent集合中对应的全部ClientRequest对象清除掉,之后调用这些ClientRequest的回调函数。

    private void checkDisconnects(long now) {
        // any disconnects affecting requests that have already been transmitted will be handled
        // by NetworkClient, so we just need to check whether connections for any of the unsent
        // requests have been disconnected; if they have, then we complete the corresponding future
        // and set the disconnect flag in the ClientResponse
        Iterator<Map.Entry<Node, List<ClientRequest>>> iterator = unsent.entrySet().iterator();
        while (iterator.hasNext()) {//遍历unsent集合中的每个 Node
            Map.Entry<Node, List<ClientRequest>> requestEntry = iterator.next();
            Node node = requestEntry.getKey();
            if (client.connectionFailed(node)) {//检测消费者与每个node的连接状态。
                // Remove entry before invoking request callback to avoid callbacks handling
                // coordinator failures traversing the unsent list again.
                iterator.remove();//从unsent集合中删除此Node对应的全部 ClientRequest
                for (ClientRequest request : requestEntry.getValue()) {
                    RequestFutureCompletionHandler handler =
                            (RequestFutureCompletionHandler) request.callback();
                    //调用ClientRequest的回调函数。
                    handler.onComplete(new ClientResponse(request, now, true, null));
                }
            }
        }
    }

断开连接的Node对应的已经发出去的请求,由NetworkClient进行异常处理,具体参照前面生产者的分析。
(6)根据executeDelayedTasks参数决定是否处理delayedTasks队列中超时的定时任务,如果需要执行delayedTasks队列中执行的任务,则调用delayedTasks.poll()方法。
(7)再次调用trySend()方法。在步骤3中调用了NetworkClient.poll()方法,在其中可能已经将KafkaChannel.send字段上的请求发送出去了,也可能已经新建了与某些Node的网络连接,所以这次再次尝试调用trySend()方法。
(8)调用ConsumerNetworkClient.failExpiredRequests()处理unsent中超时请求,他会循环遍历整个unsent集合,检测每个ClientRequest是否超时,调用超时ClientRequest的回调函数,并将其从unsent集合删除。

private void failExpiredRequests(long now) {
        // clear all expired unsent requests and fail their corresponding futures
        Iterator<Map.Entry<Node, List<ClientRequest>>> iterator = unsent.entrySet().iterator();
        while (iterator.hasNext()) {//遍历unsent集合
            Map.Entry<Node, List<ClientRequest>> requestEntry = iterator.next();
            Iterator<ClientRequest> requestIterator = requestEntry.getValue().iterator();
            while (requestIterator.hasNext()) {
                ClientRequest request = requestIterator.next();
                if (request.createdTimeMs() < now - unsentExpiryMs) {
                    RequestFutureCompletionHandler handler =
                            (RequestFutureCompletionHandler) request.callback();
                    handler.raise(new TimeoutException("Failed to send request after " + unsentExpiryMs + " ms."));//调用回调函数
                    requestIterator.remove();//删除ClientRequest
                } else
                    break;
            }
            if (requestEntry.getValue().isEmpty())//队列已经为空,则从unsent集合中删除
                iterator.remove();
        }
    }

分析完poll()方法的详细步骤后,我们看看其实现代码:

private void poll(long timeout, long now, boolean executeDelayedTasks) {
        // send all the requests we can send now
        // 步骤1:检测发送条件,并将请求放入KafkaChannel.send字段,待发送。
        
        trySend(now);

        // ensure we don't poll any longer than the deadline for
        // the next scheduled task 步骤2:计算超时时间
        timeout = Math.min(timeout, delayedTasks.nextTimeout(now));
        //步骤3,4:调用NetworkClient.poll()方法,并检测是否有中断请求。
        clientPoll(timeout, now);
        now = time.milliseconds();//重置当前时间。

        // handle any disconnects by failing the active requests. note that disconnects must
        // be checked immediately following poll since any subsequent call to client.ready()
        // will reset the disconnect status
        checkDisconnects(now);//步骤5:根据连接状态,处理unsent中的请求

        // execute scheduled tasks
        if (executeDelayedTasks)//步骤6:处理定时任务
            delayedTasks.poll(now);

        // try again to send requests since buffer space may have been
        // cleared or a connect finished in the poll
        //步骤7:检测发送条件,重新设置KafkaChannel.send字段,并超时断线重连。
        trySend(now);

        // fail requests that couldn't be sent if they have expired
        failExpiredRequests(now);//步骤8:处理unsent中的超时任务
    }

pollNoWakeup()方法是poll()方法的变体,表示执行不可中断的poll()方法。具体方法是:在执行poll()方法前,调用disableWakeups()方法将wakeupDisabledCount加一,然后调用poll()方法。这样即使别的线程请求中断,也不会响应。
poll(future)是poll()方法的另一个实现阻塞发送请求的功能,代码如下:

/**
     * Block indefinitely until the given request future has finished.
     * @param future The request future to await.
     * @throws WakeupException if {@link #wakeup()} is called from another thread
     */
    public void poll(RequestFuture<?> future) {
        while (!future.isDone())//循环检测Future,即请求的完成情况。
            poll(Long.MAX_VALUE);//请求未完成,则调用poll()方法
    }

在ConsumerNetworkClient.send()方法中,会将待发送的请求封装成ClientRequest,然后保存在unsent集合中等待发送,代码如下:

/**
     * Send a new request. Note that the request is not actually transmitted on the
     * network until one of the {@link #poll(long)} variants is invoked. At this
     * point the request will either be transmitted successfully or will fail.
     * Use the returned future to obtain the result of the send. Note that there is no
     * need to check for disconnects explicitly on the {@link ClientResponse} object;
     * instead, the future will be failed with a {@link DisconnectException}.
     * @param node The destination of the request
     * @param api The Kafka API call
     * @param request The request payload
     * @return A future which indicates the result of the send.
     */
    public RequestFuture<ClientResponse> send(Node node,
                                              ApiKeys api,
                                              AbstractRequest request) {
        long now = time.milliseconds();
        RequestFutureCompletionHandler future = new RequestFutureCompletionHandler();
        RequestHeader header = client.nextRequestHeader(api);
        RequestSend send = new RequestSend(node.idString(), header, request.toStruct());
        //创建ClientRequest对象,并保存到unsent集合中
        put(node, new ClientRequest(now, true, send, future));
        return future;
    }

这里用到了回调对象RequestFutureCompletionHandler。RequestFutureHandler接口只有onComplete()方法,此接口的另一个实现是前面介绍的KafkaProducer端的请求回调(Sender的一个内部类)。


image.png

ConsumerNetworkClient.RequestFutureCompletionHandler.onComplete()方法的代码如下:

public void onComplete(ClientResponse response) {
            if (response.wasDisconnected()) {//因连接故障而产生的ClientResponse对象
                ClientRequest request = response.request();
                RequestSend send = request.request();
                ApiKeys api = ApiKeys.forId(send.header().apiKey());
                int correlation = send.header().correlationId();
                log.debug("Cancelled {} request {} with correlation id {} due to node {} being disconnected",
                        api, request, correlation, send.destination());
                //调用继承自父类 RequestFuture 的raise()方法
                raise(DisconnectException.INSTANCE);
            } else {
                complete(response);//调用继承自父类RequestFuture的complete()方法
            }
        }

从RequestFutureCompletionHandler的继承关系上我们可以知道,它不仅实现了RequestCompletionHandler,还继承了RequestFuture类。RequestFuture是一个泛型类,核心字段如下:

/**
     *
     * 它将RequestFuture<T>适配成RequestFuture<S>
     * Convert from a request future of one type to another type
     * @param adapter The adapter which does the conversion
     * @param <S> The type of the future adapted to
     * @return The new future
     */
    public <S> RequestFuture<S> compose(final RequestFutureAdapter<T, S> adapter) {
        final RequestFuture<S> adapted = new RequestFuture<S>();//添加配置器后返回的结果
        //在当前RequestFuture上添加监听器。
        addListener(new RequestFutureListener<T>() {
            @Override
            public void onSuccess(T value) {
                adapter.onSuccess(value, adapted);
            }

            @Override
            public void onFailure(RuntimeException e) {
                adapter.onFailure(e, adapted);
            }
        });
        return adapted;
    }

下图展示了使用compose()方法进行适配后,回调时的调用过程,也可以认为是请求完成的事件传播过程。当调用RequestFuture<T>对象的complete()或onFailure方法,然后调用RequestFutureAdapter<T, S>的对应方法,最终调用RequestFuture<S>对象的对应方法。


使用compose()方法适配.jpg

RequestFuture.chain()方法的实现与compose()类似,也是通过RequestFutureListener在多个RequestFuture直接传递事件。代码如下:

public void chain(final RequestFuture<T> future) {
        addListener(new RequestFutureListener<T>() {//添加监听器
            @Override
            public void onSuccess(T value) {
                //通过监听器value传递给下一个RequestFuture对象
                future.complete(value);
            }

            @Override
            public void onFailure(RuntimeException e) {
                future.raise(e);//通过监听器将异常传递给下一个RequestFuture对象。
            }
        });
    }

RequestFuture提供了一系列检查请求完成情况的方法,以及管理listeners的方法。
简单介绍ConsumerNetworkClient几个常用的功能:

上一篇 下一篇

猜你喜欢

热点阅读