dubbo的服务发布

2020-06-06  本文已影响0人  剑道_7ffc

业务功能

1 配置文件的解析
2 服务注册:map保存注册的对象,向zookeeper创建结点
3 启动一个服务端监听
4 网络通信,序列化和反序列化

Dubbo 对于 sping 的扩展

Spring 的标签扩展

通过spring.handlers来实现自定义配置,以NamespaceUrl作为key,对应的Handler作为value的键值对,解析配置spring的DefaultNamespaceHandlerResolver的resolve方法来处理的。


image.png

org.apache.dubbo.config.spring.schema.DubboNamespaceHandler.init

将对一个的配置解析对应的Config,有3个比较特殊ServiceBean,ReferenceBean和ConfigCenterBean都继承对应的config。

public void init() {
    registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));
    registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));
    registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));
    registerBeanDefinitionParser("config-center", new DubboBeanDefinitionParser(ConfigCenterBean.class, true));
    registerBeanDefinitionParser("metadata-report", new DubboBeanDefinitionParser(MetadataReportConfig.class, true));
    registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
    registerBeanDefinitionParser("metrics", new DubboBeanDefinitionParser(MetricsConfig.class, true));
    registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));
    registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
    registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
    registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
    registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
    registerBeanDefinitionParser("annotation", new AnnotationBeanDefinitionParser());
}

org.apache.dubbo.config.spring.schema.DubboBeanDefinitionParser.parse

你解析配置文件,如ServiceBean解析代码如下

String className = element.getAttribute("class");
if (className != null && className.length() > 0) {
    RootBeanDefinition classDefinition = new RootBeanDefinition();
    classDefinition.setBeanClass(ReflectUtils.forName(className));
    classDefinition.setLazyInit(false);
    parseProperties(element.getChildNodes(), classDefinition);
    beanDefinition.getPropertyValues().addPropertyValue("ref", new BeanDefinitionHolder(classDefinition, id + "Impl"));
}

ServiceBean 的实现

ServiceBean 这个类,分别实现了 InitializingBean, DisposableBean, ApplicationContextAware, ApplicationListener,
BeanNameAware, ApplicationEventPublisherAware

org.springframework.beans.factory.InitializingBean#afterPropertiesSet

在对象初始化执行该方法

org.springframework.beans.factory.DisposableBean#destroy

在对象销毁时执行该方法

org.springframework.context.ApplicationContextAware的setApplicationContext

可以获取ApplicationContext容器

org.springframework.context.ApplicationListener#onApplicationEvent

ApplicationEvent的时间监听

org.springframework.beans.factory.BeanNameAware#setBeanName

设置bean的name值

org.springframework.context.ApplicationEventPublisherAware的setApplicationEventPublisher

事件发布

spring事件发送监听

ApplicationEvent:事件本身
ApplicationEventPublisherAware:事件发送器
ApplicationListener:事件监听器

ServiceBean 中服务暴露过程

afterPropertiesSet

将dubbo的配置ServiceBean中,方便后面使用。

onApplicationEvent

当容器初始化或刷新会触发

public void onApplicationEvent(ContextRefreshedEvent event) {
    if (!isExported() && !isUnexported()) {
        if (logger.isInfoEnabled()) {
            logger.info("The service ready on spring started. service: " + getInterface());
        }
        export(); //导出、发布
    }
}
export

服务发布

    public void export() {
        super.export();
        // Publish ServiceBeanExportedEvent
        publishExportEvent();
    }

ServiceConfig 配置类

export
    public synchronized void export() {
        checkAndUpdateSubConfigs(); //检查或这个更新配置

        if (!shouldExport()) { //当前服务是否要发布
            return;
        }

        if (shouldDelay()) {//是否延迟
            delayExportExecutor.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);
        } else {
            doExport();
        }
    }
doExportUrls
private void doExportUrls() {
    //(N)加载注册中心,并且声称URL地址
    //URL(来驱动流程的执行)->[  registry://47.110.245.187:2181/org.apache.dubbo.registry.RegistryService?application=pay-service
    // &dubbo=2.0.2&pid=18104&registry=zookeeper&release=2.7.2&timestamp=1591225813156]
    List<URL> registryURLs = loadRegistries(true);
    for (ProtocolConfig protocolConfig : protocols) {
        //iterface , version ,group组成的key
        String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version);
        //存储服务发布的元数据
        ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass);
        ApplicationModel.initProviderModel(pathKey, providerModel);
        doExportUrlsFor1Protocol(protocolConfig, registryURLs);
    }
}
doExportUrlsFor1Protocol

1 把当前服务下所配置的<dubbo:method>参数进行解析,保存到 map 集合中
2 获取暴露的ip和端口

//主机绑定
String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
Integer port = this.findConfigedPorts(protocolConfig, name, map);

3 组装URL对象
dubbo://169.254.108.117:20880/com.my.dubbo.IPayService

URL url = new URL (name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);

4 scope
分为两种local和remote,Local提供 jvm调用方式即本地的dubbo服务调用;remote表示根据配置中心进行远程发布。

String scope = url.getParameter(SCOPE_KEY);
// don't export when none is configured
if (!SCOPE_NONE.equalsIgnoreCase(scope)) {

// 如果是本地发布,则直接调用exportLocal
if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
    exportLocal(url);  //TODO
}
// export to remote if the config is not local (export to local only when config is local)
if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
}
for (URL registryURL : registryURLs) { //registryURL: registry://ip:port...
    //invoker -> 代理类
    Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
    //MetaData元数据的委托
    DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
    Exporter<?> exporter = protocol.export(wrapperInvoker);
    exporters.add(exporter);
}
protocol.export

是方法级别的自适应扩展点,会动态生成ProtocolAdaptive,因为url是以registry开头,所以调用的是RegistryProtocol.export。 1 ProtocolAdaptive

public class Protocol$Adaptive implements org.apache.dubbo.rpc.Protocol {
    public org.apache.dubbo.rpc.Invoker refer(java.lang.Class arg0, org.apache.dubbo.common.URL arg1) throws org.apache.dubbo.rpc.RpcException {
        if (arg1 == null) throw new IllegalArgumentException("url == null");
        org.apache.dubbo.common.URL url = arg1;
        String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
        if (extName == null)
            throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])");
        org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
        return extension.refer(arg0, arg1);
    }

    public org.apache.dubbo.rpc.Exporter export(org.apache.dubbo.rpc.Invoker arg0) throws org.apache.dubbo.rpc.RpcException {
        if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
        if (arg0.getUrl() == null)
            throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
        org.apache.dubbo.common.URL url = arg0.getUrl();
        String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
        if (extName == null)
            throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])");
        org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
        return extension.export(arg0);
    }
}

RegistryProtocol.export

暴露一个服务
// registryUrl -> zookeeper://ip:port
URL registryUrl = getRegistryUrl(originInvoker);
// providerUrl -> dubbo:// ip:port
URL providerUrl = getProviderUrl(originInvoker);
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
把dubbo:// url注册到zk上
register(registryUrl, registeredProviderUrl);
doLocalExport

启动一个netty服务

protocol.export(invokerDelegate), originInvoker);

DubboProtocol.export

Wrapper 包装

会通过 Wrapper 对 Protocol 进行装饰,利用方法增强。
1 org.apache.dubbo.common.extension.ExtensionLoader#isWrapperClass
若该类中有某个类型的构造器,则是包装类,否则则不是

private boolean isWrapperClass(Class<?> clazz) {
    try {
        //type=Protocol.class
        clazz.getConstructor(type);
        return true;
    } catch (NoSuchMethodException e) {
        return false;
    }
}

2 org.apache.dubbo.common.extension.ExtensionLoader#cacheWrapperClass
把包装类放入缓存中

private void cacheWrapperClass(Class<?> clazz) {
    if (cachedWrapperClasses == null) {
        cachedWrapperClasses = new ConcurrentHashSet<>();
    }
    cachedWrapperClasses.add(clazz);
}

3 org.apache.dubbo.common.extension.ExtensionLoader#createExtension
从缓存中获取包装类,对实例进行包装

Set<Class<?>> wrapperClasses = cachedWrapperClasses;
if (CollectionUtils.isNotEmpty(wrapperClasses)) {
    for (Class<?> wrapperClass : wrapperClasses) {
        instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
    }
}

根据dubbo的配置文件获取,有三个包装类ProtocolFilterWrapper和ProtocolListenerWrapper和QosProtocolWrapper,装饰器分别为: QosProtocolWrapper/ProtocolListenerWrapper/ProtocolFilterWrapper/DubboProtocol

ProtocolFilterWrapper包装类

使用激活扩展点来激活Filter链路。

protocol.export(buildInvokerChain(invoker, SERVICE_FILTER_KEY, CommonConstants.PROVIDER));
Invoker<T> last = invoker;
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
return new CallbackRegistrationInvoker<>(last, filters);

org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#export

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
    URL url = invoker.getUrl();

    // export service.
    //获取服务标识,理解成服务坐标也行。由服务组名,服务名,服务版本号以及端口组成如${group}/copm.my.practice.dubbo.ISayHelloService:${version}:20880
    String key = serviceKey(url);
    DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
    exporterMap.put(key, exporter);

    openServer(url); //openServer(url) 开启一个服务 ,暴露20880端口
    optimizeSerialization(url);  //优化序列化

    return exporter;
}

org.apache.dubbo.remoting.exchange.Exchangers#bind

public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
    url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");

    //HeaderExchanger.
    return getExchanger(url).bind(url, handler);
}

org.apache.dubbo.remoting.exchange.support.header.HeaderExchanger

.bind

    public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }

org.apache.dubbo.remoting.transport.netty4.NettyTransporter#bind

public Server bind(URL url, ChannelHandler listener) throws RemotingException {
    return new NettyServer(url, listener);
}

org.apache.dubbo.remoting.transport.AbstractServer#AbstractServer

public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
    super(url, handler);
    localAddress = getUrl().toInetSocketAddress();

    String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
    int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
    if (url.getParameter(ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
        bindIp = ANYHOST_VALUE;
    }
    bindAddress = new InetSocketAddress(bindIp, bindPort);
    this.accepts = url.getParameter(ACCEPTS_KEY, DEFAULT_ACCEPTS);
    this.idleTimeout = url.getParameter(IDLE_TIMEOUT_KEY, DEFAULT_IDLE_TIMEOUT);
    try {
        doOpen();
        if (logger.isInfoEnabled()) {
            logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
        }
    } catch (Throwable t) {
        throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
                + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
    }
    //fixme replace this with better method
    DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
    executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
}

org.apache.dubbo.remoting.transport.netty.NettyServer#doOpen

protected void doOpen() throws Throwable {
    NettyHelper.setNettyLoggerFactory();
    ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
    ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
    ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
    bootstrap = new ServerBootstrap(channelFactory);

    final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
    channels = nettyHandler.getChannels();
    // https://issues.jboss.org/browse/NETTY-365
    // https://issues.jboss.org/browse/NETTY-379
    // final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));
    bootstrap.setOption("child.tcpNoDelay", true);
    bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
        @Override
        public ChannelPipeline getPipeline() {
            NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
            ChannelPipeline pipeline = Channels.pipeline();
            /*int idleTimeout = getIdleTimeout();
            if (idleTimeout > 10000) {
                pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
            }*/
            pipeline.addLast("decoder", adapter.getDecoder());
            pipeline.addLast("encoder", adapter.getEncoder());
            pipeline.addLast("handler", nettyHandler);
            return pipeline;
        }
    });
    // bind
    channel = bootstrap.bind(getBindAddress());
}

Invoker是什么

服务发布分三个阶段:
第一个阶段会创造一个invoker
第二个阶段会把经历过一系列处理的invoker(各种包装),在DubboProtocol中保存到exporterMap中
第三个阶段把dubbo协议的url地址注册到注册中心上.

org.apache.dubbo.config.ServiceConfig#doExportUrlsFor1Protocol

//获取代理类
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));

org.apache.dubbo.rpc.proxy.wrapper.StubProxyFactoryWrapper.getInvoker

包装类

public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException {
    return proxyFactory.getInvoker(proxy, type, url);
}

org.apache.dubbo.rpc.proxy.javassist.JavassistProxyFactory.getInvoker

使用自适应扩展点默认的值即@SPI("javassist")

public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
    /**
     * proxy:接口的实现类的对象
     * type:接口的类型
     * url: registry://ip:port...
     */
    final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
    return new AbstractProxyInvoker<T>(proxy, type, url) {
        @Override
        protected Object doInvoke(T proxy, String methodName,
                                  Class<?>[] parameterTypes,
                                  Object[] arguments) throws Throwable {
            return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
        }
    };
}

javassist生成的动态代理代码

org.apache.dubbo.common.bytecode.Wrapper#makeWrapper:使用字节码动态生成类并初始化对象

public Object invokeMethod (Object o, String n, Class[]p, Object[]v) throws
java.lang.reflect.InvocationTargetException {
    com.my.dubbo.PayServiceImpl w;
    try {
        w = ((com.my.dubbo.PayServiceImpl) $1);
    } catch (Throwable e) {
        throw new IllegalArgumentException(e);
    }
    try {
        if ("pay".equals($2) && $3.length == 1) {
            return ($w) w.pay((java.lang.String) $4[0]);
        }
    } catch (Throwable e) {
        throw new java.lang.reflect.InvocationTargetException(e);
    }
    throw new org.apache.dubbo.common.bytecode.NoSuchMethodException("Not found method \"" + $2 + "\" in class com.my.dubbo.PayServiceImpl.");
}
上一篇下一篇

猜你喜欢

热点阅读