[干货]springboot集成rocketmq
2018-08-28 本文已影响38人
麦克劳林
1、pom.xml依赖
<!-- Rocketmq -->
<dependency>
<groupId>com.alibaba.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>3.2.6</version>
</dependency>
2、配置文件application.yml
配置
这种写,方便增加队列。
3、生产者Producer
package com.sheng.RocketMQ;
import java.util.List;
import com.shengtong.RocketMQ.service.IProducerService;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.util.StopWatch;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.MessageQueueSelector;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageQueue;
/**
* .Description:生产者
* .@Date:2018/7/5
* .Author:
*/
@Service(value = "producerService")
public class ProducerImpl implements IProducerService{
@Value("${apache.rocketmq.namesrvAddr}")
private String namesrvAddr;//NameServer 地址
@Value("${apache.rocketmq.producer.groupName}")
private String producerGroup;//生产者的组名
@Value("${apache.rocketmq.producer.topic}")
private String topic;
//@PostConstruct //@PostContruct是spring框架的注解,在方法上加该注解会在项目启动的时候执行该方法,也可以理解为在spring容器初始化的时候执行该方法。
public String defaultMQProducer(String messageInfo) {
//生产者的组名
DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
//指定NameServer地址,多个地址以 ; 隔开
producer.setNamesrvAddr(namesrvAddr);
String resultMsg = "";
try {
/**
* Producer对象在使用之前必须要调用start初始化,初始化一次即可
* 注意:切记不可以在每次发送消息时,都调用start方法
*/
producer.start();
//创建一个消息实例,包含 topic、tag 和 消息体
//如下:topic 为 "TopicTest",tag 为 "push"
Message message = new Message(topic, "", messageInfo.getBytes());
StopWatch stop = new StopWatch();
stop.start();
SendResult result = producer.send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, 1);
System.out.println("发送响应:MsgId:" + result.getMsgId() + ",发送状态:" + result.getSendStatus());
stop.stop();
resultMsg = result.getSendStatus().toString();
System.out.println("----------------发送1条消息耗时:" + stop.getTotalTimeMillis());
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.shutdown();
}
return resultMsg;
}
}
4、消费者Consumer
package com.sheng.RocketMQ;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.rocketmq.client.consumer.listener.*;
import com.shengtong.RocketMQ.service.IConsumerService;
import com.shengtong.RocketMQ.service.IMSGHandler;
import com.shengtong.utils.SysData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
import javax.annotation.PostConstruct;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/**
* .Description:消费者
* .@Date:2018/7/5
* .Author:
*/
@Service(value = "consumerService")
public class ConsumerImpl implements IConsumerService{
@Autowired
private IMSGHandler msgHandler;
@Value("${apache.rocketmq.namesrvAddr}")
private String namesrvAddr;//NameServer 地址
@Value("${apache.rocketmq.consumer1.groupName}")
private String consumerGroup1;//消费者的组名
@Value("${apache.rocketmq.consumer1.topic}")
private String consumerTopic1;//消费者的组名
@Value("${apache.rocketmq.consumer2.groupName}")
private String consumerGroup2;//消费者的组名
@Value("${apache.rocketmq.consumer2.topic}")
private String consumerTopic2;//消费者的组名
/**
* .Description:网关/设备在离线消息格式 Topic:gateway_node_heartbeat
* .Author:QiongWu
* .@Date: 2018/7/6
* .@PostContruct是spring框架的注解,在方法上加该注解会在项目启动的时候执行该方法,
* .也可以理解为在spring容器初始化的时候执行该方法。
*/
@PostConstruct
public void defaultMQPushConsumer1() {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup1);
consumer.setNamesrvAddr(namesrvAddr);
try {
consumer.subscribe(consumerTopic1, "");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.registerMessageListener(new MessageListenerOrderly() {
AtomicLong consumeTimes = new AtomicLong(0);
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
consumeOrderlyContext.setAutoCommit(true);
for (MessageExt msg : list) {
String msgContent = new String(msg.getBody());
System.out.println("######### MSG Content start ##########");
System.out.println(msgContent);
System.out.println("######### END ##########");
parseMsgContent(msgContent);
try {
TimeUnit.SECONDS.sleep(5L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.println("Cosumer started.");
} catch (Exception e) {
e.printStackTrace();
}
}
//解析消息内容
private void parseMsgContent(String msgContent) {
JSONObject msg = JSONObject.parseObject(msgContent);
String className = msg.getString("className");
if (className.equals(SysData.REAL_TIME_MSG_TYPE)) {
//解析实时回路信息
msgHandler.handleMsg(msgContent);
} else if (className.equals(SysData.TERMINAL_ONLINE_MSG_TYPE)) {
//解析终端在离线状态
msgHandler.handleOnlineTerminallist(msgContent);
}else if (className.equals(SysData.TERMINAL_SET_MSG_TYPE)){
//解析设置终端应答
msgHandler.handleSetTerminalReceived(msgContent);
}
return;
}
}