详解Dubbo(三):消费端构造Invoker

2020-04-18  本文已影响0人  空挡

前言

上一篇讲Proxy的文章中看到,构建Proxy需要传入Invoker参数。除基本方法外,其它接口方法的调用最终都是调用的invoker.invoke()方法。从rpc调用的整个流程来说,Invoker正好处在中间的位置,它的左边是用户的应用,调用的都是对象和方法。而它的右边是传输层,操作的是Request/Response,所以Invoker就是中间的桥梁。

Invoker结构

下面Invoker相关类的关系图,这只是其中最重要的部分:

Invoker
从上面图中可以看到,Invoker大体上分成两个部分,针对集群的ClusterInvoker和针对特定协议的Invoker。下面先从针对特定协议的Invoker开始。

Protocol和Invoker

从上一篇的ReferenceBean初始化中可以知道,消费端针对某个服务接口创建Invoker的时候,首先需要获取到URL。最简单的例子就是在@Reference注解上配置了url地址,而且这个地址不是注册中心的地址。

指定协议的URL

最简单的url比如dubbo://10.0.75.1:20880/org.apache.dubbo.demo.DemoService?&interface=org.apache.dubbo.demo.DemoService&methods=sayHello&side=provider&timestamp=1585553085050
ReferenceBean拿到这个url后就会去找它对应的Protocol类,根据url的schema, Dubbo可以找到DubboProtocol,然后调用Protocol的refer方法获取到Invoker,这个方法在AbstractProtocol类里面。

   @Override
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        return new AsyncToSyncInvoker<>(protocolBindingRefer(type, url));
    }

实际上调用的是子类的protocolBindingRefer()方法,这里外层封装的AsyncToSyncInvoker是一个装饰类,因为新版本的dubbo把所有Invoker调用都改成了异步返回,如果Consumer仍然希望同步调用,则用这个装饰类转换一下。下面看下DubboProtocol的protocolBindingRefer()方法实现:

    @Override
    public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
        optimizeSerialization(url);

        // 创建Dubbo Invoker
        DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
        invokers.add(invoker);

        return invoker;
    }

该方法直接创建了一个DubboInvoker,总共传入四个参数,除了接口和url外,第三个参数是构建传输层Client,前面讲过Invoker连接了Proxy和传输层,当Invoker发起调用时,就需要这个ExchangeClient来发送请求和接收Response,Exchange层的解析会包含在后续的文章中。第四个参数是Invoker的缓存集合,不是Protocol用的,所以不去管它。
前一篇文章讲过,当Proxy最终接收到方法调用后,会调用Invoker.invoke()来发起远程调用,下面来看下DubboInvoker.invoke()是怎么实现的。
DubboInvoker
对invoke()方法的调用首先会进到DubboInvoker的父类AbstractInvoker中:

    @Override
    public Result invoke(Invocation inv) throws RpcException {
        // 判断invoker是否已经destroy了,是则打印警告,调用继续
        if (destroyed.get()) {
            logger.warn("Invoker for service " + this + " on consumer " + NetUtils.getLocalHost() + " is destroyed, "
                    + ", dubbo version is " + Version.getVersion() + ", this invoker should not be used any longer");
        }
        //追加RpcContext中的附加信息到Invocation中,比如链路追踪的Id等
        RpcInvocation invocation = (RpcInvocation) inv;
        invocation.setInvoker(this);
        if (CollectionUtils.isNotEmptyMap(attachment)) {
            invocation.addObjectAttachmentsIfAbsent(attachment);
        }
        Map<String, Object> contextAttachments = RpcContext.getContext().getObjectAttachments();
        if (CollectionUtils.isNotEmptyMap(contextAttachments)) {
            invocation.addObjectAttachments(contextAttachments);
        }
        //设置是同步还是异步调用
        invocation.setInvokeMode(RpcUtils.getInvokeMode(url, invocation));
        //如果是异步调用,给这次请求加一个唯一id
        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);

        AsyncRpcResult asyncResult;
        try {
            //调用子类的doInvoke()方法
            asyncResult = (AsyncRpcResult) doInvoke(invocation);
        } catch (InvocationTargetException e) { // biz exception
            //异常处理
            ...
        } catch (RpcException e) {
            //异常处理
            ...
        } catch (Throwable e) {
            //异常处理
            asyncResult = AsyncRpcResult.newDefaultAsyncResult(null, e, invocation);
        }
        RpcContext.getContext().setFuture(new FutureAdapter(asyncResult.getResponseFuture()));
        return asyncResult;
    }

AbstractInvoker最终调用了DubboInvokerdoInvoke()方法。

@Override
    protected Result doInvoke(final Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation) invocation;
        final String methodName = RpcUtils.getMethodName(invocation);
        inv.setAttachment(PATH_KEY, getUrl().getPath());
        inv.setAttachment(VERSION_KEY, version);
        //获取Dubbo协议的exchangeClient
        ExchangeClient currentClient;
        if (clients.length == 1) {
            currentClient = clients[0];
        } else {
            currentClient = clients[index.getAndIncrement() % clients.length];
        }
        try {
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
            int timeout = getUrl().getMethodPositiveParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT);
            //如果Oneway调用,即Consumer端不关心调用是否成功,则发送请求后直接返回结果。多用在日志发送这种可以容忍数据丢失的场景
            if (isOneway) {
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                currentClient.send(inv, isSent);
                return AsyncRpcResult.newDefaultAsyncResult(invocation);
            } else {
                //2.7之后所有调用都改成异步,讲Future放入result中,如果Consumer调用是同步的,上面的Protocol的refer()会阻塞等待异步结果返回
                ExecutorService executor = getCallbackExecutor(getUrl(), inv);
                CompletableFuture<AppResponse> appResponseFuture =
                        currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);
                // save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter
                FutureContext.getContext().setCompatibleFuture(appResponseFuture);
                AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
                result.setExecutor(executor);
                return result;
            }
        } catch (TimeoutException e) {
            throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        } catch (RemotingException e) {
            throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

上面就是当@Reference上配置了单个url,并且这个url指定了具体协议的情况,下面看下当url是注册中心的情况。

注册中心的URL

之前的文章讲过,@Reference关联的注册中心的url格式类似于registry://localhost:2181?refer=version%3f1.0.0,所以dubbo可以基于url找到对应的Protocol类为RegistryProtocol,现在看下这个类的refer()方法如何处理的:

    @Override
    @SuppressWarnings("unchecked")
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        //1. 转换成具体注册中心实现的url
        url = getRegistryUrl(url);
        //2. 获取注册中心实现
        Registry registry = registryFactory.getRegistry(url);
        //3. 如果是获取RegistryService的代理,则直接获取本地暴露的invoker
        if (RegistryService.class.equals(type)) {
            return proxyFactory.getInvoker((T) registry, type, url);
        }

        //4. 判断url是否指定了分组信息
        Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
        String group = qs.get(GROUP_KEY);
        if (group != null && group.length() > 0) {
            if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
                //指定了分组,则使用MergeableCluster
                return doRefer(getMergeableCluster(), registry, type, url);
            }
        }
        //5. 获取Cluster Invoker
        return doRefer(cluster, registry, type, url);
    }

第1步,首先需要将url转换成真实注册中心的地址。dubbo是支持多注册中心的,而配置中获取的是一个通用的注册中心url,以registry://开头,这一步转成真正的注册中心url,比如从registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?refer=interface%3Dorg.apache.dubbo.demo.DemoService&registry=zookeeper 转成 zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?refer=interface%3Dorg.apache.dubbo.demo.DemoService
第2步,根据真实的url获取到注册中心的实现类,比如上面的url获取到的就是使用zookeeper注册中心,获取的就是ZookeeperRegistry
第3步,这里是对获取注册中心实例代理的特殊处理,暂时不看
第4步,dubbo支持将多个远程服务调用结果做合并来做为最终结果,通过配置一个merger类来实现
第5步,没有指定group的话,则使用默认的Cluster构造Invoker
上面方法的主要就是获取Registry的实现,然后调用doRefer()方法:

private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
        //1. 构建directory实例
        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
        directory.setRegistry(registry);
        directory.setProtocol(protocol);
        // 2. 生成consumer URL
        Map<String, String> parameters = new HashMap<String, String>(directory.getConsumerUrl().getParameters());
        URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
        //3. 将consumer信息写入注册中心
        if (directory.isShouldRegister()) {
            directory.setRegisteredConsumerUrl(subscribeUrl);
            registry.register(directory.getRegisteredConsumerUrl());
        }
        //4. 构建RouteChain
        directory.buildRouterChain(subscribeUrl);
        //5. 订阅服务变化通知
        directory.subscribe(toSubscribeUrl(subscribeUrl));
        //6. 生成ClusterInvoker
        Invoker<T> invoker = cluster.join(directory);
        List<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url);
        if (CollectionUtils.isEmpty(listeners)) {
            return invoker;
        }
        //7. 回调Listener
        RegistryInvokerWrapper<T> registryInvokerWrapper = new RegistryInvokerWrapper<>(directory, cluster, invoker, subscribeUrl);
        for (RegistryProtocolListener listener : listeners) {
            listener.onRefer(this, registryInvokerWrapper);
        }
        return registryInvokerWrapper;
    }

在上面的doRefer()方法中,首先为服务生成RegistryDirectory实例,该类的作用是关联DirectoryRegistry接口,前面的白话Dubbo系列中已经讲过,不清楚的话可以回查一下。随后,Consumer会将自己也注册到注册中心,所以可以通过注册中心的数据看到某个Provider都被谁消费,也可以看到某个Consumer都调用了哪些服务。
第5步中,订阅注册中心的数据变化,在provider变化时可以实时收到通知
第6步中,生成最终的ClusterInvoker,Dubbo默认配置中,这里的Cluster是FailoverCluster,join()方法返回FailoverClusterInvoker

ClusterInoker实现

ClusterInvoker是Dubbo支持集群调用的核心实现,包括负载均衡、特殊路由、容错处理等。默认实现类FailoverClusterInvoker支持用户配置重试次数,可以在一个节点失败重试其它节点。
AbstractClusterInvoker:

@Override
    public Result invoke(final Invocation invocation) throws RpcException {
        //判断Invoker是否已经destroy,是则抛出异常
        checkWhetherDestroyed();
        // 将attachments加到Invocation中
        Map<String, Object> contextAttachments = RpcContext.getContext().getObjectAttachments();
        if (contextAttachments != null && contextAttachments.size() != 0) {
            ((RpcInvocation) invocation).addObjectAttachments(contextAttachments);
        }
       // 获取可用invoker列表
        List<Invoker<T>> invokers = list(invocation);
       //根据配置获取指定的负载均衡实现
        LoadBalance loadbalance = initLoadBalance(invokers, invocation);
        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
        return doInvoke(invocation, invokers, loadbalance);
    }

ClusterInvoker的invoke()方法首先调用list()方法获取所有可用invoker列表,这里的是直接调用的Directory的list方法,Directory缓存了从注册中心获取的provider url列表,会将每个url生成invoker。
在获取到一组invoker后需要从其中选择一个发起调用,这时候就需要用到负载均衡,最终根据获取的invoker列表和负载均衡器调用子类的具体实现。
FailoverClusterInvoker:

@Override
    @SuppressWarnings({"unchecked", "rawtypes"})
    public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        List<Invoker<T>> copyInvokers = invokers;
        checkInvokers(copyInvokers, invocation);
        String methodName = RpcUtils.getMethodName(invocation);
        // 获取重试次数,最低可配置在方法粒度
        int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1;
        if (len <= 0) {
            len = 1;
        }
        // retry loop.
        RpcException le = null; // last exception.
        List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.
        Set<String> providers = new HashSet<String>(len);
        for (int i = 0; i < len; i++) {
            //Reselect before retry to avoid a change of candidate `invokers`.
            //NOTE: if `invokers` changed, then `invoked` also lose accuracy.
            if (i > 0) {
                checkWhetherDestroyed();
                copyInvokers = list(invocation);
                // check again
                checkInvokers(copyInvokers, invocation);
            }
            // 使用负载均衡最终选择一个invoker
            Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
            invoked.add(invoker);
            RpcContext.getContext().setInvokers((List) invoked);
            try {
                Result result = invoker.invoke(invocation);
                if (le != null && logger.isWarnEnabled()) {
                    logger.warn(...);
                }
                return result;
            } catch (RpcException e) {
                if (e.isBiz()) { // biz exception.
                    throw e;
                }
                le = e;
            } catch (Throwable e) {
                le = new RpcException(e.getMessage(), e);
            } finally {
                providers.add(invoker.getUrl().getAddress());
            }
        }
        throw new RpcException(...);
    }

上面的逻辑主要是两点,根据配置的重试次数来决定是否重试,根据负载均衡实现从注册中心返回的可用服务中选择其中一个,然后发起调用,当重试结束还未成功,则抛出异常。

构造带Filter的Invoker

上面讲了两种Invoker的获取和invoke的工作原理,其实Dubbo中上面得到的Invoker不会直接返回给Proxy,而是需要和Filter集成最终返回Invoker链。这部分的代码前面白话部分讲Filter的时候已分解,传送门

总结

消费端的Proxy通过Invoker发起调用,Invoker对Proxy屏蔽了集群和服务治理等一系列逻辑,同时从Invoker层开始,提供了对多协议的支持。从Invoker再往后走,将不存在接口和方法的概念,下一篇将分解传输层的实现。

上一篇下一篇

猜你喜欢

热点阅读