dubbo的服务发布过程
dubbo发布服务的流程
1、具体的服务转为invoker: ServiceConfig类通过ProxyFactory类的getInvoker方法,将服务提供类ref生成invoker。
2、Invoker转换成Exporter:打开通信端口,接听来自客户端的申请。
具体解析
1、当Spring容器实例化bean完成,ServiceBean会执行onApplicationEvent方法,该方法调用ServiceConfig的export方法。
2、ServiceConfig的父类ServiceConfig在初始化时,会率先完成protocol和proxyFactory的spi扩展
private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
private static final ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
其中,protocol 是协议的扩展,proxyFactory 是代理扩展(用于生成invoker)。
package com.alibaba.dubbo.rpc;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class Protocol$Adpative implements com.alibaba.dubbo.rpc.Protocol {
public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws java.lang.Class {
if (arg1 == null)
throw new IllegalArgumentException("url == null");
com.alibaba.dubbo.common.URL url = arg1;
String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
if(extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.refer(arg0, arg1);
}
public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.Invoker {
if (arg0 == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");com.alibaba.dubbo.common.URL url = arg0.getUrl();
//根据URL配置信息获取Protocol协议,默认是dubbo
String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
if(extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
//根据协议名,获取Protocol的实现
//获得Protocol的实现过程中,会对Protocol先进行依赖注入,然后进行Wrapper包装,最后返回被修改过的Protocol
//包装经过了ProtocolFilterWrapper,ProtocolListenerWrapper,RegistryProtocol
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.export(arg0);
}
public void destroy() {
throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
}
public int getDefaultPort() {
throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
}
}
package com.alibaba.dubbo.rpc;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class ProxyFactory$Adpative implements com.alibaba.dubbo.rpc.ProxyFactory {
public com.alibaba.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.Class arg1, com.alibaba.dubbo.common.URL arg2) throws java.lang.Object {
if (arg2 == null)
throw new IllegalArgumentException("url == null");
com.alibaba.dubbo.common.URL url = arg2;
String extName = url.getParameter("proxy", "javassist");
if(extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])");
com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName);
return extension.getInvoker(arg0, arg1, arg2);
}
public java.lang.Object getProxy(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.Invoker {
if (arg0 == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");com.alibaba.dubbo.common.URL url = arg0.getUrl();
String extName = url.getParameter("proxy", "javassist");
if(extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])");
com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName);
return extension.getProxy(arg0);
}
}
关键点
//转为invoker
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
//invoker转为exporter
Exporter<?> exporter = protocol.export(invoker);
ref转为invoker 过程
public com.alibaba.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.Class arg1, com.alibaba.dubbo.common.URL arg2) throws java.lang.Object {
if (arg2 == null)
throw new IllegalArgumentException("url == null");
com.alibaba.dubbo.common.URL url = arg2;
String extName = url.getParameter("proxy", "javassist");
if(extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])");
com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName);
return extension.getInvoker(arg0, arg1, arg2);
}
dubbo默认情况下用javassist动态代理方式,将ref转为invoker。
invoker的定义:
public interface Invoker<T> extends Node {
/**
* get service interface.
*
* @return service interface.
*/
Class<T> getInterface();
/**
* invoke.
*
* @param invocation
* @return result
* @throws RpcException
*/
Result invoke(Invocation invocation) throws RpcException;
}
Invocation 包含了ref类的相关方法名,参数等。invoker可以根据这个invocation得到对应的结果值。????
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
//下面这句话应该是javassist动态代理的内容。结果就是wrapper这个实例里,有invokeMethod方法,里面传入类的实例,方法名等,可以得到类的实例的方法的结果
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
//返回一个Invoker实例,doInvoke方法中直接返回上面wrapper的invokeMethod
//关于生成的wrapper,请看下面列出的生成的代码,其中invokeMethod方法中就有实现类对实际方法的调用
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
这里,就将服务类实例ref转为了invoker。
invoker转为exporter
Exporter<?> exporter = protocol.export(invoker);
转为exporter的过程中,主要做了两个工作:打开指定的端口号,监听来自客户端的申请;向Zookeeper等注册中心注册、订阅服务;
向注册中心注册,订阅服务
final Registry registry = getRegistry(originInvoker);
final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
registry.register(registedProviderUrl);
// 订阅override数据
// FIXME 提供者订阅时,会影响同一JVM即暴露服务,又引用同一服务的的场景,因为subscribed以服务名为缓存的key,导致订阅信息覆盖。
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
//OverrideListener是RegistryProtocol的内部类
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
//订阅override数据
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
final Registry registry = getRegistry(originInvoker) 是根据originInvoker的url获取注册中心的地址,生成注册中心的客户端。其中 originInvoker的url是
registry://192.168.25.128:2181/com.alibaba.dubbo.registry.RegistryService?application=dubbo-test-service&dubbo=2.5.3&export=dubbo%3A%2F%2F192.168.86.1%3A20880%2Fcn.andy.dubbo.DataService%3Fanyhost%3Dtrue%26application%3Ddubbo-test-service%26dispatcher%3Dall%26dubbo%3D2.5.3%26interface%3Dcn.andy.dubbo.DataService%26methods%3DdubboTest2%2CdubboTest%2CgetStringData%26mock%3Dtrue%26pid%3D47756%26retries%3D0%26service.filter%3DandyFilter%26side%3Dprovider%26threadpool%3Dfixed%26threads%3D100%26timeout%3D60000%26timestamp%3D1543287819642%26token%3D1234567&pid=47756®istry=zookeeper×tamp=1543287819603
cn.andy.dubbo.impl.DataServiceImpl@4f071df8
com.alibaba.dubbo.rpc.proxy.javassist.JavassistProxyFactory@4de41af9
interface cn.andy.dubbo.DataService
registry://192.168.25.128:2181/com.alibaba.dubbo.registry.RegistryService?application=dubbo-test-service&dubbo=2.5.3&export=dubbo%3A%2F%2F192.168.86.1%3A20880%2Fcn.andy.dubbo.DataService%3Fanyhost%3Dtrue%26application%3Ddubbo-test-service%26dispatcher%3Dall%26dubbo%3D2.5.3%26interface%3Dcn.andy.dubbo.DataService%26methods%3DdubboTest2%2CdubboTest%2CgetStringData%26mock%3Dtrue%26pid%3D47756%26retries%3D0%26service.filter%3DandyFilter%26side%3Dprovider%26threadpool%3Dfixed%26threads%3D100%26timeout%3D60000%26timestamp%3D1543287819642%26token%3D1234567&pid=47756®istry=zookeeper×tamp=1543287819603
registry.register(registedProviderUrl)是将自己(url)注册到注册中心。registedProviderUrl的地址是:
dubbo://192.168.86.1:20880/cn.andy.dubbo.DataService?anyhost=true&application=dubbo-test-service&dispatcher=all&dubbo=2.5.3&interface=cn.andy.dubbo.DataService&methods=dubboTest2,dubboTest,getStringData&mock=true&pid=47756&retries=0&service.filter=andyFilter&side=provider&threadpool=fixed&threads=100&timeout=60000×tamp=1543287819642&token=1234567
而 overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl)的地址添加了category=configurators&check=false这两项,添加了configurators(配置规则)的写入,为后续的监听做准备
provider://192.168.86.1:20880/cn.andy.dubbo.DataService?anyhost=true&application=dubbo-test-service&category=configurators&check=false&dispatcher=all&dubbo=2.5.3&interface=cn.andy.dubbo.DataService&methods=dubboTest2,dubboTest,getStringData&mock=true&pid=47756&retries=0&service.filter=andyFilter&side=provider&threadpool=fixed&threads=100&timeout=60000×tamp=1543287819642&token=1234567
//订阅override数据
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
这里是指,服务发布之后,当我们通过监控中心或者治理中心或者直接通过代码向注册中心写入配置规则时,注册中心会通知dubbo,重新发布添加了配置规则的这个服务。
打开监听服务
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
上面这句就是完成了invoker转为exporter。
最终,最转为dubboProtocol的
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// export service.
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
//export an stub service for dispaching event
Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY,Constants.DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice){
String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0 ){
if (logger.isWarnEnabled()){
logger.warn(new IllegalStateException("consumer [" +url.getParameter(Constants.INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
} else {
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}
openServer(url);
return exporter;
}
openServer(url)就是打开url所对应的netty的服务器端,进行监听。
private void openServer(URL url) {
// find server.
String key = url.getAddress();
//client 也可以暴露一个只有server可以调用的服务。
boolean isServer = url.getParameter(Constants.IS_SERVER_KEY,true);
if (isServer) {
ExchangeServer server = serverMap.get(key);
if (server == null) {
serverMap.put(key, createServer(url));
} else {
//server支持reset,配合override功能使用
server.reset(url);
}
}
}
private ExchangeServer createServer(URL url) {
//默认开启server关闭时发送readonly事件
url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
//默认开启heartbeat
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);
ExchangeServer server;
try {
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
str = url.getParameter(Constants.CLIENT_KEY);
if (str != null && str.length() > 0) {
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
return server;
}
其中, server = Exchangers.bind(url, requestHandler)里的requestHandler就包含了我们的invoker。
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
Invocation inv = (Invocation) message;
Invoker<?> invoker = getInvoker(channel, inv);
//如果是callback 需要处理高版本调用低版本的问题
if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))){
String methodsStr = invoker.getUrl().getParameters().get("methods");
boolean hasMethod = false;
if (methodsStr == null || methodsStr.indexOf(",") == -1){
hasMethod = inv.getMethodName().equals(methodsStr);
} else {
String[] methods = methodsStr.split(",");
for (String method : methods){
if (inv.getMethodName().equals(method)){
hasMethod = true;
break;
}
}
}
if (!hasMethod){
logger.warn(new IllegalStateException("The methodName "+inv.getMethodName()+" not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) +" ,invocation is :"+inv );
return null;
}
}
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
return invoker.invoke(inv);
}
throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
}
@Override
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
reply((ExchangeChannel) channel, message);
} else {
super.received(channel, message);
}
}
@Override
public void connected(Channel channel) throws RemotingException {
invoke(channel, Constants.ON_CONNECT_KEY);
}
@Override
public void disconnected(Channel channel) throws RemotingException {
if(logger.isInfoEnabled()){
logger.info("disconected from "+ channel.getRemoteAddress() + ",url:" + channel.getUrl());
}
invoke(channel, Constants.ON_DISCONNECT_KEY);
}
private void invoke(Channel channel, String methodKey) {
Invocation invocation = createInvocation(channel, channel.getUrl(), methodKey);
if (invocation != null) {
try {
received(channel, invocation);
} catch (Throwable t) {
logger.warn("Failed to invoke event method " + invocation.getMethodName() + "(), cause: " + t.getMessage(), t);
}
}
}
private Invocation createInvocation(Channel channel, URL url, String methodKey) {
String method = url.getParameter(methodKey);
if (method == null || method.length() == 0) {
return null;
}
RpcInvocation invocation = new RpcInvocation(method, new Class<?>[0], new Object[0]);
invocation.setAttachment(Constants.PATH_KEY, url.getPath());
invocation.setAttachment(Constants.GROUP_KEY, url.getParameter(Constants.GROUP_KEY));
invocation.setAttachment(Constants.INTERFACE_KEY, url.getParameter(Constants.INTERFACE_KEY));
invocation.setAttachment(Constants.VERSION_KEY, url.getParameter(Constants.VERSION_KEY));
if (url.getParameter(Constants.STUB_EVENT_KEY, false)){
invocation.setAttachment(Constants.STUB_EVENT_KEY, Boolean.TRUE.toString());
}
return invocation;
}
};
在received时间中调用reply方法,reply里有invoker.invoke(inv),里面有对方法的最终调用。