soul网关学习10-配置数据同步1-HttpLongPolli

2021-01-23  本文已影响0人  niuxin

前言

我们知道soul-bootstrap作为网关入口,需要能承载这些流量,同时又能实现网关插件功能(路由、限流、熔断)的动态配置,其配置动态生效的原理大致如下:

  1. 网关soul-boostrap会将配置数据存放到内存HashMap中,当流量请求进来的时候,直接从内存中匹配对应配置,从而实现插件功能的逻辑
  2. 网关soul-admin控制台,提供了可视化的管理界面,能够供开发和运维人员维护功能插件的配置数据,该配置数据是会存储到soul-admin数据库的
  3. 通过soul-boostrapsoul-admin中数据同步机制,将配置数据从soul-admin同步到soul-boostrap
  4. 实现流程如下:


    config-data-sync
  5. 源码模块如下:
    soul-sync-data-center
    接下来我们分析HttpLongPolling的配置同步方式。

HttpLongPolling(Http长轮询)

使用

soul-bootstrap

   <!--soul data sync start use zookeeper-->
        <dependency>
            <groupId>org.dromara</groupId>
            <artifactId>soul-spring-boot-starter-sync-data-zookeeper</artifactId>
            <version>${project.version}</version>
        </dependency>

        <!--soul data sync start use websocket-->
        <dependency>
            <groupId>org.dromara</groupId>
            <artifactId>soul-spring-boot-starter-sync-data-websocket</artifactId>
            <version>${project.version}</version>
        </dependency>

        <!--soul data sync start use http-->
        <dependency>
            <groupId>org.dromara</groupId>
            <artifactId>soul-spring-boot-starter-sync-data-http</artifactId>
            <version>${project.version}</version>
        </dependency>

        <!--soul data sync start use nacos-->
        <dependency>
            <groupId>org.dromara</groupId>
            <artifactId>soul-spring-boot-starter-sync-data-nacos</artifactId>
            <version>${project.version}</version>
        </dependency>

soul-admin

使用注意

  1. 启动soul-bootstrap程序时需注意:采用http长轮询同步,需确保soul-admin是正常启动的,否则会导致soul-bootstrap启动失败
  2. 具体采用哪种同步方式,需注意soul-adminsoul-bootstrap两边配置的同步方式要保持一致,否则会同步异常
  3. 如果soul-bootstrap启动输出如下日志,则表明成功使用HttpLongPoling的数据同步方式
2021-01-24 08:27:45.086  INFO 5667 --- [           main] .s.s.b.s.s.d.h.HttpSyncDataConfiguration : you use http long pull sync soul data
2021-01-24 08:27:45.702  INFO 5667 --- [           main] o.d.s.s.data.http.HttpSyncDataService    : request configs: [http://localhost:9095/configs/fetch?groupKeys=APP_AUTH&groupKeys=PLUGIN&groupKeys=RULE&groupKeys=SELECTOR&groupKeys=META_DATA]
2021-01-24 08:27:46.324  INFO 5667 --- [onPool-worker-1] o.d.s.s.d.h.refresh.AppAuthDataRefresh   : clear all appAuth data cache

源码分析

soul-bootstrap

// 配置生效关系
HttpLongPoling -> HttpSyncDataConfiguration -> soul.sync.http.url
Websocket -> WebsocketSyncDataConfiguration -> soul.sync.websocket.url
Zookeeper -> ZookeeperSyncDataConfiguration -> soul.sync.zookeeper.url
Nacos -> NacosSyncDataConfiguration -> soul.sync.nacos.url
    public HttpSyncDataService(final HttpConfig httpConfig, final PluginDataSubscriber pluginDataSubscriber,
                               final List<MetaDataSubscriber> metaDataSubscribers, final List<AuthDataSubscriber> authDataSubscribers) {
        this.factory = new DataRefreshFactory(pluginDataSubscriber, metaDataSubscribers, authDataSubscribers);
        this.httpConfig = httpConfig;
        // 多个admin配置用,分割
        this.serverList = Lists.newArrayList(Splitter.on(",").split(httpConfig.getUrl()));
        this.httpClient = createRestTemplate();
        // 实例化过程会调用启动逻辑,启动时admin必须在线,否则会导致bootstrap也启动不起来
        this.start();
    }
    private void start() {
        // It could be initialized multiple times, so you need to control that.
        if (RUNNING.compareAndSet(false, true)) {
            // fetch all group configs.
            // 初始启动时拉取一次
            this.fetchGroupConfig(ConfigGroupEnum.values());
            // TODO question:  如果admin的配置使用的是负载均衡呢?这个逻辑是不是就会存在问题了?
            // 线程池的大小为admin服务器数目,需要分别使用一个线程去连接每个admin,去做长轮询
            int threadSize = serverList.size();
            this.executor = new ThreadPoolExecutor(threadSize, threadSize, 60L, TimeUnit.SECONDS,
                    new LinkedBlockingQueue<>(),
                    SoulThreadFactory.create("http-long-polling", true));
            // start long polling, each server creates a thread to listen for changes.
            // 开启长轮询,每个server都创建一个线程去监听配置变化
            // 这里与启动过程拉配置是不一样的,启动过程拉取初始配置,则需要从某一台admin机器拉到就行;
            // 监听配置变更则不一样,因为用户操作时,不知道是在哪台节点上面响应的,所有需要全部监听
            this.serverList.forEach(server -> this.executor.execute(new HttpLongPollingTask(server)));
        } else {
            log.info("soul http long polling was started, executor=[{}]", executor);
        }
    }

    private void fetchGroupConfig(final ConfigGroupEnum... groups) throws SoulException {
        // 从配置的多台服务器遍历拉取配置,如果只要有一台拉取成功,则直接结束遍历
        for (int index = 0; index < this.serverList.size(); index++) {
            String server = serverList.get(index);
            try {
                this.doFetchGroupConfig(server, groups);
                break;
            } catch (SoulException e) {
                // no available server, throw exception.
                // 如果为最后一台的异常,则向上抛出异常,会中断启动程序
                if (index >= serverList.size() - 1) {
                    throw e;
                }
                log.warn("fetch config fail, try another one: {}", serverList.get(index + 1));
            }
        }
    }

    private void doFetchGroupConfig(final String server, final ConfigGroupEnum... groups) {
        // 即使有多个group的配置,也是一次请求拉取到的
        StringBuilder params = new StringBuilder();
        for (ConfigGroupEnum groupKey : groups) {
            params.append("groupKeys").append("=").append(groupKey.name()).append("&");
        }
        String url = server + "/configs/fetch?" + StringUtils.removeEnd(params.toString(), "&");
        log.info("request configs: [{}]", url);
        String json = null;
        try {
            // 拉取admin配置
            json = this.httpClient.getForObject(url, String.class);
        } catch (RestClientException e) {
            String message = String.format("fetch config fail from server[%s], %s", url, e.getMessage());
            log.warn(message);
            throw new SoulException(message, e);
        }
        // update local cache
        // 拉取成功后,则更新本地配置缓存
        boolean updated = this.updateCacheWithJson(json);
        if (updated) {
            log.info("get latest configs: [{}]", json);
            return;
        }
        // not updated. it is likely that the current config server has not been updated yet. wait a moment.
        log.info("The config of the server[{}] has not been updated or is out of date. Wait for 30s to listen for changes again.", server);
        // 若从当前配置服务器没有拉到配置,则睡眠30s后再继续执行
        // TODO questtion: 如果每个server都没有拉到配置,然后都会阻塞30s,会导致整个bootstrap启动很慢,是否考虑将启动时拉取配置逻辑异步化?
        ThreadUtils.sleep(TimeUnit.SECONDS, 30);
    }
/**
     * update local cache.
     * @param json the response from config server.
     * @return true: the local cache was updated. false: not updated.
     */
    private boolean updateCacheWithJson(final String json) {
        JsonObject jsonObject = GSON.fromJson(json, JsonObject.class);
        JsonObject data = jsonObject.getAsJsonObject("data");
        // if the config cache will be updated?
        // factory为数据更新的策略功能
        return factory.executor(data);
    }
    public boolean executor(final JsonObject data) {
        // TODO  question 为什么会用一个奇怪的数组?
        final boolean[] success = {false};
        // 对于不同的数据类型,去进行数据更新;这里使用了并行流,实现并行更新配置
        ENUM_MAP.values().parallelStream().forEach(dataRefresh -> success[0] = dataRefresh.refresh(data));
        return success[0];
    }
    public void run() {
            // 需判断bootstrap的状态是否正常,避免出现当bootstap停止了,http长轮询的线程还在跑,这是无意义的
            while (RUNNING.get()) {
                for (int time = 1; time <= retryTimes; time++) {
                    try {
                        // 长轮询失败的情况下,会进行重试,每隔5s重试一次,重试3次
                        doLongPolling(server);
                    } catch (Exception e) {
                        // print warnning log.
                        if (time < retryTimes) {
                            log.warn("Long polling failed, tried {} times, {} times left, will be suspended for a while! {}",
                                    time, retryTimes - time, e.getMessage());
                            ThreadUtils.sleep(TimeUnit.SECONDS, 5);
                            continue;
                        }
                        // print error, then suspended for a while.
                        log.error("Long polling failed, try again after 5 minutes!", e);
                        // 3次重试失败后,则先睡眠5min钟,再次去轮询
                        ThreadUtils.sleep(TimeUnit.MINUTES, 5);
                    }
                }
            }
            log.warn("Stop http long polling.");
        }
    private void doLongPolling(final String server) {
        MultiValueMap<String, String> params = new LinkedMultiValueMap<>(8);
        for (ConfigGroupEnum group : ConfigGroupEnum.values()) {
            ConfigData<?> cacheConfig = factory.cacheConfigData(group);
            // 将缓存的md5值与最后更新时间传递到admin
            String value = String.join(",", cacheConfig.getMd5(), String.valueOf(cacheConfig.getLastModifyTime()));
            params.put(group.name(), Lists.newArrayList(value));
        }
        HttpHeaders headers = new HttpHeaders();
        headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);
        HttpEntity httpEntity = new HttpEntity(params, headers);
        String listenerUrl = server + "/configs/listener";
        log.debug("request listener configs: [{}]", listenerUrl);
        JsonArray groupJson = null;
        try {
            // 调用admin的配置监听接口,如果admin有配置变更则会返回变更的配置数据类型给到bootstrap
            String json = this.httpClient.postForEntity(listenerUrl, httpEntity, String.class).getBody();
            log.debug("listener result: [{}]", json);
            groupJson = GSON.fromJson(json, JsonObject.class).getAsJsonArray("data");
        } catch (RestClientException e) {
            String message = String.format("listener configs fail, server:[%s], %s", server, e.getMessage());
            throw new SoulException(message, e);
        }
        if (groupJson != null) {
            // fetch group configuration async.
            // 如果存在结果返回,则自己再去拉取相应配置
            ConfigGroupEnum[] changedGroups = GSON.fromJson(groupJson, ConfigGroupEnum[].class);
            if (ArrayUtils.isNotEmpty(changedGroups)) {
                log.info("Group config changed: {}", Arrays.toString(changedGroups));
                // 拉取有变化的group的配置,这里的group有:APP_AUTH、PLUGIN、RULE、SELECTOR、META_DATA
                this.doFetchGroupConfig(server, changedGroups);
            }
        }
    }

至此,soul-bootstrap端的http长轮询就分析完了,接下来看看soul-admin

To be continued ...

上一篇下一篇

猜你喜欢

热点阅读