☆聊聊Dubbo(九):核心源码-服务端启动流程2
3 ServiceConfig#doExportUrlsFor1Protocol 重点分析
3.1 组装URL所需参数
String name = protocolConfig.getName();
if (name == null || name.length() == 0) {
name = "dubbo";
}
Map<String, String> map = new HashMap<String, String>();
map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE);
map.put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion());
map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
if (ConfigUtils.getPid() > 0) {
map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
}
appendParameters(map, application);
appendParameters(map, module);
appendParameters(map, provider, Constants.DEFAULT_KEY);
appendParameters(map, protocolConfig);
appendParameters(map, this);
Step1:用 Map
存储该协议的所有配置参数,包括:协议名称、Dubbo版本、当前系统时间戳、进程ID、application配置、module配置、默认服务提供者参数(ProviderConfig)、协议配置、服务提供 Dubbo:service
的属性。
if (methods != null && !methods.isEmpty()) {
for (MethodConfig method : methods) {
appendParameters(map, method, method.getName());
String retryKey = method.getName() + ".retry";
if (map.containsKey(retryKey)) {
String retryValue = map.remove(retryKey);
if ("false".equals(retryValue)) {
map.put(method.getName() + ".retries", "0");
}
}
List<ArgumentConfig> arguments = method.getArguments();
if (arguments != null && !arguments.isEmpty()) {
for (ArgumentConfig argument : arguments) {
// convert argument type
if (argument.getType() != null && argument.getType().length() > 0) {
Method[] methods = interfaceClass.getMethods();
// visit all methods
if (methods != null && methods.length > 0) {
for (int i = 0; i < methods.length; i++) {
String methodName = methods[i].getName();
// target the method, and get its signature
if (methodName.equals(method.getName())) {
Class<?>[] argtypes = methods[i].getParameterTypes();
// one callback in the method
if (argument.getIndex() != -1) {
if (argtypes[argument.getIndex()].getName().equals(argument.getType())) {
appendParameters(map, argument, method.getName() + "." + argument.getIndex());
} else {
throw new IllegalArgumentException("argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
}
} else {
// multiple callbacks in the method
for (int j = 0; j < argtypes.length; j++) {
Class<?> argclazz = argtypes[j];
if (argclazz.getName().equals(argument.getType())) {
appendParameters(map, argument, method.getName() + "." + j);
if (argument.getIndex() != -1 && argument.getIndex() != j) {
throw new IllegalArgumentException("argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
}
}
}
}
}
}
}
} else if (argument.getIndex() != -1) {
appendParameters(map, argument, method.getName() + "." + argument.getIndex());
} else {
throw new IllegalArgumentException("argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>");
}
}
}
} // end of methods for
}
Step2:如果 dubbo:service
有 dubbo:method
子标签,则 dubbo:method
以及其子标签的配置属性,都存入到 Map
中,属性名称加上对应的方法名作为前缀。dubbo:method
的子标签 dubbo:argument
,其键为方法名.参数序号。
if (ProtocolUtils.isGeneric(generic)) {
map.put(Constants.GENERIC_KEY, generic);
map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);
} else {
String revision = Version.getVersion(interfaceClass, version);
if (revision != null && revision.length() > 0) {
map.put("revision", revision);
}
String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
if (methods.length == 0) {
logger.warn("NO method found in service interface " + interfaceClass.getName());
map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);
} else {
map.put(Constants.METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
}
}
Step3:添加 methods
键值对,存放 dubbo:service
的所有方法名,多个方法名用 ,
隔开,如果是泛化实现,填充 genric=true,methods
为 “*”
。
if (!ConfigUtils.isEmpty(token)) {
if (ConfigUtils.isDefault(token)) {
map.put(Constants.TOKEN_KEY, UUID.randomUUID().toString());
} else {
map.put(Constants.TOKEN_KEY, token);
}
}
Step4:根据是否开启令牌机制,如果开启,设置 token
键,值为静态值或 uuid
。
if (Constants.LOCAL_PROTOCOL.equals(protocolConfig.getName())) {
protocolConfig.setRegister(false);
map.put("notify", "false");
}
Step5:如果协议为本地协议( injvm
),则设置 protocolConfig#register
属性为 false
,表示不向注册中心注册服务,在 map
中存储键为 notify
,值为 false
,表示当注册中心监听到服务提供者发生变化(服务提供者增加、服务提供者减少等)事件时不通知。
String contextPath = protocolConfig.getContextpath();
if ((contextPath == null || contextPath.length() == 0) && provider != null) {
contextPath = provider.getContextpath();
}
Step6:设置协议的 contextPath
,如果未配置,默认为 /interfacename
。
String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
Integer port = this.findConfigedPorts(protocolConfig, name, map);
Step7:解析服务提供者的IP地址与端口。
private String findConfigedHosts(ProtocolConfig protocolConfig, List<URL> registryURLs, Map<String, String> map) {
boolean anyhost = false;
String hostToBind = getValueFromConfig(protocolConfig, Constants.DUBBO_IP_TO_BIND);
if (hostToBind != null && hostToBind.length() > 0 && isInvalidLocalHost(hostToBind)) {
throw new IllegalArgumentException("Specified invalid bind ip from property:" + Constants.DUBBO_IP_TO_BIND + ", value:" + hostToBind);
}
// if bind ip is not found in environment, keep looking up
if (hostToBind == null || hostToBind.length() == 0) {
hostToBind = protocolConfig.getHost();
if (provider != null && (hostToBind == null || hostToBind.length() == 0)) {
hostToBind = provider.getHost();
}
if (isInvalidLocalHost(hostToBind)) {
anyhost = true;
try {
hostToBind = InetAddress.getLocalHost().getHostAddress();
} catch (UnknownHostException e) {
logger.warn(e.getMessage(), e);
}
if (isInvalidLocalHost(hostToBind)) {
if (registryURLs != null && !registryURLs.isEmpty()) {
for (URL registryURL : registryURLs) {
if (Constants.MULTICAST.equalsIgnoreCase(registryURL.getParameter("registry"))) {
// skip multicast registry since we cannot connect to it via Socket
continue;
}
try {
Socket socket = new Socket();
try {
SocketAddress addr = new InetSocketAddress(registryURL.getHost(), registryURL.getPort());
socket.connect(addr, 1000);
hostToBind = socket.getLocalAddress().getHostAddress();
break;
} finally {
try {
socket.close();
} catch (Throwable e) {
}
}
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
}
}
if (isInvalidLocalHost(hostToBind)) {
hostToBind = getLocalHost();
}
}
}
}
map.put(Constants.BIND_IP_KEY, hostToBind);
// registry ip is not used for bind ip by default
String hostToRegistry = getValueFromConfig(protocolConfig, Constants.DUBBO_IP_TO_REGISTRY);
if (hostToRegistry != null && hostToRegistry.length() > 0 && isInvalidLocalHost(hostToRegistry)) {
throw new IllegalArgumentException("Specified invalid registry ip from property:" + Constants.DUBBO_IP_TO_REGISTRY + ", value:" + hostToRegistry);
} else if (hostToRegistry == null || hostToRegistry.length() == 0) {
// bind ip is used as registry ip by default
hostToRegistry = hostToBind;
}
map.put(Constants.ANYHOST_KEY, String.valueOf(anyhost));
return hostToRegistry;
}
服务IP地址解析顺序:(序号越小越优先)
- 系统环境变量,变量名:
DUBBO_DUBBO_IP_TO_BIND
- 系统属性,变量名:
DUBBO_DUBBO_IP_TO_BIND
- 系统环境变量,变量名:
DUBBO_IP_TO_BIND
- 系统属性,变量名:
DUBBO_IP_TO_BIND
dubbo:protocol
标签的host
属性 -->dubbo:provider
标签的host
属性- 默认网卡IP地址,通过
InetAddress.getLocalHost().getHostAddress()
获取,如果IP地址不符合要求,继续下一个匹配。// 判断IP地址是否符合要求的标准 public static boolean isInvalidLocalHost(String host) { return host == null || host.length() == 0 || host.equalsIgnoreCase("localhost") || host.equals("0.0.0.0") || (LOCAL_IP_PATTERN.matcher(host).matches()); }
- 选择第一个可用网卡,其实现方式是建立
socket
,连接注册中心,获取socket
的IP地址。Socket socket = new Socket(); try { SocketAddress addr = new InetSocketAddress(registryURL.getHost(), registryURL.getPort()); socket.connect(addr, 1000); hostToBind = socket.getLocalAddress().getHostAddress(); break; } finally { try { socket.close(); } catch (Throwable e) { } }
private Integer findConfigedPorts(ProtocolConfig protocolConfig, String name, Map<String, String> map) {
Integer portToBind = null;
// parse bind port from environment
String port = getValueFromConfig(protocolConfig, Constants.DUBBO_PORT_TO_BIND);
portToBind = parsePort(port);
// if there's no bind port found from environment, keep looking up.
if (portToBind == null) {
portToBind = protocolConfig.getPort();
if (provider != null && (portToBind == null || portToBind == 0)) {
portToBind = provider.getPort();
}
final int defaultPort = ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(name).getDefaultPort();
if (portToBind == null || portToBind == 0) {
portToBind = defaultPort;
}
if (portToBind == null || portToBind <= 0) {
portToBind = getRandomPort(name);
if (portToBind == null || portToBind < 0) {
portToBind = getAvailablePort(defaultPort);
putRandomPort(name, portToBind);
}
logger.warn("Use random available port(" + portToBind + ") for protocol " + name);
}
}
// save bind port, used as url's key later
map.put(Constants.BIND_PORT_KEY, String.valueOf(portToBind));
// registry port, not used as bind port by default
String portToRegistryStr = getValueFromConfig(protocolConfig, Constants.DUBBO_PORT_TO_REGISTRY);
Integer portToRegistry = parsePort(portToRegistryStr);
if (portToRegistry == null) {
portToRegistry = portToBind;
}
return portToRegistry;
}
服务提供者端口解析顺序:(序号越小越优先)
- 系统环境变量,变量名:
DUBBO_DUBBO_PORT_TO_BIND
- 系统属性,变量名:
DUBBO_DUBBO_PORT_TO_BIND
- 系统环境变量,变量名:
DUBBO_PORT_TO_BIND
- 系统属性,变量名:
DUBBO_PORT_TO_BIND
dubbo:protocol
标签port
属性 -->dubbo:provider
标签的port
属性。- 随机选择一个端口。
3.2 封装URL实例
URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);
if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.hasExtension(url.getProtocol())) {
url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.getExtension(url.getProtocol()).getConfigurator(url).configure(url);
}
Step8:根据协议名称、协议 host
、协议端口、contextPath
、相关配置属性(application
、module
、provider
、protocolConfig
、service
及其子标签)构建服务提供者URI。
URL运行效果图,如下:
URL运行效果图3.3 构建Invoker实例
String scope = url.getParameter(Constants.SCOPE_KEY);
// don't export when none is configured
if (!Constants.SCOPE_NONE.equalsIgnoreCase(scope)) { // @ 代码1
// export to local if the config is not remote (export to remote only when config is remote)
if (!Constants.SCOPE_REMOTE.equalsIgnoreCase(scope)) { // @ 代码2
exportLocal(url);
}
// export to remote if the config is not local (export to local only when config is local)
if (!Constants.SCOPE_LOCAL.equalsIgnoreCase(scope)) {
if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
if (registryURLs != null && !registryURLs.isEmpty()) { // @ 代码3
for (URL registryURL : registryURLs) {
url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY)); // @ 代码4
URL monitorUrl = loadMonitor(registryURL); // @ 代码5
if (monitorUrl != null) {
url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
}
if (logger.isInfoEnabled()) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
}
// For providers, this is used to enable custom proxy to generate invoker
String proxy = url.getParameter(Constants.PROXY_KEY);
if (StringUtils.isNotEmpty(proxy)) {
registryURL = registryURL.addParameter(Constants.PROXY_KEY, proxy);
}
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString())); // @ 代码6
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker); // 代码7
exporters.add(exporter);
}
} else {
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
}
}
Step9:获取 dubbo:service
标签的 scope
属性,其可选值为 none
(不暴露)、local
(本地)、remote
(远程),如果配置为 none
,则不暴露。默认为 local
。
Step10:根据 scope
来暴露服务,如果 scope
不配置,则默认本地与远程都会暴露,如果配置成 local
或 remote
,那就只能是二选一。
通过动态代理机制创建Invoker代码1:如果
scope
不为remote
,则先在本地暴露(injvm
),具体暴露服务的具体实现,将在remote 模式中详细分析。代码2:如果
scope
不为local
,则将服务暴露在远程。代码3:
remote
方式,检测当前配置的所有注册中心,如果注册中心不为空,则遍历注册中心,将服务依次在不同的注册中心进行注册。代码4:如果
dubbo:service
的dynamic
属性未配置, 尝试取dubbo:registry
的dynamic
属性,该属性的作用是否启用动态注册,如果设置为false
,服务注册后,其状态显示为disable
,需要人工启用,当服务不可用时,也不会自动移除,同样需要人工处理,此属性不要在生产环境上配置。代码5:根据注册中心URL,构建监控中心的URL,如果监控中心URL不为空,则在服务提供者URL上追加
monitor
,其值为监控中心URL(已编码)。1)如果dubbo spring xml配置文件中没有配置监控中心(dubbo:monitor),就从系统属性-Ddubbo.monitor.address,-Ddubbo.monitor.protocol构建MonitorConfig对象,否则从dubbo的properties配置文件中寻找这个两个参数,如果没有配置,则返回null。 2)如果有配置,则追加相关参数,dubbo:monitor标签只有两个属性:address、protocol,其次会追加interface(MonitorService)、协议等。
代码6:通过动态代理机制创建
Invoker
,Dubbo的远程调用实现类。
Dubbo远程调用器如何构建,这里不详细深入,重点关注WrapperInvoker的url为:
registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider
&dubbo=2.0.0
&export=dubbo%3A%2F%2F192.168.56.1%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider%26bind.ip%3D192.168.56.1%26bind.port%3D20880%26dubbo%3D2.0.0%26generic%3Dfalse%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D6328%26qos.port%3D22222%26side%3Dprovider%26timestamp%3D1527255510215
&pid=6328
&qos.port=22222
®istry=zookeeper
×tamp=1527255510202
这里有两个重点值得关注:
Protocol 适配实现类实例
- path属性:
com.alibaba.dubbo.registry.RegistryService
,注册中心也类似于服务提供者。- export属性:值为服务提供者的URL,为什么需要关注这个URL呢?请看代码7,
protocol
属性为Protocol$Adaptive
,Dubbo在加载组件实现类时采用SPI(关于SPI细节,可参阅《☆聊聊Dubbo(五):核心源码-SPI扩展》
),在这里我们只需要知道,根据URL冒号之前的协议名将会调用相应的方法。
其映射关系(列出与服务启动相关协议实现类):
dubbo=com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol // 文件位于dubbo-rpc-dubbo/src/main/resources/META-INF/dubbo/internal/com.alibaba.dubbo.rpc.Protocol
registry=com.alibaba.dubbo.registry.integration.RegistryProtocol // 文件位于dubbo-registry-api/src/main/resources/META-INF/dubbo/internal/com.alibaba.dubbo.rpc.Protocol
代码7:根据代码6的分析,将调用
RegistryProtocol#export
方法。
这里很重要的是 Invoker
实例,作为Dubbo的核心模型,其它模型都向它靠扰,或转换成它,它代表一个可执行体,可向它发起invoke调用,它有可能是一个本地的实现,也可能是一个远程的实现,也可能一个集群实现。
所以,下面重点分析 代码6 & 代码7
两处代码实现,源码如下:
// 使用ProxyFactory将服务实现封装成一个Invoker对象
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString())); // @ 代码6
// 根据指定协议本地暴露和向注册中心注册服务
Exporter<?> exporter = protocol.export(invoker); // @ 代码7
//用于unexport
exporters.add(exporter);
上面 proxyFactory
和 protocol
两个变量,具体定义如下:
private static final ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
Invoker
实例从 proxyFactory
获取,而 proxyFactory
在这里实际是个适配器,通过调用 getAdaptiveExtension()
方法,会以拼接源码的方式动态生成目标ProxyFactory Class,生成的Class方法中会获取 url
中的参数来构建合适的具体实现对象,如果 url
中未配置,则使用 @SPI
配置的默认值。
查看 ProxyFactory
和 Protocol
接口,默认 ProxyFactory
实现为 JavassistProxyFactory
,默认 Protocol
实现为 DubboProtocol
。源码如下:
// 默认javassist
@SPI("javassist")
public interface ProxyFactory {
...
}
// 默认dubbo
@SPI("dubbo")
public interface Protocol {
...
}
ExtensionLoader#getAdaptiveExtension()
调用栈,如下:
ExtensionLoader<T>.getAdaptiveExtension()
ExtensionLoader<T>.createAdaptiveExtension()
ExtensionLoader<T>.getAdaptiveExtensionClass()
ExtensionLoader<T>.createAdaptiveExtensionClass()
ExtensionLoader<T>.createAdaptiveExtensionClassCode()
最终,生成目标ProxyFactory Class,源码如下:
public class ProxyFactory$Adpative implements com.alibaba.dubbo.rpc.ProxyFactory {
public java.lang.Object getProxy(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.Invoker {
if (arg0 == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
com.alibaba.dubbo.common.URL url = arg0.getUrl();
String extName = url.getParameter("proxy", "javassist");
if(extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])");
com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName);
return extension.getProxy(arg0);
}
public com.alibaba.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.Class arg1, com.alibaba.dubbo.common.URL arg2) throws java.lang.Object {
if (arg2 == null)
throw new IllegalArgumentException("url == null");
com.alibaba.dubbo.common.URL url = arg2;
String extName = url.getParameter("proxy", "javassist");
if(extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])");
com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName);
return extension.getInvoker(arg0, arg1, arg2);
}
}
可以看到,在上面的 getInvoker
方法中,会优先获取 proxy
扩展,否则默认获取 javassist
扩展。一般情况下,我们未主动扩展配置代理工厂的话,使用 JavassistProxyFactory
,源码如下:
public class JavassistProxyFactory extends AbstractProxyFactory {
@SuppressWarnings("unchecked")
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// TODO Wrapper类不能正确处理带$的类名
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
}
注意到这里的入参包括 proxy
服务实例和其接口类型,因为需要对服务进行代理封装,最终是生成一个 AbstractProxyInvoker
实例,其 doInvoker
方法成为服务调用的入口。以下是具体的封装过程:
public static Wrapper getWrapper(Class<?> c) {
while( ClassGenerator.isDynamicClass(c) ) // can not wrapper on dynamic class.
c = c.getSuperclass();
if( c == Object.class )
return OBJECT_WRAPPER;
Wrapper ret = WRAPPER_MAP.get(c);
if( ret == null )
{
ret = makeWrapper(c);
WRAPPER_MAP.put(c,ret);
}
return ret;
}
具体的 makeWrapper
方法是利用 javassist
技术动态构造 Wapper
类型并创建实例,源码较长这里不再列出,以下是 Wapper
类型的 invokeMethod
方法源码(注意是 javasssit
语法形式):
public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws java.lang.reflect.InvocationTargetException {
indi.cesc.inno.learn.dubbo.HelloService w;
try{
w = ((indi.cesc.inno.learn.dubbo.HelloService)$1);
}catch(Throwable e){
throw new IllegalArgumentException(e);
}
try{
if( "sayHello".equals( $2 ) && $3.length == 1 ) {
return ($w)w.sayHello((indi.cesc.inno.learn.dubbo.HelloRequest)$4[0]); // 真实方法调用
}
} catch(Throwable e) {
throw new java.lang.reflect.InvocationTargetException(e);
}
throw new com.alibaba.dubbo.common.bytecode.NoSuchMethodException("Not found method \""+$2+"\" in class indi.cesc.inno.learn.dubbo.HelloService.");
}
可以看到 w.sayHello()
这就是直接通过服务的实现对象调用具体方法,并不是通过反射,效率会高些。默认使用Javassist而不是JDK动态代理也是出于效率的考虑。
这里就将真实服务加入到整体调用链条之中,后续再将 Invoker
往上层传递,打通整个链条。
继续上面 代码7
处的代码,protocol
实例调用 export
方法进入后续流程。这里的 protocol
类型实际依旧是个适配器,export
方法源码如下:
public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.Invoker {
if (arg0 == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
com.alibaba.dubbo.common.URL url = arg0.getUrl();
String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
if(extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.export(arg0);
}
注意 invoker
的 url
不是服务暴露的 url
,而是协议注册的 url
,因此 url
里面的协议是 registry
。尝试获取名为 registry
的 Protocol
扩展,但进入 ExtensionLoader
后被拦截,实际拿到了其封装类 ProtocolFilterWrapper
,其负责组装过滤器链。
在/META-INF/dubbo/internal/com.alibaba.dubbo.rpc.Protocol
中配置有:
filter=com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper
listener=com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper
则获取 RegistryProtocol
实例会被 ProtocolFilterWrapper
和 ProtocolListenerWrapper
装饰,分别用来实现拦截器和监听器功能,查看这两个Wrapper的代码可以看出,对于注册url都做了特别处理,向注册中心发布url不会触发拦截器和监听器功能,只有在真正暴露服务时才会注册拦截器,触发监听器。
ProtocolFilterWrapper#export
方法,源码如下:
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
return protocol.export(invoker);
}
// 此处,将直接进入 RegistryProtocol 的 export 方法
return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
}
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
if (!filters.isEmpty()) {
for (int i = filters.size() - 1; i >= 0; i--) {
final Filter filter = filters.get(i);
final Invoker<T> next = last;
last = new Invoker<T>() {
@Override
public Class<T> getInterface() {
return invoker.getInterface();
}
@Override
public URL getUrl() {
return invoker.getUrl();
}
@Override
public boolean isAvailable() {
return invoker.isAvailable();
}
@Override
public Result invoke(Invocation invocation) throws RpcException {
return filter.invoke(next, invocation);
}
@Override
public void destroy() {
invoker.destroy();
}
@Override
public String toString() {
return invoker.toString();
}
};
}
}
return last;
}
此处,将直接进入 RegistryProtocol
的 export
方法。
3.4 注册发布服务
依据上面分析,最终注册发布服务调用链:ServiceBean#afterPropertiesSet —> ServiceConfig#export —> ServiceConfig#doExport —> ServiceConfig#doExportUrlsFor1Protocol —> RegistryProtocol#export
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
//export invoker
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker); // @ 代码1
URL registryUrl = getRegistryUrl(originInvoker); // @ 代码2
//registry provider
final Registry registry = getRegistry(originInvoker); // @ 代码3
final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker); // @ 代码4 start
//to judge to delay publish whether or not
boolean register = registeredProviderUrl.getParameter("register", true);
ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);
if (register) {
register(registryUrl, registeredProviderUrl); // @ 代码4 end
ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
}
// Subscribe the override data
// FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call the same service. Because the subscribed is cached key with the name of the service, it causes the subscription information to cover.
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl); // @ 代码5 start
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); // @ 代码5 end
//Ensure that a new exporter instance is returned every time export
return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl);
}
代码1:启动服务提供者服务,监听指定端口,准备服务消费者的请求,这里其实就是从
WrapperInvoker
中的url
(注册中心url
)中提取export
属性,描述服务提供者的url
,然后启动服务提供者。
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
String key = getCacheKey(originInvoker);
ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
synchronized (bounds) {
exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
bounds.put(key, exporter);
}
}
}
return exporter;
}
调用DubboProtocol#export完成Dubbo服务的启动
从上图中,可以看出,将调用 DubboProtocol#export
完成Dubbo服务的启动,利用netty构建一个微型服务端,监听端口,准备接受服务消费者的网络请求,然后将 dubbo:service
的服务handler加入到命令处理器中,当有消息消费者连接该端口时,通过网络解包,将需要调用的服务和参数等信息解析处理后,转交给对应的服务实现类处理即可。
代码2:获取真实注册中心的URL,例如:zookeeper注册中心的URL。
zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider
&dubbo=2.0.0
&export=dubbo%3A%2F%2F192.168.56.1%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider%26bind.ip%3D192.168.56.1%26bind.port%3D20880%26dubbo%3D2.0.0%26generic%3Dfalse%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D10252%26qos.port%3D22222%26side%3Dprovider%26timestamp%3D1527263060882
&pid=10252
&qos.port=22222
×tamp=1527263060867
代码3:根据注册中心URL,从注册中心工厂中获取指定的注册中心实现类:zookeeper注册中心的实现类为:
ZookeeperRegistry
。代码4:获取服务提供者URL中的
register
属性,如果为true
,则调用注册中心的ZookeeperRegistry#register
方法向注册中心注册服务(实际由其父类FailbackRegistry
实现)。
RegistryProtocol#register
方法,源码如下:
public void register(URL registryUrl, URL registedProviderUrl) {
Registry registry = registryFactory.getRegistry(registryUrl);
registry.register(registedProviderUrl);
}
FailbackRegistry#register
方法,源码如下:
@Override
public void register(URL url) {
super.register(url);
failedRegistered.remove(url);
failedUnregistered.remove(url);
try {
// Sending a registration request to the server side
doRegister(url);
} catch (Exception e) {
Throwable t = e;
// If the startup detection is opened, the Exception is thrown directly.
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true)
&& !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
// Record a failed registration request to a failed list, retry regularly
failedRegistered.add(url);
}
}
ZookeeperRegistry#doRegister
方法,源码如下:
@Override
protected void doRegister(URL url) {
try {
zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
代码5:服务提供者向注册中心订阅自己,主要是为了服务提供者URL发生变化的时候,会触发
overrideSubscribeListener
的notify
方法重新暴露服务。当然,会将dubbo:reference
的check
属性设置为false
。
为了感知注册中心的一些配置变化,提供者会监听注册中心路径 /dubbo/${interfaceClass}/configurators
的节点,监听该节点在注册中心的一些配置信息变更。Zookeeper注册中心通过zookeeper框架的监听回调接口进行监听(redis注册中心通过订阅命令(subscribe)监听),服务器缓存注册中心的配置,当配置发生变更时,服务会刷新本地缓存。
FailbackRegistry#subscribe
订阅方法,源码如下:
@Override
public void subscribe(URL url, NotifyListener listener) {
super.subscribe(url, listener);
removeFailedSubscribed(url, listener);
try {
// Sending a subscription request to the server side
doSubscribe(url, listener);
} catch (Exception e) {
Throwable t = e;
List<URL> urls = getCacheUrls(url);
if (urls != null && !urls.isEmpty()) {
notify(url, listener, urls);
logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getUrl().getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/dubbo-registry-" + url.getHost() + ".cache") + ", cause: " + t.getMessage(), t);
} else {
// If the startup detection is opened, the Exception is thrown directly.
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true);
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
}
// Record a failed registration request to a failed list, retry regularly
addFailedSubscribed(url, listener);
}
}
ZookeeperRegistry#doSubscribe
订阅方法,源码如下:
@Override
protected void doSubscribe(final URL url, final NotifyListener listener) {
try {
if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
String root = toRootPath();
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) {
zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
listeners = zkListeners.get(url);
}
ChildListener zkListener = listeners.get(listener);
if (zkListener == null) {
listeners.putIfAbsent(listener, new ChildListener() {
@Override
public void childChanged(String parentPath, List<String> currentChilds) {
for (String child : currentChilds) {
child = URL.decode(child);
if (!anyServices.contains(child)) {
anyServices.add(child);
subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child,
Constants.CHECK_KEY, String.valueOf(false)), listener);
}
}
}
});
zkListener = listeners.get(listener);
}
zkClient.create(root, false);
List<String> services = zkClient.addChildListener(root, zkListener);
if (services != null && !services.isEmpty()) {
for (String service : services) {
service = URL.decode(service);
anyServices.add(service);
subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service,
Constants.CHECK_KEY, String.valueOf(false)), listener);
}
}
} else {
List<URL> urls = new ArrayList<URL>();
for (String path : toCategoriesPath(url)) {
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
if (listeners == null) {
zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
listeners = zkListeners.get(url);
}
ChildListener zkListener = listeners.get(listener);
if (zkListener == null) {
listeners.putIfAbsent(listener, new ChildListener() {
@Override
public void childChanged(String parentPath, List<String> currentChilds) {
ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
}
});
zkListener = listeners.get(listener);
}
zkClient.create(path, false);
List<String> children = zkClient.addChildListener(path, zkListener);
if (children != null) {
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
notify(url, listener, urls);
}
} catch (Throwable e) {
throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
3.5 打通服务网络
本节是切实最最核心的,重点关注 RegistryProtocol#export
中调用 doLocalExport
方法,其实主要是 根据各自协议,服务提供者建立网络服务器,在特定端口建立监听,监听来自消息消费端服务的请求。
RegistryProtocol#doLocalExport,源码如下:
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
String key = getCacheKey(originInvoker);
ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
synchronized (bounds) {
exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker)); // @ 代码1
exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker); // @ 代码2
bounds.put(key, exporter);
}
}
}
return exporter;
}
代码1:如果服务提供者以
dubbo
协议暴露服务,getProviderUrl(originInvoker)返回的URL将以dubbo://
开头。代码2:根据Dubbo内置的SPI机制,将调用
DubboProtocol#export
方法。
DubboProtocol#export,源码如下:
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl(); // @ 代码1
// export service.
String key = serviceKey(url); // @ 代码2
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
//export an stub service for dispatching event
Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT); // @ 代码3 start
Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
} else {
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
} // @ 代码3 end
openServer(url); // @ 代码4
optimizeSerialization(url); // @ 代码5
return exporter;
}
代码1:获取服务提供者URL,以协议名称,这里是
dubbo://
开头。代码2:从服务提供者URL中获取服务名,
key: interface:port
,例如:com.alibaba.dubbo.demo.DemoService:20880
。代码3:是否将转发事件导出成
stub
。代码4:根据url打开服务。
代码5:根据url优化器序列化方式。
DubboProtocol#openServer,源码如下:
private void openServer(URL url) {
// find server.
String key = url.getAddress(); // @ 代码1
//client can export a service which's only for server to invoke
boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
if (isServer) {
ExchangeServer server = serverMap.get(key);
if (server == null) {
synchronized (this) {
server = serverMap.get(key); // @ 代码2
if (server == null) {
serverMap.put(key, createServer(url)); // @ 代码3
}
}
} else {
// server supports reset, use together with override
server.reset(url); // @代码4
}
}
}
代码1:根据url获取网络地址:
ip:port
,例如:192.168.56.1:20880
,服务提供者IP与暴露服务端口号。代码2:根据key从服务器缓存中获取,如果存在,则执行代码4,如果不存在,则执行代码3.
代码3:根据URL创建一服务器,Dubbo服务提供者服务器实现类为
ExchangeServer
。代码4:如果服务器已经存在,用当前URL重置服务器,这个不难理解,因为一个Dubbo服务中,会存在多个
dubbo:service
标签,这些标签都会在服务台提供者的同一个IP地址、端口号上暴露服务。
DubboProtocol#createServer,源码如下:
private ExchangeServer createServer(URL url) {
// send readonly event when server closes, it's enabled by default
url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString()); // @ 代码1
// enable heartbeat by default
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT)); // @ 代码2
String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER); // @ 代码3
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) // @ 代码4
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME); // @ 代码5
ExchangeServer server;
try {
server = Exchangers.bind(url, requestHandler); // @ 代码6
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
str = url.getParameter(Constants.CLIENT_KEY); // @ 代码7
if (str != null && str.length() > 0) {
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
return server;
}
代码1:为服务提供者url增加
channel.readonly.sent
属性,默认为true
,表示在发送请求时,是否等待将字节写入socket后再返回,默认为true
。代码2:为服务提供者url增加
heartbeat
属性,表示心跳间隔时间,默认为60*1000
,表示60s。代码3:为服务提供者url增加
server
属性,可选值为netty,mina
等等,默认为netty
。代码4:根据SPI机制,判断
server
属性是否支持。代码5:为服务提供者url增加
codec
属性,默认值为dubbo
,协议编码方式。代码6:根据服务提供者URI,服务提供者命令请求处理器
requestHandler
构建ExchangeServer
实例。requestHandler
的实现具体在以后详细分析Dubbo服务调用时再详细分析。代码7:验证客户端类型是否可用。
Exchangers#bind方法,根据 URL
、ExchangeHandler
构建服务器,源码如下:
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
return getExchanger(url).bind(url, handler);
}
public static Exchanger getExchanger(URL url) {
String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);
return getExchanger(type);
}
public static Exchanger getExchanger(String type) {
return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);
}
上述代码不难看出,首先根据 url
获取 Exchanger
实例,然后调用 bind
方法构建 ExchangeServer
,Exchanger
接口方法如下:
ExchangeServer bind(URL url, ExchangeHandler handler)
:服务提供者调用。ExchangeClient connect(URL url, ExchangeHandler handler)
:服务消费者调用。
Dubbo提供的实现类为:HeaderExchanger
,其 bind
方法如下:
@Override
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
从此处可以看到,端口的绑定由 Transporters
的 bind
方法实现。源码如下:
public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handlers == null || handlers.length == 0) {
throw new IllegalArgumentException("handlers == null");
}
ChannelHandler handler;
if (handlers.length == 1) {
handler = handlers[0];
} else {
handler = new ChannelHandlerDispatcher(handlers);
}
return getTransporter().bind(url, handler);
}
public static Transporter getTransporter() {
return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
}
从这里得知,Dubbo网络传输的接口有 Transporter
接口实现,其继承类图所示:
本文以netty版本来查看一下 Transporter
实现。 NettyTransporter
源码如下:
public class NettyTransporter implements Transporter {
public static final String NAME = "netty3";
@Override
public Server bind(URL url, ChannelHandler listener) throws RemotingException {
return new NettyServer(url, listener);
}
@Override
public Client connect(URL url, ChannelHandler listener) throws RemotingException {
return new NettyClient(url, listener);
}
}
创建 NettyServer
实例时,其父类构造函数会调用 doOpen()
建立网络连接,源码如下:
@Override
protected void doOpen() throws Throwable {
NettyHelper.setNettyLoggerFactory();
ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
bootstrap = new ServerBootstrap(channelFactory);
final NettyHandler nettyHandler = new NettyHandler(getUrl(), this); // @ 代码1
channels = nettyHandler.getChannels();
// https://issues.jboss.org/browse/NETTY-365
// https://issues.jboss.org/browse/NETTY-379
// final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));
bootstrap.setOption("child.tcpNoDelay", true);
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ChannelPipeline pipeline = Channels.pipeline();
/*int idleTimeout = getIdleTimeout();
if (idleTimeout > 10000) {
pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
}*/
pipeline.addLast("decoder", adapter.getDecoder());
pipeline.addLast("encoder", adapter.getEncoder());
pipeline.addLast("handler", nettyHandler); // @ 代码2
return pipeline;
}
});
// bind
channel = bootstrap.bind(getBindAddress());
}
从本方法 代码1 & 代码2
了解,首先创建 NettyServer
必须传入一个服务提供者 URL
,但从 DubboProtocol#createServer
中可以看出,Server是基于网络套接字 (ip:port)
缓存的,一个JVM应用中,必然会存在多个 dubbo:service
标签,就会有多个 URL
,这里为什么可以这样做呢?
从 DubboProtocol#createServer
中可以看出,在解析第二个 dubbo:service
标签时并不会调用 createServer
,而是会调用 Server#reset
方法,是不是这个方法有什么魔法,在reset方法时能将URL也注册到Server上?
那接下来分析 NettyServer#reset
方法是如何实现的?DubboProtocol#reset
方法最终将调用 Server
的 reset
方法,同样还是以netty版本的 NettyServer
为例,查看reset方法的实现原理。 NettyServer#reset—>父类(AbstractServer) AbstractServer#reset
,源码如下:
@Override
public void reset(URL url) {
if (url == null) {
return;
}
try {
if (url.hasParameter(Constants.ACCEPTS_KEY)) {
int a = url.getParameter(Constants.ACCEPTS_KEY, 0);
if (a > 0) {
this.accepts = a;
}
}
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
try {
if (url.hasParameter(Constants.IDLE_TIMEOUT_KEY)) {
int t = url.getParameter(Constants.IDLE_TIMEOUT_KEY, 0);
if (t > 0) {
this.idleTimeout = t;
}
}
} catch (Throwable t) {
logger.error(t.getMessage(), t);
}
try {
if (url.hasParameter(Constants.THREADS_KEY)
&& executor instanceof ThreadPoolExecutor && !executor.isShutdown()) { // @ 代码1 start
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor;
int threads = url.getParameter(Constants.THREADS_KEY, 0);
int max = threadPoolExecutor.getMaximumPoolSize();
int core = threadPoolExecutor.getCorePoolSize();
if (threads > 0 && (threads != max || threads != core)) {
if (threads < core) {
threadPoolExecutor.setCorePoolSize(threads);
if (core == max) {
threadPoolExecutor.setMaximumPoolSize(threads);
}
} else {
threadPoolExecutor.setMaximumPoolSize(threads);
if (core == max) {
threadPoolExecutor.setCorePoolSize(threads);
}
}
}
}
} catch (Throwable t) {
logger.error(t.getMessage(), t);
} // @ 代码1 end
super.setUrl(getUrl().addParameters(url.getParameters())); // @ 代码2
}
代码1:首先是调整线程池的相关线程数量,这个好理解。
代码2:然后设置调用
setUrl
覆盖原先NettyServer
的private volatile URL url
的属性,那为什么不会影响原先注册的dubbo:service
呢?原来NettyHandler
上加了注解:@Sharable
,由该注解去实现线程安全。