guava的EventBus,可以用来做单体服务的消息处理。

2023-05-20  本文已影响0人  zxbyh

guava的EventBus,可以用来做单体服务的消息处理。

加入pom依赖

<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>31.1-jre</version>
</dependency>

直接上代码

package com.smooth.common.core.event;

import com.google.common.eventbus.AllowConcurrentEvents;
import com.google.common.eventbus.AsyncEventBus;
import com.google.common.eventbus.Subscribe;

import java.lang.annotation.Annotation;
import java.util.concurrent.Executors;

public class EventBusDemo {

    enum msgType{one,two,three}

    record MsgRecord(String id,msgType type,String info,Long timestamp){}

    interface ISubscriber<T>{
        public void deal(T msg) throws InterruptedException;
    }

    class SubscriberOne implements ISubscriber<MsgRecord> {
        // 标记当前订阅者是线程安全的,支持并发接收消息
        @AllowConcurrentEvents
        @Subscribe
        public void deal(MsgRecord msg) throws InterruptedException {
            if (msg.type.equals(msgType.one)) {
                System.out.println("SubscriberOne >>>>>> 收到消息,"+msg);
                Thread.sleep(600);
                System.out.println("SubscriberOne >>>>>> 完成处理,"+msg.id);
            }
        }

    }

    class SubscriberTwo implements ISubscriber<MsgRecord> {
        // 标记当前订阅者是线程安全的,支持并发接收消息
        @AllowConcurrentEvents
        @Subscribe
        public void deal(MsgRecord msg) throws InterruptedException {
            if (msg.type.equals(msgType.two)) {
                System.out.println("SubscriberTwo >>>>>> 收到消息,"+msg);
                Thread.sleep(900);
                System.out.println("SubscriberTwo >>>>>> 完成处理,"+msg.id);
            }
        }
    }

    public void notifySubscriber() {
        //EventBus eventBus = new EventBus();
        //todo 实际开发中不要用这个,请使用ThreadPoolExecutor创建线程池
        AsyncEventBus eventBus = new AsyncEventBus(Executors.newFixedThreadPool(5));
        /*
        //todo 实际开发中不要用这个,请使用以下方式创建线程池
        final int nThreads = Runtime.getRuntime().availableProcessors()*12; //IO密集型任务线程并不是一直在执行任务,则应配置尽可能多的线程;
        final int capacity = 10 * nThreads; //设置有界队列的容量
        AsyncEventBus eventBus = new AsyncEventBus(
            new ThreadPoolExecutor(
                nThreads, //核心线程数 = 处理器的核数 * 期望CPU利用率0~1 * (1 + 等待时间/计算时间)
                nThreads+10, //总线程数= 核心线程数+救急线程数 . 如果队列满了,就会创建救急线程.
                60000, //救急线程生存时间.一分钟
                TimeUnit.MILLISECONDS, //救急线程生存时间的单位,毫秒.
                new ResizeableCapacityLinkedBlockingQueue<>(capacity), //设置有界队列的容量
                new ThreadPoolExecutor.AbortPolicy() //拒绝策略, AbortPolicy()当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略:抛出异常,[注意开发人员一定要捕获异常并记录日志!]。
            )
        );
        */
        eventBus.register(new SubscriberOne());
        eventBus.register(new SubscriberTwo());

        eventBus.post(new MsgRecord("msg_01",msgType.one,"今天天气不错",System.currentTimeMillis()));
        eventBus.post(new MsgRecord("msg_02",msgType.two,"今天要下雨",System.currentTimeMillis()));
        eventBus.post(new MsgRecord("msg_03",msgType.two,"今天要打雷",System.currentTimeMillis()));
        eventBus.post(new MsgRecord("msg_04",msgType.three,"今天要刮风",System.currentTimeMillis()));

        System.out.println("message posted! ");

    }



    public static void main(String[] args) {
        new EventBusDemo().notifySubscriber();
    }
    
}

运行结果如下:

message posted!
SubscriberTwo >>>>>> 收到消息,EventMsg\[id=msg\_03, type=two, info=今天要打雷, timestamp=1684626583478]
SubscriberTwo >>>>>> 收到消息,EventMsg\[id=msg\_02, type=two, info=今天要下雨, timestamp=1684626583478]
SubscriberOne >>>>>> 收到消息,EventMsg\[id=msg\_01, type=one, info=今天天气不错, timestamp=1684626583475]
SubscriberOne >>>>>> 完成处理,msg\_01
SubscriberTwo >>>>>> 完成处理,msg\_03
SubscriberTwo >>>>>> 完成处理,msg\_02

然后可以看到 type=three的消息没有被处理。

在实际的项目开发中,可以将 eventBus 设置为static
如果是用springboot的话,可以弄成Bean

@Configuration
public class EventBusConfig {
    @Bean
    public AsyncEventBus asyncEventBus () {
        new AsyncEventBus(
            new ThreadPoolExecutor(
                nThreads, //核心线程数 = 处理器的核数 * 期望CPU利用率0~1 * (1 + 等待时间/计算时间)
                nThreads+10, //总线程数= 核心线程数+救急线程数 . 如果队列满了,就会创建救急线程.
                60000, //救急线程生存时间.一分钟
                TimeUnit.MILLISECONDS, //救急线程生存时间的单位,毫秒.
                new ResizeableCapacityLinkedBlockingQueue<>(capacity), //设置有界队列的容量
                new ThreadPoolExecutor.AbortPolicy() //拒绝策略, AbortPolicy()当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略:抛出异常,[注意开发人员一定要捕获异常并记录日志!]。
            )
        );
    }
}
上一篇 下一篇

猜你喜欢

热点阅读