2020-12-21

2020-12-21  本文已影响0人  风雨无阻zql

ActiveMq队列spring实现,案列如下

(1)pom.xml引入相关jar

<!-- activeMQ相关 begin-->

<dependency>

<groupId>org.apache.activemq</groupId>

<artifactId>activemq-all</artifactId>

<version>5.11.1</version></dependency>

<dependency>

<groupId>org.springframework</groupId>

<artifactId>spring-jms</artifactId>

<version>4.1.4.RELEASE</version>

</dependency>

(2)添加生产者配置activemq-sender.xml

<description>JMS发布者应用配置</description>

    <!-- CachingConnectionFactory 连接工厂 (有缓存功能)-->

    <bean id="cachingConnectionFactory"

        class="org.springframework.jms.connection.CachingConnectionFactory">

        <!-- Session缓存数量 -->

        <property name="sessionCacheSize" value="20" />

        <property name="targetConnectionFactory"> 

            <bean class="org.apache.activemq.ActiveMQConnectionFactory"> 

                <!-- MQ地址 账户名 密码--> 

                <property name="brokerURL" value="tcp://192.168.56.129:61616" />

                <property name="userName" value="parry" />

                <property name="password" value="parry123" />

                <!-- 是否异步发送 -->

                <property name="useAsyncSend" value="true"/>

            </bean> 

        </property> 

    </bean>

    <!-- 接收消息的目的地(一个主题)点对点队列 -->

    <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">

        <!-- 设置消息主题的名字 -->

        <constructor-arg index="0" value="messages" />

    </bean>

    <!-- 接收配置JMS模版 -->

    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">

        <property name="connectionFactory" ref="cachingConnectionFactory" />

        <property name="defaultDestination" ref="destination" />

        <!-- value为true为发布/订阅模式; value为false为点对点模式-->

        <property name="pubSubDomain" value="false"/>

    </bean>

(3)添加消费者配置activemq-consumer.xml

<description>JMS订阅者应用配置</description>

    <!-- CachingConnectionFactory 连接工厂 (有缓存功能)-->

    <bean id="cachingConnectionFactory"

        class="org.springframework.jms.connection.CachingConnectionFactory">

        <!-- Session缓存数量 -->

        <property name="sessionCacheSize" value="20" />

        <property name="targetConnectionFactory"> 

            <bean class="org.apache.activemq.ActiveMQConnectionFactory"> 

                <!-- MQ地址 账户名 密码--> 

                <property name="brokerURL" value="tcp://192.168.56.129:61616" />

                <property name="userName" value="parry" />

                <property name="password" value="parry123" />

                <!-- 是否异步发送 -->

                <property name="useAsyncSend" value="true"/>

            </bean> 

        </property> 

    </bean>

    <!-- 接收消息的目的地(一个主题)点对点队列 -->

    <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">

        <!-- 设置消息主题的名字 -->

        <constructor-arg index="0" value="messages" />

    </bean>

    <!-- 消费者配置 (自己定义) -->

    <bean id="consumer" class="com.parry.MQ.funcion.Listener" />

    <!-- 消息监听容器 -->

    <bean id="myListenerContainer"

        class="org.springframework.jms.listener.DefaultMessageListenerContainer">

        <property name="connectionFactory" ref="cachingConnectionFactory" />

        <property name="destination" ref="destination" />

        <property name="messageListener" ref="consumer" />

        <!-- 如果消息的接收速率,大于消息处理的速率时,可以采取线程池方式 -->

        <property name="taskExecutor" ref="queueMessageExecutor"/>

        <!-- 设置固定的线程数 -->

        <property name="concurrentConsumers" value="30"/>

        <!-- 设置动态的线程数 -->

        <property name="concurrency" value="20-50"/>

        <!-- 设置最大的线程数 -->

        <property name="maxConcurrentConsumers" value="80"/>

    </bean>

    <bean id="queueMessageExecutor"

        class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">

        <property name="corePoolSize" value="30" />

        <property name="maxPoolSize" value="80" />

        <property name="daemon" value="true" />

        <property name="keepAliveSeconds" value="120" />

    </bean>

 (4)新建一个发送消息的方法

import javax.jms.JMSException;

import javax.jms.Message;

import javax.jms.Session;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.jms.core.JmsTemplate;

import org.springframework.jms.core.MessageCreator;

import org.springframework.stereotype.Component;

/**

* 发送消息

* @author Administrator

*

*/

@Component

public class QueueSender {

    @Autowired

    private JmsTemplate myJmsTemplate;

    /**

    * 发送一条消息到指定的队列(目标)

    *

    * @param queueName

    *            队列名称

    * @param message

    *            消息内容

    */

    public void send(String queueName, final String message) {

        myJmsTemplate.send(queueName, new MessageCreator() {

            public Message createMessage(Session session) throws JMSException {

                return session.createTextMessage(message);

            }

        });

    }

}

 (5)添加监听器

package com.parry.MQ.funcion;

import javax.jms.JMSException;

import javax.jms.Message;

import javax.jms.MessageListener;

import javax.jms.TextMessage;

/**

* 接收者监听类

* @author Administrator

*

*/

public class Listener implements MessageListener {

    public void onMessage(Message message) {

        // 业务处理

        try {

            TextMessage message2 = (TextMessage) message;

            System.out.println("接收到信息:" + message2.getText());

        } catch (JMSException e) {

            e.printStackTrace();

        }

    }

}

(6)写个一请求测试一下

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Controller;

import org.springframework.web.bind.annotation.RequestMapping;

import org.springframework.web.bind.annotation.ResponseBody;

import com.parry.MQ.funcion.QueueSender;

@Controller

public class App {

    @Autowired

    private QueueSender sender;

    @RequestMapping("test")

    @ResponseBody

    public String Test() {

        sender.send("messages", "你好,这是我的第一条消息!");

        return "Hello world";

    }

}

(7)开启服务,访问路径测试


上一篇 下一篇

猜你喜欢

热点阅读