DDD开发技巧

关于事件驱动架构在工作中的一些想法及实践

2021-12-19  本文已影响0人  javacoo

生活中每天都在发生各种各样的事件,有的被我们关注,有的被我们忽略...

人的精力是有限的,如果对于各种纷繁复杂的事件不加以甄别,筛选,过滤,而一股脑儿的全盘接收,在那些无关紧要的事件上浪费太多的时间与精力,最终只会事倍功半,得不偿失。所以我们只关注我们关心的,重要的事件,当事件发生时才予以相应的回应或动作:我们关注的事件发生,触发了我们的下一步动作------我们的生活不正是由一个一个的事件驱动着前行的吗?:)

应用系统亦是如此,在系统设计时根据不同的需求和使用场景有很多种架构风格可供选择,比如数据流风格、调用/返回风格、独立构件风格、虚拟机风格、仓库风格等:

一个业务系统可能是多种架构风格的综合运用,根据进件业务的特点,在设计进件业务模块的时候,可以考虑采用独立构件风格中的事件驱动系统(隐式调用)风格,或者事件驱动架构风格。

接下来结合事件驱动架构谈谈我对进件业务的一些理解和想法...

进件业务模型中的领域事件

事件订阅.png 原进件系统.png 重构后系统.png 交互模型2.png

现在的交互的逻辑和目前系统现实情况比较贴近,可以在较小的代价下完成现有系统的重构,且可以快速接入新的业务系统。

从上图可以看到进件系统的业务职责进一步抽离到各个业务系统中,只需要关进件事件的订阅及推送,但是此时我们关注的还是进件这一单一领域,我们能不能更进一步,让系统更具通用性,普适性?答案是肯定的:理想状态应该是有一个跨业务领域事件总线(BizEvent-Bus)来统一处理领域事件的发布,订阅与推送,各个系统产生业务领域事件,发布到BizEvent-Bus,同时又从BizEvent-Bus订阅感兴趣的业务领域事件,交互模型如下:

BizEvent-Bus.png

这是理想模型,也是框架后续演进的方向和目标。

接下来谈谈我的一些设计吧...

业务领域事件框架(BizEvents-Framework)

事件驱动架构(Event Driven Architecture,EDA)一个事件驱动框架(EDA)定义了一个设计和实现一个应用系统的方法学,在这个系统里事件可传输于松散耦合的组件和服务之间。一个事件驱动系统典型地由事件消费者和事件产生者组成。事件消费者向事件管理器订阅事件,事件产生者向事件管理器发布事件。当事件管理器从事件产生者那接收到一个事件时,事件管理把这个事件转送给相应的事件消费者。如果这个事件消费者是不可用的,事件管理者将保留这个事件,一段间隔之后再次转送该事件消费者------百度百科

一,简介

二,交互模型

业务系统发布业务领域事件,分为领域内事件和跨领域事件,领域内事件通过本地事件处理机制处理,跨领域事件通过Runtime首先由Register经过过滤和排序查找订阅了此事件的客户端,再由Stroe对事件进行存储,然后推送到MQ中指定客户端队列,再由MQ客户端通道消费端调用Remote,Remote将事件包装成约定好的协议,通过HTTP/RPC方式推送到对应的客户端,如果推送失败,则重试指定次数,如果任然失败,则后续由Task定时任务处理,客户端则通过集成BizEvent-SDK,实现服务端的接入,接收推送的事件通知。

BizEvent-Framework.png

三,核心特性与能力

四,架构说明

1,BizEvents-Framework

plugin.png

插件基于xkernel 提供的SPI机制实现,扩展非常方便,大致步骤如下:

plugin-file.png
  1. Stroe插件:主要负责事件的存储,支持对接多种事件存储机制,默认实现:JPA

    类结构如下:

store.png

系统第一次启动将创建务领域事件信息表,业务领域事件MQ异常信息表,业务领域事件任务表,插件通过业务领域事件持久化服务对外提供服务。

  1. MQ插件:主要负责事件的发布与订阅,支持对接多种消息中间件,默认实现:RabbitMQ。

    类结构如下:

mq.png

基于RabbitMQ 的延迟队列+死信队列实现进件信息的可靠投递与消费,支持根据不同分组生成不同分组交换机,路由KEY和队列,以实现不同分组业务处理的隔离:

  1. Remote插件:主要负责向客户端推送订阅的领域事件,支持HTTP和RPC模式。默认实现分别是:Fegin和Dubbo。

    类结构如下:

Remote.png

为MQ插件提供远程调用服务,实现了HTTP和RPC(dubbo)方式调用:

  1. Task插件:主要负责定时从推送失败消息表中获取推送失败的领域事件消息,重新发送到MQ中,继续推送到指定客户端。默认实现:XXLJob

    类结构如下:

    Task.png

通过定时任务获取推送异常的业务事件信息,重新推送到MQ中,执行推送流程,默认采用XXLJOB:

  1. Register插件:主要负责客户端系统注册业务领域事件,默认实现:基于配置文件

    类结构如下:

Registry.png

通过客户端系统注册信息实现事件的过滤,达到精准推送,按需推送的目的:

2,BizEvent-SDK

BizEventDispatcher.png

接下来分两部分来说明:

  1. SDK初始化

    系统集成BizEvent-SDK ,系统启动中根据SDK包中spring.factories自动装配相关组件(根据接入配置(见安装教程->客户端),初始化如 HTTP和RPC(dubbo),过滤器,解析器等),spring.factories配置如下:

    org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
      com.javacoo.events.client.config.BizEventClientConfig,\
      com.javacoo.events.client.rpc.dubbo.config.DubboConfig,\
      com.javacoo.events.client.rpc.dubbo.DubboAutoConfiguration,\
      com.javacoo.events.client.rpc.dubbo.CustomDubboAutoConfiguration,\
      com.javacoo.events.client.http.HttpAutoConfiguration,\
      com.javacoo.events.client.BizEventListenerRegistrar
    
    • BizEventClientConfig:完成了过滤器,解析器的初始化

      解析器:默认实现基于Spring SpelExpressionParser实现。EL表达式介绍及使用文档参见 Spring官网。这里需要注意是,SDK约定是基于对象方式访问,对象名称为eventObject,具体使用参见 使用说明->客户端,关键代码如下:

      /**
        * 获取上下文
        */
       private EvaluationContext getContext(Object root){
              //设置上下文
              EvaluationContext context = new StandardEvaluationContext();
              context.setVariable("eventObject",root);
              return context;
          }
      

      过滤器:过滤器基于自定义函数式接口BizEventFilter实现,定义了匹配方法及辅助方法,支持并操作,或操作,非操作,可实现不同规则下过滤器的灵活组装。默认提供了事件类型过滤器,事件主题过滤器,事件版本过滤器,事件对象参数过滤器,其中前三个过滤器默认实现是简单等值比较,事件对象参数过滤器则是通过解析器解析领域事件中事件对象,获取对象中指定字段的值,与配置的值比较,支持正则表达式匹配。这些过滤器最终通过事件过滤器规则对象组合在一起,默认规则是并操作,关键代码如下:

      业务领域事件过滤接口

      @FunctionalInterface
      public interface BizEventFilter<U,T>{
          /**
           * 匹配
           */
          boolean matcher(U u,T t);
          /**
           * 并操作
           */
          default BizEventFilter<U,T> and(BizEventFilter<? super U,? super T> other) {
              Objects.requireNonNull(other);
              return (u,t) -> matcher(u,t) && other.matcher(u,t);
          }
          /**
           * 或操作
         */
          default BizEventFilter<U,T> or(BizEventFilter<? super U,? super T> other) {
            Objects.requireNonNull(other);
              return (u,t) -> matcher(u,t) || other.matcher(u,t);
          }
          /**
           * 非操作
           */
          default BizEventFilter<U,T> negate() {
              return (u,t) -> !matcher(u,t);
          }
      }
      

      事件对象参数过滤器

      @Slf4j
      public class ParamBizEventFilter implements BizEventFilter<BizEventReq, BizEventListenerInfo> {
          /**
           * 匹配
           */
          @Override
          public boolean matcher(BizEventReq bizEventReq, BizEventListenerInfo bizEventListenerInfo) {
              Objects.requireNonNull(bizEventReq);
              Objects.requireNonNull(bizEventReq.getEventObject());
              Objects.requireNonNull(bizEventListenerInfo);
              log.info("开始表达式匹配:{}", bizEventListenerInfo.getParamEL());
              if(StringUtils.isBlank(bizEventListenerInfo.getParamEL())){
                  return true;
              }
              //不支持表达式
              if(!ExpressionParserHolder.getSpelExpressionParser().isPresent()){
                  log.warn("不支持表达式匹配");
                  return false;
              }
              //获取表达式解析器
              ExpressionParser expressionParser = ExpressionParserHolder.getSpelExpressionParser().get();
              //获取参数class对象
            Class paramClass = bizEventListenerInfo.getTargetMethod().getParameterTypes()[0];
              //获取业务领域事件对象JSON字符串
            final String eventObjectJsonString = JSONObject.toJSONString(bizEventReq.getEventObject());
              //得到目标参数对象
              Object eventObject = JSONObject.parseObject(eventObjectJsonString,paramClass);
              //获取值
              Object valueObj = expressionParser.getValue(bizEventListenerInfo.getEl(),eventObject);
              log.info("取值表达式:{},取值结果:{}", bizEventListenerInfo.getEl(),valueObj);
              if(valueObj == null){
                  return false;
              }
              boolean matches = DataUtil.matcher(bizEventListenerInfo.getElValue(),valueObj.toString());
              log.info("值匹配表达式:{},待匹配值:{},匹配结果:{}", bizEventListenerInfo.getElValue(),valueObj.toString(),matches);
              return matches;
          }
      }
      

      事件类型过滤器

      @Slf4j
      public class EventTypeBizEventFilter implements BizEventFilter<BizEventReq, BizEventListenerInfo> {
          /**
           * 匹配
           */
          @Override
          public boolean matcher(BizEventReq bizEventReq, BizEventListenerInfo bizEventListenerInfo) {
              Objects.requireNonNull(bizEventReq);
              Objects.requireNonNull(bizEventListenerInfo);
              log.info("开始事件类型匹配,事件中的类型:{},注册的类型:{}",bizEventReq.getEventType(), bizEventListenerInfo.getEventType());
              if(StringUtils.isBlank(bizEventListenerInfo.getEventType())){
                  return false;
              }
              return bizEventReq.getEventType().equals(bizEventListenerInfo.getEventType());
          }
      }
      

      事件过滤器规则

      @Slf4j
      public class SimpleBizEventFilterRule implements BiFunction<BizEventReq, BizEventListenerInfo,Boolean> {
          /** 事件类型过滤器 */
          private BizEventFilter<BizEventReq, BizEventListenerInfo> eventTypeBizEventFilter;
          /** 事件主题过滤器 */
          private BizEventFilter<BizEventReq, BizEventListenerInfo> topicBizEventFilter;
          /** 事件版本过滤器 */
          private BizEventFilter<BizEventReq, BizEventListenerInfo> versionBizEventFilter;
          /** 事件对象参数过滤器 */
          private BizEventFilter<BizEventReq, BizEventListenerInfo> paramBizEventFilter;
          @PostConstruct
          public void init(){
              eventTypeBizEventFilter = new EventTypeBizEventFilter();
              topicBizEventFilter = new TopicBizEventFilter();
              versionBizEventFilter = new VersionBizEventFilter();
              paramBizEventFilter = new ParamBizEventFilter();
              log.info("事件过滤器规则对象初始化完成");
          }
          @Override
          public Boolean apply(BizEventReq bizEventReq, BizEventListenerInfo bizEventListenerInfo) {
              return eventTypeBizEventFilter.and(topicBizEventFilter).and(versionBizEventFilter).and(paramBizEventFilter).matcher(bizEventReq,
                  bizEventListenerInfo);
          }
      }
      
    • DubboConfig,DubboAutoConfiguration,CustomDubboAutoConfiguration三个类,实现了RPC方式接收事件推送通知。

      CustomDubboAutoConfiguration自动配置类,在客户端系统未使用dubbo的情况下使用,需要配置dubbo参数,具体配置参见 安装教程->客户端 代码如下:

      @Slf4j
      @Configuration(proxyBeanMethods = false)
      @DubboComponentScan("com.javacoo.events.client.rpc.dubbo.service")
      @EnableConfigurationProperties(value = BizEventDubboConfig.class)
      @ConditionalOnClass(BizEventDubboConfig.class)
      @ConditionalOnProperty(prefix = BizEventDubboConfig.PREFIX, name=BizEventDubboConfig.CUSTOM_KEY,havingValue = BizEventDubboConfig.CUSTOM_TRUE)
      public class CustomDubboAutoConfiguration {
          @Autowired
          private DubboConfig dubboConfig;
          @Bean("biz-event-annotation-provider")
          public ApplicationConfig applicationConfig() {
              return dubboConfig.getApplication();
          }
          @Bean("biz-event-registry")
          public RegistryConfig registryConfig() {
              return dubboConfig.getRegistry();
          }
      
          @Bean("biz-event-protocol")
          public ProtocolConfig protocolConfig() {
              return dubboConfig.getProtocol();
          }
      
          @Bean("biz-event-centerConfig")
          public ConfigCenterConfig configCenterConfig(){
              return dubboConfig.getConfigCenter();
          }
          @Bean
          public BizEventDispatcher bizEventDispatcher(){
              return new BizEventDispatcher();
          }
      }
      

      业务事件通知实现类

      /**
       * 业务事件通知实现类
       * <li>基于dubbo实现</li>
       * <li>version:版本,根据约定,需与接入平台保持一致</li>
       * <li>group:分组,根据约定,需与接入平台保持一致</li>
       */
      @Slf4j
      @DubboService(interfaceClass = IBizEventNoticeService.class, version = "${biz.event.client.rpc.dubbo.service.version}", retries = -1,group = "${biz.event.client.rpc.dubbo.service.group}")
      public class DubboBizEventNoticeService implements IBizEventNoticeService {
          @Autowired
          private BizEventPersistence bizEventPersistence;
          @Autowired
          private BizEventDispatcher bizEventDispatcher;
          /**
           * 通知业务领域事件
           */
          @Override
          public BaseResp notice(BizEventReq bizEventReq) {
              log.info("收到业务领域事件通知:{}",bizEventReq);
              try{
                  //持久化业务领域事件
                  bizEventPersistence.persist(bizEventReq);
                  //分发业务领域事件
                  bizEventDispatcher.dispatch(bizEventReq);
            }catch (Exception e){
                  log.error("业务领域事件分发失败",e);
                return BaseResp.fail(e.getMessage());
              }
              return BaseResp.ok();
          }
      }
      

      DubboAutoConfiguration自动配置类,在客户端已经使用dubbo的情况下使用,主要思想是利用ServiceConfig暴露服务,代码如下:

      /**
       * dubbo自动配置
       * <li>依赖已有配置,注册服务</li>
       */
      @Slf4j
      @Configuration(proxyBeanMethods = false)
      @AutoConfigureOrder(5)
      @EnableConfigurationProperties(value = BizEventDubboConfig.class)
      @ConditionalOnClass(value = {BizEventDubboConfig.class})
      @ConditionalOnProperty(prefix = BizEventDubboConfig.PREFIX, name=BizEventDubboConfig.CUSTOM_KEY,havingValue = BizEventDubboConfig.CUSTOM_DEFAULT)
      public class DubboAutoConfiguration {
          ...
          @Bean
          public BizEventDispatcher bizEventDispatcher(){
              return new BizEventDispatcher();
          }
          @Bean
          public IBizEventNoticeService bizEventNoticeService(){
              IBizEventNoticeService bizEventNoticeService = new DubboBizEventNoticeService();
              ServiceConfig config = new ServiceConfig();
              config.setApplication(applicationConfig);
              List<RegistryConfig> registryConfigs = new ArrayList<>(1);
              registryConfigs.add(registryConfig);
              config.setRegistries(registryConfigs);
              config.setProtocol(protocolConfig);
              config.setConfigCenter(configCenterConfig);
              config.setGroup(bizEventDubboConfig.getService().getGroup());
            config.setVersion(bizEventDubboConfig.getService().getVersion());
              config.setInterface("com.javacoo.events.api.IBizEventNoticeService");
            config.setRef(bizEventNoticeService);
              config.export();
              return bizEventNoticeService;
          }
      }
      
    • HttpAutoConfiguration自动配置类,完成Http方式接入,实现很简单,按照约定对外暴露/notice/bizEvent接口方法,代码如下:

      @Slf4j
      @Configuration
      @EnableConfigurationProperties(value = HttpConfig.class)
      @ConditionalOnClass(HttpConfig.class)
      @ConditionalOnProperty(prefix = HttpConfig.PREFIX, value = HttpConfig.ENABLED, matchIfMissing = true)
      public class HttpAutoConfiguration {
          /**
           * http配置
           */
          @Autowired
          private HttpConfig HttpConfig;
          @Bean
          public HttpBizEventNoticeController httpBizEventNoticeController() {
              return new HttpBizEventNoticeController();
          }
          @Bean
        public BizEventDispatcher bizEventDispatcher(){
              return new BizEventDispatcher();
          }
      }
      

      业务领域事件通知控制器

      @Slf4j
      @RestController
      @RequestMapping("/notice")
      public class HttpBizEventNoticeController {
      ....
          /**
           * 业务领域事件通知
           */
          @RequestMapping("/bizEvent")
        @ResponseBody
          public BaseResp notice(@RequestBody BizEventReq bizEventReq) {
            log.info("HttpBizEventNoticeController收到业务领域事件通知:{}",bizEventReq);
              try{
                //持久化业务领域事件
                  bizEventPersistence.persist(bizEventReq);
                  //分发业务领域事件
                  bizEventDispatcher.dispatch(bizEventReq);
              }catch (Exception e){
                  log.error("业务领域事件分发失败",e);
                  return BaseResp.fail(e.getMessage());
              }
              return BaseResp.ok();
          }
      }
      
    • BizEventListenerRegistrar业务领域事件监听对象注册服务:系统集成BizEvent-SDK (配置见安装教程->客户端),Spring容器启动之后,便会运行业务领域事件监听对象注册服务(BizEventListenerRegistrar),扫描Spring容器中包含@Service注解的服务,找到其中添加了@BizEventListener注解的方法,解析并组装业务领域事件监听信息(包括事件类型,事件主题,事件版本,事件对象参数匹配:EL表达式,目标方法),完成业务事件监听元数据集合的组装,供后续使用。关键代码如下:

      业务领域事件监听对象注册服务

      @Slf4j
      @Component
      public class BizEventListenerRegistrar implements ApplicationContextAware {
          /** 业务事件监听元数据集合 */
          private static List<BizEventListenerMetaData> bizEventListenerMetaDataList = new ArrayList<>();
          @Override
          public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
              //设置上下文
              ApplicationContextProvider.setApplicationContext(applicationContext);
              //扫描Service注解,初始化业务事件监听元数据集合
              initBizEventMetaDataList(Service.class);
          }
          /**
           * 扫描指定注解服务,初始化业务事件监听元数据集合s对象
           * @return: void
           */
          private void initBizEventMetaDataList(Class<? extends Annotation> annClass){
              //查找Service
              Map<String, Object> serviceMap = ApplicationContextProvider.getApplicationContext().getBeansWithAnnotation(annClass);
              for (Map.Entry<String, Object> entry : serviceMap.entrySet()) {
                  Class entryClass = AopUtils.getTargetClass(entry.getValue());
                  //获取业务领域事件监听信息集合
                  List<BizEventListenerInfo> bizEventListenerInfos = Arrays.stream(entryClass.getDeclaredMethods())
                      //获取本类 public方法
                      .filter(method -> Modifier.isPublic(method.getModifiers()))
                      //找到注解所在方法
                      .filter(method -> method.isAnnotationPresent(BizEventListener.class))
                      //只支持监听一个参数
                      .filter(method -> method.getParameterTypes().length == 1)
                      //排序
                      .sorted(Comparator.comparing(method -> method.getAnnotation(BizEventListener.class).order()))
                      //组装
                      .map(method -> getBizEventInfo(method))
                      .collect(Collectors.toList());
                  if(bizEventListenerInfos.isEmpty()){
                      continue;
                  }
            bizEventListenerMetaDataList.add(BizEventListenerMetaData.builder().beanName(entry.getKey()).bizEventListenerInfos(bizEventListenerInfos).targetClass(entryClass).build());
              }
              log.info("业务领域事件监听对象注册数量:{},对象:{}", bizEventListenerMetaDataList.size(), bizEventListenerMetaDataList);
          }
          /**
           * 获取方法上的注解信息,组装业务领域事件监听信息
           */
          private BizEventListenerInfo getBizEventInfo(Method method){
              BizEventListener bizEventListener = method.getAnnotation(BizEventListener.class);
              return BizEventListenerInfo.builder()
                  .eventType(bizEventListener.eventType())
                  .topic(bizEventListener.topic())
                  .version(bizEventListener.version())
                  .paramEL(bizEventListener.paramEl())
                .targetMethod(method)
                  .build();
        }
          /**
           * 获取业务事件监听元数据集合
           */
          public static Optional<List<BizEventListenerMetaData>> getBizEventListenerMetaDataList(){
              return bizEventListenerMetaDataList.isEmpty() ? Optional.empty() : Optional.of(bizEventListenerMetaDataList);
          }
      }
      

      业务领域事件监听注解

      @Inherited
      @Documented
      @Target(ElementType.METHOD)
      @Retention(RetentionPolicy.RUNTIME)
      public @interface BizEventListener {
          /**
           * 业务事件类型
           */
          String eventType() default "";
          /**
           * 业务事件主题
         */
          String topic() default "";
        /**
           * 业务事件版本
         */
          String version() default "";
        /**
           * 参数EL匹配表达式->格式:EL表达式=取值
           */
          String paramEl() default "";
          /**
           * 排序
           */
          int order() default 0;
      }
      

    至此SDK初始化便完成。

  2. 事件处理

    通知事件的处理,则通过业务领域事件持久化服务BizEventPersistence和业务领域事件分发器BizEventDispatcher完成,业务领域事件持久化服务完成事件的持久化处理,SDK提供默认持久化服务(什么都不做),客户端可根据业务需求,扩展实现。业务领域事件分发器主要是根据接收到业务领域事件通知信息,从系统启动时收集到的业务事件监听元数据集合中依次调用事件过滤器规则服务SimpleBizEventFilterRule找到匹配的事件监听方法,利用反射机制执行。执行分异步和同步执行,由下发的通知事件中async参数决定,默认是异步的,执行时,具有相同事件匹配规则的方法是按照order自然排序后,顺序执行的,且传递的事件对象是同一个对象,支持修改。 代码如下:

    @Slf4j
    public class BizEventDispatcher {
        /**
         * 事件过滤器规则
         */
        @Autowired
        private SimpleBizEventFilterRule simpleBizEventFilterRule;
        /**
         * 任务执行器
         */
        private TaskExecutor taskExecutor;
        @PostConstruct
        public void init(){
            taskExecutor = new TaskExecutor() {
                ExecutorService executorService = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(),
                    0L, TimeUnit.MILLISECONDS,
                    new LinkedBlockingQueue<>(), Executors.defaultThreadFactory());
                @Override
                public void execute(Runnable task) {
                    executorService.execute(task);
                }
            };
        }
        public void dispatch(final BizEventReq bizEventReq){
            //异步执行
            if (bizEventReq.async()) {
                log.info("异步分发业务领域事件");
                taskExecutor.execute(()->doDispatch(bizEventReq));
            } else {
                log.info("同步分发业务领域事件");
                doDispatch(bizEventReq);
            }
        }
        private void doDispatch(final BizEventReq bizEventReq){
           if(!BizEventListenerRegistrar.getBizEventListenerMetaDataList().isPresent()){
                log.warn("未注册事件监听方法:{}",bizEventReq.getEventType());
                return;
            }
            //获取业务事件监听元数据集合
            List<BizEventListenerMetaData> bizEventMetaDataList = BizEventListenerRegistrar.getBizEventListenerMetaDataList().get();
            //获取业务领域事件对象JSON字符串
            final String eventObjectJsonString = JSONObject.toJSONString(bizEventReq.getEventObject());
            //业务领域事件对象Map:相同参数监听方法间传递同一业务领域事件对象
            Map<String,Object> eventObjectMap = new HashMap<>();
            //执行分发
            bizEventMetaDataList.forEach(bizEventListenerMetaData -> {
                //根据spring bean名称查找bean对象
                final Object beanObject = ApplicationContextProvider.getBean(bizEventListenerMetaData.getBeanName());
                bizEventListenerMetaData.getBizEventListenerInfos().parallelStream()
                        .filter(bizEventListenerInfo -> simpleBizEventFilterRule.apply(bizEventReq,bizEventListenerInfo))
                        .forEachOrdered(bizEventListenerInfo -> execute(eventObjectJsonString,eventObjectMap,beanObject,bizEventListenerInfo.getTargetMethod()));
            });
        }
        private void execute(String eventObjectJsonString, Map<String, Object> eventObjectMap, Object beanObject, Method method) {
            Class paramClass = method.getParameterTypes()[0];
            Object eventObject = null;
            if (eventObjectMap.containsKey(paramClass.getName())) {
                eventObject = eventObjectMap.get(paramClass.getName());
            } else {
                eventObject = JSONObject.parseObject(eventObjectJsonString,paramClass);
                eventObjectMap.put(paramClass.getName(), eventObject);
            }
            try {
                method.invoke(beanObject, eventObject);
            } catch (IllegalAccessException e) {
                e.printStackTrace();
                log.error("方法访问异常",e);
            } catch (InvocationTargetException e) {
                e.printStackTrace();
                log.error("方法调用异常",e);
            }
        }
    }
    

3,Protocol

五,安装教程

1,服务端
  1. 引入依赖包:

           <dependency>
                <groupId>com.javacoo</groupId>
                <artifactId>bizevents-runtime</artifactId>
                <version>1.0.0</version>
            </dependency>
    
  2. 配置参数:以下是接入的最小配置示例,其中一些基础依赖配置如:RabbitMQ,数据源等未列出,按照与SpringBoot 集成方式配置即可。

    ## =============================业务领域事件组件配置================开始
    ## 业务领域事件注册插件配置-默认配置文件方式
    # TEST001 TEST001->表示接入的客户端系统ID,event-type->表示注册的业务领域事件
    biz.event.registry.plugin.file.groups.TEST001[0].event-type=com.javacoo.xservice.example.bean.event.ServerLoanApplyEvent
    # TEST001 TEST001->表示接入的客户端系统ID,topic->事件对象参数EL表达式,格式:EL表达式=取值->取值精准匹配
    biz.event.registry.plugin.file.groups.TEST001[0].param-eL=#eventObject.userType=A
    # TEST001 TEST001->表示接入的客户端系统ID,event-type->表示注册的业务领域事件
    biz.event.registry.plugin.file.groups.TEST001[1].event-type=com.javacoo.xservice.example.bean.event.ServerLoanApplyEvent
    # TEST001 TEST001->表示接入的客户端系统ID,topic->事件对象参数EL表达式,格式:EL表达式=取值->取值正则表达式匹配
    biz.event.registry.plugin.file.groups.TEST001[1].param-eL=#eventObject.userName=.*徐.*
    # TEST001 TEST001->表示接入的客户端系统ID,event-type->表示注册的业务领域事件
    biz.event.registry.plugin.file.groups.TEST001[2].event-type=com.javacoo.xservice.example.bean.event.ServerLoanApplyEvent
    # TEST001 TEST001->表示接入的客户端系统ID,version->事件版本
    biz.event.registry.plugin.file.groups.TEST001[2].version=1.0
    # TEST001 TEST001->表示接入的客户端系统ID,event-type->表示注册的业务领域事件
    biz.event.registry.plugin.file.groups.TEST001[3].event-type=com.javacoo.xservice.example.bean.event.ServerLoanApplyEvent
    # TEST001 TEST001->表示接入的客户端系统ID,topic->事件主题
    biz.event.registry.plugin.file.groups.TEST001[3].topic=天下第二
    # TEST001 TEST001->表示接入的客户端系统ID,event-type->表示注册的业务领域事件
    biz.event.registry.plugin.file.groups.TEST001[4].event-type=com.javacoo.xservice.example.bean.event.ServerLoanApplyEvent
    # TEST001 TEST001->表示接入的客户端系统ID,version->事件版本
    biz.event.registry.plugin.file.groups.TEST001[4].version=1.0
    # TEST001 TEST001->表示接入的客户端系统ID,topic->事件主题
    biz.event.registry.plugin.file.groups.TEST001[4].topic=桃花剑神
    biz.event.registry.plugin.file.groups.TEST001[5].event-type=com.javacoo.xservice.example.bean.event.ServerExampleEvent
    # TEST002 业务领域事件注册配置-配置文件方式-接入:其中:TEST002-表示接入的客户端系统ID,event-type->表示注册的业务领域事件
    #biz.event.registry.plugin.file.groups.TEST002[0].event-type=com.javacoo.xservice.example.bean.event.ServerLoanApplyEvent
    #biz.event.registry.plugin.file.groups.TEST002[1].event-type=com.javacoo.xservice.example.bean.event.ServerExampleEvent
    
    
    ## 业务领域事件-MQ插件配置-默认RabbitMQ
    # TEST001 推送类型配置-RPC
    biz.event.mq.plugin.groups.TEST001.remote-type=RPC
    # TEST002 推送类型配置-HTTP
    biz.event.mq.plugin.groups.TEST002.remote-type=HTTP
    
    ## 业务领域事件-远程调用插件配置
    #降级配置
    biz.event.remote.plugin.grade=2
    biz.event.remote.plugin.count=10
    # HTTP配置
    # TEST001-推送地址
    biz.event.remote.plugin.http.groups.TEST001.push-url=http://127.0.0.1:8188
    # TEST001-请求超时时间
    biz.event.remote.plugin.http.groups.TEST001.socket-timeout=60000
    
    # TEST002-推送地址
    biz.event.remote.plugin.http.groups.TEST002.push-url=http://127.0.0.1:8080
    # TEST002-请求超时时间
    biz.event.remote.plugin.http.groups.TEST002.socket-timeout=60000
    
    # TEST001 RPC配置-zk地址
    biz.event.remote.plugin.rpc.groups.TEST001.address=zookeeper://zookeeper01-dev.javacoo.com:22181?backup=zookeeper02-dev.javacoo.com:22181,zookeeper03-dev.javacoo.com:22181
    # TEST001 RPC配置-服务分组:格式->{客户端系统ID}_NOTICE_SERVICE
    biz.event.remote.plugin.rpc.groups.TEST001.group=TEST001_NOTICE_SERVICE
    # TEST001 RPC配置-服务版本
    biz.event.remote.plugin.rpc.groups.TEST001.version=1.0.0
    # TEST001 RPC配置-请求超时时间
    biz.event.remote.plugin.rpc.groups.TEST001.socket-timeout=60000
    
    # TEST002 RPC配置-zk地址
    biz.event.remote.plugin.rpc.groups.TEST002.address=zookeeper://zookeeper01-dev.javacoo.com:22181?backup=zookeeper02-dev.javacoo.com:22181,zookeeper03-dev.javacoo.com:22181
    # TEST002 RPC配置-服务分组:格式->{客户端系统ID}_NOTICE_SERVICE
    biz.event.remote.plugin.rpc.groups.TEST002.group=TEST002_NOTICE_SERVICE
    # TEST003 RPC配置-服务版本
    biz.event.remote.plugin.rpc.groups.TEST002.version=1.0.0
    # TEST004 RPC配置-请求超时时间
    biz.event.remote.plugin.rpc.groups.TEST002.socket-timeout=60000
    
    ## 业务领域事件-存储插件配置
    biz.event.store.plugin.jpa.enabled=true
    ## JPA配置
    spring.jpa.show-sql=true
    spring.jpa.generate-ddl=true
    spring.jpa.hibernate.ddl-auto=update
    spring.jpa.database-platform=org.hibernate.dialect.MySQL5InnoDBDialect
    
    ## 业务领域事件-定时任务插件配置
    biz.event.task.plugin.impl=xxlJob
    
    ## 业务领域事件-定时任务xxljob插件配置
    ### 调度中心部署跟地址 [选填]:如调度中心集群部署存在多个地址则用逗号分隔。执行器将会使用该地址进行"执行器心跳注册"和"任务结果回调";为空则关闭自动注册;
    biz.event.task.plugin.xxljob.admin-addresses=https://xxl-job-admin-javacoo.com/
    ### 执行器AppName [选填]:执行器心跳注册分组依据;为空则关闭自动注册
    biz.event.task.plugin.xxljob.app-name=bizevent-job
    ### 执行器IP [选填]:默认为空表示自动获取IP,多网卡时可手动设置指定IP,该IP不会绑定Host仅作为通讯实用;地址信息用于 "执行器注册" 和 "调度中心请求并触发任务";
    biz.event.task.plugin.xxljob.ip=
    ### 执行器端口号 [选填]:小于等于0则自动获取;默认端口为9999,单机部署多个执行器时,注意要配置不同执行器端口;
    biz.event.task.plugin.xxljob.port=9999
    ### 执行器通讯TOKEN [选填]:非空时启用;
    biz.event.task.plugin.xxljob.access-token=
    ### 执行器运行日志文件存储磁盘路径 [选填] :需要对该路径拥有读写权限;为空则使用默认路径;
    biz.event.task.plugin.xxljob.log-path=/volume_logs
    ### 执行器日志保存天数 [选填] :值大于3时生效,启用执行器Log文件定期清理功能,否则不生效;
    biz.event.task.plugin.xxljob.log-retention-days=-1
    ## =============================业务领域事件组件配置================结束
    
2,客户端

所谓客户端都是相对的,只要集成了BizEvent-SDK我们都叫他客户端,目前SDK支持HTTP方式和RPC(dubbo)方式接入,具体配置如下:

  1. 引入依赖包:

           <dependency>
                <groupId>com.javacoo</groupId>
                <artifactId>bizevents-sdk-java</artifactId>
                <version>1.0.0</version>
            </dependency>
    
  2. 配置参数:

    http接入:如果是http接入,则无需任何配置,只需要将客户端服务地址和端口告知服务端(目前是通过人工配置方式),在服务端配置即可,如:http://127.0.0.1:8080

    RPC接入:如果是RPC接入,则分为2种情况:

    • 当前客户端系统已经集成dubbo,则参数配置如下:

      #=====================业务领域事件SDK rpc-dubbo 接入配置===========
      #RPC服务配置
      #是否自定义dubbo配置:false->表示系统已经集成dubbo,无需配置dubbo参数
      biz.event.client.rpc.dubbo.custom=false
      #dubbo服务分组:格式->{系统ID}_NOTICE_SERVICE,系统ID由服务端统一分配
      biz.event.client.rpc.dubbo.service.group=TEST002_NOTICE_SERVICE
      #dubbo服务版本:当前服务的版本
      biz.event.client.rpc.dubbo.service.version=1.0.0
      
    • 当前客户端系统未集成dubbo,则参数配置如下:

      #=====================业务领域事件SDK rpc-dubbo 接入配置===========
      #RPC服务配置
      #是否自定义dubbo配置:false->表示系统已经集成dubbo,无需配置dubbo参数
      biz.event.client.rpc.dubbo.custom=true
      #dubbo服务分组:格式->{系统ID}_NOTICE_SERVICE,系统ID由服务端统一分配
      biz.event.client.rpc.dubbo.service.group=TEST001_NOTICE_SERVICE
      #dubbo服务版本:当前服务的版本
      biz.event.client.rpc.dubbo.service.version=1.0.0
      
      #dubbo配置-application
      dubbo.application.name = fund-api
      dubbo.application.id = fund-api
      dubbo.application.qos-enable = false
      #dubbo配置-registry
      dubbo.registry.id = registry
      dubbo.registry.address = zookeeper://zookeeper01-dev.javacoo.com:22181?backup=zookeeper02-dev.javacoo.com:22181,zookeeper03-dev.javacoo.com:22181
      #dubbo配置-protocol
      dubbo.protocol.name = dubbo
      dubbo.protocol.id = dubbo
      dubbo.protocol.port = 20887
      dubbo.protocol.host = 0.0.0.0
      dubbo.protocol.heartbeat = 30
      dubbo.protocol.accesslog = /volume_logs/rpc-access.log
      #dubbo配置-config-center
      dubbo.config-center.timeout = 60000
      

六,使用说明

1,服务端

服务端使用BizEventPublisher类直接发布业务领域事件即可,如:

服务端发布领域事件类型:com.javacoo.xservice.example.bean.event.ServerLoanApplyEvent,协议如下:

字段 类型 说明
userName String 用户名
userType String 用户分类
amount BigDecimal 金额

对应的领域事件对象:com.javacoo.xservice.example.bean.event.ServerLoanApplyEvent

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class ServerLoanApplyEvent {
    /**
     * 用户名
     */
    private String userName;
    /**
     * 用户分类
     */
    private String userType;
    /**
     * 金额
     */
    private BigDecimal amount;
}

使用BizEventPublisher发布事件,此时可根据具体业务,设置事件的版本,主题

        //发布普通领域事件
        BizEventPublisher.publish(ServerLoanApplyEvent.builder()
            .userName("徐凤年")
            .userType("A")
            .amount(new BigDecimal("30000"))
            .build());
        //发布普通领域事件
        BizEventPublisher.publish(ServerLoanApplyEvent.builder()
            .userName("徐渭熊")
            .userType("B")
            .amount(new BigDecimal("10000"))
            .build());
        //发布普通领域事件
        BizEventPublisher.publish(ServerLoanApplyEvent.builder()
            .userName("温华")
            .userType("C")
            .amount(new BigDecimal("20000"))
            .build());
        //发布带主题的领域事件
        BizEventPublisher.publish(ServerLoanApplyEvent.builder()
            .userName("王仙芝")
            .userType("A")
            .amount(new BigDecimal("100000"))
            .build(),"天下第二");
        //发布带版本的领域事件
        BizEventPublisher.publish(ServerLoanApplyEvent.builder()
            .userName("李淳罡")
            .userType("B")
            .amount(new BigDecimal("200000"))
            .build(),"","1.0");
        //发布带主题+版本的领域事件
        BizEventPublisher.publish(ServerLoanApplyEvent.builder()
            .userName("邓太阿")
            .userType("C")
            .amount(new BigDecimal("300000"))
            .build(),"桃花剑神","1.0");

如发布带主题+版本的领域事件,生成协议JSON格式如下:

{
    "async":false,
    "eventId":"EventId_2021120900000077",
    "eventVersion":"1.0",
    "eventObject":{
        "amount":300000,
        "userType":"C",
        "userName":"邓太阿"
    },
    "eventTopic":"桃花剑神",
    "eventType":"com.javacoo.xservice.example.bean.event.ServerLoanApplyEvent",
    "eventGroup":"TEST001",
    "transactionSn":"TransSn_2021120900000073",
    "timestamp":"20211209150610183"
}
2,客户端

客户端只需在事件处理方法上加上 @BizEventListener注解,填写服务端发布的领域业务事件类型,主题和版本,或者参数过滤条件,即可收到服务端推送的相应业务领域事件。如:

客户端订阅服务端发布的领域事件类型:com.javacoo.xservice.example.bean.event.ServerLoanApplyEvent,则客户端首先按照事件类型协议,定义监听对象参数对象,参数对象字段与协议保持一致:

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class ClientLoanApplyEvent {
    /**
     * 用户名
     */
    private String userName;
    /**
     * 用户分类
     */
    private String userType;

    /**
     * 金额
     */
    private BigDecimal amount;
}

接着定义事件监听方法,如下:

在监听方法上添加@BizEventListener注解,填写服务端发布的领域业务事件类型,主题和版本,或者参数过滤条件

@Slf4j
@Service
public class BizEventTestService {
    /**
     * 指定事件类型+事件版本
     */
    @BizEventListener(eventType = "com.javacoo.xservice.example.bean.event.ServerLoanApplyEvent",version = "1.0",order = 1)
    public void test(ClientLoanApplyEvent clientLoanApplyEvent){
        log.info("BizEventTestService收到版本事件推送:{}", FastJsonUtil.toJSONString(clientLoanApplyEvent));
        clientLoanApplyEvent.setAmount(new BigDecimal("50000"));
    }
    /**
     * 指定事件类型+事件主题
     */
    @BizEventListener(eventType = "com.javacoo.xservice.example.bean.event.ServerLoanApplyEvent",topic = "天下第二",order = 1)
    public void test1(ClientLoanApplyEvent clientLoanApplyEvent){
        log.info("BizEventTestService收到主题事件推送:{}", FastJsonUtil.toJSONString(clientLoanApplyEvent));
        clientLoanApplyEvent.setAmount(new BigDecimal("70000"));
    }
    /**
     * 指定事件类型+事件主题+事件主题
     */
    @BizEventListener(eventType = "com.javacoo.xservice.example.bean.event.ServerLoanApplyEvent",version = "1.0",topic = "桃花剑神",order = 1)
    public void test2(ClientLoanApplyEvent clientLoanApplyEvent){
        log.info("BizEventTestService收到版本+主题事件推送:{}", FastJsonUtil.toJSONString(clientLoanApplyEvent));
    }
    /**
     * 指定事件类型
     */
    @BizEventListener(eventType = "com.javacoo.xservice.example.bean.event.ServerLoanApplyEvent")
    public void test4(ClientLoanApplyEvent clientLoanApplyEvent){
        log.info("BizEventTestService收到事件推送:{}", FastJsonUtil.toJSONString(clientLoanApplyEvent));
    }
    /**
     * 指定事件类型+值正则匹配
     */
    @BizEventListener(eventType = "com.javacoo.xservice.example.bean.event.ServerLoanApplyEvent",paramEl = "#eventObject.userName=.*徐.*")
    public void test5(ClientLoanApplyEvent clientLoanApplyEvent){
        log.info("BizEventTestService收到值正则匹配事件推送:{}", FastJsonUtil.toJSONString(clientLoanApplyEvent));
        try {
            Thread.sleep(2*1000);
        } catch (InterruptedException interruptedException) {
            interruptedException.printStackTrace();
        }
    }
    /**
     * 指定事件类型+值精准匹配
     */
    @BizEventListener(eventType = "com.javacoo.xservice.example.bean.event.ServerLoanApplyEvent",paramEl = "#eventObject.userType=A")
    public void test6(ClientLoanApplyEvent clientLoanApplyEvent){
        log.info("BizEventTestService收到值精准匹配事件推送:{}", FastJsonUtil.toJSONString(clientLoanApplyEvent));
    }
    /**
     * 指定事件类型
     */
    @BizEventListener(eventType = "com.javacoo.xservice.example.bean.event.ServerExampleEvent")
    public void handleClientExampleEvent(ClientExampleEvent clientExampleEvent){
        log.info("BizEventTestService->handleClientExampleEvent收到事件推送:{}", FastJsonUtil.toJSONString(clientExampleEvent));
    }
}

注意事项:

  1. 业务领域事件监听方法参数对象字段,需要与服务端推送的业务领域对象字段保持一致,即必须根据服务端发布的业务领域事件类型来定义客户端业务领域事件监听方法参数对象。
  2. SDK约定业务领域事件监听方法所在服务类必须添加@Service注解。

效果

  1. 值正则匹配
处理值正则匹配.png
  1. 值精准匹配
处理值精准匹配.png
  1. 事件类型+事件版本
类型和版本匹配.png
  1. 事件类型+事件主题
类型主题匹配.png
  1. 事件类型+事件版本+事件主题
类型版本主题匹配.png
  1. 事件类型匹配
类型匹配.png

七,后续规划

  1. 增加BizEvent-SDK发布事件的能力
  2. 增加预警能力
  3. 升级框架为BizEvents-Bus

八,问题及局限性

  1. 事件的发布,存储,推送MQ,MQ消费,推送都在一个工程,降低了系统的可靠性及可用性:后续可以考虑拆分,分别单独部署。
  2. 事件消费,依赖推送是否成功,依赖客户端的处理能力,虽然目前SDK提供了持久化接口,由客户端决定是否持久化,且事件的分发默认是异步的,可以最大程度提高响应速度,但是客户端性能是不可控的,当遇到客户端响应超时,将导致MQ消息堆积,将影响服务端的性能。
  3. 目前BizEvents-Framework还是一个雏形,是基于对现有业务的梳理,理解和抽象,结合现有相关系统特点加上自己的一些想法而得出的设计原型。这也算是我对于这段时间工作的梳理和总结,对接下来的工作任务也有参考和指导意义。由于本人涉及到的业务领域有限,其设计存在局限性和不合理的地方,仅供学习和交流,不足之处欢迎吐槽和指正,本人将不胜感激:)
上一篇 下一篇

猜你喜欢

热点阅读