RocketMQ SpringBoot starter 示例
2018-09-25 本文已影响178人
SlowGO
目标
实现一个 RocketMQ + Springboot-starter 版本的 helloworld,用来熟悉他们的整合用法。
spring-boot-rocketmq-starter 目前没有官方版本,但已经有不少实现,这里我们选择 rhwayfun 版本的实现。
创建项目
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example.rocketmq</groupId>
<artifactId>springboot-starter-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>springboot-starter-demo</name>
<description>Demo project for Spring Boot</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.16.RELEASE</version>
<relativePath /> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.github.rhwayfun</groupId>
<artifactId>spring-boot-rocketmq-starter</artifactId>
<version>0.0.3.RELEASE</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.49</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
src/main/resources/application.properties
spring.rocketmq.nameServer=localhost:9876
spring.rocketmq.producer-group-name=spring-boot-test-producer-group
基础类
需要实现自定义的 Topic, Content。
src/main/java/com/example/rocketmq/springbootstarterdemo/MyMqTopic.java
package com.example.rocketmq.springbootstarterdemo;
import io.github.rhwayfun.springboot.rocketmq.starter.constants.RocketMqTopic;
public class MyMqTopic implements RocketMqTopic {
@Override
public String getTopic() {
return "testTopic";
}
}
src/main/java/com/example/rocketmq/springbootstarterdemo/MyMqContent.java
package com.example.rocketmq.springbootstarterdemo;
import io.github.rhwayfun.springboot.rocketmq.starter.constants.RocketMqContent;
public class MyMqContent extends RocketMqContent {
private String id;
private String cnt;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getCnt() {
return cnt;
}
public void setCnt(String cnt) {
this.cnt = cnt;
}
}
Consumer
src/main/java/com/example/rocketmq/springbootstarterdemo/MyConsumer.java
package com.example.rocketmq.springbootstarterdemo;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;
import io.github.rhwayfun.springboot.rocketmq.starter.common.AbstractRocketMqConsumer;
@Component
public class MyConsumer extends AbstractRocketMqConsumer<MyMqTopic, MyMqContent> {
@Override
public boolean consumeMsg(MyMqContent content, MessageExt msg) {
System.out.println("[MyConsumer] " + content);
return false;
}
@Override
public String getConsumerGroup() {
return "testConsumerGroup";
}
@Override
public Map<String, Set<String>> subscribeTopicTags() {
Map<String, Set<String>> map = new HashMap<>();
Set<String> tags = new HashSet<>();
tags.add("test-tag");
map.put("TopicTest", tags);
return map;
}
}
Controller
src/main/java/com/example/rocketmq/springbootstarterdemo/TestController.java
package com.example.rocketmq.springbootstarterdemo;
import org.apache.rocketmq.common.message.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import io.github.rhwayfun.springboot.rocketmq.starter.common.DefaultRocketMqProducer;
@RestController
public class TestController {
@Autowired
private DefaultRocketMqProducer producer;
@RequestMapping("/push")
public String pushMsg(String msg) {
System.out.println("---push:" + msg);
MyMqContent cnt = new MyMqContent();
cnt.setId(""+System.currentTimeMillis());
cnt.setCnt("测试消息 " + msg);
Message message = new Message("TopicTest", "test-tag", cnt.toString().getBytes());
boolean sendResult = producer.sendMsg(message);
return "[/push] sendResult:" + sendResult;
}
}
运行测试
启动 RocketMQ:
$ bin/mqnamesrv
# 另一个终端执行
$ bin/mqbroker -n localhost:9876
# 另一个终端启动项目
$ mvn spring-boot:run
访问 controller:
http://localhost:8080/push?msg=hello
控制台会显示 Consumer 的打印信息:
[MyConsumer] {"cnt":"测试消息 hello","id":"1537864589562"}