soul网关学习10-配置数据同步1-HttpLongPolli
2021-01-23 本文已影响0人
niuxin
前言
我们知道soul-bootstrap
作为网关入口,需要能承载这些流量,同时又能实现网关插件功能(路由、限流、熔断)的动态配置,其配置动态生效的原理大致如下:
- 网关
soul-boostrap
会将配置数据存放到内存HashMap
中,当流量请求进来的时候,直接从内存中匹配对应配置,从而实现插件功能的逻辑 - 网关
soul-admin
控制台,提供了可视化的管理界面,能够供开发和运维人员维护功能插件的配置数据,该配置数据是会存储到soul-admin
数据库的 - 通过
soul-boostrap
与soul-admin
中数据同步机制,将配置数据从soul-admin
同步到soul-boostrap
-
实现流程如下:
config-data-sync - 源码模块如下:
soul-sync-data-center
接下来我们分析HttpLongPolling
的配置同步方式。
HttpLongPolling(Http长轮询)
使用
soul-bootstrap
- 引入依赖,把数据同步相关的
starter
引入进来
<!--soul data sync start use zookeeper-->
<dependency>
<groupId>org.dromara</groupId>
<artifactId>soul-spring-boot-starter-sync-data-zookeeper</artifactId>
<version>${project.version}</version>
</dependency>
<!--soul data sync start use websocket-->
<dependency>
<groupId>org.dromara</groupId>
<artifactId>soul-spring-boot-starter-sync-data-websocket</artifactId>
<version>${project.version}</version>
</dependency>
<!--soul data sync start use http-->
<dependency>
<groupId>org.dromara</groupId>
<artifactId>soul-spring-boot-starter-sync-data-http</artifactId>
<version>${project.version}</version>
</dependency>
<!--soul data sync start use nacos-->
<dependency>
<groupId>org.dromara</groupId>
<artifactId>soul-spring-boot-starter-sync-data-nacos</artifactId>
<version>${project.version}</version>
</dependency>
-
配置
bootstrap.applicaiton.sync-http
soul-admin
-
配置
admin.applcaiton.sync-http
使用注意
- 启动
soul-bootstrap
程序时需注意:采用http长轮询同步,需确保soul-admin
是正常启动的,否则会导致soul-bootstrap
启动失败 - 具体采用哪种同步方式,需注意
soul-admin
与soul-bootstrap
两边配置的同步方式要保持一致,否则会同步异常 - 如果
soul-bootstrap
启动输出如下日志,则表明成功使用HttpLongPoling
的数据同步方式
2021-01-24 08:27:45.086 INFO 5667 --- [ main] .s.s.b.s.s.d.h.HttpSyncDataConfiguration : you use http long pull sync soul data
2021-01-24 08:27:45.702 INFO 5667 --- [ main] o.d.s.s.data.http.HttpSyncDataService : request configs: [http://localhost:9095/configs/fetch?groupKeys=APP_AUTH&groupKeys=PLUGIN&groupKeys=RULE&groupKeys=SELECTOR&groupKeys=META_DATA]
2021-01-24 08:27:46.324 INFO 5667 --- [onPool-worker-1] o.d.s.s.d.h.refresh.AppAuthDataRefresh : clear all appAuth data cache
源码分析
soul-bootstrap
- 引入
starter
依赖。把四种同步类型的starter
全部引入也没有关系,因为最终生效还需要对应属性配置有进行配置才可。具体可参考对应同步类型starter
中的xxxSyncDataConfiguration
类
// 配置生效关系
HttpLongPoling -> HttpSyncDataConfiguration -> soul.sync.http.url
Websocket -> WebsocketSyncDataConfiguration -> soul.sync.websocket.url
Zookeeper -> ZookeeperSyncDataConfiguration -> soul.sync.zookeeper.url
Nacos -> NacosSyncDataConfiguration -> soul.sync.nacos.url
- 启动类入口为
org.dromara.soul.sync.data.http.HttpSyncDataService
。其关键逻辑:HttpSyncDataService
实例化 -> 启动start
->拉取配置fetchGroupConfig
-> 执行拉取配置doFetchGroupConfig
->更新内存配置updateCache
-> 启动长轮询线程池 -> 线程池长轮询任务HttpLongPollingTask
-> 长轮询逻辑实现doLongPolling
- 更详细一些的逻辑就需要看下面的源码注释拉。
-
HttpSyncDataService
实例化
public HttpSyncDataService(final HttpConfig httpConfig, final PluginDataSubscriber pluginDataSubscriber,
final List<MetaDataSubscriber> metaDataSubscribers, final List<AuthDataSubscriber> authDataSubscribers) {
this.factory = new DataRefreshFactory(pluginDataSubscriber, metaDataSubscribers, authDataSubscribers);
this.httpConfig = httpConfig;
// 多个admin配置用,分割
this.serverList = Lists.newArrayList(Splitter.on(",").split(httpConfig.getUrl()));
this.httpClient = createRestTemplate();
// 实例化过程会调用启动逻辑,启动时admin必须在线,否则会导致bootstrap也启动不起来
this.start();
}
- 启动
start
private void start() {
// It could be initialized multiple times, so you need to control that.
if (RUNNING.compareAndSet(false, true)) {
// fetch all group configs.
// 初始启动时拉取一次
this.fetchGroupConfig(ConfigGroupEnum.values());
// TODO question: 如果admin的配置使用的是负载均衡呢?这个逻辑是不是就会存在问题了?
// 线程池的大小为admin服务器数目,需要分别使用一个线程去连接每个admin,去做长轮询
int threadSize = serverList.size();
this.executor = new ThreadPoolExecutor(threadSize, threadSize, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
SoulThreadFactory.create("http-long-polling", true));
// start long polling, each server creates a thread to listen for changes.
// 开启长轮询,每个server都创建一个线程去监听配置变化
// 这里与启动过程拉配置是不一样的,启动过程拉取初始配置,则需要从某一台admin机器拉到就行;
// 监听配置变更则不一样,因为用户操作时,不知道是在哪台节点上面响应的,所有需要全部监听
this.serverList.forEach(server -> this.executor.execute(new HttpLongPollingTask(server)));
} else {
log.info("soul http long polling was started, executor=[{}]", executor);
}
}
- 拉取配置
fetchGroupConfig
-> 执行拉取配置doFetchGroupConfig
private void fetchGroupConfig(final ConfigGroupEnum... groups) throws SoulException {
// 从配置的多台服务器遍历拉取配置,如果只要有一台拉取成功,则直接结束遍历
for (int index = 0; index < this.serverList.size(); index++) {
String server = serverList.get(index);
try {
this.doFetchGroupConfig(server, groups);
break;
} catch (SoulException e) {
// no available server, throw exception.
// 如果为最后一台的异常,则向上抛出异常,会中断启动程序
if (index >= serverList.size() - 1) {
throw e;
}
log.warn("fetch config fail, try another one: {}", serverList.get(index + 1));
}
}
}
private void doFetchGroupConfig(final String server, final ConfigGroupEnum... groups) {
// 即使有多个group的配置,也是一次请求拉取到的
StringBuilder params = new StringBuilder();
for (ConfigGroupEnum groupKey : groups) {
params.append("groupKeys").append("=").append(groupKey.name()).append("&");
}
String url = server + "/configs/fetch?" + StringUtils.removeEnd(params.toString(), "&");
log.info("request configs: [{}]", url);
String json = null;
try {
// 拉取admin配置
json = this.httpClient.getForObject(url, String.class);
} catch (RestClientException e) {
String message = String.format("fetch config fail from server[%s], %s", url, e.getMessage());
log.warn(message);
throw new SoulException(message, e);
}
// update local cache
// 拉取成功后,则更新本地配置缓存
boolean updated = this.updateCacheWithJson(json);
if (updated) {
log.info("get latest configs: [{}]", json);
return;
}
// not updated. it is likely that the current config server has not been updated yet. wait a moment.
log.info("The config of the server[{}] has not been updated or is out of date. Wait for 30s to listen for changes again.", server);
// 若从当前配置服务器没有拉到配置,则睡眠30s后再继续执行
// TODO questtion: 如果每个server都没有拉到配置,然后都会阻塞30s,会导致整个bootstrap启动很慢,是否考虑将启动时拉取配置逻辑异步化?
ThreadUtils.sleep(TimeUnit.SECONDS, 30);
}
- 更新内存配置updateCache:调用数据更新的策略工厂
/**
* update local cache.
* @param json the response from config server.
* @return true: the local cache was updated. false: not updated.
*/
private boolean updateCacheWithJson(final String json) {
JsonObject jsonObject = GSON.fromJson(json, JsonObject.class);
JsonObject data = jsonObject.getAsJsonObject("data");
// if the config cache will be updated?
// factory为数据更新的策略功能
return factory.executor(data);
}
- 更新内存配置updateCache:
DataRefreshFactory
数据更新策略工厂,工厂会调用不同类型数据的更新操作
public boolean executor(final JsonObject data) {
// TODO question 为什么会用一个奇怪的数组?
final boolean[] success = {false};
// 对于不同的数据类型,去进行数据更新;这里使用了并行流,实现并行更新配置
ENUM_MAP.values().parallelStream().forEach(dataRefresh -> success[0] = dataRefresh.refresh(data));
return success[0];
}
-
更新内存配置updateCache:数据更新的类图
dataRefresh - 启动长轮询线程池。参考
start
流程后部分 - 线程池长轮询
HttpLongPollingTask
public void run() {
// 需判断bootstrap的状态是否正常,避免出现当bootstap停止了,http长轮询的线程还在跑,这是无意义的
while (RUNNING.get()) {
for (int time = 1; time <= retryTimes; time++) {
try {
// 长轮询失败的情况下,会进行重试,每隔5s重试一次,重试3次
doLongPolling(server);
} catch (Exception e) {
// print warnning log.
if (time < retryTimes) {
log.warn("Long polling failed, tried {} times, {} times left, will be suspended for a while! {}",
time, retryTimes - time, e.getMessage());
ThreadUtils.sleep(TimeUnit.SECONDS, 5);
continue;
}
// print error, then suspended for a while.
log.error("Long polling failed, try again after 5 minutes!", e);
// 3次重试失败后,则先睡眠5min钟,再次去轮询
ThreadUtils.sleep(TimeUnit.MINUTES, 5);
}
}
}
log.warn("Stop http long polling.");
}
- 长轮询逻辑
doLongPolling
private void doLongPolling(final String server) {
MultiValueMap<String, String> params = new LinkedMultiValueMap<>(8);
for (ConfigGroupEnum group : ConfigGroupEnum.values()) {
ConfigData<?> cacheConfig = factory.cacheConfigData(group);
// 将缓存的md5值与最后更新时间传递到admin
String value = String.join(",", cacheConfig.getMd5(), String.valueOf(cacheConfig.getLastModifyTime()));
params.put(group.name(), Lists.newArrayList(value));
}
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);
HttpEntity httpEntity = new HttpEntity(params, headers);
String listenerUrl = server + "/configs/listener";
log.debug("request listener configs: [{}]", listenerUrl);
JsonArray groupJson = null;
try {
// 调用admin的配置监听接口,如果admin有配置变更则会返回变更的配置数据类型给到bootstrap
String json = this.httpClient.postForEntity(listenerUrl, httpEntity, String.class).getBody();
log.debug("listener result: [{}]", json);
groupJson = GSON.fromJson(json, JsonObject.class).getAsJsonArray("data");
} catch (RestClientException e) {
String message = String.format("listener configs fail, server:[%s], %s", server, e.getMessage());
throw new SoulException(message, e);
}
if (groupJson != null) {
// fetch group configuration async.
// 如果存在结果返回,则自己再去拉取相应配置
ConfigGroupEnum[] changedGroups = GSON.fromJson(groupJson, ConfigGroupEnum[].class);
if (ArrayUtils.isNotEmpty(changedGroups)) {
log.info("Group config changed: {}", Arrays.toString(changedGroups));
// 拉取有变化的group的配置,这里的group有:APP_AUTH、PLUGIN、RULE、SELECTOR、META_DATA
this.doFetchGroupConfig(server, changedGroups);
}
}
}
至此,soul-bootstrap
端的http长轮询就分析完了,接下来看看soul-admin
端