Java 程序员Java

部门老大狂问我:RabbitMQ多消费者顺序性消费消息实现

2021-11-03  本文已影响0人  程序花生

最近起了个项目消息中心,用来中转各个系统中产生的消息,用到的是RabbitMQ,由于UAT环境、生产环境每台消费者服务都是多台,有些消息要求按顺序消费,所以需要采取一定的措施保证消息的顺序消费,下面讲下我们不断优化的三种方法:

1、我们最开始考虑的比较简单,采用的direct交换机,指定特定消费者服务器监听队列,其他消费者服务器不监听。比如现在有C1、C2、C3三台消费者机器,我们决定C1消费消息,C2、C3不监听。我们在启动C1的时候,启动脚本中添加C1_IP,在代码中做处理,消费者服务器启动时,如果当前服务器IP就是启动脚本的C1_IP,那就会由这台C1来监听并消费消息。这种方式有个单点故障问题,如果C1服务器宕机,那么整个消息中心剩余两个节点都无法消费这个队列,导致队列消息堆积。如果有丰富的监控措施,那么监控到C1宕机后,可通过手动配置C2_IP(或者C3_IP)到启动脚本,重启C2服务器(C3服务器)消费消息。

2、为了解决单点故障问题,我们采用了fanout交换机,每个消费者创建一个专用的queue,这样如果生产者产生两条有先后顺序的消息m1和m2(它们有公共的批次号batchNo和唯一的消息编号msgID),就会给每个queue都推送,如下图所示。同时消费者消费的时候需要配合数据库共同实施,消费者监听到消息后就入库(落库内容包括m1消息信息和消费者IP),根据msgID唯一索引性如果入库了则自己抛弃消息,消费m2时,需要从库表中取出m1的消费者IP是否是当前IP,如果不是则抛弃消息。但是这个方案有个缺点:如果consumer1消费了m1后挂掉了,m2只能等到consumer1正常后才能消费,无法转移到其他消费者进行消费,这样会对一些业务场景不友好(当然这个地方可以考虑死信交换机死信队列进行转移,只不过架构更复杂了)。

3.第三种方式跟第二种类似,采用fanout交换机,每个消费者创建一个专用的queue。但是没有借助数据库,而是通过访问rabbitMQ的API接口,获取这三个队列的所有消费者的IP放到list中,消费者监听到消息后,判断自己的ip是否是ip集合里面的最小值,如果是则消费,如果否则抛弃消息。一旦最小IP的消费者宕机后,则list种就会只剩下两个IP,后续的消息选定的消费者就会从这两个IP中选择最小IP消费。同理它也有第二种方案的缺点。

最后附上通过rabbitmq的api获取minIP的代码(入参consumerIps是初始size=0的list),如下:

 private String findUsefulMinIP(List<String> consumerIps) {
        String minIp = null;
        SimpleClientHttpRequestFactory requestFactory = new SimpleClientHttpRequestFactory();
        requestFactory.setConnectTimeout(20000);
        try {
            RestTemplate rest = new RestTemplateBuilder().basicAuthentication(username, password).build();
            rest.setRequestFactory(requestFactory);
            JSONArray result2 = rest.getForObject(moccMQApiUrl, JSONArray.class);
            if(result2 != null && result2.size() > 0) {
                log.info("===clear the ips===new query start===");
                consumerIps.clear();
            }
            for(int m=0; m<result2.size(); m++) {
                LinkedHashMap itmap = (LinkedHashMap) result2.get(m);
                LinkedHashMap queueMap = (LinkedHashMap)itmap.get("queue");
                if(!queueMap.values().stream().anyMatch(v -> v.toString().indexOf(moccQueue)>=0)) {
                   continue; 
                }

                LinkedHashMap consumerMap = (LinkedHashMap)itmap.get("channel_details");
                consumerIps.add((String)consumerMap.get("peer_host"));
            }
            log.info("===query from mq===consumerIps={}", consumerIps);
        } catch (RestClientException e) {
            log.error(e.getMessage(), e);
        }
        minIp = Collections.min(consumerIps);
        return minIp;
    }

原文地址:http://adkx.net/9qh8y

上一篇下一篇

猜你喜欢

热点阅读