Guava--EventBus学习

2018-11-22  本文已影响0人  tom_xin

    最近在项目中,发现有人在用EventBus做事件监听,当时我就觉得很厉害。赶紧学习一下,Goole真是厉害。

一个简单的例子

    下面让我们来看一下EventBus的使用实例。

/**
 * 订单发布
 *
 * @author tomxin
 * @date 2018-11-21
 * @since v1.0.0
 */
public class OrderPublish {

    private EventBus eventBus;

    public OrderPublish(EventBus eventBus) {
        this.eventBus = eventBus;
    }

    public void orderStateChange(int state) {
        if (0 == state) {
            eventBus.post("the order is close");
            return;
        }
        if (1 == state) {
            eventBus.post(1);
            return;
        }

    }
}

/**
 * 事件监听器
 *
 * @author tomxin
 * @date 2018-11-21
 * @since v1.0.0
 */
public class OrderCloseListener {

    /**
     * 构造函数
     *
     * @param eventBus
     */
    public OrderCloseListener(EventBus eventBus) {
        eventBus.register(this);
    }

    /**
     * 接收消息
     *
     * @param message
     */
    @Subscribe
    public void receiver(String message) {
        System.out.println(message);
    }
}

/**
 * 订单开启监听者
 *
 * @author tomxin
 * @date 2018-11-21
 * @since v1.0.0
 */
public class OrderOpenListener {

    /**
     * 构造函数
     *
     * @param eventBus
     */
    public OrderOpenListener(EventBus eventBus) {
        eventBus.register(this);
    }

    /**
     * 接收消息
     *
     * @param orderState
     */
    @Subscribe
    public void receiver(Integer orderState) {
        System.out.println("this order is open,state = " + orderState);
    }
}
/**
 * Main函数
 *
 * @author tomxin
 * @date 2018-11-21
 * @since v1.0.0
 */
public class OrderMain {

    public static void main(String[] args) {
        // 初始化一个EventBus
        EventBus eventBus = new EventBus();
        // 初始化事件发布者
        OrderPublish orderPublish = new OrderPublish(eventBus);
        // 初始化事件监听者
        OrderCloseListener orderCloseListener = new OrderCloseListener(eventBus);
        OrderOpenListener orderOpenListener = new OrderOpenListener(eventBus);
        //发布消息
        orderPublish.orderStateChange(1);
        orderPublish.orderStateChange(0);
    }
}

记得添加上这个依赖

        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>27.0-jre</version>
        </dependency>

    可以看到,这个这的是太像之前写的 观察者模式+中介者模式。对于我们Spring项目中,可以通过@PostConstruct来向EventBus中注册监听者。EvenBus也可以通过类似的方法初始化。通过与@Subscribe注解就可以达到效果啦。


EventBus源码学习

首先来看一下EventBus的构造函数,

  EventBus(
      String identifier,
      Executor executor,
      Dispatcher dispatcher,
      SubscriberExceptionHandler exceptionHandler) {
    // EventBus的身份信息,可以简单的理解为起了一个别名,如果不显式赋值的话,会默认为“default”。
    this.identifier = checkNotNull(identifier);
    // executor是定义了一个线程池,事件的运行需要提交到这个线程池中。
    this.executor = checkNotNull(executor);
    // 处理事件的策略接口,默认PerThreadQueuedDispatcher,还有LegacyAsyncDispatcher,ImmediateDispatcher。
    this.dispatcher = checkNotNull(dispatcher);
    // 异常处理类,默认LoggingHandler.INSTANCE
    this.exceptionHandler = checkNotNull(exceptionHandler);
  }

下面来看一下,如何注册监听一个事件

  private final SubscriberRegistry subscribers = new SubscriberRegistry(this);

  public void register(Object object) {
    // 调用了SubscriberRegistry的register方法
    subscribers.register(object);
  }
  // 我们来看一下register中的方法具体是怎么实现的
  // SubscriberRegistry类中的register方法
   private final ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>> subscribers =
      Maps.newConcurrentMap();

  void register(Object listener) {
    // 查找listener中所有被@Subscribe注解的方法,将对应的方法参数与被listener封装成的Subscriber,封装为一个Map
    Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);
    // 遍历当前HashMultimap,这里的eventType表示方法的参数
    for (Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {
      Class<?> eventType = entry.getKey();
      Collection<Subscriber> eventMethodsInListener = entry.getValue();
      // 以eventType(方法参数)作为key,查询subscribers中是否已经存在相同的订阅者集合。
      CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);
     // 如果不存在,则新建一个CopyOnWriteArraySet类型的新集合。
      if (eventSubscribers == null) {
        CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<>();
        // 将newSet,eventType赋值,返回引用赋值给eventSubscribers
        eventSubscribers =
            MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet);
      }
     // 将新的listener集合,添加到eventSubscribers的集合中。
      eventSubscribers.addAll(eventMethodsInListener);
    }
  }
 // 我们再来看一下,findAllSubscribers(listeners)方法是如何实现的。不难想到这是以反射的方式来做的,但是有一点大家要注意,这里规定了方法的参数只能有一个,我们一起来看一下。

  private Multimap<Class<?>, Subscriber> findAllSubscribers(Object listener) {
    // 首先创建了一个HashMultimap来存储这些listener
    Multimap<Class<?>, Subscriber> methodsInListener = HashMultimap.create();
    Class<?> clazz = listener.getClass();
    // 这个for循环,先调用getAnnotatedMethods(clazz)方法,找到所有带有@Subscriber注解的方法。
    for (Method method : getAnnotatedMethods(clazz)) {
      Class<?>[] parameterTypes = method.getParameterTypes();
      // 取第一个参数
      Class<?> eventType = parameterTypes[0];
      // 将eventType和Subscriber放入Map中。
      methodsInListener.put(eventType, Subscriber.create(bus, listener, method));
    }
    return methodsInListener;
  }

// 再来看一下getAnnotatedMethods(class)方法。
  private static final LoadingCache<Class<?>, ImmutableList<Method>> subscriberMethodsCache =
      CacheBuilder.newBuilder()
          .weakKeys()
          .build(
              new CacheLoader<Class<?>, ImmutableList<Method>>() {
                @Override
                public ImmutableList<Method> load(Class<?> concreteClass) throws Exception {
                  return getAnnotatedMethodsNotCached(concreteClass);
                }
              });
  
  // 如上subscriberMethodsCache使用了CacheBuilder.newBuilder(),然后定义了回调方法。getAnnotatedMethodsNotCached(concreteClass);
  private static ImmutableList<Method> getAnnotatedMethods(Class<?> clazz) {
    return subscriberMethodsCache.getUnchecked(clazz);
  }

  // 我们来看一下这个方法是如何实现的
  private static ImmutableList<Method> getAnnotatedMethodsNotCached(Class<?> clazz) {
    // 首先获取父类的Class
    Set<? extends Class<?>> supertypes = TypeToken.of(clazz).getTypes().rawTypes();
    Map<MethodIdentifier, Method> identifiers = Maps.newHashMap();
    for (Class<?> supertype : supertypes) {
      for (Method method : supertype.getDeclaredMethods()) {
        if (method.isAnnotationPresent(Subscribe.class) && !method.isSynthetic()) {
          // TODO(cgdecker): Should check for a generic parameter type and error out
          // 获取对应的方法参数。
          Class<?>[] parameterTypes = method.getParameterTypes();
          // 判断参数的长度,参数的长度不能大于1,否则会抛出异常。
          checkArgument(
              parameterTypes.length == 1,
              "Method %s has @Subscribe annotation but has %s parameters."
                  + "Subscriber methods must have exactly 1 parameter.",
              method,
              parameterTypes.length);

          MethodIdentifier ident = new MethodIdentifier(method);
          if (!identifiers.containsKey(ident)) {
            identifiers.put(ident, method);
          }
        }
      }
    }
    return ImmutableList.copyOf(identifiers.values());
  }

下面我们在看一下post方法是如何发布事件的

  public void post(Object event) {
    // 通过getSubscribers(event)方法,获取所有订阅者
    Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
    if (eventSubscribers.hasNext()) {
      // 通过dispacth(event,eventSubscribers)方法,将所有任务分发出去。
      dispatcher.dispatch(event, eventSubscribers);
    } else if (!(event instanceof DeadEvent)) {
      // the event had no subscribers and was not itself a DeadEvent
      post(new DeadEvent(this, event));
    }
  }
// 下面是getSubscribers方法
  Iterator<Subscriber> getSubscribers(Object event) {
    // 这一步的主要目的是,获取实体类event继承体系中所有的父类
    ImmutableSet<Class<?>> eventTypes = flattenHierarchy(event.getClass());
    // 根据eventTypes的容量,初始化subscriberIterators
    List<Iterator<Subscriber>> subscriberIterators =
        Lists.newArrayListWithCapacity(eventTypes.size());
    // 从subscribers中获取对应的事件监听者
    for (Class<?> eventType : eventTypes) {
      CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);
      if (eventSubscribers != null) {
        // eager no-copy snapshot
        subscriberIterators.add(eventSubscribers.iterator());
      }
    }
    
    return Iterators.concat(subscriberIterators.iterator());
  }
 // 再来看一下,dispatcher方法是如何工作的
    // ThreadLocal维护的queue
    private final ThreadLocal<Queue<Event>> queue =
        new ThreadLocal<Queue<Event>>() {
          @Override
          protected Queue<Event> initialValue() {
            return Queues.newArrayDeque();
          }
        };
    // 维护线程是否正在分发任务的状态
    private final ThreadLocal<Boolean> dispatching =
        new ThreadLocal<Boolean>() {
          @Override
          protected Boolean initialValue() {
            return false;
          }
        };

    void dispatch(Object event, Iterator<Subscriber> subscribers) {
      // 首先对event,subscribers进行判空
      checkNotNull(event);
      checkNotNull(subscribers);
      // 调用ThreadLocal的get方法,获取当前线程对应的队列。
      Queue<Event> queueForThread = queue.get();
      // 先讲新的事件放入到队列中
      queueForThread.offer(new Event(event, subscribers));
      // 判断当前线程是否在分发任务,如果没有在分发任务,则开始分发任务,如果已经在分发任务,则直接返回。
      if (!dispatching.get()) {
        dispatching.set(true);
        try {
          Event nextEvent;
          // 从队列中poll出对应的事件,判断是否为空。
          while ((nextEvent = queueForThread.poll()) != null) {
            // 循环该事件的监听者,将任务分发出去。
            while (nextEvent.subscribers.hasNext()) {
              nextEvent.subscribers.next().dispatchEvent(nextEvent.event);
            }
          }
        } finally {
          dispatching.remove();
          queue.remove();
        }
      }
    }
  // dispatchEvent方法
  final void dispatchEvent(final Object event) {
    // 事件进入到这里之后,在给定的线程池中主动启动一个新的线程,通过invoke的方法,运行监听者对应的方法。
    executor.execute(
        new Runnable() {
          @Override
          public void run() {
            try {
              invokeSubscriberMethod(event);
            } catch (InvocationTargetException e) {
              bus.handleSubscriberException(e.getCause(), context(event));
            }
          }
        });
  }

好了,EventBus的代码就讲解到这里了。可以看到EventBus里用到了很多内容,对于好的代码,我们还是要多学习的。

上一篇 下一篇

猜你喜欢

热点阅读