dubbo的服务注册
2020-06-07 本文已影响0人
剑道_7ffc
RegistryProtocol.export
// 把dubbo:// url注册到zk上
final Registry registry = getRegistry(originInvoker);
final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);
ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker,
registryUrl, registeredProviderUrl);
//to judge if we need to delay publish
boolean register = registeredProviderUrl.getParameter("register", true);
if (register) {
register(registryUrl, registeredProviderUrl);
providerInvokerWrapper.setReg(true);
}
RegistryProtocol#register
public void register(URL registryUrl, URL registeredProviderUrl) {
Registry registry = registryFactory.getRegistry(registryUrl);
registry.register(registeredProviderUrl);
}
RegistryFactory
根据url的协议来确定,默认值是dubbo,因为registryUrl是zookeeper://ip:port,所以调用的是ZookeeperRegistryFactory的register方法。
@SPI("dubbo")
public interface RegistryFactory {
@Adaptive({"protocol"})
Registry getRegistry(URL url);
}
AbstractRegistryFactory#getRegistry
若缓存中有则从缓存中取,否则则创建。
ZookeeperRegistryFactory#createRegistry
创建Registry对象,zookeeperTransporter根据自适应扩展点为CuratorZookeeperTransporter的对象。
public Registry createRegistry(URL url) {
return new ZookeeperRegistry(url, zookeeperTransporter);
}
ZookeeperRegistry#ZookeeperRegistry
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
String group = url.getParameter(GROUP_KEY, DEFAULT_ROOT);
if (!group.startsWith(PATH_SEPARATOR)) {
group = PATH_SEPARATOR + group;
}
this.root = group;
//连接注册中心
zkClient = zookeeperTransporter.connect(url);
//添加监听
zkClient.addStateListener(state -> {
if (state == StateListener.RECONNECTED) {
try {
recover();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
});
}
registry.register(registedProviderUrl)
因为registry是ZookeeperRegistry类的对象,而ZookeeperRegistry是FailbackRegistry的子类,所以调用FailbackRegistry的register方法
public void register(URL registryUrl, URL registeredProviderUrl) {
//registeredProviderUrl:dubbo://169.254.108.117:20880/com.my.dubbo.IPayService
Registry registry = registryFactory.getRegistry(registryUrl);
registry.register(registeredProviderUrl);
}
FailbackRegistry#register
失败重试的机制
public void register(URL url) {
super.register(url);
removeFailedRegistered(url);
removeFailedUnregistered(url);
try {
// Sending a registration request to the server side
doRegister(url);
} catch (Exception e) {
Throwable t = e;
// If the startup detection is opened, the Exception is thrown directly.
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true)
&& !CONSUMER_PROTOCOL.equals(url.getProtocol());
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
// Record a failed registration request to a failed list, retry regularly
addFailedRegistered(url);
}
}
ZookeeperRegistry#doRegister
public void doRegister(URL url) {
try {
zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}