AskMe项目 异步队列

2018-02-20  本文已影响0人  当麻真实

异步队列简单介绍

队列实现异步可以用单向队列,任务放到队列中,先进先出,或者使用优先队列,按照优先级来选择谁先执行,来防止某一个用户执行大量的请求,如一个用户发送了100个请求,如果用单向队列,其它用户必须要等这个用户的100个请求结束后才能执行,这就不合理,所以可以给第2个请求设置比较低的优先级,这样其他用户的请求也可以被执行

使用异步队列可以让主线程继续运行,减少请求响应时间和解耦,主要的使用场景就是将比较耗时而且不需要即时(同步)返回结果的操作作为消息放入消息队列。

主要编写两个类

  1. EventProducer把事件推到Redis队列中去
  2. EventConsumer把事件从队列中取出来,找到关联的handler,一件一件地去处理

其他用到了一些辅助类

  1. 枚举型的EventType,用来表示不同的Event类型
  2. EventModel来存放Event中的相关信息
  3. 接口类EventHandler,其中有两个函数,一个是执行Handler,另一个是返回需要使用这个Handler的EventType列表

具体实现

1. 枚举型EventType

public enum EventType {
    LIKE(0),
    COMMENT(1),
    LOGIN(2),
    ASK(3),
    Mail(4);

    private int value;
    EventType(int value)
    {
        this.value=value;
    }
    public int getValue()
    {
        return this.value;
    }

}

2.EventModel

public class EventModel {
    private int userid;//事件执行者,如点赞的人
    private EventType eventype;//事件类型,如点赞
    private  int entitytype;//操作对象类型,如给评论点赞,则为评论类型
    private int entityid;//操作对象ID,如评论的ID
    private int entityownerid;//对象的拥有者,如发表该评论的用户

    public int getUserid() {
        return userid;
    }

    public EventModel setUserid(int userid) {
        this.userid = userid;
        return this;
    }

    public EventType getEventype() {
        return eventype;
    }

    public EventModel setEventype(EventType eventype) {
        this.eventype = eventype;
        return this;
    }

    public int getEntitytype() {
        return entitytype;
    }

    public EventModel setEntitytype(int entitytype) {
        this.entitytype = entitytype;
        return this;
    }

    public int getEntityid() {
        return entityid;
    }

    public EventModel setEntityid(int entityid) {
        this.entityid = entityid;
        return this;
    }

    public int getEntityownerid() {
        return entityownerid;
    }

    public EventModel setEntityownerid(int entityownerid) {
        this.entityownerid = entityownerid;
        return this;
    }

    public Map<String, String> getMap() {
        return map;
    }

    public EventModel setMap(Map<String, String> map) {
        this.map = map;
        return this;
    }

    private Map<String,String> map=new HashMap<String, String>();

    public EventModel setkeyvalue(String key,String value)
    {
        map.put(key,value);
        return this;
    }
    public String getkeyvalue(String key)
    {
        return map.get(key);
    }


}

3. EventHandler

    public interface EventHandler {
    void doHandle(EventModel event);//执行Event的具体操作函数
    List<EventType> getSupportEventType();//返回该handler对那些类型的Event是关心的,即这些EventType进入队列需要运行时,需要调用该Handler
}

4. EventProducer

@Service
public class EventProducer {
    @Autowired
    JedisService jedis;
    //将EVENT加入到redis队列中
    public boolean fireEvent(EventModel event)
    {
        try
        {
            String eventstring= JSONObject.toJSONString(event);
            //将event转换为JSON字符串,取出时Parse回到EventModel类型
            String key= RedisKeyUtil.eventKey;
            jedis.lpush(key,eventstring);
            //将该event加入list中
            return true;
        }
        catch(Exception e)
        {
            return false;
        }
    }
}

5.EventConsumer

@Service
public class EventConsumer implements InitializingBean,ApplicationContextAware{
    @Autowired
    JedisService jedis;
    
    private static final Logger logger= LoggerFactory.getLogger(EventConsumer.class) ;
    
    private Map<EventType,List<EventHandler>> eventConsumerMap=new HashMap<EventType,List<EventHandler>>();
    //一种类型的EventType进来,就寻找这件Event所需要的Handler的列表
    
    private ApplicationContext applicationContext;
    //运行的上下文
    
    @Override
    public void afterPropertiesSet() throws Exception {
        Map<String,EventHandler> beans=applicationContext.getBeansOfType(EventHandler.class);
        //找到所有的EventHandler,注意在具体实现handler的时候一定要加上@Component,否则找不到这个handler
        if(beans!=null)
        {
            for(Map.Entry<String,EventHandler> entry:beans.entrySet())
            {
                List<EventType> tmp=entry.getValue().getSupportEventType();
                for(EventType a:tmp)
                {
                    if(eventConsumerMap.containsKey(a)==false)
                    {
                        eventConsumerMap.put(a,new ArrayList<EventHandler>());
                    }
                        eventConsumerMap.get(a).add(entry.getValue());
                }
            }
        }
        //初始化Map<EventType,List<EventHandler>>将Event类型和需要用到的EventHandler关联起来

        //新建线程,该线程不断地循环,从redis的list中找event,找到了就将Json字符串parse成event,然后执行它所需要的handler
        Thread thread=new Thread(new Runnable() {
            @Override
            public void run() {
                while(true)
                {
                    String key= RedisKeyUtil.eventKey;
                    List<String> eventlist=jedis.brpop(0,key);
                    for(String tmp:eventlist)
                    {
                        if(tmp.equals(key))
                            continue;
                        EventModel model= JSON.parseObject(tmp,EventModel.class);
                        if(!eventConsumerMap.containsKey(model.getEventype()))
                        {
                            logger.error("不能识别的事件");
                            continue;
                        }
                            for(EventHandler handler:eventConsumerMap.get(model.getEventype()))
                        {
                            handler.doHandle(model);
                        }
                    }
                }
            }
        });
        thread.start();

    }


    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext=applicationContext;
    }
}

解释一下其中的brpop指令:
假如在指定时间内没有任何元素被弹出,则返回一个 nil 和等待时长(即第一个参数,表示等待time时间后返回nil)。 反之,返回一个含有两个元素的列表,第一个元素是被弹出元素所属的 key (所以下面遍历的时候要把Key过滤掉),第二个元素是被弹出元素的值。

6.具体的Handler实现 LikeHandler

@Component
public class LikeHandler implements EventHandler {
    @Autowired
    UserService userService;
    @Autowired
    MessageService messageService;
    //给被点赞用户发送站内信
    @Override
    public void doHandle(EventModel event) {
        Message msg=new Message();
        msg.setToid(event.getEntityownerid());
        msg.setCreateddate(new Date());
        msg.setFromid(888);
        User user=userService.getuserbyid(event.getUserid());
        msg.setContent("用户"+user.getName()+"赞了您的评论 "+event.getkeyvalue("questionid"));
        msg.setHasread(0);
        msg.setConversationid(event.getEntityownerid(),888);
        messageService.insertMessage(msg);
    }

    @Override
    public List<EventType> getSupportEventType() {
        return Arrays.asList(EventType.LIKE);
    }
}

7. Controller中将任务放入异步队列中

eventProducer.fireEvent(new EventModel().setEntityid(entity_id).setEntitytype(EntityType.ENTITY_COMMENT).setUserid(user.getId()).setEventype(EventType.LIKE).setkeyvalue("questionid","http://127.0.0.1:8080/question/"+String.valueOf(comment.getEntityid())).setEntityownerid(comment.getUserid()));

总结:

通过异步队列,如果用户有一个耗时且不需要同步响应的事件时,可以将事件通过事件生产者,将事件推入异步队列中,消费者线程不断从异步队列中取出事件,来执行,适用于Message Service等应用场景

上一篇下一篇

猜你喜欢

热点阅读