rocketmq功能

2022-08-23  本文已影响0人  sunpy

架构图


导包


<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>${rocketmq.version}</version>
</dependency>

生产者常用方法


方法名 说明
setRetryTimesWhenSendFailed 同步方式发送消息重试次数,默认为2
setRetryTimesWhenSendAsyncFailed 异步方式发送消息重试次数,默认为2
setSendMsgTimeout 发送消息默认超时时间,默认3000ms
setMaxMessageSize 允许发送的最大消息长度,默认为4M
setCompressMsgBodyOverHowmuch 消息体超过该值则启用压缩,默认4k
setRetryAnotherBrokerWhenNotStoreOK 消息重试时选择另外一个Broker时
setNamesrvAddr 设置NameServer的地址
send 发送消息,可以指定回调函数,同步异步
sendOneway 单向发送消息,不等待broker响应
shutdown 关闭当前生产者实例并释放相关资源
start 启动生产者
viewMessage 根据给定的msgId查询消息,还可指定topic
queryMessage 按关键字查询消息

消费者常用方法


方法名 说明
setNamesrvAddr 设置NameServer的地址
setMessageModel 设置消息消费模式(默认集群消费)
setConsumeThreadMin 消费者最小线程数量(默认20)
setConsumeThreadMax 消费者最大线程数量(默认20)
setPullInterval 推模式下任务间隔时间(推模式也是基于不断的轮训拉取的封装)
setPullBatchSize 推模式下任务拉取的条数,默认32条(一批批拉)
setMaxReconsumeTimes 消息重试次数,-1代表16次 (超过 次数成为死信消息)
setConsumeTimeout 消息消费超时时间(消息可能阻塞正在使用的线程的最大时间:以分钟为单位)

普通消息


生产者思路:

消费者思路:

生产者实现:

public class ProducerUtil {

    private static DefaultMQProducer producer = null;

    public static void start() {
        producer = new DefaultMQProducer("defaultGroup");
        producer.setNamesrvAddr("IP:9876");
        producer.setRetryTimesWhenSendFailed(3);
        try {
            producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }

    public static ResultModel<String> send(String topic, String tags, String content) {
        Message msg = new Message(topic, tags, "", content.getBytes());
        try {
            producer.send(msg);

            return new ResultModel<>();
        } catch (Exception e) {
            e.printStackTrace();
        }

        return new ResultModel<>();
    }

    public static void shutDownProducer() {
        if(producer != null) {
            producer.shutdown();
        }
    }
}

消费者实现:

@Log
@Service
public class ConsumerService {

    private DefaultMQPushConsumer consumer = null;

    @PostConstruct
    public void initMQConsumer() {
        consumer = new DefaultMQPushConsumer("defaultGroup");
        consumer.setNamesrvAddr("IP:9876");
        consumer.setConsumeTimeout(10000);
        try {
            consumer.subscribe("demo", "*");
            consumer.registerMessageListener(new MessageListenerConcurrently() {

                @Override
                public ConsumeConcurrentlyStatus consumeMessage(
                        List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    for (MessageExt msg : msgs) {
                        log.info("Message Received: " + new String(msg.getBody()));
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            consumer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }

    @PreDestroy
    public void shutDownConsumer() {
        if (consumer != null) {
            consumer.shutdown();
        }
    }
}

调用工具类测试:

顺序消息


顺序消息设计思路:
顺序消息就是指的,按顺序存入,按顺序取出。实现思路就是将同一类消息按照消费顺序放到同一种队列中去,这样就可以保证消费的先后顺序。
生产者实现:

public class ProducerUtil {

    private static DefaultMQProducer producer = null;

    public static void start() {
        producer = new DefaultMQProducer("defaultGroup");
        producer.setNamesrvAddr("49.235.73.14:9876");
        producer.setRetryTimesWhenSendFailed(3);
        try {
            producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }

    public static ResultModel<String> send(String topic) {
        List<Treat> treatList = new ProducerUtil().buildList();
        // tags数组 使用tag区分
        String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD"};

        for (int i = 0; i < treatList.size(); i++) {
            String body = " treat:" + treatList.get(i);
            Message msg = new Message(topic, tags[i % tags.length], "KEY" + i, body.getBytes());

            try {
                producer.send(msg, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                        /**
                         * 此处代码逻辑:就是找到消息对应的队列,一类消息一种队列
                         * 一种医疗治疗过程对应一种队列
                         * 20220819001L
                         * 20220819002L
                         * 20220819003L
                         * 20220819004L
                         */
                        String dbId = (String) arg;
                        String indexStr = dbId.substring(dbId.length() - 1);
                        return mqs.get(Integer.parseInt(indexStr) - 1);
                    }
                }, treatList.get(i).getDbId());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        return new ResultModel<>();
    }

    public static void shutDownProducer() {
        if(producer != null) {
            producer.shutdown();
        }
    }

    /**
     * 医疗过程
     */
    static class Treat {
        private String dbId;
        private String desc;

        public Treat() {
        }

        public Treat(String dbId, String desc) {
            this.dbId = dbId;
            this.desc = desc;
        }

        public String getDbId() {
            return dbId;
        }

        public void setDbId(String dbId) {
            this.dbId = dbId;
        }

        public String getDesc() {
            return desc;
        }

        public void setDesc(String desc) {
            this.desc = desc;
        }

        @Override
        public String toString() {
            return "Treat{" +
                    "dbId='" + dbId + '\'' +
                    ", desc='" + desc + '\'' +
                    '}';
        }
    }

    /**
     * 生成模拟订单数据
     */
    private List<Treat> buildList() {
        List<Treat> treatList = new ArrayList<>();
        treatList.add(new Treat("20220819004", "慢病检查"));
        treatList.add(new Treat("20220819001", "孕产妇产前检查"));
        treatList.add(new Treat("20220819003", "老人身体检查"));
        treatList.add(new Treat("20220819001", "孕产妇产中处理"));
        treatList.add(new Treat("20220819002", "婴儿身体检查"));
        treatList.add(new Treat("20220819001", "孕产妇产后护理"));
        treatList.add(new Treat("20220819004", "慢病治疗"));
        treatList.add(new Treat("20220819003", "老人专家会诊"));
        treatList.add(new Treat("20220819002", "婴儿产房护理"));
        treatList.add(new Treat("20220819003", "老人定期随访"));
        treatList.add(new Treat("20220819002", "婴儿定期随访"));
        treatList.add(new Treat("20220819004", "慢病随访"));
        treatList.add(new Treat("20220819004", "慢病观察"));
        return treatList;
    }
}

测试:

延时消息


延时消息指的是间隔一段时间后传给消费者。
setDelayTimeLevel(4);方法:
level(1~18个等级):1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

     public static ResultModel<String> send() {
        List<Treat> treatList = new ProducerUtil().buildList();
        // tags数组 使用tag区分
        String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD"};

        for (int i = 0; i < treatList.size(); i++) {
            String body = " treat:" + treatList.get(i);
            Message msg = new Message(TOPIC, tags[i % tags.length], "KEY" + i, body.getBytes());
            // 第四个等级 30s
            msg.setDelayTimeLevel(4);
            try {
                producer.send(msg);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        return new ResultModel<>();
    }

批量消息


批量消息就是一次性传入多个Message。

     public static ResultModel<String> send() {
        List<Treat> treatList = new ProducerUtil().buildList();
        // tags数组 使用tag区分
        String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD"};
        List<Message> msgList = new ArrayList<>();

        for (int i = 0; i < treatList.size(); i++) {
            String body = " treat:" + treatList.get(i);
            Message msg = new Message(TOPIC, tags[i % tags.length], "KEY" + i, body.getBytes());
            msgList.add(msg);
        }

        try {
            producer.send(msgList);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return new ResultModel<>();
    }

过滤消息


通过Tag标签来过滤不同消息,在消费者端实现。

    @PostConstruct
    public void initMQConsumer() {
        consumer = new DefaultMQPushConsumer(PRODUCER_GROUP);
        consumer.setNamesrvAddr(NAMESRV_ADDR);
        consumer.setConsumeTimeout(10000);

        try {
            consumer.subscribe(TOPIC, "TagC");
            consumer.registerMessageListener(new MessageListenerConcurrently() {

                @Override
                public ConsumeConcurrentlyStatus consumeMessage(
                        List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    try {
                        for (MessageExt msg : msgs) {
                            log.info("QueueId Received: " + msg.getQueueId() + " Message Received: " + new String(msg.getBody()));
                        }
                    } catch (Exception e) {
                        log.info(e.getMessage());
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }

                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            consumer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }

事务消息


https://www.jianshu.com/p/ed5c3c62a3c1

rocketmq顺序消息解决方案


将消息按照顺序放在同一个队列中就可以了。

rocketmq消息重复消费问题(幂等性问题)解决方案


rocketmq消息丢失问题解决方案


使用rocketmq提供的自身事务。

参考


https://gitee.com/apache/rocketmq/blob/master/docs/cn/client/java/API_Reference_DefaultMQProducer.md

https://blog.csdn.net/weixin_38880770/article/details/118447350

上一篇 下一篇

猜你喜欢

热点阅读