soul网关学习6-dubbo协议转换1

2021-01-20  本文已影响0人  niuxin

我们知道协议转换也是API网关常见的一个功能,这次我们看下soul网关是如何实现协议转换的。

请求流图

请求流图

大致流程:

一、dubbo服务提供者注册服务到soul-admin

  1. 使用@SoulDubboClient注解,暴露http服务地址到soul-admin
    dubbo服务提供者注册服务
  2. 接下来在启动过程中,通过spring的BeanPostProcessor机制,对dubbo服务接口做拦截处理。主要逻辑就是先获取到所有dubbo服务的ServiceBean对象,再遍历这些ServiceBean,其接口方法上看是否存在@SoulDubboClient注解;若是,则会调用soul-admin的dubbo服务client的注册接口/soul-client/dubbo-register,完成路由信息的注册。该逻辑由 org.dromara.soul.client.alibaba.dubbo.AlibabaDubboServiceBeanPostProcessor完成
    private void handler(final ServiceBean<?> serviceBean) {
        Class<?> clazz = serviceBean.getRef().getClass();
        if (ClassUtils.isCglibProxyClass(clazz)) {
            String superClassName = clazz.getGenericSuperclass().getTypeName();
            try {
                clazz = Class.forName(superClassName);
            } catch (ClassNotFoundException e) {
                log.error(String.format("class not found: %s", superClassName));
                return;
            }
        }
        // 获取class的去重后的方法,包括父类
        final Method[] methods = ReflectionUtils.getUniqueDeclaredMethods(clazz);
        for (Method method : methods) {
            SoulDubboClient soulDubboClient = method.getAnnotation(SoulDubboClient.class);
            // 如果SoulDubboClient不为空,则注册服务信息到admin
            if (Objects.nonNull(soulDubboClient)) {
                RegisterUtils.doRegister(buildJsonParams(serviceBean, soulDubboClient, method), url, RpcTypeEnum.DUBBO);
            }
        }
    }
  1. 【soul-admin】接收到注册请求并处理org.dromara.soul.admin.controller.SoulClientController
    @PostMapping("/dubbo-register")
    public String registerRpc(@RequestBody final MetaDataDTO metaDataDTO) {
        return soulClientRegisterService.registerDubbo(metaDataDTO);
    }
  1. dubbo的注册请求逻辑:将元数据、选择器、规则进行落库(更新或插入);org.dromara.soul.admin.service.impl.SoulClientRegisterServiceImpl
    register-dubbo
  2. 落库完成后,会发布配置变更事件,给到soul-bootstrap
    private void publishEvent(final RuleDO ruleDO, final List<RuleConditionDTO> ruleConditions) {
        SelectorDO selectorDO = selectorMapper.selectById(ruleDO.getSelectorId());
        PluginDO pluginDO = pluginMapper.selectById(selectorDO.getPluginId());

        List<ConditionData> conditionDataList =
                ruleConditions.stream().map(ConditionTransfer.INSTANCE::mapToRuleDTO).collect(Collectors.toList());
        // publish change event.
        eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.RULE, DataEventTypeEnum.UPDATE,
                Collections.singletonList(RuleDO.transFrom(ruleDO, pluginDO.getName(), conditionDataList))));
    }

二、配置变更事件发送给soul-bootstrap

  1. 从上我们得知每次的配置变更会通过spring的应用事件机制发送一个DataChangedEvent事件,经过在源码中搜索关键字ApplicationListener<DataChangedEvent>能定位到事件的回调处理类为org.dromara.soul.admin.listener.DataChangedEventDispatcher
    public void onApplicationEvent(final DataChangedEvent event) {
        // 遍历事件变更的监听者,根据不同的事件类型,调用相应事件的处理逻辑
        for (DataChangedListener listener : listeners) {
            switch (event.getGroupKey()) {
                case APP_AUTH:
                    listener.onAppAuthChanged((List<AppAuthData>) event.getSource(), event.getEventType());
                    break;
                case PLUGIN:
                    listener.onPluginChanged((List<PluginData>) event.getSource(), event.getEventType());
                    break;
                case RULE:
                    listener.onRuleChanged((List<RuleData>) event.getSource(), event.getEventType());
                    break;
                case SELECTOR:
                    listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());
                    break;
                case META_DATA:
                    listener.onMetaDataChanged((List<MetaData>) event.getSource(), event.getEventType());
                    break;
                default:
                    throw new IllegalStateException("Unexpected value: " + event.getGroupKey());
            }
        }
    }
  1. 再看下这里的DataChangedListener又是如何来的?为一个接口,查看其实现,得知存在如下几种事件处理的方式

    DataChangedListeners
  2. 结合官网得知soul-admin的配置信息同步到soul-bootstap存在如下几种方式:http长轮询、Nacos配置同步、Zookeeper配置同步、websocket连接同步

  3. 我们查看到数据同步支持的方式,org.dromara.soul.admin.config.DataSyncConfiguration

    DataSyncConfiguration
    default sync
  4. 从上图得知,具体使用哪种配置同步方式,又由属性参数决定

soul.sync.http.enabled=true
# 默认
soul.sync.websocket.enabled=true
soul.sync.zookeeper.url=xxx
soul.sync.nacos.url=xxx
  1. 默认同步方式为websocket,我们继续跟踪源码,看下websocket的同步方式是怎么实现的
  2. 当前存在的配置数据类型如下:
    • APP_AUTH --- app权限配置
    • PLUGIN --- 插件配置
    • RULE --- 规则配置
    • SELECTOR --- 选择器配置
    • META_DATA --- 元数据配置
  3. org.dromara.soul.admin.listener.websocket.WebsocketDataChangedListener为数据变更的监听的webscoket实现,存在上述几种配置数据的处理
    WebsocketDataChangedListener
  4. 我们跟踪下一个具体配置的处理onPluginChanged,会发现调用org.dromara.soul.admin.listener.websocket.WebsocketCollector的send方法,将消息发送给webscoket的客户端,也就是soul-bootstrap
    public void onPluginChanged(final List<PluginData> pluginDataList, final DataEventTypeEnum eventType) {
        WebsocketData<PluginData> websocketData =
                new WebsocketData<>(ConfigGroupEnum.PLUGIN.name(), eventType.name(), pluginDataList);
        // 通过websocket收集器发送变更消息
        WebsocketCollector.send(GsonUtils.getInstance().toJson(websocketData), eventType);
    }
  1. WebsocketCollector.send逻辑
public static void send(final String message, final DataEventTypeEnum type) {
        if (StringUtils.isNotBlank(message)) {
            // 自身的消息
            if (DataEventTypeEnum.MYSELF == type) {
                try {
                    session.getBasicRemote().sendText(message);
                } catch (IOException e) {
                    log.error("websocket send result is exception: ", e);
                }
                return;
            }
            // 其他消息则遍历所有的websocket请求,依次发送消息;这里有多个session,因soul-admin可能是集群
            for (Session session : SESSION_SET) {
                try {
                    session.getBasicRemote().sendText(message);
                } catch (IOException e) {
                    log.error("websocket send result is exception: ", e);
                }
            }
        }
    }
  1. 这里还有全量同步的情况,当soul-bootstrap启动的时候,会给soul-admin发一个self的消息,soul-admin接收到消息后,会获取当前所有的配置类型数据,每次生成一个类型的消息,发给soul-bootstrap。这样就完成了配置的全量同步。
public void onMessage(final String message, final Session session) {
        // 收到myself的消息,用于bootstrap初始连接上来的时候,相当于是给admin自己的消息
        // 该消息会同步所有类型的配置数据,给到连上来的这个bootstrap
        // 如果有多台bootstrap同时连接上来,又会怎样?onMessage保证了有序性?这里应该存在并发问题
        // TODO question
        if (message.equals(DataEventTypeEnum.MYSELF.name())) {
            WebsocketCollector.session = session;
            SpringBeanUtils.getInstance().getBean(SyncDataService.class).syncAll(DataEventTypeEnum.MYSELF);
        }
    }
  1. 使用websocket方式的同步时,需要soul-adminsoul-bootstrap是互通的
上一篇下一篇

猜你喜欢

热点阅读