Dubbo系列--消费者服务调用《六》

2023-11-29  本文已影响0人  Teddy_b

接口调用

在示例代码中

HelloService demoService = bootstrap.getCache().get(reference);
        String message = demoService.sayHello("dubbo");

这里第一步就是从缓存中获取创建的代理对象,就是上一篇Dubbo系列--消费者服务引用《五》提及的代理对象

然后调用代理对象的接口方法

InvocationHandler

回顾下代理对象的接口方法实现

public java.lang.String sayHello(java.lang.String arg0) {
                     Object[] args = new Object[1];
                    // $w是javassist中的类型转换,基础类型转为包装类型,其它类型会忽略这个
                     args[0] = ($w)$1; 
                     Object ret = handler.invoke(this, methods[0], args); 
                     return (java.lang.String)ret;
            }

可以看到它调用的是InvocationHandlerinvoke方法

@Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        ...
        return InvocationUtil.invoke(invoker, rpcInvocation);
    }

public static Object invoke(Invoker<?> invoker, RpcInvocation rpcInvocation) throws Throwable {
        ...
                return invoker.invoke(rpcInvocation).recreate();
       ...
    }

而这个invoke方法实际调用的是上一篇Dubbo系列--消费者服务引用《五》中创建的Invoke对象的invoke方法

在回顾下我们创建的代理对象


image.png

可以看到它引用的是MigrationInvoker对象,而MigrationInvoker对象又同时持有两个invoker

上一篇中我们也提及了默认的currentAvailableInvoker指向的是应用级别的Invoker对象

Invoker

我们进一步分析下MigrationInvokerinvoker方法

 @Override
    public Result invoke(Invocation invocation) throws RpcException {
        if (currentAvailableInvoker != null) {
            ...
                return decideInvoker().invoke(invocation);
          ...
        }
}

这里需要先决定使用哪一个Invoker对象,因为默认的currentAvailableInvoker指向的是应用级别的Invoker对象,所以只要应用级别的Invoker对象可用,就会优先使用它(只要Netty客户端连接还存活就是可用)

@Override
    public Result invoke(Invocation invocation) throws RpcException {
        ...
        return invoker.invoke(invocation);
    }
@Override
    public Result invoke(Invocation invocation) throws RpcException {
        Result result;

        String value = getUrl().getMethodParameter(RpcUtils.getMethodName(invocation), MOCK_KEY, Boolean.FALSE.toString()).trim();
        if (ConfigUtils.isEmpty(value)) {
            //no mock
            result = this.invoker.invoke(invocation);
}
@Override
        public Result invoke(Invocation invocation) throws RpcException {
            return filterInvoker.invoke(invocation);
        }
@Override
        public Result invoke(Invocation invocation) throws RpcException {
            Result asyncResult;
            try {
                asyncResult = filter.invoke(nextNode, invocation);
}
@Override
        public Result invoke(Invocation invocation) throws RpcException {
            Result asyncResult;
            try {
                asyncResult = filter.invoke(nextNode, invocation);
}

它会依次经过

这些过滤器都是针对特性定制的,一般情况下是啥也没干,经过这些过滤器后,才会到达FailoverClusterInvoker,由于它没有重写invoke方法,因此会直接使用父类AbstractClusterInvoker中的

@Override
    public Result invoke(final Invocation invocation) throws RpcException {
        ...
        List<Invoker<T>> invokers = list(invocation);
        ...

        LoadBalance loadbalance = initLoadBalance(invokers, invocation);
        ...
            return doInvoke(invocation, invokers, loadbalance);
        ...
    }

这里主要包括三步

@Override
    public List<Invoker<T>> doList(SingleRouterChain<T> singleRouterChain,
                                   BitList<Invoker<T>> invokers, Invocation invocation) {
        ...
            List<Invoker<T>> result = singleRouterChain.route(getConsumerUrl(), invokers, invocation);
            return result == null ? BitList.emptyList() : result;
        ...
    }

可用看到RegistryDirectory是从RouterChain中获取的Invoker,这里的RouterChain对应的也是上一篇文章里保存在RegistryDirectory中的

image.png

默认使用的是当前的RouterChain,对应的是mainChain

public List<Invoker<T>> simpleRoute(URL url, BitList<Invoker<T>> availableInvokers, Invocation invocation) {
        BitList<Invoker<T>> resultInvokers = availableInvokers.clone();

        // 1. route state router
        resultInvokers = headStateRouter.route(resultInvokers, url, invocation, false, null);
        ...

        if (routers.isEmpty()) {
            return resultInvokers;
        }
        ...
    }

最终会进入路由链里找Invoker

首先进入的是StateRouter链,这个链通过抽象类AbstractStateRouter中的方法串起来

@Override
    public final BitList<Invoker<T>> route(BitList<Invoker<T>> invokers, URL url, Invocation invocation, boolean needToPrintMessage, Holder<RouterSnapshotNode<T>> nodeHolder) throws RpcException {
       ...
        BitList<Invoker<T>> routeResult;

        routeResult = doRoute(invokers, url, invocation, needToPrintMessage, nodeHolder, messageHolder);
        if (routeResult != invokers) {
            routeResult = invokers.and(routeResult);
        }
        // check if router support call continue route by itself
        if (!supportContinueRoute()) {
            // use current node's result as next node's parameter
            if (!shouldFailFast || !routeResult.isEmpty()) {
                routeResult = continueRoute(routeResult, url, invocation, needToPrintMessage, nodeHolder);
            }
        }

       ...
        return routeResult;
    }

它会调用子类的doRoute方法寻找Invoker,如果子类返回的Invoker和当前的Invoker不同,则把子类的Invoker添加到当前的Invoker中

这里当前的Invoker对应的就是RegistryDirectory中记录的Invoker

然后通过continueRoute继续寻找下一个子类,它会依次经过

经过所有StateRouter的子类链处理后,得到所有的Invoker,一般情况下得到的Invoker仍然是只有RegistryDirectory中记录的Invoker

然后会进入Router链,由于默认情况下Router链为空,所以会直接返回

最终返回的仍然是RegistryDirectory中记录的Invoker,但是经过了StateRouter链和Router链的处理

@SPI("random")
public interface LoadBalance {
}
@Override
    @SuppressWarnings({"unchecked", "rawtypes"})
    public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
        // 重试次数,默认是3次
        int len = calculateInvokeTimes(methodName);
        ...
        for (int i = 0; i < len; i++) {
            ...
             // 从众多Invoker中挑选一个,通过负载均衡策略来挑选
            Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
            invoked.add(invoker);
            RpcContext.getServiceContext().setInvokers((List) invoked);
            boolean success = false;
            try {
                Result result = invokeWithContext(invoker, invocation);
                ...
                success = true;
                return result;
            }...

这里通过重试的方式来执行Invoker,默认情况下重试3次

然后通过负载均衡策略挑选一个Invoker执行,由于我们这里只有一个Invoker,所有会直接返回,还没到负载均衡去挑选的那一步

最后就是通过这个Invoker去执行了,按照上一篇文章中,我们的Invoker是通过Wrapper包装的DubboInvoker

image.png
@Override
    public Result invoke(Invocation invocation) throws RpcException {
        try {
            ...
            return invoker.invoke(invocation);
        ...
    }

然后又要经过过滤器链

最终达到DubboInvoker

@Override
    protected Result doInvoke(final Invocation invocation) throws Throwable {
        ...
        List<? extends ExchangeClient> exchangeClients = clientsProvider.getClients();
        if (exchangeClients.size() == 1) {
            currentClient = exchangeClients.get(0);
        } else {
            currentClient = exchangeClients.get(index.getAndIncrement() % exchangeClients.size());
        }
        try {
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);

            Request request = new Request();
            if (payload != null) {
                request.setPayload(payload);
            }
            request.setData(inv);
            request.setVersion(Version.getProtocolVersion());

            if (isOneway) {
                ...
            } else {
                request.setTwoWay(true);
                ExecutorService executor = getCallbackExecutor(getUrl(), inv);
                CompletableFuture<AppResponse> appResponseFuture =
                    currentClient.request(request, timeout, executor).thenApply(AppResponse.class::cast);
                ...
                return result;
            }
        }...
    }

可用看到这里就是取出之前创建的Netty客户端,然后判断是否oneWay(即不需要返回结果的)

然后通过Netty客户端把请求发送出去,请求包装了Invocation对象

拿到返回结果后在进行解析

至此才完成了一次远程调用

上一篇 下一篇

猜你喜欢

热点阅读