金融基础技术与业务

Guava之EventBus源码

2019-05-23  本文已影响13人  小猫无痕

最近需要使用事件驱动,打算使用EventBus管理事件的注册和分发。于是仔细阅读了下Guava的EventBus实现,并在此做了些整理。
EventBus是基于设计模式中的Observer模式的实现。Observer模式是非常常用的设计模式之一,jdk中的EventObject、EventListener、Observable、Observer都是为观察者模式服务的。但随着业务场景复杂度的不断提高,我们希望能在管理事件的同时提供更多的扩展。所以我们通过EventBus来优雅的实现这些。


Observer模式

我们先简单回顾下Observer模式:


观察者模式.png

定义对象之间的一对多依赖关系,以便当一个对象更改状态时,它的所有依赖关系都会被通知并自动更新。

Observer Pattern: Define a one-to-many dependency between objects so that when one object changes state, all its dependents are notified and updated automatically.

一些文中的“发布-订阅(Publish/Subscribe)模式”其实就是Observer模式,他所做的事情和我们将要做的事情一样:丰富Subject的功能。


EventBus

首先,我们先来看一下EventBus模块的类:


EventBus.jpg

EventBus.class

EventBus.class:它对应于Subject类,是整个模块的核心,也是功能扩展的中心点。
首先看下EventBus.class包含的以下几个变量:

  private final String identifier;
  private final Executor executor;
  private final SubscriberExceptionHandler exceptionHandler;
  private final SubscriberRegistry subscribers = new SubscriberRegistry(this);
  private final Dispatcher dispatcher;
private static Logger logger(SubscriberExceptionContext context) {
      return Logger.getLogger(EventBus.class.getName() + "." + context.getEventBus().identifier());
    }
  public EventBus(String identifier) {
    this(identifier, MoreExecutors.directExecutor(),
        Dispatcher.perThreadDispatchQueue(), LoggingHandler.INSTANCE);
  }
  EventBus(String identifier, Executor executor, Dispatcher dispatcher,
      SubscriberExceptionHandler exceptionHandler) {...}

然后我们看下EventBus的方法,作为一个核心类,它一共只有三个public方法:

 public void register(Object object) {
    subscribers.register(object);
  }
public void unregister(Object object) {
    subscribers.unregister(object);
  }
public void post(Object event) {
  ...
      dispatcher.dispatch(event, eventSubscribers);
  ...
  }

仅有的三个方法也都异常的简单,'register'和'unregister'都调用了SubscriberRegistry类,'post'交给了Dispatcher类。而多线程的控制也通过'executor'交给了Subscriber,异常的处理不在自身管理同样传递给了Subscriber,作为中心的EventBus只做了功能的定义和分配,事件的转发,完美的实现了功能的解耦,做到了职责单一原则。

AsyncEventBus.class

AsyncEventBus.class是EventBus.class的异步多线程的子类,上面也有提到过,二者之间只在构造器中有两处区别:

  1. Executor:EventBus默认使用'DirectExecutor.class',他是一个线程执行器,简单的直接执行传入的Runnable。AsyncEventBus正好相反,它的Executor必须是传入的。
 private enum DirectExecutor implements Executor {
    INSTANCE;
    @Override public void execute(Runnable command) {
      command.run();
    }
}
  1. Dispatcher:在'EventBus'中默认使用了'PerThreadQueuedDispatcher',在'AsyncEventBus'中默认使用'LegacyAsyncDispatcher'。前者是单线程同步,后者是多线程同步。两者的具体区别在下面介绍。

通过上面的描述,两者并不能通过他们类名简单的区别为一个单线程,一个多线程。他们的区别同样可以总结为两点:

Dispatcher.class

Dispatcher是一个抽象类,它本身是default的,因此无法被外部继承,EventBus也没有可以传入Dispatcher的构造器,所以对于Dispatcher我们是无法正常扩展的。
Dispatcher中只有一个抽象方法:来实现消息的分发。

abstract void dispatch(Object event, Iterator<Subscriber> subscribers);

还有三个静态方法来创建它的三个实现类:

private final ThreadLocal<Queue<Event>> queue =
        new ThreadLocal<Queue<Event>>() {...};
private final ThreadLocal<Boolean> dispatching =
        new ThreadLocal<Boolean>() {...};
@Override
void dispatch(Object event, Iterator<Subscriber> subscribers) {
...
Queue<Event> queueForThread = queue.get();
queueForThread.offer(new Event(event, subscribers));
if (!dispatching.get()) {
        dispatching.set(true);
        try {
          Event nextEvent;
          while ((nextEvent = queueForThread.poll()) != null) {
            while (nextEvent.subscribers.hasNext()) {
              nextEvent.subscribers.next().dispatchEvent(nextEvent.event);
            }
          }
        } finally {
          dispatching.remove();
          queue.remove();
        }
      }
}

里面的dispatching用于避免重入事件分派,例如循环发起Event的场景。

private final ConcurrentLinkedQueue<EventWithSubscriber> queue =
        Queues.newConcurrentLinkedQueue();
 @Override
    void dispatch(Object event, Iterator<Subscriber> subscribers) {
      while (subscribers.hasNext()) {
        queue.add(new EventWithSubscriber(event, subscribers.next()));
      }
      EventWithSubscriber e;
      while ((e = queue.poll()) != null) {
        e.subscriber.dispatchEvent(e.event);
      }
    }

值得注意的是由于分发也是多线程共同完成,这使得它将无法保证Event的顺序性。

Subscriber.class

对应于Observer的抽象类,但它更像是一种封装,Subscriber自身提供了静态创建方法,将真正的Observer实现类和执行Event的方法都与EventBus封装在了一起,通过反射实现了对应于不同Observer的抽象。

static Subscriber create(EventBus bus, Object listener, Method method) {
    return isDeclaredThreadSafe(method)
        ? new Subscriber(bus, listener, method)
        : new SynchronizedSubscriber(bus, listener, method);
  }
final void dispatchEvent(final Object event) {
    executor.execute(new Runnable() {
      @Override
      public void run() {
        try {
         method.invoke(target, checkNotNull(event));
        } catch (InvocationTargetException e) {
          bus.handleSubscriberException(e.getCause(), context(event));
        }
      }
    });
  }

SynchronizedSubscriber:在dispatchEvent()方法上加了synchronized同步锁,如果正在的Observer方法是线程不安全的话就需要用到此类。他是通过@AllowConcurrentEvents注解来判断的,这里就不多讲了。

SubscriberRegistry.class

维护了Subscriber与Event的对应关系,对EventBus进行了解耦,使EventBus职责单一。

private final ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>> subscribers =
      Maps.newConcurrentMap();
  private Multimap<Class<?>, Subscriber> findAllSubscribers(Object listener) {
    Multimap<Class<?>, Subscriber> methodsInListener = HashMultimap.create();
    Class<?> clazz = listener.getClass();
    for (Method method : getAnnotatedMethods(clazz)) {
      Class<?>[] parameterTypes = method.getParameterTypes();
      Class<?> eventType = parameterTypes[0];
      methodsInListener.put(eventType, Subscriber.create(bus, listener, method));
    }
    return methodsInListener;
  }

最后多提一句,@Subscriber标注的那些Method都是事先通过'getAnnotatedMethodsNotCached' 方法获取,保存在了一个LoadingCache中的。由于和EventBus的机制没有太大关系,这里就不展开了。

  private static final LoadingCache<Class<?>, ImmutableList<Method>> subscriberMethodsCache =
      CacheBuilder.newBuilder().weakKeys()
          .build(new CacheLoader<Class<?>, ImmutableList<Method>>() {
            @Override
            public ImmutableList<Method> load(Class<?> concreteClass) throws Exception {
              return getAnnotatedMethodsNotCached(concreteClass);
            }
          });

总结

Guava的EventBus以EventBus类为中心,对于Event的发布、订阅者的管理、异常的处理都提供了专门的实现类,流程非常清楚。而且基于Annotation扫描绑定的方式会使代码非常的简洁。但由于这种方式,在EventBus中对于事件类型和事件参数等等并不能提供很好的支撑,而且由于基本所有的类都是default权限的,这使得扩展异常的艰难T~T


Classfier扩展

待续...

上一篇 下一篇

猜你喜欢

热点阅读