08-dubbo服务引入和调用源码分析

2021-07-06  本文已影响0人  Coding626

开头

上一节讲到了服务的导出,即服务端如何将自己的接口提供成dubbo服务的过程,这一节就是讲服务的调用了,消费端是如何调用服务端的接口的呢?

主要流程

1.spring启动时,会给@Reference注解的属性赋值,赋值的时候会调用referenceBean.get方法
2.准备初始化invoker对象,MockClusterInvoker,生成这个是最终目的
3.在注册中心初始化服务目录RegistryDirectory
4.将消费端信息注册到zk
5.构造路由链、服务订阅
6.根据服务目录得到最终的invoker对象MockClusterInvoker
8.最终调用MockClusterInvoker.invoke方法执行请求发送数据,里面调用了netty.send方法
9.通过netty channel,执行nettyServerHandler方法处理请求和结果返回

源码流程
流程图地址:https://www.processon.com/view/link/60e02b8d637689510d6c4184

服务引入.jpg

1.程序入口

在spring启动的时候,会对@Reference注解的属性赋值,生成ReferenceBean,在ReferenceAnnotationBeanPostProcessor.doGetInjectedBean方法中
可以看到,最终调用了 referenceBean.get()方法,这个方法最后返回了一个ref对象,这个ref对象看到最后就是一个Invoke代理对象,也就是主要流程的第二步,准备初始化invoker对象,MockClusterInvoker,生成这个是最终目的

@Override
    protected Object doGetInjectedBean(AnnotationAttributes attributes, Object bean, String beanName, Class<?> injectedType,
                                       InjectionMetadata.InjectedElement injectedElement) throws Exception {

    
        return getOrCreateProxy(referencedBeanName, referenceBeanName, referenceBean, injectedType);
    }
 private Object getOrCreateProxy(String referencedBeanName, String referenceBeanName, ReferenceBean referenceBean, Class<?> serviceInterfaceType) {
        if (existsServiceBean(referencedBeanName)) { // If the local @Service Bean exists, build a proxy of ReferenceBean
            return newProxyInstance(getClassLoader(), new Class[]{serviceInterfaceType},
                    wrapInvocationHandler(referenceBeanName, referenceBean));
        } else {                                    // ReferenceBean should be initialized and get immediately
            // 这里
            return referenceBean.get();
        }
    }

public synchronized T get() {
        checkAndUpdateSubConfigs();

        if (destroyed) {
            throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
        }
        if (ref == null) {
            // 入口
            init();
        }
        return ref;  // Invoke代理
    }

2.准备初始化invoker对象,MockClusterInvoker

由init()->createProxy(map),这个方法太长了,留了三个主要的方法:
1.加载注册中心url地址

  1. invoker = REF_PROTOCOL.refer调用registry.refer,这里又是spi机制,最终调用了registryProtocol.refer方法
private T createProxy(Map<String, String> map) {
     List<URL> us = loadRegistries(false);
    invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
     invoker = CLUSTER.join(new StaticDirectory(u, invokers));

    }

3.在注册中心初始化服务目录RegistryDirectory
留下了主要代码,可以看到这里初始化了一个注册目录,也就是我们最终在zk上看到的consumers节点文件夹。
registry.register(directory.getRegisteredConsumerUrl());这里最终会调用ZookeeperRegistry.doRegister方法,用zk客户端向zk服务端创建节点,将消费端信息注册到zk,可以看到这里创建的是临时节点

private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
      
        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
        directory.setRegistry(registry);
        directory.setProtocol(protocol);

         
          registry.register(directory.getRegisteredConsumerUrl());
       

    
        directory.buildRouterChain(subscribeUrl);

      
        directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
         
    

        return invoker;
    }

 @Override
    public void doRegister(URL url) {
        try {
            zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
        } catch (Throwable e) {
            throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

4.构造路由链、服务订阅
directory.buildRouterChain(subscribeUrl);
directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY

3.生成最终的invoker对象MockClusterInvoker

    Invoker invoker = cluster.join(directory);

这里又是SPI机制,由于Cluster有一个包装类,所以会先调用MockClusterWrapper.join方法,原理可参照我之前单独写的一节SPI源码分析
可以看到,这里最终生成MockClusterInvoker

public class MockClusterWrapper implements Cluster {

    private Cluster cluster;

    public MockClusterWrapper(Cluster cluster) {
        this.cluster = cluster;
    }

    @Override
    public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
        return new MockClusterInvoker<T>(directory,
                this.cluster.join(directory));
    }

}

4.服务调用

第3步骤中生成了一个MockClusterInvoker对象,所以最终调用服务的方法实际上就是调用MockClusterInvoker.invoke方法,会依次调用AbstractClusterInvoker.invoke->FailoverClusterInvoker.doInvoke->DubboInvoker.doInvoke

@Override
    public Result invoke(Invocation invocation) throws RpcException {
        Result result = null;

        String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), MOCK_KEY, Boolean.FALSE.toString()).trim();
        if (value.length() == 0 || "false".equalsIgnoreCase(value)) {
            //no mock
            result = this.invoker.invoke(invocation);
        } else if (value.startsWith("force")) {
            if (logger.isWarnEnabled()) {
                logger.warn("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + directory.getUrl());
            }
            //force:direct mock
            result = doMockInvoke(invocation, null);
        } else {
            //fail-mock
            try {
                result = this.invoker.invoke(invocation);

                //fix:#4585
                if(result.getException() != null && result.getException() instanceof RpcException){
                    RpcException rpcException= (RpcException)result.getException();
                    if(rpcException.isBiz()){
                        throw  rpcException;
                    }else {
                        result = doMockInvoke(invocation, rpcException);
                    }
                }

            }
        return result;
    }

我们直接看DubboInvoker.doInvoke方法
1.首先会拿到一个 ExchangeClient客户端
2.异步请求currentClient.request,最终调用HeaderExchangeChannel.request->调用netty的方法channel.send

@Override
    protected Result doInvoke(final Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation) invocation;
        final String methodName = RpcUtils.getMethodName(invocation);
        inv.setAttachment(PATH_KEY, getUrl().getPath());
        inv.setAttachment(VERSION_KEY, version);

 
        ExchangeClient currentClient;
        if (clients.length == 1) {
            currentClient = clients[0];
        } else {
           
            currentClient = clients[index.getAndIncrement() % clients.length];
        }

        try {
         
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);

            int timeout = getUrl().getMethodPositiveParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT);
            if (isOneway) {
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                currentClient.send(inv, isSent);

                return AsyncRpcResult.newDefaultAsyncResult(invocation);
            } else {
                AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv);

                CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout);

                asyncRpcResult.subscribeTo(responseFuture);

                return asyncRpcResult;
            }
        }
    }

5.服务请求处理

由于使用的netty通信,所有客户端发送消息后,netty服务端会在NettyServerHandler.channelRead中接到消息,这里调用了很多handler,就不展开看了。
1.MultiMessageHandler
2.HeartbeatHandler
3.AllChannelHandler
4.DecodeHandler
5.HeaderExchangeHandler
6.ExchangeHandlerAdapter

@Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
      
        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
        try {
            handler.received(channel, msg);
        } finally {

            NettyChannel.removeChannelIfDisconnected(ctx.channel());
        }
    }

总结

服务的引入的目的就是在消费端@Reference标注一个服务端接口,这个注解会去将消费端消息注册到zk,最终会生成一个调用服务端的代理对象invoker,消费端调用服务端接口的时候最后调用的就是invoker.invoke方法,而这个方法采用的通信框架是netty,实现了远程调用。
dubbo源码写的很好,比如里面的SPI机制运用的很巧妙,还有一些抽象工厂设计模式等,源码值得品读。

上一篇下一篇

猜你喜欢

热点阅读