中间件

springboot整合rocketMq示例

2020-12-22  本文已影响0人  haiyong6

简介

rocketMq消息队列以前是阿里开发的后来捐赠给Apache开源,先进先出,可以用来推送消息,也可以一对多发布消息(广播订阅),redis也能实现这些功能,但是对比起redis,rocketMq的可靠性要更好,比如生产者发布了消息,如果消费者那边程序挂了没收到消息,消息不会消失,等消费者程序恢复正常后,消息还能被接收到。

producer:生产者,主动发消息的那一端。
comsumer:消费者,被动接收消息的那一端(通过监听器触发pull动作)。
生产者可以一对一的发消息,也可以一对多的发消息,多个comsumer都能收到相同的消息。生产者发送消息的过程为广播动作,消费者接收消息的过程为订阅动作。

利用rocketmq接收消息而不是直接发送消息给数据库,可以大大缓解在高并发下数据库承受的压力,按照先进先出的规则先存入rocketmq,再一个一个的被消费者获取操作数据库,这个过程为削峰。

安装

可以去这个网址https://archive.apache.org/dist/rocketmq下载安装,jdk最好用1.8,用jdk11跑不起来,用的公司安装好的集群或自己电脑上安装的单机都是可以运行的。
以4.5.1为例

wget https://archive.apache.org/dist/rocketmq/4.5.1/rocketmq-all-4.5.1-bin-release.zip
unzip rocketmq-all-4.5.1-bin-release.zip

解压之后先把conf文件夹下的broker.conf文件后面加入自己电脑的ip

brokerIP1=192.168.137.18

为啥要加这个呢,这个程序不太智能,如果我电脑上开了docker,docker的ip排在第一个,它默认就把docker的ip作为我本机的ip了,所以要指定。
将bin文件夹下的runbroker.sh的第一个JAVA_OPT改成

JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"

将runserver.sh的第一个JAVA_OPT改成

JAVA_OPT="${JAVA_OPT} -server -Xms128m -Xmx256m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

因为默认的内存占用太大了,如果是8g的电脑跑不起来

启动namesrv
nohup sh bin/mqnamesrv &

查看启动日志

tail -f ~/logs/rocketmqlogs/namesrv.log
指定ip端口和修改后的配置文件启动broker
nohup sh bin/mqbroker -n 192.168.137:9876 -c ~/myspace/profiles/rocketmq-all-4.5.1-bin-release/conf/broker.conf &

查看启动日志

tail -f ~/logs/rocketmqlogs/broker.log 

这样rocketmq就算启动好了
停止命令:

sh bin/mqshutdown namesrv
sh bin/mqshutdown broker

springboot整合rocketMq

maven引入(可以去maven仓库下载最新版,截止目前是2.1.1最新):

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.1.1</version>
        </dependency>

application.properties加入:

rocketmq.name-server:172.26.xxx.24:9876;172.26.xxx.25:9876
rocketmq.producer.group=producerGroup1
rocketmq.producer.retry-times-when-send-failed=2
rocketmq.producer.retry-times-when-send-async-failed=0
rocketmq.producer.send-message-timeout=300000
rocketmq.producer.max-message-size=4194304
rocketmq.producer.retry-next-server=false

上面的server如果是集群则是多个地址分号分隔,如果是单机只填一个就好了,rocketmq.producer.group是生产者组名称,名字可以自己取

新建生产者类Producer.java

package com.zhaohy.app.rocketMq;

import java.util.HashMap;
import java.util.Map;

import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

@Component
public class Producer {
    @Autowired
    private RocketMQTemplate mqTemplate;
    
    public void send() {
        Map<String, Object> resultMap = new HashMap<String, Object>();
        resultMap.put("name", "testName");
        resultMap.put("id", "1");
        resultMap.put("sex", "1");
        //发送消息
        mqTemplate.convertAndSend("Topic1:TagA", resultMap);

        //发送spring的Message
        mqTemplate.send("Topic1:TagA", MessageBuilder.withPayload(resultMap).build());

        //发送异步消息
        mqTemplate.asyncSend("Topic1:TagA", resultMap, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println("发送成功");
            }

            @Override
            public void onException(Throwable throwable) {
                System.out.println("发送失败");
            }
        });

        //发送顺序消息
        mqTemplate.syncSendOrderly("Topic1", "1,创建", "3");
        mqTemplate.syncSendOrderly("Topic1", "2,支付", "2");
        mqTemplate.syncSendOrderly("Topic1", "3,完成", "1");
    }
}

在上面的send方法中有三种发送方式,可以发送String也可以发送实体类。
新建消费者Consumer.java和Consumer2.java

package com.zhaohy.app.rocketMq;

import java.util.Map;

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

@Component
@RocketMQMessageListener(topic = "Topic1", consumerGroup = "consumerGroup1")
public class Consumer implements RocketMQListener<Map<String, Object>> {
    @Override
    public void onMessage(Map<String, Object> paramsMap) {
        System.out.println("1收到: "+paramsMap.get("name"));
    }

}
package com.zhaohy.app.rocketMq;

import java.util.Map;

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

@Component
@RocketMQMessageListener(topic = "Topic1", consumerGroup = "consumerGroup2")
public class Consumer2 implements RocketMQListener<String> {
    @Override
    public void onMessage(String s) {
        System.out.println("2收到: "+s);
    }

}

消费者类中注解里的topic即是在生产者定义的topic,多个消费者之间consumerGroup的名称不能重复。

controller里新建测试方法:

@Autowired
Producer producer;

@RequestMapping("/test/producerSend.do")
    public void rocketMqProducerTest(HttpServletRequest request) {
        producer.send();
    }

运行这个测试接口:控制台输出如下:

发送成功
2收到: {"sex":"1","name":"testName","id":"1"}
2收到: 1,创建
2收到: {"sex":"1","name":"testName","id":"1"}
2收到: {"sex":"1","name":"testName","id":"1"}
2收到: 3,完成
2收到: 2,支付
1收到: testName
1收到: testName
1收到: testName

可以看到consumerGroup1里用Map<String, Object>接收的,只能拿到同是Map<String, Object>类型发送的生产者发送的信息,consumerGroup2里用String接收的信息能全部拿到,所以可以用String来接收,然后再把json转换成对象处理。

至此就成功运行了,用rocketmq可以做并发量比较大的场景,如点赞,秒杀等场景,之前写过一篇是用redis做的点赞:java实现点赞功能示例,需要结合定时任务,如果是用mq,就不用定时也可以实现了,会简单不少。

参考:https://www.cnblogs.com/zpKang/p/13717258.html

上一篇 下一篇

猜你喜欢

热点阅读