dubbo的服务发布
业务功能
1 配置文件的解析
2 服务注册:map保存注册的对象,向zookeeper创建结点
3 启动一个服务端监听
4 网络通信,序列化和反序列化
Dubbo 对于 sping 的扩展
Spring 的标签扩展
通过spring.handlers来实现自定义配置,以NamespaceUrl作为key,对应的Handler作为value的键值对,解析配置spring的DefaultNamespaceHandlerResolver的resolve方法来处理的。
image.png
org.apache.dubbo.config.spring.schema.DubboNamespaceHandler.init
将对一个的配置解析对应的Config,有3个比较特殊ServiceBean,ReferenceBean和ConfigCenterBean都继承对应的config。
public void init() {
registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));
registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));
registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));
registerBeanDefinitionParser("config-center", new DubboBeanDefinitionParser(ConfigCenterBean.class, true));
registerBeanDefinitionParser("metadata-report", new DubboBeanDefinitionParser(MetadataReportConfig.class, true));
registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
registerBeanDefinitionParser("metrics", new DubboBeanDefinitionParser(MetricsConfig.class, true));
registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));
registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
registerBeanDefinitionParser("annotation", new AnnotationBeanDefinitionParser());
}
org.apache.dubbo.config.spring.schema.DubboBeanDefinitionParser.parse
你解析配置文件,如ServiceBean解析代码如下
String className = element.getAttribute("class");
if (className != null && className.length() > 0) {
RootBeanDefinition classDefinition = new RootBeanDefinition();
classDefinition.setBeanClass(ReflectUtils.forName(className));
classDefinition.setLazyInit(false);
parseProperties(element.getChildNodes(), classDefinition);
beanDefinition.getPropertyValues().addPropertyValue("ref", new BeanDefinitionHolder(classDefinition, id + "Impl"));
}
ServiceBean 的实现
ServiceBean 这个类,分别实现了 InitializingBean, DisposableBean, ApplicationContextAware, ApplicationListener,
BeanNameAware, ApplicationEventPublisherAware
org.springframework.beans.factory.InitializingBean#afterPropertiesSet
在对象初始化执行该方法
org.springframework.beans.factory.DisposableBean#destroy
在对象销毁时执行该方法
org.springframework.context.ApplicationContextAware的setApplicationContext
可以获取ApplicationContext容器
org.springframework.context.ApplicationListener#onApplicationEvent
ApplicationEvent的时间监听
org.springframework.beans.factory.BeanNameAware#setBeanName
设置bean的name值
org.springframework.context.ApplicationEventPublisherAware的setApplicationEventPublisher
事件发布
spring事件发送监听
ApplicationEvent:事件本身
ApplicationEventPublisherAware:事件发送器
ApplicationListener:事件监听器
ServiceBean 中服务暴露过程
afterPropertiesSet
将dubbo的配置ServiceBean中,方便后面使用。
onApplicationEvent
当容器初始化或刷新会触发
public void onApplicationEvent(ContextRefreshedEvent event) {
if (!isExported() && !isUnexported()) {
if (logger.isInfoEnabled()) {
logger.info("The service ready on spring started. service: " + getInterface());
}
export(); //导出、发布
}
}
export
服务发布
public void export() {
super.export();
// Publish ServiceBeanExportedEvent
publishExportEvent();
}
ServiceConfig 配置类
export
public synchronized void export() {
checkAndUpdateSubConfigs(); //检查或这个更新配置
if (!shouldExport()) { //当前服务是否要发布
return;
}
if (shouldDelay()) {//是否延迟
delayExportExecutor.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);
} else {
doExport();
}
}
doExportUrls
private void doExportUrls() {
//(N)加载注册中心,并且声称URL地址
//URL(来驱动流程的执行)->[ registry://47.110.245.187:2181/org.apache.dubbo.registry.RegistryService?application=pay-service
// &dubbo=2.0.2&pid=18104®istry=zookeeper&release=2.7.2×tamp=1591225813156]
List<URL> registryURLs = loadRegistries(true);
for (ProtocolConfig protocolConfig : protocols) {
//iterface , version ,group组成的key
String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version);
//存储服务发布的元数据
ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass);
ApplicationModel.initProviderModel(pathKey, providerModel);
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
doExportUrlsFor1Protocol
1 把当前服务下所配置的<dubbo:method>参数进行解析,保存到 map 集合中
2 获取暴露的ip和端口
//主机绑定
String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
Integer port = this.findConfigedPorts(protocolConfig, name, map);
3 组装URL对象
dubbo://169.254.108.117:20880/com.my.dubbo.IPayService
URL url = new URL (name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);
4 scope
分为两种local和remote,Local提供 jvm调用方式即本地的dubbo服务调用;remote表示根据配置中心进行远程发布。
String scope = url.getParameter(SCOPE_KEY);
// don't export when none is configured
if (!SCOPE_NONE.equalsIgnoreCase(scope)) {
// 如果是本地发布,则直接调用exportLocal
if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
exportLocal(url); //TODO
}
// export to remote if the config is not local (export to local only when config is local)
if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
}
for (URL registryURL : registryURLs) { //registryURL: registry://ip:port...
//invoker -> 代理类
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
//MetaData元数据的委托
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
protocol.export
是方法级别的自适应扩展点,会动态生成ProtocolAdaptive
public class Protocol$Adaptive implements org.apache.dubbo.rpc.Protocol {
public org.apache.dubbo.rpc.Invoker refer(java.lang.Class arg0, org.apache.dubbo.common.URL arg1) throws org.apache.dubbo.rpc.RpcException {
if (arg1 == null) throw new IllegalArgumentException("url == null");
org.apache.dubbo.common.URL url = arg1;
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null)
throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])");
org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.refer(arg0, arg1);
}
public org.apache.dubbo.rpc.Exporter export(org.apache.dubbo.rpc.Invoker arg0) throws org.apache.dubbo.rpc.RpcException {
if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null)
throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
org.apache.dubbo.common.URL url = arg0.getUrl();
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null)
throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])");
org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.export(arg0);
}
}
RegistryProtocol.export
暴露一个服务
// registryUrl -> zookeeper://ip:port
URL registryUrl = getRegistryUrl(originInvoker);
// providerUrl -> dubbo:// ip:port
URL providerUrl = getProviderUrl(originInvoker);
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
把dubbo:// url注册到zk上
register(registryUrl, registeredProviderUrl);
doLocalExport
启动一个netty服务
protocol.export(invokerDelegate), originInvoker);
DubboProtocol.export
Wrapper 包装
会通过 Wrapper 对 Protocol 进行装饰,利用方法增强。
1 org.apache.dubbo.common.extension.ExtensionLoader#isWrapperClass
若该类中有某个类型的构造器,则是包装类,否则则不是
private boolean isWrapperClass(Class<?> clazz) {
try {
//type=Protocol.class
clazz.getConstructor(type);
return true;
} catch (NoSuchMethodException e) {
return false;
}
}
2 org.apache.dubbo.common.extension.ExtensionLoader#cacheWrapperClass
把包装类放入缓存中
private void cacheWrapperClass(Class<?> clazz) {
if (cachedWrapperClasses == null) {
cachedWrapperClasses = new ConcurrentHashSet<>();
}
cachedWrapperClasses.add(clazz);
}
3 org.apache.dubbo.common.extension.ExtensionLoader#createExtension
从缓存中获取包装类,对实例进行包装
Set<Class<?>> wrapperClasses = cachedWrapperClasses;
if (CollectionUtils.isNotEmpty(wrapperClasses)) {
for (Class<?> wrapperClass : wrapperClasses) {
instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
}
}
根据dubbo的配置文件获取,有三个包装类ProtocolFilterWrapper和ProtocolListenerWrapper和QosProtocolWrapper,装饰器分别为: QosProtocolWrapper/ProtocolListenerWrapper/ProtocolFilterWrapper/DubboProtocol
ProtocolFilterWrapper包装类
使用激活扩展点来激活Filter链路。
protocol.export(buildInvokerChain(invoker, SERVICE_FILTER_KEY, CommonConstants.PROVIDER));
Invoker<T> last = invoker;
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
return new CallbackRegistrationInvoker<>(last, filters);
org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#export
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// export service.
//获取服务标识,理解成服务坐标也行。由服务组名,服务名,服务版本号以及端口组成如${group}/copm.my.practice.dubbo.ISayHelloService:${version}:20880
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
openServer(url); //openServer(url) 开启一个服务 ,暴露20880端口
optimizeSerialization(url); //优化序列化
return exporter;
}
org.apache.dubbo.remoting.exchange.Exchangers#bind
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
//HeaderExchanger.
return getExchanger(url).bind(url, handler);
}
org.apache.dubbo.remoting.exchange.support.header.HeaderExchanger
.bind
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
org.apache.dubbo.remoting.transport.netty4.NettyTransporter#bind
public Server bind(URL url, ChannelHandler listener) throws RemotingException {
return new NettyServer(url, listener);
}
org.apache.dubbo.remoting.transport.AbstractServer#AbstractServer
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler);
localAddress = getUrl().toInetSocketAddress();
String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
if (url.getParameter(ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
bindIp = ANYHOST_VALUE;
}
bindAddress = new InetSocketAddress(bindIp, bindPort);
this.accepts = url.getParameter(ACCEPTS_KEY, DEFAULT_ACCEPTS);
this.idleTimeout = url.getParameter(IDLE_TIMEOUT_KEY, DEFAULT_IDLE_TIMEOUT);
try {
doOpen();
if (logger.isInfoEnabled()) {
logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
}
} catch (Throwable t) {
throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
+ " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
}
//fixme replace this with better method
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
}
org.apache.dubbo.remoting.transport.netty.NettyServer#doOpen
protected void doOpen() throws Throwable {
NettyHelper.setNettyLoggerFactory();
ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
bootstrap = new ServerBootstrap(channelFactory);
final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
channels = nettyHandler.getChannels();
// https://issues.jboss.org/browse/NETTY-365
// https://issues.jboss.org/browse/NETTY-379
// final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));
bootstrap.setOption("child.tcpNoDelay", true);
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ChannelPipeline pipeline = Channels.pipeline();
/*int idleTimeout = getIdleTimeout();
if (idleTimeout > 10000) {
pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
}*/
pipeline.addLast("decoder", adapter.getDecoder());
pipeline.addLast("encoder", adapter.getEncoder());
pipeline.addLast("handler", nettyHandler);
return pipeline;
}
});
// bind
channel = bootstrap.bind(getBindAddress());
}
Invoker是什么
服务发布分三个阶段:
第一个阶段会创造一个invoker
第二个阶段会把经历过一系列处理的invoker(各种包装),在DubboProtocol中保存到exporterMap中
第三个阶段把dubbo协议的url地址注册到注册中心上.
org.apache.dubbo.config.ServiceConfig#doExportUrlsFor1Protocol
//获取代理类
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
org.apache.dubbo.rpc.proxy.wrapper.StubProxyFactoryWrapper.getInvoker
包装类
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException {
return proxyFactory.getInvoker(proxy, type, url);
}
org.apache.dubbo.rpc.proxy.javassist.JavassistProxyFactory.getInvoker
使用自适应扩展点默认的值即@SPI("javassist")
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
/**
* proxy:接口的实现类的对象
* type:接口的类型
* url: registry://ip:port...
*/
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
javassist生成的动态代理代码
org.apache.dubbo.common.bytecode.Wrapper#makeWrapper:使用字节码动态生成类并初始化对象
public Object invokeMethod (Object o, String n, Class[]p, Object[]v) throws
java.lang.reflect.InvocationTargetException {
com.my.dubbo.PayServiceImpl w;
try {
w = ((com.my.dubbo.PayServiceImpl) $1);
} catch (Throwable e) {
throw new IllegalArgumentException(e);
}
try {
if ("pay".equals($2) && $3.length == 1) {
return ($w) w.pay((java.lang.String) $4[0]);
}
} catch (Throwable e) {
throw new java.lang.reflect.InvocationTargetException(e);
}
throw new org.apache.dubbo.common.bytecode.NoSuchMethodException("Not found method \"" + $2 + "\" in class com.my.dubbo.PayServiceImpl.");
}