SpringBoot整合activeMQ

2018-07-30  本文已影响0人  雨中独奏

ActiveMQ简介

ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。

ActiveMQ特性

1.多语言和协议编写客户端。语言: Java,C,C++,C#,Ruby,Perl,Python,PHP。
2.应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP。
3。完全支持JMS1.1和J2EE 1.4规范(持久化、XA消息,事物)。
4.对Spring的支持,ActiveMQ可以很容易嵌套到使用Spring的系统里面去,而且也支持Spring2.0的特征。
5.通过了常见的J2EE服务器(如Geronimo,JBoss 4,GlassFish,WebLogic)的测试,其中通过JCA 1.5。
6.resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上。
7.支持多种传送协议: in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA。
8.支持通过JDBC和journal提供高速的消息持久化。
9.从设计上保证了高性能的集群,客户端-服务器,点对点。
10.支持Ajax。
11.支持和Axis的整合。
12.可以很容易的调用内嵌JMS provider,进行测试。

什么情况下使用ActiveMQ?

多个项目之间集成 ;跨平台; 多语言 ;多项目;降低系统间模块的耦合度,解耦 ;软件扩展性;系统前后端隔离 ;前后端隔离,屏蔽高安全区;

入门实例

引入依赖配置

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-pool</artifactId>
        </dependency>

application.properties配置文件中新增配置

# ActiveMQ--------------------------------------------------------------------------------------------------------------
# Specify if the default broker URL should be in memory. Ignored if an explicit broker has been specified.
# 默认为true表示使用内存的activeMQ,不需要安装activeMQ server
#spring.activemq.in-memory=false
# URL of the ActiveMQ broker. Auto-generated by default. For instance `tcp://localhost:61616`
spring.activemq.broker-url=tcp://localhost:61616
# Login user of the broker.
spring.activemq.user=admin
# Login password of the broker.
spring.activemq.password=admin
# Trust all packages.
#spring.activemq.packages.trust-all=false
# Comma-separated list of specific packages to trust (when not trusting all packages).
#spring.activemq.packages.trusted=
# See PooledConnectionFactory.
#spring.activemq.pool.configuration.*=
# Whether a PooledConnectionFactory should be created instead of a regular ConnectionFactory.
spring.activemq.pool.enabled=true
# Maximum number of pooled connections.
spring.activemq.pool.max-connections=50
# Connection expiration timeout in milliseconds.
spring.activemq.pool.expiry-timeout=10000
# Connection idle timeout in milliseconds.
spring.activemq.pool.idle-timeout=30000
# 如果为True,则是Topic;如果是false或者默认,则是queue。
spring.jms.pub-sub-domain=false

新建消息生产者类

@Service("producer")
public class Producer {

    @Resource
    private JmsTemplate jmsTemplate;

    /**
     * 发送消息
     *
     * @param destination 发送到的队列
     * @param message     待发送的消息
     */
    public void convertAndSend(Destination destination, final String message) {
        jmsTemplate.convertAndSend(destination, message);
    }
}

新建消息消费者ABC

@Component
public class ConsumerA {

    /**
     * 使用JmsListener配置消费者监听的队列
     *
     * @param text 接收到的消息
     */
    @JmsListener(destination = "suimh_queue")
    public void receiveQueue(String text) {
        System.out.println("Consumer-A : 收到的报文为:" + text);
    }
}

@Component
public class ConsumerB {

    /**
     * 使用JmsListener配置消费者监听的队列
     *
     * @param text 接收到的消息
     */
    @JmsListener(destination = "suimh_queue")
    @SendTo("out.queue")
    public String receiveQueue(String text) {
        System.out.println("Consumer-B : 收到的报文为:" + text);
        return text;
    }
}

@Component
public class ConsumerC {

    /**
     * 使用JmsListener配置消费者监听的队列
     *
     * @param text 接收到的消息
     */
    @JmsListener(destination = "out.queue")
    public void consumerMessage(String text) {
        System.out.println("Consumer-C : 从out.queue队列收到的回复报文为:" + text);
    }
}

测试:

/**
     * activeMQ测试方法(queue队列模式)
     * 如果想用Topic发布订阅模式,需要新建ActiveMQTopic实例,并修改配置文件spring.jms.pub-sub-domain=true
     *
     * @return String
     */
    @GetMapping(value = "/activeMqSendMes")
    @ResponseBody
    public String activeMqSendMes() {
        int num = 10;
        try {
            Destination destinationQueue = new ActiveMQQueue("suimh_queue");
            for (int i = 1; i <= num; i++) {
                producer.convertAndSend(destinationQueue, "这是queueProducer发送的第" + i + "个消息!");
            }
            return "activeMQ生产成功!";
        } catch (Exception e) {
            return "activeMQ生产失败!";
        }
    }
上一篇下一篇

猜你喜欢

热点阅读