apache-shenyu之注册中心整合与元数据同步
(apache-shenyu 2.4.3版本)本文一起学习apache-shyu的设计与实现。看看shenyu如何支持多种注册中心,多种数据同步协议的。
apache-shenyu数据同步以及注册逻辑图####### 当然通过 shenyu的disruptor应用
了解到所有数据同步,实例以及元数据注册的发送,接收都使用了disruptor解耦并提高了可用性。
本文以 SyncDataService/DataChangedListener:websocket,以及ShenyuClientRegisterRepository/ShenyuClientServerRegisterRepository:nacos为主线来研究apache-shenyu的注册中心整合以及元数据同步逻辑。
注册中心整合
注册中心逻辑的模块shenyu-register-center 作为注册中心模块,主要有三个模块需要了解
- shenyu-register-client:ShenyuClientRegisterRepository,作为后端服务的注册逻辑封装,也就是我们业务服务会引入这个包的jar根据配置来选择consul,etcd,http,nacos,zookeeper目前支持的5种注册中心。(包括元数据,uri的注册)
- shenyu-register-client-server:ShenyuClientServerRegisterRepository,作为业务服务上报的数据接收端,接收上报过来的数据然后处理。包括初始化,建立监听等,包括consul,etcd,nacos,zookeeper的子类供选择,http比较特殊并木有这个模块,http直接通过shenyu-admin的ShenyuClientHttpRegistryController类提供了两个post接口用于上报元数据及uri
- shenyu-register-instance:用于注册业务服务实例,shenyu这里的核心类ShenyuInstanceRegisterRepository只有consul,zookeeper,etcd三个实现类,在这个模块使用了spring的WebServerInitializedEvent通过web容器初始化事件观察者进行实例注册,可以提供给shenyu网关调用,也可以实现负载均衡等逻辑,但是这里为什么只有这三个注册中心会加这个实例注册逻辑目前不太了解...
下面来看看代码
spring-boot入口入口为ShenyuClientCommonBeanConfiguration类。通过spring.factories指定,然后初始化一个注册中心策略
ShenyuClientRegisterRepositoryFactory
public final class ShenyuClientRegisterRepositoryFactory {
// 可以看到 是可以支持多注册中心逻辑
private static final Map<String, ShenyuClientRegisterRepository> REPOSITORY_MAP = new ConcurrentHashMap<>();
public static ShenyuClientRegisterRepository newInstance(final ShenyuRegisterCenterConfig shenyuRegisterCenterConfig) {
if (!REPOSITORY_MAP.containsKey(shenyuRegisterCenterConfig.getRegisterType())) {
// 通过RegisterType 来初始化 注册中心子类实现
ShenyuClientRegisterRepository result = ExtensionLoader.getExtensionLoader(ShenyuClientRegisterRepository.class).getJoin(shenyuRegisterCenterConfig.getRegisterType());
result.init(shenyuRegisterCenterConfig);
ShenyuClientShutdownHook.set(result, shenyuRegisterCenterConfig.getProps());
REPOSITORY_MAP.put(shenyuRegisterCenterConfig.getRegisterType(), result);
return result;
}
return REPOSITORY_MAP.get(shenyuRegisterCenterConfig.getRegisterType());
}
}
可以看到我们的业务服务只要引入了apache-shenyu的pom文件,就会通过spring-boot自动注入一个ShenyuClientRegisterRepository通过如下配置类
@Configuration
public class ShenyuClientCommonBeanConfiguration {
/**
* Register the register repository for http client bean post processor.
*
* @param config the config
* @return the client register repository
*/
@Bean
public ShenyuClientRegisterRepository shenyuClientRegisterRepository(final ShenyuRegisterCenterConfig config) {
return ShenyuClientRegisterRepositoryFactory.newInstance(config);
}
/**
* Shenyu Register Center Config.
*
* @return the Register Center Config
*/
@Bean
@ConfigurationProperties(prefix = "shenyu.register")
public ShenyuRegisterCenterConfig shenyuRegisterCenterConfig() {
return new ShenyuRegisterCenterConfig();
}
/**
* Shenyu client config shenyu client config.
*
* @return the shenyu client config
*/
@Bean
@ConfigurationProperties(prefix = "shenyu")
public ShenyuClientConfig shenyuClientConfig() {
return new ShenyuClientConfig();
}
}
核心api接口,ShenyuClientRegisterRepository
@SPI
public interface ShenyuClientRegisterRepository {
/**
* Init.
* 初始化业务服务的注册客户端
* @param config the config
*/
default void init(ShenyuRegisterCenterConfig config) {
}
/**
* Persist metadata.
* 注册controller层接口的入口
* @param metadata metadata
*/
void persistInterface(MetaDataRegisterDTO metadata);
/**
* Persist uri.
* 注册当前业务服务的ip及端口信息
* @param registerDTO the register dto
*/
default void persistURI(URIRegisterDTO registerDTO) {
}
/**
* Close.
*/
default void close() {
// 关闭
}
}
我们只看nacos的实现
nacos的客户端注册
@Join
public class NacosClientRegisterRepository implements ShenyuClientRegisterRepository {
private static final Logger LOGGER = LoggerFactory.getLogger(NacosClientRegisterRepository.class);
//用于获取nacos name space 配置的key
private static final String NAMESPACE = "nacosNameSpace";
// 用于传递metadata 数据的key
private static final String URI_META_DATA = "uriMetadata";
// nacos 暴露出来的 操作configServer的接口
private ConfigService configService;
// nacos 暴露出来的 操作namingServer的接口
private NamingService namingService;
// 用于元数据的缓存,我们的业务服务会通过nacos的configServer上报关于当前服务的接口的元数据,通过当前类缓存,每次都会全量上报
private final ConcurrentLinkedQueue<String> metadataCache = new ConcurrentLinkedQueue<>();
public NacosClientRegisterRepository() { }
public NacosClientRegisterRepository(final ShenyuRegisterCenterConfig config) {
init(config);
}
@Override
public void init(final ShenyuRegisterCenterConfig config) {
// 去和nacos建立
String serverAddr = config.getServerLists();
Properties properties = config.getProps();
Properties nacosProperties = new Properties();
nacosProperties.put(PropertyKeyConst.SERVER_ADDR, serverAddr);
nacosProperties.put(PropertyKeyConst.NAMESPACE, properties.getProperty(NAMESPACE));
// the nacos authentication username
nacosProperties.put(PropertyKeyConst.USERNAME, properties.getProperty(PropertyKeyConst.USERNAME, ""));
// the nacos authentication password
nacosProperties.put(PropertyKeyConst.PASSWORD, properties.getProperty(PropertyKeyConst.PASSWORD, ""));
// access key for namespace
nacosProperties.put(PropertyKeyConst.ACCESS_KEY, properties.getProperty(PropertyKeyConst.ACCESS_KEY, ""));
// secret key for namespace
nacosProperties.put(PropertyKeyConst.SECRET_KEY, properties.getProperty(PropertyKeyConst.SECRET_KEY, ""));
try {
this.configService = ConfigFactory.createConfigService(nacosProperties);
this.namingService = NamingFactory.createNamingService(nacosProperties);
} catch (NacosException e) {
throw new ShenyuException(e);
}
}
@Override
public void close() {
try {
configService.shutDown();
namingService.shutDown();
} catch (NacosException e) {
LOGGER.error("NacosClientRegisterRepository close error!", e);
}
}
@Override
public void persistInterface(final MetaDataRegisterDTO metadata) {
String rpcType = metadata.getRpcType();
String contextPath = ContextPathUtils.buildRealNode(metadata.getContextPath(), metadata.getAppName());
// 注册接口元数据
registerConfig(rpcType, contextPath, metadata);
}
/**
* Persist uri.
* 注册 ip以及端口
* @param registerDTO the register dto
*/
@Override
public void persistURI(final URIRegisterDTO registerDTO) {
String rpcType = registerDTO.getRpcType();
String contextPath = ContextPathUtils.buildRealNode(registerDTO.getContextPath(), registerDTO.getAppName());
String host = registerDTO.getHost();
int port = registerDTO.getPort();
registerService(rpcType, contextPath, host, port, registerDTO);
}
// 这是一个 同步方法
private synchronized void registerService(final String rpcType,
final String contextPath,
final String host,
final int port,
final URIRegisterDTO registerDTO) {
// 注册当前实例的 ip以及 端口
Instance instance = new Instance();
instance.setEphemeral(true);
instance.setIp(host);
instance.setPort(port);
Map<String, String> metadataMap = new HashMap<>();
metadataMap.put(Constants.CONTEXT_PATH, contextPath);
metadataMap.put(URI_META_DATA, GsonUtils.getInstance().toJson(registerDTO));
instance.setMetadata(metadataMap);
String serviceName = RegisterPathConstants.buildServiceInstancePath(rpcType);
try {
// 通过nacos的 namingServer注册当前服务实例
namingService.registerInstance(serviceName, instance);
} catch (NacosException e) {
throw new ShenyuException(e);
}
LOGGER.info("register service uri success: {}", serviceName);
}
private synchronized void registerConfig(final String rpcType,
final String contextPath,
final MetaDataRegisterDTO metadata) {
metadataCache.add(GsonUtils.getInstance().toJson(metadata));
String configName = RegisterPathConstants.buildServiceConfigPath(rpcType, contextPath);
try {
final String defaultGroup = NacosPathConstants.GROUP;
// 通过configServer注册 接口元数据
if (configService.publishConfig(configName, defaultGroup, GsonUtils.getInstance().toJson(metadataCache))) {
LOGGER.info("register metadata success: {}", metadata.getRuleName());
} else {
throw new ShenyuException("register metadata fail , please check ");
}
} catch (NacosException e) {
throw new ShenyuException(e);
}
}
}
然后我们来看一看 shenyu-admin侧接收这些数据的逻辑,核心类-> ShenyuClientServerRegisterRepository
先来看位于shenyu-admin模块中的配置类
@Configuration
public class RegisterCenterConfiguration {
/**
* Shenyu register center config shenyu register center config.
*
* @return the shenyu register center config
*/
@Bean
@ConfigurationProperties(prefix = "shenyu.register")
public ShenyuRegisterCenterConfig shenyuRegisterCenterConfig() {
return new ShenyuRegisterCenterConfig();
}
@Bean(destroyMethod = "close")
public ShenyuClientServerRegisterRepository shenyuClientServerRegisterRepository(final ShenyuRegisterCenterConfig shenyuRegisterCenterConfig,
final List<ShenyuClientRegisterService> shenyuClientRegisterService) {
String registerType = shenyuRegisterCenterConfig.getRegisterType();
// 根据配置获取到 spi对应的实现
ShenyuClientServerRegisterRepository registerRepository = ExtensionLoader.getExtensionLoader(ShenyuClientServerRegisterRepository.class).getJoin(registerType);
// 这里是从注册中心接收到数据后,没有立即调用shenyu内部逻辑,而是通过一层disruptor解耦,并提供了高可用的一个削峰
RegisterClientServerDisruptorPublisher publisher = RegisterClientServerDisruptorPublisher.getInstance();
Map<String, ShenyuClientRegisterService> registerServiceMap = shenyuClientRegisterService.stream().collect(Collectors.toMap(ShenyuClientRegisterService::rpcType, e -> e));
publisher.start(registerServiceMap);
registerRepository.init(publisher, shenyuRegisterCenterConfig);
return registerRepository;
}
}
然后来看NacosClientServerRegisterRepository
@Join
public class NacosClientServerRegisterRepository implements ShenyuClientServerRegisterRepository {
private static final Logger LOGGER = LoggerFactory.getLogger(NacosClientServerRegisterRepository.class);
// 被注册的业务服务,目前支持RpcTypeEnum.GRPC, RpcTypeEnum.HTTP, RpcTypeEnum.TARS, RpcTypeEnum.SPRING_CLOUD, RpcTypeEnum.DUBBO这些协议的后端
private static final List<RpcTypeEnum> RPC_URI_TYPE_SET = RpcTypeEnum.acquireSupportURIs();
// nacos 默认的分组
private final String defaultGroup = NacosPathConstants.GROUP;
// nacos 暴露出来的 configServer操作接口
private ConfigService configService;
// nacos 暴露出来的 namingServer操作接口
private NamingService namingService;
// 接收到的数据通过disruptor解耦,并可以削峰
private ShenyuClientServerRegisterPublisher publisher;
// 通过跳表实现的线程安全的可排序的容器,用于接收到各种业务服务的实例的uri
private final ConcurrentSkipListSet<String> metadataConfigCache = new ConcurrentSkipListSet<>();
// key(不同的后端接口协议,例如grpc,springcloud等) -> 对应协议的所有后端服务实例的 uri
private final ConcurrentMap<String, ConcurrentSkipListSet<String>> uriServiceCache = new ConcurrentHashMap<>();
@Override
public void close() {
publisher.close();
}
@Override
public void init(final ShenyuClientServerRegisterPublisher publisher,
final ShenyuRegisterCenterConfig config) {
this.publisher = publisher;
String serverAddr = config.getServerLists();
Properties properties = config.getProps();
Properties nacosProperties = new Properties();
nacosProperties.put(PropertyKeyConst.SERVER_ADDR, serverAddr);
nacosProperties.put(PropertyKeyConst.NAMESPACE, properties.getProperty("nacosNameSpace"));
// the nacos authentication username
nacosProperties.put(PropertyKeyConst.USERNAME, properties.getProperty(PropertyKeyConst.USERNAME, ""));
// the nacos authentication password
nacosProperties.put(PropertyKeyConst.PASSWORD, properties.getProperty(PropertyKeyConst.PASSWORD, ""));
// access key for namespace
nacosProperties.put(PropertyKeyConst.ACCESS_KEY, properties.getProperty(PropertyKeyConst.ACCESS_KEY, ""));
// secret key for namespace
nacosProperties.put(PropertyKeyConst.SECRET_KEY, properties.getProperty(PropertyKeyConst.SECRET_KEY, ""));
try {
// 和nacos建立连接
this.configService = ConfigFactory.createConfigService(nacosProperties);
this.namingService = NamingFactory.createNamingService(nacosProperties);
} catch (NacosException e) {
throw new ShenyuException(e);
}
// 对每一个 后端实例进行订阅,并订阅对应实例的config(config是apache-shenyu自己用来同步业务服务接口元数据用的)
subscribe();
}
private void subscribe() {
// 每种协议都订阅一遍 RpcTypeEnum.acquireSupportMetadatas().forEach(this::subscribeRpcTypeService);
}
private void subscribeRpcTypeService(final RpcTypeEnum rpcType) {
final String serviceName = RegisterPathConstants.buildServiceInstancePath(rpcType.getName());
try {
Map<String, List<URIRegisterDTO>> services = new HashMap<>();
// 目前可以从nacos 的namingServer查到的可用实例
List<Instance> healthyInstances = namingService.selectInstances(serviceName, true);
healthyInstances.forEach(healthyInstance -> {
String contextPath = healthyInstance.getMetadata().get("contextPath");
String serviceConfigName = RegisterPathConstants.buildServiceConfigPath(rpcType.getName(), contextPath);
// 定于每个实例的 config信息,shenyu用于同步接口元数据用
subscribeMetadata(serviceConfigName);
metadataConfigCache.add(serviceConfigName);
String metadata = healthyInstance.getMetadata().get("uriMetadata");
URIRegisterDTO uriRegisterDTO = GsonUtils.getInstance().fromJson(metadata, URIRegisterDTO.class);
// 收集起来
services.computeIfAbsent(contextPath, k -> new ArrayList<>()).add(uriRegisterDTO);
// 放入缓存
uriServiceCache.computeIfAbsent(serviceName, k -> new ConcurrentSkipListSet<>()).add(contextPath);
});
// 如果是apache-shenyu支持的接口协议,则通过disruptor发布真正的接收到nacos信息的事件
if (RPC_URI_TYPE_SET.contains(rpcType)) {
services.values().forEach(this::publishRegisterURI);
}
LOGGER.info("subscribe uri : {}", serviceName);
// 通过nacos的 namingService提供的监听器(长轮询)来监听业务服务实例的上下线变化,再进行刷新,如果变化了,会刷新本地缓存并重新调用当前方法,重新查询一次服务实例,和config中的元数据进行刷新
namingService.subscribe(serviceName, event -> {
if (event instanceof NamingEvent) {
List<Instance> instances = ((NamingEvent) event).getInstances();
instances.forEach(instance -> {
String contextPath = instance.getMetadata().get("contextPath");
uriServiceCache.computeIfAbsent(serviceName, k -> new ConcurrentSkipListSet<>()).add(contextPath);
});
refreshURIService(rpcType, serviceName);
}
});
} catch (NacosException e) {
throw new ShenyuException(e);
}
}
// 元数据订阅
private void subscribeMetadata(final String serviceConfigName) {
// 注册 接口元数据,从nacos的configService读取
registerMetadata(readData(serviceConfigName));
LOGGER.info("subscribe metadata: {}", serviceConfigName);
try {
configService.addListener(serviceConfigName, defaultGroup, new Listener() {
@Override
public Executor getExecutor() {
return null;
}
// 通过configService提供的监听 如果config有变化则进行注册
@Override
public void receiveConfigInfo(final String config) {
registerMetadata(config);
}
});
} catch (NacosException e) {
throw new ShenyuException(e);
}
}
@SuppressWarnings("unchecked")
private void registerMetadata(final String metadataConfig) {
List<String> metadataList = GsonUtils.getInstance().fromJson(metadataConfig, List.class);
metadataList.forEach(this::publishMetadata);
}
private void publishMetadata(final String data) {
LOGGER.info("publish metadata: {}", data);
publisher.publish(Lists.newArrayList(GsonUtils.getInstance().fromJson(data, MetaDataRegisterDTO.class)));
}
private void refreshURIService(final RpcTypeEnum rpcType, final String serviceName) {
Optional.ofNullable(uriServiceCache.get(serviceName)).ifPresent(services -> services.forEach(contextPath -> registerURI(contextPath, serviceName, rpcType)));
}
// 注册服务实例的逻辑,和注册对应协议的服务实例逻辑有点像
private void registerURI(final String contextPath, final String serviceName, final RpcTypeEnum rpcType) {
try {
List<Instance> healthyInstances = namingService.selectInstances(serviceName, true);
List<URIRegisterDTO> registerDTOList = new ArrayList<>();
healthyInstances.forEach(healthyInstance -> {
if (contextPath.equals(healthyInstance.getMetadata().get("contextPath"))) {
String metadata = healthyInstance.getMetadata().get("uriMetadata");
URIRegisterDTO uriRegisterDTO = GsonUtils.getInstance().fromJson(metadata, URIRegisterDTO.class);
registerDTOList.add(uriRegisterDTO);
String serviceConfigName = RegisterPathConstants.buildServiceConfigPath(rpcType.getName(), contextPath);
// 本地缓存没有的 配置信息(接口元数据)才会进行订阅
if (!metadataConfigCache.contains(serviceConfigName)) {
subscribeMetadata(serviceConfigName);
metadataConfigCache.add(serviceConfigName);
}
}
});
if (!RPC_URI_TYPE_SET.contains(rpcType)) {
return;
}
if (registerDTOList.isEmpty()) {
URIRegisterDTO uriRegisterDTO = URIRegisterDTO.builder()
.contextPath(Constants.PATH_SEPARATOR + contextPath)
.rpcType(rpcType.getName()).build();
registerDTOList.add(uriRegisterDTO);
}
// 注册服务实例的 ip port
publishRegisterURI(registerDTOList);
} catch (NacosException e) {
throw new ShenyuException(e);
}
}
private void publishRegisterURI(final List<URIRegisterDTO> registerDTOList) {
LOGGER.info("publish uri: {}", registerDTOList);
publisher.publish(registerDTOList);
}
private String readData(final String configName) {
try {
return configService.getConfig(configName, defaultGroup, 5000);
} catch (NacosException e) {
throw new ShenyuException(e);
}
}
}
上面关于业务服务的实例信息,接口元数据如何同步,通过查看其中一个nacos的实现,看到了业务服务如何通过shenyu-client如果注册到nacos,而shenyu-admin如何从nacos接收到数据
然后我们看看 shenyu-admin 与 shenyu-bootstrap如何同步这些数据,我们主要从 websocket来看,主角是SyncDataService
spring-boot如果使用spring-boot,我们引入了对应的jar包,这里实际直接通过spring.factories & 配置方式注入
@Configuration
@ConditionalOnClass(WebsocketSyncDataService.class)
// 这里通过配置注入
@ConditionalOnProperty(prefix = "shenyu.sync.websocket", name = "urls")
public class WebsocketSyncDataConfiguration {
private static final Logger LOGGER = LoggerFactory.getLogger(WebsocketSyncDataConfiguration.class);
@Bean
public SyncDataService websocketSyncDataService(final ObjectProvider<WebsocketConfig> websocketConfig, final ObjectProvider<PluginDataSubscriber> pluginSubscriber,
final ObjectProvider<List<MetaDataSubscriber>> metaSubscribers, final ObjectProvider<List<AuthDataSubscriber>> authSubscribers) {
LOGGER.info("you use websocket sync shenyu data.......");
return new WebsocketSyncDataService(websocketConfig.getIfAvailable(WebsocketConfig::new), pluginSubscriber.getIfAvailable(),
metaSubscribers.getIfAvailable(Collections::emptyList), authSubscribers.getIfAvailable(Collections::emptyList));
}
/**
* Config websocket config.
*
* @return the websocket config
*/
@Bean
@ConfigurationProperties(prefix = "shenyu.sync.websocket")
public WebsocketConfig websocketConfig() {
return new WebsocketConfig();
}
}
可以看到和之前client的区别,client的注入是根据配置针对性注入一个实现(当然其实也可以支持多实现,但逻辑上没有必要)也就是通过@Bean注入一个接口的实现,而数据同步的注入是分别在自己的模块通过 spring.factories & 配置注入,通过配置是可以同时存在多个数据同步方式的。
同时shenyu抽象出来需要同步数据的几个维度,包括AuthDataSubscriber权限数据,MetaDataSubscriber接口元数据,PluginDataSubscriber插件数据
所有抽象的维度上述这些数据需要同步,如果分别是这些interface的方法入口,上述接口是抽象出来给shenyu-bootstrap使用,实现都是通过apache-shenyu的插件架构实现,而数据变化的生产者是通过DataChangedListener&DataChangedInit接口抽象在admin端进行发送,而bootstrap通过它进行订阅感知数据变化后的操作。而这些数据变化的消费者在对应数据同步的子类模块中如下
websocket的模块中的类
在shenyu-admin有数据同步的包
数据同步的所有实现以及左边的包核心处理接口为 DataHandler#handle(String json, String eventType)方法,AbstractDataHandler为基础实现,分别对应元数据,插件,rule规则,selectorData,权限有自己的实现,业务处理的入口类为WebsocketDataHandler
public class WebsocketDataHandler {
// 通过枚举 + map 消除 if else 或者 switch
private static final EnumMap<ConfigGroupEnum, DataHandler> ENUM_MAP = new EnumMap<>(ConfigGroupEnum.class);
/**
* Instantiates a new Websocket data handler.
*
* @param pluginDataSubscriber the plugin data subscriber
* @param metaDataSubscribers the meta data subscribers
* @param authDataSubscribers the auth data subscribers
*/
public WebsocketDataHandler(final PluginDataSubscriber pluginDataSubscriber,
final List<MetaDataSubscriber> metaDataSubscribers,
final List<AuthDataSubscriber> authDataSubscribers) {
ENUM_MAP.put(ConfigGroupEnum.PLUGIN, new PluginDataHandler(pluginDataSubscriber));
ENUM_MAP.put(ConfigGroupEnum.SELECTOR, new SelectorDataHandler(pluginDataSubscriber));
ENUM_MAP.put(ConfigGroupEnum.RULE, new RuleDataHandler(pluginDataSubscriber));
ENUM_MAP.put(ConfigGroupEnum.APP_AUTH, new AuthDataHandler(authDataSubscribers));
ENUM_MAP.put(ConfigGroupEnum.META_DATA, new MetaDataHandler(metaDataSubscribers));
}
/**
* Executor.
*
* @param type the type
* @param json the json
* @param eventType the event type
*/
public void executor(final ConfigGroupEnum type, final String json, final String eventType) {
ENUM_MAP.get(type).handle(json, eventType);
}
}
ShenyuWebsocketClient作为websocket协议和内部业务处理的适配器
public final class ShenyuWebsocketClient extends WebSocketClient {
/**
* logger.
*/
private static final Logger LOG = LoggerFactory.getLogger(ShenyuWebsocketClient.class);
private volatile boolean alreadySync = Boolean.FALSE;
private final WebsocketDataHandler websocketDataHandler;
private final Timer timer;
private TimerTask timerTask;
/**
* Instantiates a new shenyu websocket client.
*
* @param serverUri the server uri
* @param pluginDataSubscriber the plugin data subscriber
* @param metaDataSubscribers the meta data subscribers
* @param authDataSubscribers the auth data subscribers
*/
public ShenyuWebsocketClient(final URI serverUri, final PluginDataSubscriber pluginDataSubscriber,
final List<MetaDataSubscriber> metaDataSubscribers,
final List<AuthDataSubscriber> authDataSubscribers
) {
super(serverUri);
this.websocketDataHandler = new WebsocketDataHandler(pluginDataSubscriber, metaDataSubscribers, authDataSubscribers);
this.timer = WheelTimerFactory.getSharedTimer();
this.connection();
}
private void connection() {
this.connectBlocking();
// 通过 timer线程进行心跳检测
this.timer.add(timerTask = new AbstractRoundTask(null, TimeUnit.SECONDS.toMillis(10)) {
@Override
public void doRun(final String key, final TimerTask timerTask) {
// 心跳检测
healthCheck();
}
});
}
@Override
public boolean connectBlocking() {
// websocket协议的连接方法回调
boolean success = false;
try {
success = super.connectBlocking();
} catch (Exception ignored) {
}
if (success) {
LOG.info("websocket connection server[{}] is successful.....", this.getURI().toString());
} else {
LOG.warn("websocket connection server[{}] is error.....", this.getURI().toString());
}
return success;
}
@Override
public void onOpen(final ServerHandshake serverHandshake) {
// websocket协议的打开连接 回调,代表当前连接正常开启了
if (!alreadySync) {
send(DataEventTypeEnum.MYSELF.name());
alreadySync = true;
}
}
@Override
public void onMessage(final String result) {
// websocket协议的接收消息回调
handleResult(result);
}
@Override
public void onClose(final int i, final String s, final boolean b) {
// websocket协议的关闭连接回调
this.close();
}
@Override
public void onError(final Exception e) {
this.close();
}
@Override
public void close() {
alreadySync = false;
if (this.isOpen()) {
super.close();
}
}
/**
* Now close.
* now close. will cancel the task execution.
*/
public void nowClose() {
this.close();
timerTask.cancel();
}
// 看来是shenyu-bootstrap侧进行主动通信,如果是关闭状态会一直尝试连接到shenyu-admin的webscoket,开启状态进行心跳检测
private void healthCheck() {
try {
if (!this.isOpen()) {
this.reconnectBlocking();
} else {
this.sendPing();
LOG.debug("websocket send to [{}] ping message successful", this.getURI().toString());
}
} catch (Exception e) {
LOG.error("websocket connect is error :{}", e.getMessage());
}
}
@SuppressWarnings("ALL")
private void handleResult(final String result) {
LOG.info("handleResult({})", result);
WebsocketData websocketData = GsonUtils.getInstance().fromJson(result, WebsocketData.class);
ConfigGroupEnum groupEnum = ConfigGroupEnum.acquireByName(websocketData.getGroupType());
String eventType = websocketData.getEventType();
String json = GsonUtils.getInstance().toJson(websocketData.getData());
// 业务入口
websocketDataHandler.executor(groupEnum, json, eventType);
}
}
从上述代码可以看到 shenyu-bootstrap网关通过SyncDataService下沉到对应不同的子实现的模块后,通过抽象不同类型的数据接口进行操作。那么这些数据变化的事件源头则是shenyu-admin通过DataChangedInit和DataChangedListenner两个抽象的接口来操作。
DataChangedInit 是在服务启动后初始化逻辑,没有websocket的子实现,其实就是在服务启动后的一个钩子,因为nacos,consul,etcd,zookeeper相当于都需要一个中间件服务来同步数据,需要每次启动会进行一次同步,而websocket则是直连,互相能感知到上下线,可以直接在上下线中处理
public abstract class AbstractDataChangedInit implements DataChangedInit {
/**
* SyncDataService, sync all data.
*/
@Resource
private SyncDataService syncDataService;
//通过 CommandLineRunner 服务启动的钩子来初始化
@Override
public void run(final String... args) throws Exception {
if (notExist()) {
syncDataService.syncAll(DataEventTypeEnum.REFRESH);
}
}
turn boolean.
*/
protected abstract boolean notExist();
}
而所有数据变化的逻辑是通过spring提供的观察者机制,通过ApplicationEvent发布事件,由事件监听者来同步数据
分发事件的类为DataChangedEventDispatcher类
@Component
public class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {
private final ApplicationContext applicationContext;
// 可以看到也支持多种数据同步协议
private List<DataChangedListener> listeners;
public DataChangedEventDispatcher(final ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
}
@Override
@SuppressWarnings("unchecked")
public void onApplicationEvent(final DataChangedEvent event) {
for (DataChangedListener listener : listeners) {
//不同的数据类型进行一次强转,调用类是保证类型安全的。
switch (event.getGroupKey()) {
case APP_AUTH:
listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType());
break;
case PLUGIN:
listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());
break;
case RULE:
listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType());
break;
case SELECTOR:
listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());
applicationContext.getBean(LoadServiceDocEntry.class).loadDocOnSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());
break;
case META_DATA:
listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType());
break;
default:
throw new IllegalStateException("Unexpected value: " + event.getGroupKey());
}
}
}
@Override
public void afterPropertiesSet() {
Collection<DataChangedListener> listenerBeans = applicationContext.getBeansOfType(DataChangedListener.class).values();
this.listeners = Collections.unmodifiableList(new ArrayList<>(listenerBeans));
}
}
然后我们来看websocket关于DataChangedListener的实现
全部委托给WebsocketCollector处理
public class WebsocketDataChangedListener implements DataChangedListener {
@Override
public void onPluginChanged(final List<PluginData> pluginDataList, final DataEventTypeEnum eventType) {
WebsocketData<PluginData> websocketData =
new WebsocketData<>(ConfigGroupEnum.PLUGIN.name(), eventType.name(), pluginDataList);
WebsocketCollector.send(GsonUtils.getInstance().toJson(websocketData), eventType);
}
@Override
public void onSelectorChanged(final List<SelectorData> selectorDataList, final DataEventTypeEnum eventType) {
WebsocketData<SelectorData> websocketData =
new WebsocketData<>(ConfigGroupEnum.SELECTOR.name(), eventType.name(), selectorDataList);
WebsocketCollector.send(GsonUtils.getInstance().toJson(websocketData), eventType);
}
@Override
public void onRuleChanged(final List<RuleData> ruleDataList, final DataEventTypeEnum eventType) {
WebsocketData<RuleData> configData =
new WebsocketData<>(ConfigGroupEnum.RULE.name(), eventType.name(), ruleDataList);
WebsocketCollector.send(GsonUtils.getInstance().toJson(configData), eventType);
}
@Override
public void onAppAuthChanged(final List<AppAuthData> appAuthDataList, final DataEventTypeEnum eventType) {
WebsocketData<AppAuthData> configData =
new WebsocketData<>(ConfigGroupEnum.APP_AUTH.name(), eventType.name(), appAuthDataList);
WebsocketCollector.send(GsonUtils.getInstance().toJson(configData), eventType);
}
@Override
public void onMetaDataChanged(final List<MetaData> metaDataList, final DataEventTypeEnum eventType) {
WebsocketData<MetaData> configData =
new WebsocketData<>(ConfigGroupEnum.META_DATA.name(), eventType.name(), metaDataList);
WebsocketCollector.send(GsonUtils.getInstance().toJson(configData), eventType);
}
}
我们来看WebsocketCollector,它也是一个websocket客户端类
// shenyu-admin服务通过当前websocket与 shenyu-bootstrap建立连接,由admin侧的ShenyuWebsocketClient类的timer task心跳和尝试连接
@ServerEndpoint(value = "/websocket", configurator = WebsocketConfigurator.class)
public class WebsocketCollector {
private static final Logger LOG = LoggerFactory.getLogger(WebsocketCollector.class);
private static final Set<Session> SESSION_SET = new CopyOnWriteArraySet<>();
private static final String SESSION_KEY = "sessionKey";
@OnOpen
public void onOpen(final Session session) {
LOG.info("websocket on client[{}] open successful,maxTextMessageBufferSize:{}",
getClientIp(session), session.getMaxTextMessageBufferSize());
SESSION_SET.add(session);
}
private static String getClientIp(final Session session) {
Map<String, Object> userProperties = session.getUserProperties();
if (MapUtils.isEmpty(userProperties)) {
return StringUtils.EMPTY;
}
return Optional.ofNullable(userProperties.get(WebsocketListener.CLIENT_IP_NAME))
.map(Object::toString)
.orElse(StringUtils.EMPTY);
}
/**
* On message.
*
* @param message the message
* @param session the session
*/
@OnMessage
public void onMessage(final String message, final Session session) {
if (!Objects.equals(message, DataEventTypeEnum.MYSELF.name())) {
return;
}
try {
ThreadLocalUtils.put(SESSION_KEY, session);
SpringBeanUtils.getInstance().getBean(SyncDataService.class).syncAll(DataEventTypeEnum.MYSELF);
} finally {
ThreadLocalUtils.clear();
}
}
@OnClose
public void onClose(final Session session) {
clearSession(session);
LOG.warn("websocket close on client[{}]", getClientIp(session));
}
@OnError
public void onError(final Session session, final Throwable error) {
clearSession(session);
LOG.error("websocket collection on client[{}] error: ", getClientIp(session), error);
}
/**
* Send.
*
* @param message the message
* @param type the type
*/
public static void send(final String message, final DataEventTypeEnum type) {
if (StringUtils.isBlank(message)) {
return;
}
if (DataEventTypeEnum.MYSELF == type) {
Session session = (Session) ThreadLocalUtils.get(SESSION_KEY);
if (Objects.nonNull(session)) {
sendMessageBySession(session, message);
}
} else {
SESSION_SET.forEach(session -> sendMessageBySession(session, message));
}
}
// 发送同步的数据时都是 使用同步方法发送
private static synchronized void sendMessageBySession(final Session session, final String message) {
try {
session.getBasicRemote().sendText(message);
} catch (IOException e) {
LOG.error("websocket send result is exception: ", e);
}
}
private void clearSession(final Session session) {
SESSION_SET.remove(session);
ThreadLocalUtils.clear();
}
}
至此上面的发送和接收逻辑也看到了,那么总结
注册逻辑
- 客户端(业务后端)ShenyuClientRegisterRepository 上报数据
- 服务端(shenyu-admin)ShenyuClientServerRegisterRepository 接收数据
- disruptor 削峰解耦注册中心的数据 ShenyuClientServerRegisterPublisher
- 如果是元数据发布 spring的event同步到shenyu-bootstrap
(因为本文只研究了nacos,springcloud协议的网关的后端实例列表都是通过springCloud中loadbalancer逻辑实现,ribbon通过一个定时线程从注册中心拉取服务实例列表)其他非nacos未了解
数据同步
- 启动时利用DataChangedInit从注册中心同步全量数据(websocket除外,websocket直连,可以直接感知到连接和断开)
- 通过DataChangedEventDispatcher利用spring的applicationEvent监听到数据变化事件
- 调用DataChangedListener 数据变化逻辑