一起读源码

一起学RPC(五)

2018-08-21  本文已影响0人  MR丿VINCENT

之前的文章介绍了一个RPC请求有哪些经历。这篇文章将介绍一直遗漏的provider发布服务的相关细节。

以前都是以JNettyTcpAcceptor作为程序的入口来一步一步抽丝剥茧探索细节。现在就得寻根问底从DefaultServer来继续探索。

程序包中有个example,给了最基础的用法,一看就知道的那种。

public static void main(String[] args) {
        JServer server = new DefaultServer().withAcceptor(new JNettyTcpAcceptor(18090));
        MonitorServer monitor = new MonitorServer();
        try {
            monitor.start();

            ServiceWrapper provider = server.serviceRegistry()
                    .provider(new GenericServiceTestImpl())
                    .flowController(new FlowController<JRequest>() {

                        private AtomicLong count = new AtomicLong();

                        @Override
                        public ControlResult flowControl(JRequest request) {
                            if (count.getAndIncrement() > 100) {
                                return new ControlResult(false, "fuck out!!!");
                            }
                            return ControlResult.ALLOWED;
                        }
                    })
                    .register();

//            server.withGlobalFlowController(); // 全局限流器
            server.connectToRegistryServer("127.0.0.1:20001");
            server.publish(provider);
            server.start();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

很明显JNettyTcpAcceptor仅仅是一个小弟,正真的老大其实是JServer.他才是有至高无上的权利。这也是一个接口类型,这么做目的也是很明显,就是为了方便拓展--其他的实现只需要实现这个接口就完事了。从demo中可以看到几个非常明显的操作如connectToRegistryServerpublishstart以及serviceRegistry。接下来就对这些动作逐一解读。

首先通过serviceRegistry()获取“本地”注册工具。什么意思呢?也就是说通过这个方法获取的玩意儿实际上是用来在本机上将所要发布的服务给“注册”上。所谓的注册其实简单理解为用一个map给保存起来。而这个注册工具也提供几个动作。

这几个动作的命名也是非常语义化--provider()方法就是将一个服务的实现(就是普通的Javabean)给hold住:

    public ServiceRegistry provider(Object serviceProvider, ProviderInterceptor... interceptors) {
            this.serviceProvider = serviceProvider;
            this.interceptors = interceptors;
            return this;
        }

这里将其保存为成员变量,放到后面会有用到的,现在先别慌,继续往下面看。看完了啥都会知道的。

接下来就是flowController()流控器,也是非常直白的命名。这个玩意具体是做什么的呢,其实也很简单。在一些场景下,会出现下游业务大量请求上游的接口,翻译成人话就是consumer会发起很多请求到provider,这时候provider会觉得难受,压力很大,可能被搞崩溃。因为一旦崩溃了全部都完蛋,因此provider就想个法子:你们consumer可以来请求,随便你们怎么搞,一旦把我搞烦了我就不搭理你了。这样就不会出现崩溃的情形了。当然这个解释是很简陋的,实际上要比这个复杂的多。流量控制是一个比较大的话题,绝非一句两句能说的清道的明的。现有的成熟的限流中间件也有很多,如近期阿里开源的Sentinel等。这个demo中给的限流策略也很简单:如果你调用这服务超过100次我就把你踢掉,具体做法是抛出异常。

接下来就是register()方法了。顾名思义就是注册。这个注册就是本地的注册,也就是之前说的将这个服务在本地用一个map保存下来。这个逻辑稍微有点复杂。先不急,继续往后看。

通过register()会返回一个ServiceWrapper实例,Jupiter根据这个对象将注册信息上传给注册中心。这里的注册才是正真意义上的注册,其实逻辑也非常简单,无非就是将这个实例相关的信息如类名,版本信息和组名等一些元数据放到注册中心上去,这里的注册中心简单理解为zk就好了。

connectToRegistryServer()方法用来连接到注册中心,注册中心本质上就是一个server,这里的provider实际上是一个客户端。然后通过publish()方法将ServiceWrapper中的相关信息通过tcp网络传送给注册中心,注册中心将其保存下来。最后start()用来启动provider的server,用来接受consumer的请求。

下面来看看这个register()是怎么实现的。

    public ServiceWrapper register() {
            checkNotNull(serviceProvider, "serviceProvider");

            Class<?> providerClass = serviceProvider.getClass();

            ServiceProviderImpl implAnnotation = null;
            ServiceProvider ifAnnotation = null;
            for (Class<?> cls = providerClass; cls != Object.class; cls = cls.getSuperclass()) {
                if (implAnnotation == null) {
                    implAnnotation = cls.getAnnotation(ServiceProviderImpl.class);
                }

                Class<?>[] interfaces = cls.getInterfaces();
                if (interfaces != null) {
                    for (Class<?> i : interfaces) {
                        ifAnnotation = i.getAnnotation(ServiceProvider.class);
                        if (ifAnnotation == null) {
                            continue;
                        }

                        checkArgument(
                                interfaceClass == null,
                                i.getName() + " has a @ServiceProvider annotation, can't set [interfaceClass] again"
                        );

                        interfaceClass = i;
                        break;
                    }
                }

                if (implAnnotation != null && ifAnnotation != null) {
                    break;
                }
            }

            if (ifAnnotation != null) {
                checkArgument(
                        group == null,
                        interfaceClass.getName() + " has a @ServiceProvider annotation, can't set [group] again"
                );
                checkArgument(
                        providerName == null,
                        interfaceClass.getName() + " has a @ServiceProvider annotation, can't set [providerName] again"
                );

                group = ifAnnotation.group();
                String name = ifAnnotation.name();
                providerName = Strings.isNotBlank(name) ? name : interfaceClass.getName();
            }

            if (implAnnotation != null) {
                checkArgument(
                        version == null,
                        providerClass.getName() + " has a @ServiceProviderImpl annotation, can't set [version] again"
                );

                version = implAnnotation.version();
            }

            checkNotNull(interfaceClass, "interfaceClass");
            checkArgument(Strings.isNotBlank(group), "group");
            checkArgument(Strings.isNotBlank(providerName), "providerName");
            checkArgument(Strings.isNotBlank(version), "version");

            // method's extensions
            //
            // key:     method name
            // value:   pair.first:  方法参数类型(用于根据JLS规则实现方法调用的静态分派)
            //          pair.second: 方法显式声明抛出的异常类型
            Map<String, List<Pair<Class<?>[], Class<?>[]>>> extensions = Maps.newHashMap();
            for (Method method : interfaceClass.getMethods()) {
                String methodName = method.getName();
                List<Pair<Class<?>[], Class<?>[]>> list = extensions.get(methodName);
                if (list == null) {
                    list = Lists.newArrayList();
                    extensions.put(methodName, list);
                }
                list.add(Pair.of(method.getParameterTypes(), method.getExceptionTypes()));
            }

            return registerService(
                    group,
                    providerName,
                    version,
                    serviceProvider,
                    interceptors,
                    extensions,
                    weight,
                    executor,
                    flowController
            );
        }

首先第一个for循环目的是通过给定的Javabean找到其接口,如果没有就去找父类,直到最终为Object为止。在暴露一个服务的时候可以在其实现类及接口上加注解,因此这里需要做一下筛选,只会去找有注解的接口以及实现类。通过注解去找接口是一种方式,其实也能手动指定接口,原则是不能既手动指定接口又给接口指定注解。注解中有提供相关信息如group、name等。最后将这些参数全部丢给registerService处理。

    ServiceWrapper registerService(
            String group,
            String providerName,
            String version,
            Object serviceProvider,
            ProviderInterceptor[] interceptors,
            Map<String, List<Pair<Class<?>[], Class<?>[]>>> extensions,
            int weight,
            Executor executor,
            FlowController<JRequest> flowController) {

        ProviderInterceptor[] allInterceptors = null;
        List<ProviderInterceptor> tempList = Lists.newArrayList();
        if (globalInterceptors != null) {
            Collections.addAll(tempList, globalInterceptors);
        }
        if (interceptors != null) {
            Collections.addAll(tempList, interceptors);
        }
        if (!tempList.isEmpty()) {
            allInterceptors = tempList.toArray(new ProviderInterceptor[tempList.size()]);
        }

        ServiceWrapper wrapper =
                new ServiceWrapper(group, providerName, version, serviceProvider, allInterceptors, extensions);

        wrapper.setWeight(weight);
        wrapper.setExecutor(executor);
        wrapper.setFlowController(flowController);

        providerContainer.registerService(wrapper.getMetadata().directoryString(), wrapper);

        return wrapper;
    }

这里有几个参数没有提及到,如interceptors,weight,executor等。这些实际上和核心逻辑关系不大,只是可选项而已。不必太多关注。这里的逻辑也十分简单,主旨就是将这些参数封装到一个ServiceWrapper对象中,最后将这个对象保存在本地(放一个map中)。这里的key的生成逻辑是这样的:组-服务名-版本号。这种结构就像目录一样,一级又一级。

public String directoryString() {
        if (directoryCache != null) {
            return directoryCache;
        }

        StringBuilder buf = StringBuilderHelper.get();
        buf.append(getGroup())
                .append('-')
                .append(getServiceProviderName())
                .append('-')
                .append(getVersion());

        directoryCache = buf.toString();

        return directoryCache;
    }

本地注册逻辑也非常简单:

    private final ConcurrentMap<String, ServiceWrapper> serviceProviders = Maps.newConcurrentMap();
    public void registerService(String uniqueKey, ServiceWrapper serviceWrapper) {
            serviceProviders.put(uniqueKey, serviceWrapper);

            logger.info("ServiceProvider [{}, {}] is registered.", uniqueKey, serviceWrapper);
        }

整个“本地注册”的逻辑实际上也讲完了(除了那些不影响逻辑的可选参数)。接下来就是连接到注册中心。

    public void connectToRegistryServer(String connectString) {
        registryService.connectToRegistryServer(connectString);
    }

实际上也是委派给registryService完成的,这个老大很懒,很多事情不自己做。而这个registryService是这样创建的:


    public DefaultServer() {
        this(RegistryService.RegistryType.DEFAULT);
    }
    public DefaultServer(RegistryService.RegistryType registryType) {
        registryType = registryType == null ? RegistryService.RegistryType.DEFAULT : registryType;
        registryService = JServiceLoader.load(RegistryService.class).find(registryType.getValue());
    }

这个操作比较高级,通过SPI去加载具体实现。具体怎么整的先不去看了,反正一时半会也看不懂。Jupiter中实现有两种:DefaultRegistryServiceZookeeperRegistryService,默认的是DefaultRegistryService。实际上默认的是作者自己写的一个实现,属于玩票级别的。虽然是业余的,但是也是值得学习的。这个DefaultRegistryService的核心实际上就是内部维护一个tcp客户端,然后去连接注册中心。注册中心也有两个实现:自己实现的和基于zk的。这里的细节以后会说到。

连接完事了之后,接下来的动作就是发布publish:

    public void publish(ServiceWrapper serviceWrapper) {
        ServiceMetadata metadata = serviceWrapper.getMetadata();

        RegisterMeta meta = new RegisterMeta();
        meta.setPort(acceptor.boundPort());
        meta.setGroup(metadata.getGroup());
        meta.setServiceProviderName(metadata.getServiceProviderName());
        meta.setVersion(metadata.getVersion());
        meta.setWeight(serviceWrapper.getWeight());
        meta.setConnCount(JConstants.SUGGESTED_CONNECTION_COUNT);

        registryService.register(meta);
    }

这个发布的实际操作是将这些信息传给注册中心,准确来讲应该叫做“远程注册”。其中的逻辑无非就是将服务中的信息封装到RegisterMeta对象中,然后将这个对象丢给注册中心。后续的处理就留个悬念,后续的文章继续讲。

至此,一个provider的服务发布流程就差不多整理完了。这么回头以看,其实代码组织逻辑也不是那么复杂。接下来的内容该是RegistryService的核心实现了。

上一篇 下一篇

猜你喜欢

热点阅读