xxl-系列

2019-11-21  本文已影响0人  93张先生

xxl-register

register-client

XxlRegisteryBaseClient

功能:

提供最简单的register remove discovery monitor方法,向注册中心发送请求并返回结果.

XxlRegisteryClient

属性:

1.registeryData (HashSet):要注册的数据;
2.discoveryData (ConcurrentHashMap<String,TreeSet<<String>>) :需要发现的数据;

后台线程:

1.registeryThread:一个注册的后台线程:向注册中心注册数据,每隔10秒钟注册一次;通过Thread.sleep()实现;
2.discoveryThread:一个发现的后台线程:没有要查找的数据,sleep(3 second);有要查找的数据,监控注册中心数据;然后sleep(10 second);刷新注册的数据;通过已经缓存数据的集合对比,没有改变不需要更新;

功能:

regisitery:注册数据加入缓存,调用基础的注册方法(registryBaseClient.registry());
remove:移除数据移除缓存,调用基础的移除方法(registryBaseClient.remove());
discovery:先从缓存查询注册的数据,数据不一致,然后去刷新注册中心的数据,根据更新的数据,刷新本地缓存;

// registry thread
registryThread = new Thread(new Runnable() {
    @Override
    public void run() {
        while (!registryThreadStop) {
            try {
                if (registryData.size() > 0) {

                    boolean ret = registryBaseClient.registry(new ArrayList<XxlRegistryDataParamVO>(registryData));
                    logger.debug(">>>>>>>>>>> xxl-registry, refresh registry data {}, registryData = {}", ret?"success":"fail",registryData);
                }
            } catch (Exception e) {
                if (!registryThreadStop) {
                    logger.error(">>>>>>>>>>> xxl-registry, registryThread error.", e);
                }
            }
            try {
                // per 10 seconds register once
                TimeUnit.SECONDS.sleep(10);
            } catch (Exception e) {
                if (!registryThreadStop) {
                    logger.error(">>>>>>>>>>> xxl-registry, registryThread error.", e);
                }
            }
        }
        logger.info(">>>>>>>>>>> xxl-registry, registryThread stoped.");
    }
});
 // discovery thread
    discoveryThread = new Thread(new Runnable() {
        @Override
        public void run() {
            while (!registryThreadStop) {

                if (discoveryData.size() == 0) {
                    try {
                        TimeUnit.SECONDS.sleep(3);
                    } catch (Exception e) {
                        if (!registryThreadStop) {
                            logger.error(">>>>>>>>>>> xxl-registry, discoveryThread error.", e);
                        }
                    }
                } else {
                    try {
                        // monitor
                        boolean monitorRet = registryBaseClient.monitor(discoveryData.keySet());

                        // avoid fail-retry request too quick
                        if (!monitorRet){
                            TimeUnit.SECONDS.sleep(10);
                        }

                        // refreshDiscoveryData, all
                        refreshDiscoveryData(discoveryData.keySet());
                    } catch (Exception e) {
                        if (!registryThreadStop) {
                            logger.error(">>>>>>>>>>> xxl-registry, discoveryThread error.", e);
                        }
                    }
                }

            }
            logger.info(">>>>>>>>>>> xxl-registry, discoveryThread stoped.");
        }
    });

/**
 * refreshDiscoveryData, some or all
 */
private void refreshDiscoveryData(Set<String> keys){
    if (keys==null || keys.size() == 0) {
        return;
    }

    // discovery mult
    Map<String, TreeSet<String>> updatedData = new HashMap<>();

    Map<String, TreeSet<String>> keyValueListData = registryBaseClient.discovery(keys);
    if (keyValueListData!=null) {
        for (String keyItem: keyValueListData.keySet()) {

            // list > set
            TreeSet<String> valueSet = new TreeSet<>();
            valueSet.addAll(keyValueListData.get(keyItem));

            // valid if updated
            boolean updated = true;
            TreeSet<String> oldValSet = discoveryData.get(keyItem);
            if (oldValSet!=null && BasicJson.toJson(oldValSet).equals(BasicJson.toJson(valueSet))) {
                updated = false;
            }

            // set
            if (updated) {
                discoveryData.put(keyItem, valueSet);
                updatedData.put(keyItem, valueSet);
            }

        }
    }

    if (updatedData.size() > 0) {
        logger.info(">>>>>>>>>>> xxl-registry, refresh discovery data finish, discoveryData(updated) = {}", updatedData);
    }
    logger.debug(">>>>>>>>>>> xxl-registry, refresh discovery data finish, discoveryData = {}", discoveryData);
}

register-admin

register

服务注册 & 续约 API

说明:新服务注册上线1s内广播通知接入方;需要接入方循环续约,否则服务将会过期(三倍于注册中心心跳时间)下线;
将消息实体放入LinkedBlockingQueue registerQueue;

remove

服务摘除 API
说明:新服务摘除下线1s内广播通知接入方;
将消息实体放入LinkedBlockingQueue registerQueue;

discovery

说明:查询在线服务地址列表;
通过磁盘文件发现注册服务

monitor

说明:long-polling 接口,主动阻塞一段时间(三倍于注册中心心跳时间);直至阻塞超时或服务注册信息变动时响应;

token使用:

每次请求(register remove discovery ...)都携带token;服务端token和客户端token进行比对;查看是否一致合法;

if (this.accessToken!=null && this.accessToken.trim().length()>0 && !this.accessToken.equals(accessToken)) {
    return new ReturnT<String>(ReturnT.FAIL_CODE, "AccessToken Invalid");
}
客户端和服务端心跳频率:10s;

信息注册:

服务端通过广播机制实时同步服务注册信息向客户端;客户端连接服务端的long polling技术: 新注册和移除的服务 1 秒内通知客户端;客户端和服务端实时通信,进行续约;
文件和数据库内容的一致性:先处理数据库,然后处理磁盘文件;先更新数据库,然后写更新事件到registery message,最后通过广播线程处理registery message;

discovery:只从文件读取;一致性都是按照磁盘文件为准;
registry():加入registryQueue;
remove():加入removeQueue;
discovery():只从文件获取注册数据信息;
monitor():

返回DeferredResult;他是一个返回延迟结果的对象;它的结果在brocadcast Thread中处理;有结果或者超时自动返回结果.从registryDeferredResultMap中获取list,每一个key,有一个List<DeferredResult>> list;逐个处理list中的DeferredResult对象.
private Map<String, List<DeferredResult>> registryDeferredResultMap = new ConcurrentHashMap<>();

// brocast monitor client
List<DeferredResult> deferredResultList = registryDeferredResultMap.get(fileName);
if (deferredResultList != null) {
    registryDeferredResultMap.remove(fileName);
    for (DeferredResult deferredResult: deferredResultList) {
        deferredResult.setResult(new ReturnT<>(ReturnT.SUCCESS_CODE, "Monitor key update."));
    }
}
10个注册线程:

加入registryQueue;先更新registery data,然后更新registery,最后添加registery message;

10个移除线程:

加入removeQueue;先删除registery data,然后更新registery,最后添加registery message;

一个广播线程:

广播线程从registeryMessage表获取事件(registry,remove....)信息,然后同步注册器磁盘文件和处理客户端监控结果,并返回客户端;客户端结果,两种情况;1. 成功;2. 3个心跳时间(10s)超时返回;

一个旧数据清楚线程:

清除registery data 10 * 3 时间之前的;
然后更新registery数据库;
然后更新registery磁盘文件;
最后删除磁盘文件;

集群信息共享:
通过同一个数据库实例实现;

上一篇 下一篇

猜你喜欢

热点阅读