dubbo系列之-服务暴露-2021-01-16

2021-01-16  本文已影响0人  five_year

背景

服务暴露网上已经有很多文章了,大而全,我们这里主要抓细节😄。

image

疑问

暴露过程做了些啥?

是先启动服务还是先连接注册中心?

服务下线怎么感知注册中心?

暴露

我们从 org.apache.dubbo.config.ServiceConfig#doExportUrls() 方法进去

private void doExportUrls() {
    List<URL> registryURLs = loadRegistries(true);
    //支持多协议暴露就是说 <dubbo:protocol 可以多个
    //<dubbo:protocol name="dubbo" port="20880"/>
    //<dubbo:protocol  name="rest" port="20881"/>
    //像这样,如果有php客户端 和 dubbo客户端都可以同事支持
    for (ProtocolConfig protocolConfig : protocols) {
        doExportUrlsFor1Protocol(protocolConfig, registryURLs);
    }
}

进入 doExportUrlsFor1Protocol()中,这个方法大家一定要进去瞅一眼,和我们写的代码也差不多,方法长度太长,而且循环嵌套很深。

//org.apache.dubbo.config.ServiceConfig#doExportUrlsFor1Protocol
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
    String name = protocolConfig.getName();
    if (StringUtils.isEmpty(name)) {//没有配置协议,默认dubbo
        name = DUBBO;
    }

    Map<String, String> map = new HashMap<String, String>();
    map.put(SIDE_KEY, PROVIDER_SIDE);
    //将所有的配置都放到URL 的key=value 中
    appendRuntimeParameters(map);
    appendParameters(map, metrics);
    appendParameters(map, application);
    appendParameters(map, module);
    appendParameters(map, provider);
    appendParameters(map, protocolConfig);
    appendParameters(map, this);
    if (ProtocolUtils.isGeneric(generic)) {//泛化
        map.put(GENERIC_KEY, generic);
        map.put(METHODS_KEY, ANY_VALUE);
    } else {//版本
        String revision = Version.getVersion(interfaceClass, version);
        if (revision != null && revision.length() > 0) {
            map.put(REVISION_KEY, revision);
        }

        String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
        if (methods.length == 0) {
            logger.warn("No method found in service interface " + interfaceClass.getName());
            map.put(METHODS_KEY, ANY_VALUE);
        } else {
            map.put(METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
        }
    }
    //token,dubbo 支持token校验,只有携带对的token才能调用成功
    if (!ConfigUtils.isEmpty(token)) {
        if (ConfigUtils.isDefault(token)) {
            map.put(TOKEN_KEY, UUID.randomUUID().toString());
        } else {
            map.put(TOKEN_KEY, token);
        }
    }

    String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
    Integer port = this.findConfigedPorts(protocolConfig, name, map);
    URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);

    if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
            .hasExtension(url.getProtocol())) {
        url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                .getExtension(url.getProtocol()).getConfigurator(url).configure(url);
    }

    String scope = url.getParameter(SCOPE_KEY);
    // don't export when none is configured
    if (!SCOPE_NONE.equalsIgnoreCase(scope)) {
       if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
            exportLocal(url);//先将服务暴露到本地,下面分析
        }
        // export to remote if the config is not local (export to local only when config is local)
        if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
            if (CollectionUtils.isNotEmpty(registryURLs)) {
                //注册中心也支持多个,比如可以将服务暴露到集群内,也可以将
                //服务暴露到中台供所其他业务线用
                for (URL registryURL : registryURLs) {
                    //if protocol is only injvm ,not register
                    if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
                        continue;
                    }
                    url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
                    //加载监控配置
                    URL monitorUrl = loadMonitor(registryURL);
                    if (monitorUrl != null) {
                        url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString());
                    }

                    // 调用具体bean的代理模式,默认为javassist 
                    String proxy = url.getParameter(PROXY_KEY);
                    if (StringUtils.isNotEmpty(proxy)) {
                        registryURL = registryURL.addParameter(PROXY_KEY, proxy);
                    }
                    //组装invoker
                    Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
                    DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
                    //暴露服务
                    Exporter<?> exporter = protocol.export(wrapperInvoker);
                    exporters.add(exporter);
                }
            } else {
                Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
                DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

                Exporter<?> exporter = protocol.export(wrapperInvoker);
                exporters.add(exporter);
            }
            //存储发布信息
            MetadataReportService metadataReportService = null;
            if ((metadataReportService = getMetadataReportService()) != null) {
                metadataReportService.publishProvider(url);
            }
        }
    }
    this.urls.add(url);
}

本地暴露 exportLocal(url)

//org.apache.dubbo.config.ServiceConfig#exportLocal
private void exportLocal(URL url) {
    URL local = URLBuilder.from(url)
       .setProtocol(LOCAL_PROTOCOL)//收到设置协议为injvm,以供下面选择对应的protocol
            .setHost(LOCALHOST_VALUE)
            .setPort(0)
            .build();
//
    Exporter<?> exporter = protocol.export(
            PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, local));
    exporters.add(exporter);
}
static Protocol protocol = ExtensionLoader
.getExtensionLoader(Protocol.class).getAdaptiveExtension();

static ProxyFactory PROXY_FACTORY = ExtensionLoader
.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();

protocol 静态变量为 Protocol 接口的自适应扩展点,调用 protocol.export(Invoker<T> invoker) 将会根据传入的invoker 信息决定去往哪个实现类。而 invoker 传入的值为

PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, local) ,PROXY_FACTORY 静态变量也是一个 ProxyFactory 的扩展点,从下面可以看到该扩展点为方法扩展点,这里我们并没有个自定义过proxy属性,默认实现为javassist=JavassistProxyFactory;(这里忽略各种包装器)

@SPI("javassist")
public interface ProxyFactory {
    @Adaptive({"proxy"})
    <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) ;
}

进到JavassistProxyFactory 的 getInvoker实现中。

//org.apache.dubbo.rpc.proxy.javassist.JavassistProxyFactory#getInvoker
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) { 
//这里的proxy 是我们真正的实现类HelloServiceImpl@xxx,
//如果传进来的是一个代理类实现的花,这里只取接口type=HelloService
Class cls = proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type;
//将 HelloServiceImpl包装成一个Wrapper类,而wrapper对象的创建方式正式默认的javassist
final Wrapper wrapper = Wrapper.getWrapper(cls);
//返回一个匿名内部类对象,对象 doInvoke 方法中持有wrapper对象
//AbstractProxyInvoker 实现了Invoker
return new AbstractProxyInvoker<T>(proxy, type, url) {
    @Override
    protected Object doInvoke(T proxy, String methodName,
                              Class<?>[] parameterTypes,
                              Object[] arguments) throws Throwable {

        return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
    }
};
}

上面这种匿名的写法可能不够具体,我们通过自定义类的方式去实现它,更具象点

//org.apache.dubbo.rpc.proxy.javassist.JavassistProxyFactory#getInvoker
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
    final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
    return new MyProxyInvoker(proxy,type,url,wrapper);
}
public class MyProxyInvoker extends AbstractProxyInvoker {
    private Wrapper wrapper;
    public MyProxyInvoker(Object proxy, Class type, URL url, Wrapper wrapper) {
        super(proxy, type, url);
        this.wrapper = wrapper;
    }
    @Override
    protected Object doInvoke(Object proxy, String methodName, Class[] parameterTypes, Object[] arguments) throws Throwable {
        return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
    }
}

这样写的效果是一样的 JavassistProxyFactory#getInvoker()方法返回的是 MyProxyInvoker 对象,后面我们就用该对象来描述分析。

回到 Exporter<?> exporter = protocol.export(

PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, local));中,表达式变成了Exporter<?> exporter = protocol.export(MyProxyInvoker),MyProxyInvoker中的url对象为local

URL local = URLBuilder.from(url)
        .setProtocol("injvm")
        .setHost(LOCALHOST_VALUE)
        .setPort(0)
        .build();

所以protocol.export()的实现类为InjvmProtocol

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(),
     exporterMap);
}

该方法返回 InjvmExporter,最后执行 exporters.add(exporter),将InjvmExporter(这里其实外面会包装一层ListenerExporterWrapper包装器) 对象暴露到map中结束了jvm本地暴露。

远程暴露

我们再来看看远程暴露的区别

//同本地暴露一样返回MyProxyInvoker实例
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
//区别本地暴露 将 MyProxyInvoker实例 包装为 DelegateProviderMetaDataInvoker
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
//这里写法和本地暴露一样,区别在于 wrapperInvoker 中的url#protocol 并不是injvm
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);

我们dubug 看看 wrapperInvoker 中的url#protocol 是啥

image

Protocol 为registry,所以流程会进入到 RegistryProtocol#export(同样这里也会有Wrapper包装)我们debug进去,这个方法内容太丰富了,这里我们先只分析服务暴露

public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
    URL registryUrl = getRegistryUrl(originInvoker);
    // 获取要暴露到注册中心的url
    URL providerUrl = getProviderUrl(originInvoker);

    final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
    final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
    overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);

    providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
    //暴露服务 下面分析
    final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);

    //...
    return new DestroyableExporter<>(exporter);
}

暴露服务 doLocalExport()
//org.apache.dubbo.registry.integration.RegistryProtocol#doLocalExport
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
    //将要暴露的服务生成唯一的key,避免重复
    String key = getCacheKey(originInvoker);
    //再次包装invoker,然后暴露
    return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
    //providerUrl 为dubbo://xxx
        Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
        //protocol.export 经过各种Wrapper 会进入到Dubbo.export
        return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
    });
}
private String getCacheKey(final Invoker<?> originInvoker) {
    URL providerUrl = getProviderUrl(originInvoker);
    String key = providerUrl.removeParameters("dynamic", "enabled").toFullString();
    return key;
}

这里originInvoker为 DelegateProviderMetaDataInvoker(MyProxyInvoker(HelloServiceImpl@xx)),invokerDelegate再次包装为InvokerDelegate(DelegateProviderMetaDataInvoker(MyProxyInvoker(HelloServiceImpl@xx))),我们继续debug,到了ProtocolFilterWrapper#export

@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    if (REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
        return protocol.export(invoker);
    }
    return protocol.export(buildInvokerChain(invoker, SERVICE_FILTER_KEY, CommonConstants.PROVIDER));
}
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
    Invoker<T> last = invoker;
    List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);

    if (!filters.isEmpty()) {
        for (int i = filters.size() - 1; i >= 0; i--) {
           //.....
            };
        }
    }
    return new CallbackRegistrationInvoker<>(last, filters);
}

buildInvokerChain()会将InvokerDelegate关联多个Filter过滤器,然后包装为CallbackRegistrationInvoker对象返回,我们接着debug,最后到了DubboProtocol#export(),此时的invoker为CallbackRegistrationInvoker(InvokerDelegate(DelegateProviderMetaDataInvoker(MyProxyInvoker(HelloServiceImpl@xx))))

image
//org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#export
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    URL url = invoker.getUrl();
    //生成服务key=com.poizon.study.api.service.HelloService:20880,和方法无关
    String key = serviceKey(url);
    //将CallbackRegistrationInvoker包装为DubboExporter,然后存储在map中
    //这个map 很关键,将作为后面调用寻找服务的入口
    DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
    exporterMap.put(key, exporter);

    //export an stub service for dispatching event
    Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
    Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
    if (isStubSupportEvent && !isCallbackservice) {
        String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
        if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
            if (logger.isWarnEnabled()) {
                logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +
                        "], has set stubproxy support event ,but no stub methods founded."));
            }

        } else {
            stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
        }
    }
    //开启服务,也就是调用netty,开启20880端口
    openServer(url);
    //加载指定序列化方式 默认采用hessan2
    optimizeSerialization(url);
    return exporter;
}
//org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#openServer
private void openServer(URL url) {
    //..... createServer()创建服务
   serverMap.put(key, createServer(url));
}
//org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#createServer
private ExchangeServer createServer(URL url) {

    ExchangeServer server;
    try {
        server = Exchangers.bind(url, requestHandler);
    } catch (RemotingException e) {
        throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
    }

    return server;
}
//org.apache.dubbo.remoting.exchange.support.header.HeaderExchanger#bind
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
    return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
//org....remoting.Transporters#bind(URL, ChannelHandler...)
public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {

    ChannelHandler handler;
    if (handlers.length == 1) {
        handler = handlers[0];
    } else {
        handler = new ChannelHandlerDispatcher(handlers);
    }//默认选择netty4 实现
    return getTransporter().bind(url, handler);
}

//org.apache.dubbo.remoting.transport.netty4.NettyTransporter#connect
public Client connect(URL url, ChannelHandler listener) throws RemotingException {
    return new NettyClient(url, listener);
}
//org.apache.dubbo.remoting.transport.AbstractClient#AbstractClient
public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
    super(url, handler);
    needReconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false);
    try {
        doOpen();
//org.apache.dubbo.remoting.transport.netty4.NettyClient#doOpen     
protected void doOpen() throws Throwable {
    final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);
    bootstrap = new Bootstrap();
    bootstrap.group(nioEventLoopGroup)
            .option(ChannelOption.SO_KEEPALIVE, true)
            .option(ChannelOption.TCP_NODELAY, true)
            .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
            .channel(NioSocketChannel.class);

    if (getConnectTimeout() < 3000) {
        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000);
    } else {
        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getConnectTimeout());
    }

    bootstrap.handler(new ChannelInitializer() {

        @Override
        protected void initChannel(Channel ch) throws Exception {

跟到最后看到了熟悉的netty启动,这里有好多我们熟悉的配置,比如第一篇文章我们说到的心跳实现IdleStateHandler,以及心跳默认时间 UrlUtils.getHeartbeat(getUrl()),还有netty 的自定义handler nettyClientHandler(没错这个handler就是处理dubbo消费者请求的)

总结

总结下,我们一根线走到底,走到了最后的socket启动,最后将 DubboExporter 放入了map中,最后层层包装为 DestroyableExporter(ExporterChangeableWrapper(ListenerExporterWrapper(DubboExporter(CallbackRegistrationInvoker(InvokerDelegate(DelegateProviderMetaDataInvoker(MyProxyInvoker(HelloServiceImpl@xx))))))));嵌套虽然多了点,但是Wrapper 类的功能都是为了扩展小功能,后面我们调几个分析

image

后面将分析注册中心和Wrapper 等功能。

上一篇下一篇

猜你喜欢

热点阅读