【ElasticJob源码解析】事件

2018-02-03  本文已影响0人  农蓝

ElasticJob有一套自己的事件发布机制,核心部件使用的Guava的EventBus,如果不是很了解,出门左转看我的关于Guava的EventBus文章;

1,事件配置接口-JobEventConfiguration

public interface JobEventConfiguration extends JobEventIdentity {
    JobEventListener createJobEventListener() throws JobEventListenerConfigurationException;
}

2,事件的观察者-JobEventRdbListener

先来看看这个接口中定义了什么;

public interface JobEventListener extends JobEventIdentity {
    
    @Subscribe
    @AllowConcurrentEvents
    void listen(JobExecutionEvent jobExecutionEvent);
    
    @Subscribe
    @AllowConcurrentEvents
    void listen(JobStatusTraceEvent jobStatusTraceEvent);
}

再来看看该接口的实现类:

public final class JobEventRdbListener extends JobEventRdbIdentity implements JobEventListener {
    
    private final JobEventRdbStorage repository;
    
    public JobEventRdbListener(final DataSource dataSource) throws SQLException {
        repository = new JobEventRdbStorage(dataSource);
    }
    
    @Override
    public void listen(final JobExecutionEvent executionEvent) {
        repository.addJobExecutionEvent(executionEvent);
    }
    
    @Override
    public void listen(final JobStatusTraceEvent jobStatusTraceEvent) {
        repository.addJobStatusTraceEvent(jobStatusTraceEvent);
    }
}

3,事件总线-JobEventBus

有了事件监听器(JobEventListener),并且有了能够生产事件监听器的工厂(JobEventConfiguration),那么下一步就是将事件监听器注册到事件总线中了,先来看一下ElasticJob的事件总线;

public final class JobEventBus {
    private final JobEventConfiguration jobEventConfig;
    private final ExecutorServiceObject executorServiceObject;
    private final EventBus eventBus;
    private boolean isRegistered;
    
    public JobEventBus() {
        jobEventConfig = null;
        executorServiceObject = null;
        eventBus = null;
    }
    
    public JobEventBus(final JobEventConfiguration jobEventConfig) {
        this.jobEventConfig = jobEventConfig;
        executorServiceObject = new ExecutorServiceObject("job-event", Runtime.getRuntime().availableProcessors() * 2);
        eventBus = new AsyncEventBus(executorServiceObject.createExecutorService());
        register();
    }
    
    private void register() {
        try {
            eventBus.register(jobEventConfig.createJobEventListener());
            isRegistered = true;
        } catch (final JobEventListenerConfigurationException ex) {
            log.error("Elastic job: create JobEventListener failure, error is: ", ex);
        }
    }
    
    public void post(final JobEvent event) {
        if (isRegistered && !executorServiceObject.isShutdown()) {
            eventBus.post(event);
        }
    }
}

解读:

上一篇 下一篇

猜你喜欢

热点阅读