RocketMQ SpringBoot 示例
2018-09-25 本文已影响406人
SlowGO
目标
做一个 RocketMQ + SpringBoot 的 helloworld 示例,来熟悉他们的整合用法。
项目启动后准备好 Producer 和 Consumer,在 controller 中调用 producer 发送消息,consumer 拿到消息后打印到后台。
创建项目
使用 https://start.spring.io 创建一个springboot项目,修改 pom.xml 为:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example.rocketmq</groupId>
<artifactId>spring-rocketmq-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>spring-rocketmq-demo</name>
<description>Demo project for Spring Boot</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.16.RELEASE</version>
<relativePath /> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-common -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-common</artifactId>
<version>4.3.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.3.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.49</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
src/main/resources/application.propreties
apache.rocketmq.consumer.PushConsumer=PushConsumer
apache.rocketmq.producer.producerGroup=Producer
apache.rocketmq.namesrvAddr=localhost:9876
Producer
src/main/java/com/example/rocketmq/springrocketmqdemo/ProducerService.java
package com.example.rocketmq.springrocketmqdemo;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
@Service
public class ProducerService {
@Value("${apache.rocketmq.producer.producerGroup}")
private String producerGroup;
@Value("${apache.rocketmq.namesrvAddr}")
private String namesrvAddr;
private DefaultMQProducer producer;
@PostConstruct
public void initProducer() {
producer = new DefaultMQProducer(producerGroup);
producer.setNamesrvAddr(namesrvAddr);
producer.setRetryTimesWhenSendFailed(3);
try {
producer.start();
System.out.println("[Producer 已启动]");
} catch (Exception e) {
e.printStackTrace();
}
}
public String send(String topic, String tags, String msg) {
SendResult result = null;
try {
Message message = new Message(topic, tags, msg.getBytes(RemotingHelper.DEFAULT_CHARSET));
result = producer.send(message);
System.out.println("[Producer] msgID(" + result.getMsgId() + ") " + result.getSendStatus());
} catch (Exception e) {
e.printStackTrace();
}
return "{\"MsgId\":\"" + result.getMsgId() + "\"}";
}
@PreDestroy
public void shutDownProducer() {
if (producer != null) {
producer.shutdown();
}
}
}
Consumer
src/main/java/com/example/rocketmq/springrocketmqdemo/ProducerService.java
package com.example.rocketmq.springrocketmqdemo;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Component
public class Consumer {
@Value("${apache.rocketmq.consumer.PushConsumer}")
private String consumerGroup;
@Value("${apache.rocketmq.namesrvAddr}")
private String namesrvAddr;
@PostConstruct
public void defaultMQPushConsumer() {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
consumer.setNamesrvAddr(namesrvAddr);
try {
consumer.subscribe("TopicTest", "push");
// 如果是第一次启动,从队列头部开始消费
// 如果不是第一次启动,从上次消费的位置继续消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.registerMessageListener((MessageListenerConcurrently) (list, context) -> {
try {
for (MessageExt messageExt : list) {
String messageBody = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);
System.out.println("[Consumer] msgID(" + messageExt.getMsgId() + ") msgBody : " + messageBody);
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
System.out.println("[Consumer 已启动]");
} catch (Exception e) {
e.printStackTrace();
}
}
}
Controller
src/main/java/com/example/rocketmq/springrocketmqdemo/ProducerService.java
package com.example.rocketmq.springrocketmqdemo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class TestController {
@Autowired
private ProducerService producer;
@RequestMapping("/push")
public String pushMsg(String msg) {
return producer.send("TopicTest", "push", msg);
}
}
运行测试
启动 rockermq :
$ bin/mqnamesrv
# 另一个终端执行
$ bin/mqbroker -n localhost:9876
运行项目:
# 另一个终端执行
$ mvn spring-boot:run
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v1.5.16.RELEASE)
......
[Consumer 已启动]
[Producer 已启动]
......
2018-09-25 15:08:30.725 INFO 57641 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet : FrameworkServlet 'dispatcherServlet': initialization started
2018-09-25 15:08:30.740 INFO 57641 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet : FrameworkServlet 'dispatcherServlet': initialization completed in 14 ms
访问 controller:
http://localhost:8080/push?msg=hi
查看控制台:
[Producer] msgID(C0A80072E12937741D1F7ED864B60000) SEND_OK
[Consumer] msgID(C0A80072E12937741D1F7ED864B60000) msgBody : hi