Dubbo服务暴露
1 Dubbo服务暴露介绍
Dubbo服务暴露的整体流程如下:
2 Dubbo服务暴露源码
2.1 延时暴露
ServiceConfig#export
public synchronized void export() {
//检查和更新附属的配置文件
checkAndUpdateSubConfigs();
// shouldExport 默认值为 true 如果export==null?true:export
//如果只想本地启动服务,不把服务暴露出去给别人使用,可以配置<dubbo:provider export="false" />禁止服务导出
if (!shouldExport()) {
return;
}
//如果应该延迟导出,就去延迟导出 ,这里不研究
if (shouldDelay()) {
DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);
} else {
// 立即导出服务
doExport();
}
}
如果设置了 delay 参数,Dubbo 的处理方式是启动一个守护线程在 sleep 指定时间后再 doExport。
2.2 暴露检查
ServiceConfig#doExport
protected synchronized void doExport() {
// 这个服务不需要暴露
if (unexported) {
throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!");
}
//这个服务已经暴露了
if (exported) {
return;
}
exported = true;
//如果path为空,给它赋值
if (StringUtils.isEmpty(path)) {
path = interfaceName;
}
//多协议多注册中心暴露服务
doExportUrls();
}
判断服务是否已经暴露或者不需要暴露。
2.3 多协议多注册中心暴露服务
ServiceConfig#doExportUrls
//Dubbo允许我们使用不同的协议暴露服务,也允许我们向多个 注册中心注册服务
//多协议多注册中心暴露服务
private void doExportUrls() {
//通过loadRegistries加载注册中心链接
List<URL> registryURLs = loadRegistries(true);
//遍历协议、在每个协议下暴露服务
for (ProtocolConfig protocolConfig : protocols) {
// 通过(路径+guoup+version)构建一个string类型的 路径key 从协议配置类中,获取上下文路径.
String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version);
//提供者模型
ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass);
//ApplicationModel持有所有的提供者模型 .init初始化提供者模型
ApplicationModel.initProviderModel(pathKey, providerModel);
//组装URL传入协议配置类 注册集合
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
loadRegistries
方法的主要是构造注册配置类的URL。
2.4 组装URL
ServiceConfig#doExportUrlsFor1Protocol
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
//省略URL 的构造 (协议名、IP、端口、上下文路径、map ),map包含了url的参数(如generic,token)。
//------------------------ 暴露Dubbo服务-----------------------------
if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.hasExtension(url.getProtocol())) {
// 加载 ConfiguratorFactory,并生成 Configurator 实例,然后通过实例配置 url
url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.getExtension(url.getProtocol()).getConfigurator(url).configure(url);
}
// 作用范围三个取值(none:什么都不做、scope != remote,暴露到本地scope != local,暴露到远程)
String scope = url.getParameter(SCOPE_KEY);
//如果scope !=none 则进入方法
if (!SCOPE_NONE.equalsIgnoreCase(scope)) {
// scope != remote 暴露到本地
if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
//导出到本地
exportLocal(url);
}
// scope != local 暴露到远程
if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
if (CollectionUtils.isNotEmpty(registryURLs)) {
for (URL registryURL : registryURLs) {
//if protocol is only injvm ,not register
// 如果协议只是 injvm , 而不是redister
if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
continue;
}
url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
// 加载监视器链接
URL monitorUrl = loadMonitor(registryURL);
// 如果 monitor不等于null 将监视器链接作为参数添加到URL中
if (monitorUrl != null) {
url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString());
}
// 对于提供程序,这用于启用自定义代理来生成调用程序
String proxy = url.getParameter(PROXY_KEY);
if (StringUtils.isNotEmpty(proxy)) {
registryURL = registryURL.addParameter(PROXY_KEY, proxy);
}
// 为服务提供类(ref),生成Imvoker
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
// DelegateProviderMetaDataInvoker 用于持有 Invoker 和 ServiceConfig
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
//暴露 服务 并生成exporter
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
//不存在注册中心 ,仅仅暴露服务
} else {
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
//发布元数据
WritableMetadataService metadataService = WritableMetadataService.getExtension(url.getParameter(METADATA_KEY, DEFAULT_METADATA_STORAGE_TYPE));
if (metadataService != null) {
metadataService.publishServiceDefinition(url);
}
}
}
}
注意:
如果配置 scope=none, 则不会进行服务暴露;如果没有配置 scope 或者 scope=local,则会进行本地暴露。本地暴露时使用 injvm 协议,injvm 协议是一个伪协议,它不开启端口,不能被远程调用,只在 JVM 内直接关联,但执行 Dubbo 的 Filter 链。
上面远程暴露服务会根据是有有注册中心分两种情况:
-
直接暴露服务
-
通过
RegistryProtocol
暴露远程服务后,暴露服务到注册中心。
2.5 创建Invoker
在服务暴露之前需要创建Invoker
,Invoker
是实体域,它是Dubbo的核心模型,其他模型都向它靠拢,或者转换成它。它代表一个可执行体,可向它发起invoke
调用,这个调用有可能是一个本地的实现,也有可能是一个远程的实现,也有可能是一个集群的实现。 Invoker
是ProxyFactory
创建的,Dubbo的默认代理工厂实现类是JavassistProxyFactory
。
JavassistProxyFactory#getInvoker
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// 为目标类创建 Wrapper(方法就是,从缓存中获取Wrepper ,如果缓存中没有,创建,写入缓存)
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
// 创建匿名 Invoker 类对象,并实现 doInvoke 方法。
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
// 调用 Wrapper 的 invokeMethod 方法,invokeMethod 最终会调用目标方法
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
2.6 本地暴露
ServiceConfig#exportLocal
private void exportLocal(URL url) {
URL local = URLBuilder.from(url)
.setProtocol(LOCAL_PROTOCOL) //设置协议头为 injvm
.setHost(LOCALHOST_VALUE) //设置Host为localhost
.setPort(0)
.build();
// 创建 Invoker,并暴露服务,这里的 protocol 会在运行时调用 InjvmProtocol 的 export 方法
Exporter<?> exporter = protocol.export(
PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, local));
exporters.add(exporter);
logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry url : " + local);
}
本地暴露是通过InjvmProtocol
暴露。
2.7 远程暴露
远程暴露的协议通过ExtensionLoader
加载,同时会加载ProtocolFilterWrapper
和ProtocolListenerWrapper
作为Protocol
的包装类。
-
ProtocolFilterWrapper:包装Context,token,echo等的
Filter
到Invoker
; -
ProtocolListenerWrapper:包装
ExporterListener
';
2.8 注册服务
如果使用了注册中心,则在通过具体协议(如 Dubbo 协议)暴露服务之后进入服务注册流程,将服务节点注册到注册中心。
RegistryProtocol#export
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
//export
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
//registry provider
final Registry registry = getRegistry(originInvoker);
final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
//向Zookeeper注册节点
registry.register(registedProviderUrl);
// 订阅override数据
// 提供者订阅时,会影响同一JVM即暴露服务,又引用同一服务的的场景,因为subscribed以服务名为缓存的key,导致订阅信息覆盖。
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
//保证每次export都返回一个新的exporter实例
return new Exporter<T>() {
public Invoker<T> getInvoker() {
return exporter.getInvoker();
}
public void unexport() {
try {
exporter.unexport();
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
try {
registry.unregister(registedProviderUrl);
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
try {
overrideListeners.remove(overrideSubscribeUrl);
registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener);
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
};
}
private Registry getRegistry(final Invoker<?> originInvoker){
URL registryUrl = originInvoker.getUrl();
if (Constants.REGISTRY_PROTOCOL.equals(registryUrl.getProtocol())) {
String protocol = registryUrl.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_DIRECTORY);
registryUrl = registryUrl.setProtocol(protocol).removeParameter(Constants.REGISTRY_KEY);
}
return registryFactory.getRegistry(registryUrl);
}
3 DubboProtocol
Dubbo协议的暴露的服务是通过Netty远程通信实现服务调用。
DubboProtocol#export
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// export service.
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
//export an stub service for dispaching event
Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY,Constants.DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice){
String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0 ){
if (logger.isWarnEnabled()){
logger.warn(new IllegalStateException("consumer [" +url.getParameter(Constants.INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
} else {
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}
openServer(url);
return exporter;
}
DubboProtocol#openServer
private void openServer(URL url) {
// find server.
String key = url.getAddress();
//client 也可以暴露一个只有server可以调用的服务。
boolean isServer = url.getParameter(Constants.IS_SERVER_KEY,true);
if (isServer) {
ExchangeServer server = serverMap.get(key);
if (server == null) {
serverMap.put(key, createServer(url));
} else {
//server支持reset,配合override功能使用
server.reset(url);
}
}
}
private ExchangeServer createServer(URL url) {
//默认开启server关闭时发送readonly事件
url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
//默认开启heartbeat
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);
ExchangeServer server;
try {
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
str = url.getParameter(Constants.CLIENT_KEY);
if (str != null && str.length() > 0) {
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
return server;
}
Exchanger (默认 HeaderExchanger)封装请求响应模式,同步转异步,以 Request、Response 为中心。
HeaderExchager#bind
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
Transporters#bind
public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handlers == null || handlers.length == 0) {
throw new IllegalArgumentException("handlers == null");
}
ChannelHandler handler;
if (handlers.length == 1) {
handler = handlers[0];
} else {
handler = new ChannelHandlerDispatcher(handlers);
}
return getTransporter().bind(url, handler);
}
底层传输默认使用 NettyTransporter,最终是创建 NettyServer。
NettyTransporter#bind
public Server bind(URL url, ChannelHandler listener) throws RemotingException {
return new NettyServer(url, listener);
}
至此Netty连接创建。