spring boot

RocketMQ SpringBoot starter 示例

2018-09-25  本文已影响178人  SlowGO

目标

实现一个 RocketMQ + Springboot-starter 版本的 helloworld,用来熟悉他们的整合用法。

spring-boot-rocketmq-starter 目前没有官方版本,但已经有不少实现,这里我们选择 rhwayfun 版本的实现。

创建项目

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example.rocketmq</groupId>
    <artifactId>springboot-starter-demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>springboot-starter-demo</name>
    <description>Demo project for Spring Boot</description>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.16.RELEASE</version>
        <relativePath /> <!-- lookup parent from repository -->
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>io.github.rhwayfun</groupId>
            <artifactId>spring-boot-rocketmq-starter</artifactId>
            <version>0.0.3.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.49</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

src/main/resources/application.properties

spring.rocketmq.nameServer=localhost:9876
spring.rocketmq.producer-group-name=spring-boot-test-producer-group

基础类

需要实现自定义的 Topic, Content

src/main/java/com/example/rocketmq/springbootstarterdemo/MyMqTopic.java

package com.example.rocketmq.springbootstarterdemo;

import io.github.rhwayfun.springboot.rocketmq.starter.constants.RocketMqTopic;

public class MyMqTopic implements RocketMqTopic {
    @Override
    public String getTopic() {
        return "testTopic";
    }
}

src/main/java/com/example/rocketmq/springbootstarterdemo/MyMqContent.java

package com.example.rocketmq.springbootstarterdemo;

import io.github.rhwayfun.springboot.rocketmq.starter.constants.RocketMqContent;

public class MyMqContent extends RocketMqContent {
    private String id;
    private String cnt;
    public String getId() {
        return id;
    }
    public void setId(String id) {
        this.id = id;
    }
    public String getCnt() {
        return cnt;
    }
    public void setCnt(String cnt) {
        this.cnt = cnt;
    }
}

Consumer

src/main/java/com/example/rocketmq/springbootstarterdemo/MyConsumer.java

package com.example.rocketmq.springbootstarterdemo;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;

import io.github.rhwayfun.springboot.rocketmq.starter.common.AbstractRocketMqConsumer;

@Component
public class MyConsumer extends AbstractRocketMqConsumer<MyMqTopic, MyMqContent> {
    @Override
    public boolean consumeMsg(MyMqContent content, MessageExt msg) {
        System.out.println("[MyConsumer] " + content);
        return false;
    }

    @Override
    public String getConsumerGroup() {
        return "testConsumerGroup";
    }

    @Override
    public Map<String, Set<String>> subscribeTopicTags() {
        Map<String, Set<String>> map = new HashMap<>();
        Set<String> tags = new HashSet<>();
        tags.add("test-tag");
        map.put("TopicTest", tags);
        return map;
    }
}

Controller

src/main/java/com/example/rocketmq/springbootstarterdemo/TestController.java

package com.example.rocketmq.springbootstarterdemo;

import org.apache.rocketmq.common.message.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import io.github.rhwayfun.springboot.rocketmq.starter.common.DefaultRocketMqProducer;

@RestController
public class TestController {
    @Autowired
    private DefaultRocketMqProducer producer;
    
    @RequestMapping("/push")
    public String pushMsg(String msg) {
        System.out.println("---push:" + msg);
        MyMqContent cnt = new MyMqContent();
        cnt.setId(""+System.currentTimeMillis());
        cnt.setCnt("测试消息 " + msg);
        Message message = new Message("TopicTest", "test-tag", cnt.toString().getBytes());
        boolean sendResult = producer.sendMsg(message);
        return "[/push] sendResult:" + sendResult;
    }
}

运行测试

启动 RocketMQ:

$ bin/mqnamesrv

# 另一个终端执行
$ bin/mqbroker -n localhost:9876

# 另一个终端启动项目
$ mvn spring-boot:run

访问 controller:

http://localhost:8080/push?msg=hello

控制台会显示 Consumer 的打印信息:

[MyConsumer] {"cnt":"测试消息 hello","id":"1537864589562"}
上一篇 下一篇

猜你喜欢

热点阅读