Spring Event

2018-11-01  本文已影响0人  Vanes丶

Spring Event

Observer Design Pattern

Scenario

Design

Model
image
Abstract
image

Source code

Company.java#notifyObserver()
    @Override
    public void notifyObserver() {
        employees.forEach(employee -> employee.takeAction(announcement));
    }

    public void announcenMsg(String msg) {
        this.setAnnouncement(msg);
        notifyObserver();
    }
Employee.java#takeAction(String msg)
    @Override
    public void takeAction(String msg) {
        this.msg = msg;
        interview();
    }

    public void interview() {
        System.out.println(name + " received msg : " + msg + " and will go to interview");
    }

Differences with Publish-Subscribe

image

Spring Events

image

Event Publisher

1. @DomainEvents - Spring Data JPA
    @DomainEvents
    AccountSaveEvent accountSaveEvent()
    {
        AccountSaveEvent accountSaveEvent = new AccountSaveEvent();
        accountSaveEvent.setAccount(this);
        accountSaveEvent.setEventType("AccountSaveEventByJPA");
        return accountSaveEvent;
    }

    @AfterDomainEventPublication
    void callbackMethod() {
        System.out.println("DATA SAVED!\n"+"WELL DONE");
    }
2. Mannual Publish
@Component
public class AccountEventPublisher {

    @Autowired
    private ApplicationContext applicationContext;

    @Autowired
    private ApplicationEventPublisher publisher;

    public void publishEventByContext(Account account)
    {
        AccountSaveEvent event = new AccountSaveEvent();
        event.setAccount(account);
        event.setEventType("AccountSaveEventByContext");
        System.out.println("====================================");
        System.out.println("Start to publish AccountSaveEvent by context");
        applicationContext.publishEvent(event);
        System.out.println("End");
        System.out.println("====================================");
    }

    public void publishEventByPublisher(Account account)
    {
        AccountSaveEvent event = new AccountSaveEvent();
        event.setAccount(account);
        event.setEventType("AccountSaveEventByPublisher");
        System.out.println("====================================");
        System.out.println("Start to publish AccountSaveEvent by publisher");
        publisher.publishEvent(event);
        System.out.println("End");
        System.out.println("====================================");
    }
}

Event Listener

1. @EventListener
    @EventListener(condition = "#accountSaveApplicationEvent.valid")
    public void handleEvent(AccountSaveApplicationEvent accountSaveApplicationEvent) {
        System.out.println("======================================");
        System.out.println("Listener listened AccountSaveApplicationEvent");
        System.out.println(accountSaveApplicationEvent.getEventType());
        System.out.println("======================================");
    }
2. @TransactionEventListener
    @Async
    @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT, fallbackExecution = true)
    public void handleEvent(AccountSaveEvent accountSaveEvent) {
        System.out.println("======================================");
        System.out.println("Listener listened AccountSaveEvent");
        System.out.println(accountSaveEvent.getEventType());
        System.out.println("======================================");
    }

Explore

How to event process method in listener class
    @Override
    public void multicastEvent(ApplicationEvent event) {
        multicastEvent(event, resolveDefaultEventType(event));
    }

    @Override
    public void multicastEvent(final ApplicationEvent event, ResolvableType eventType) {
        ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
        for (final ApplicationListener<?> listener : getApplicationListeners(event, type)) {
            Executor executor = getTaskExecutor();
            if (executor != null) {
                // new Runnable(){...} just define Runnable variable  
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        invokeListener(listener, event);
                    }
                });
            }
            else {
                invokeListener(listener, event);
            }
        }
    }
@Configuration
public class AsynchronousSpringEventsConfig implements AsyncConfigurer {

    @Bean(name = "applicationEventMulticaster")
    public ApplicationEventMulticaster simpleApplicationEventMulticaster() {
        SimpleApplicationEventMulticaster eventMulticaster
                = new SimpleApplicationEventMulticaster();
        // Inject the ThreadPoolTaskExecutor
        eventMulticaster.setTaskExecutor(getAsyncExecutor());
        return eventMulticaster;
    }


    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(5);
        taskExecutor.setMaxPoolSize(50);
        taskExecutor.setQueueCapacity(25);
        taskExecutor.initialize();

        return taskExecutor;
    }
}
public class AsyncExecutionInterceptor extends ... {
    @Nullable
    public Object invoke(MethodInvocation invocation) throws Throwable {
        Class<?> targetClass = invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null;
        Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
        Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
        // Find executors configured in configuration class.
        AsyncTaskExecutor executor = this.determineAsyncExecutor(userDeclaredMethod);
        if (executor == null) {
            throw new IllegalStateException("No executor specified and no default executor set on AsyncExecutionInterceptor either");
        } else {
            Callable<Object> task = () -> {
                try {
                    Object result = invocation.proceed();
                    if (result instanceof Future) {
                        return ((Future)result).get();
                    }
                } catch (ExecutionException var4) {
                    this.handleError(var4.getCause(), userDeclaredMethod, invocation.getArguments());
                } catch (Throwable var5) {
                    this.handleError(var5, userDeclaredMethod, invocation.getArguments());
                }

                return null;
            };
            return this.doSubmit(task, executor, invocation.getMethod().getReturnType());
        }
    }
}
    /**
     * Executes the given task sometime in the future.  The task
     * may execute in a new thread or in an existing pooled thread.
     *
     * If the task cannot be submitted for execution, either because this
     * executor has been shutdown or because its capacity has been reached,
     * the task is handled by the current {@code RejectedExecutionHandler}.
     *
     * @param command the task to execute
     * @throws RejectedExecutionException at discretion of
     *         {@code RejectedExecutionHandler}, if the task
     *         cannot be accepted for execution
     * @throws NullPointerException if {@code command} is null
     */
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }
Where to call method multicastEvent()
    protected void publishEvent(Object event, ResolvableType eventType) {
        Assert.notNull(event, "Event must not be null");
        if (logger.isTraceEnabled()) {
            logger.trace("Publishing event in " + getDisplayName() + ": " + event);
        }

        // Decorate event as an ApplicationEvent if necessary
        ApplicationEvent applicationEvent;
        if (event instanceof ApplicationEvent) {
            applicationEvent = (ApplicationEvent) event;
        }
        else {
            applicationEvent = new PayloadApplicationEvent<Object>(this, event);
            if (eventType == null) {
                eventType = ((PayloadApplicationEvent) applicationEvent).getResolvableType();
            }
        }

        // Multicast right now if possible - or lazily once the multicaster is initialized
        if (this.earlyApplicationEvents != null) {
            this.earlyApplicationEvents.add(applicationEvent);
        }
        else {
            getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType);
        }

        // Publish event via parent context as well...
        if (this.parent != null) {
            if (this.parent instanceof AbstractApplicationContext) {
                ((AbstractApplicationContext) this.parent).publishEvent(event, eventType);
            }
            else {
                this.parent.publishEvent(event);
            }
        }
    }
Reference :

[1] Baeldung: How to use events in Spring
[2] Spring Blog: Better application events in Spring Framework 4.2

[3] Zoltan Altfatter: Publishing domain events from aggregate roots
[4] Spring IO: Spring Data JPA - Reference Documentation
[5] Pursue: Simple Analysis Domain Driven Design
[6] Spring IO: Spring 5.1.1 RELEASE Reference Events
[7] luohanguo: The Obeserver Design Pattern
[8] miaoyu: Differences between Observer and Publish-Subscribe Pattern
[9] Github Source Code Example of Spring Events by Elvis
[10] Github Source Code Example of Observer Design Pattern by Elvis
[11] CSDN Blog: Spring Event System

上一篇 下一篇

猜你喜欢

热点阅读