dubbo的服务注册

2020-06-07  本文已影响0人  剑道_7ffc

RegistryProtocol.export

// 把dubbo:// url注册到zk上
final Registry registry = getRegistry(originInvoker);
final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);
ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker,
        registryUrl, registeredProviderUrl);
//to judge if we need to delay publish
boolean register = registeredProviderUrl.getParameter("register", true);
if (register) {
    register(registryUrl, registeredProviderUrl);
    providerInvokerWrapper.setReg(true);
}

RegistryProtocol#register

public void register(URL registryUrl, URL registeredProviderUrl) {
    Registry registry = registryFactory.getRegistry(registryUrl);
    registry.register(registeredProviderUrl);
}

RegistryFactory

根据url的协议来确定,默认值是dubbo,因为registryUrl是zookeeper://ip:port,所以调用的是ZookeeperRegistryFactory的register方法。

@SPI("dubbo")
public interface RegistryFactory {
    @Adaptive({"protocol"})
    Registry getRegistry(URL url);
}

AbstractRegistryFactory#getRegistry

若缓存中有则从缓存中取,否则则创建。

ZookeeperRegistryFactory#createRegistry

创建Registry对象,zookeeperTransporter根据自适应扩展点为CuratorZookeeperTransporter的对象。

public Registry createRegistry(URL url) {
    return new ZookeeperRegistry(url, zookeeperTransporter);
}

ZookeeperRegistry#ZookeeperRegistry

public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
    String group = url.getParameter(GROUP_KEY, DEFAULT_ROOT);
    if (!group.startsWith(PATH_SEPARATOR)) {
        group = PATH_SEPARATOR + group;
    }
    this.root = group;
    //连接注册中心
    zkClient = zookeeperTransporter.connect(url);
    //添加监听
    zkClient.addStateListener(state -> {
        if (state == StateListener.RECONNECTED) {
            try {
                recover();
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            }
        }
    });
}

registry.register(registedProviderUrl)

因为registry是ZookeeperRegistry类的对象,而ZookeeperRegistry是FailbackRegistry的子类,所以调用FailbackRegistry的register方法

public void register(URL registryUrl, URL registeredProviderUrl) {
    //registeredProviderUrl:dubbo://169.254.108.117:20880/com.my.dubbo.IPayService
    Registry registry = registryFactory.getRegistry(registryUrl);
    registry.register(registeredProviderUrl);
}

FailbackRegistry#register

失败重试的机制

public void register(URL url) {
    super.register(url);
    removeFailedRegistered(url);
    removeFailedUnregistered(url);
    try {
        // Sending a registration request to the server side
        doRegister(url);
    } catch (Exception e) {
        Throwable t = e;

        // If the startup detection is opened, the Exception is thrown directly.
        boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                && url.getParameter(Constants.CHECK_KEY, true)
                && !CONSUMER_PROTOCOL.equals(url.getProtocol());
        boolean skipFailback = t instanceof SkipFailbackWrapperException;
        if (check || skipFailback) {
            if (skipFailback) {
                t = t.getCause();
            }
            throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
        } else {
            logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
        }

        // Record a failed registration request to a failed list, retry regularly
        addFailedRegistered(url);
    }
}

ZookeeperRegistry#doRegister

public void doRegister(URL url) {
    try {
        zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
    } catch (Throwable e) {
        throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}
上一篇下一篇

猜你喜欢

热点阅读