Spring-BootSpring Boot

spring整合activemq

2018-05-16  本文已影响48人  z七夜

1.linux安装activemq

本例使用docker pull的activemq的镜像,并没有安装,
安装完成之后通过8161端口访问,输入用户名密码(admin),即可访问activemq的管理界面


image.png

2.新建一个maven项目

这是一个ssm项目。pom如下

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>jk.zmn</groupId>
  <artifactId>spring-activemq</artifactId>
  <packaging>war</packaging>
  <version>0.0.1-SNAPSHOT</version>
  <name>spring-activemq Maven Webapp</name>
  <url>http://maven.apache.org</url>
  
    <properties>
    <spring.version>4.0.5.RELEASE</spring.version>
    <mybatis.version>3.2.1</mybatis.version>
    <slf4j.version>1.6.6</slf4j.version>
    <log4j.version>1.2.12</log4j.version>
    <mysql.version>5.1.35</mysql.version>
    <jackjson.version>2.8.8</jackjson.version>
    <activemq.version>5.11.2</activemq.version>
  </properties>
  <dependencies>
  <!-- 添加Spring依赖 -->
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-core</artifactId>
        <version>${spring.version}</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-context</artifactId>
        <version>${spring.version}</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-context-support</artifactId>
        <version>${spring.version}</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-aop</artifactId>
        <version>${spring.version}</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-aspects</artifactId>
        <version>${spring.version}</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-tx</artifactId>
        <version>${spring.version}</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-jdbc</artifactId>
        <version>${spring.version}</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-web</artifactId>
        <version>${spring.version}</version>
    </dependency>
    <!--spring单元测试依赖 -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-test</artifactId>
            <version>${spring.version}</version>
            <scope>test</scope>
        </dependency>
 
  <!-- spring webmvc相关jar -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-webmvc</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-web</artifactId>
            <version>${spring.version}</version>
        </dependency>
  
  <!-- mysql驱动包 -->
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>${mysql.version}</version>
    </dependency>
    
    <!-- alibaba data source 相关jar包-->
     <dependency>
         <groupId>com.alibaba</groupId>
         <artifactId>druid</artifactId>
         <version>0.2.23</version>
     </dependency>
     
    
     <!-- logback start -->
  <dependency>
        <groupId>log4j</groupId>
        <artifactId>log4j</artifactId>
        <version>${log4j.version}</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>${slf4j.version}</version>
    </dependency>
    <dependency>
        <groupId>ch.qos.logback</groupId>
        <artifactId>logback-classic</artifactId>
        <version>1.1.2</version>
    </dependency>
    <dependency>
        <groupId>ch.qos.logback</groupId>
        <artifactId>logback-core</artifactId>
        <version>1.1.2</version>
    </dependency>
    <dependency>
        <groupId>org.logback-extensions</groupId>
        <artifactId>logback-ext-spring</artifactId>
        <version>0.1.1</version>
    </dependency>

    
     
    <!--mybatis依赖 -->
    <dependency>
        <groupId>org.mybatis</groupId>
        <artifactId>mybatis</artifactId>
        <version>${mybatis.version}</version>
    </dependency>

    <!-- mybatis/spring包 -->
    <dependency>
        <groupId>org.mybatis</groupId>
        <artifactId>mybatis-spring</artifactId>
        <version>1.2.0</version>
    </dependency>
  <!-- 添加servlet3.0核心包 -->
          <dependency>
              <groupId>javax.servlet</groupId>
              <artifactId>javax.servlet-api</artifactId>
              <version>3.0.1</version>
          </dependency>
          <dependency>
              <groupId>javax.servlet.jsp</groupId>
              <artifactId>javax.servlet.jsp-api</artifactId>
              <version>2.3.2-b01</version>
          </dependency>
          <!-- jstl -->
          <dependency>
              <groupId>javax.servlet</groupId>
              <artifactId>jstl</artifactId>
              <version>1.2</version>
          </dependency>
    <!--单元测试依赖 -->
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
    </dependency>
    
    <dependency>
         <groupId>com.github.pagehelper</groupId>
         <artifactId>pagehelper</artifactId>
         <version>4.1.4</version>
     </dependency>

    <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-core</artifactId>
       <version>${jackjson.version}</version>
    </dependency>
    <dependency>
       <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-annotations</artifactId>
       <version>${jackjson.version}</version>
    </dependency>
    <dependency>
       <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-databind</artifactId>
       <version>${jackjson.version}</version>
    </dependency>
    
    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-all</artifactId>
        <version>${activemq.version}</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-jms</artifactId>
        <version>${spring.version}</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-context-support</artifactId>
        <version>${spring.version}</version>
    </dependency>
  </dependencies>
  
  
  
  <build>
    <finalName>spring-activemq</finalName>
  </build>
</project>

1.非整合spring的单机版

1.queue形式

内容提供者

    @Test
    public void testQueueMqProducter() throws Exception{
        //1.创建链接工厂
        ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://www.itzmn.com:61616");
        //2.创建链接
        Connection connection = factory.createConnection();
        //3.开启连接
        connection.start();
        //4.创建一个session对象
        /**
         * 第一个参数,是否开启事务,如果开启 后面一个参数无意义
         * 第二个参数,应答模式
         */
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.创建一个目的地
        Queue testQueue = session.createQueue("testQueue");
        //6.创建一个内容提供者
        MessageProducer producter = session.createProducer(testQueue);
        //7.发送消息
        TextMessage QMessage = new ActiveMQTextMessage();
        QMessage.setText("我是队列信息,要减库存了");
        producter.send(QMessage);
        //8.关闭资源
        session.close();
        connection.close();
        
        
    }

消费者


    @Test
    public void testQueueMqConsumer() throws Exception{
        //1.创建链接工厂
        ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://www.itzmn.com:61616");
        //2.创建链接
        Connection connection = factory.createConnection();
        //3.开启连接
        connection.start();
        //4.创建一个session对象
        /**
         * 第一个参数,是否开启事务,如果开启 后面一个参数无意义
         * 第二个参数,应答模式
         */
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.创建一个目的地
        Queue testQueue = session.createQueue("testQueue");
        //6.创建一个消费者
         MessageConsumer consumer = session.createConsumer(testQueue);
        //7.发送消息
         TextMessage receive = (TextMessage) consumer.receive();
         System.out.println(receive.getText());
        //8.关闭资源
        session.close();
        connection.close();
        
        
    }

效果如下

image.png

2. 发布订阅模式

发布者

@Test
    public void testTopicMqProducter() throws Exception{
        //1.创建链接工厂
        ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://www.itzmn.com:61616");
        //2.创建链接
        Connection connection = factory.createConnection();
        //3.开启连接
        connection.start();
        //4.创建一个session对象
        /**
         * 第一个参数,是否开启事务,如果开启 后面一个参数无意义
         * 第二个参数,应答模式
         */
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.创建一个目的地
        Topic createTopic = session.createTopic("testTopic");
        //6.创建一个内容提供者
        MessageProducer producter = session.createProducer(createTopic);
        //7.发送消息
        TextMessage QMessage = new ActiveMQTextMessage();
        QMessage.setText("我是发布信息,张三抢到了手机11,rsad");
        producter.send(QMessage);
        //8.关闭资源
        session.close();
        connection.close();
        
        
    }

订阅者

@Test
    public void testTopicMqConsumer() throws Exception{
        //1.创建链接工厂
        ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://www.itzmn.com:61616");
        //2.创建链接
        Connection connection = factory.createConnection();
        //3.开启连接
        connection.start();
        //4.创建一个session对象
        /**
         * 第一个参数,是否开启事务,如果开启 后面一个参数无意义
         * 第二个参数,应答模式
         */
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.创建一个目的地
        Topic createTopic = session.createTopic("testTopic");
        //6.创建一个内容提供者
        MessageConsumer consumer = session.createConsumer(createTopic);
        
        //获取数据
        consumer.setMessageListener(new MessageListener() {
            
            @Override
            public void onMessage(Message message) {
                TextMessage message2 = (TextMessage) message;
                try {
                    System.out.println(message2.getText());
                } catch (JMSException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        });
        System.out.println("消费者1启动");
        System.in.read();
        //8.关闭资源
        session.close();
        connection.close();
        
        
    }
    
    @Test
    public void testTopicMqConsumer2() throws Exception{
        //1.创建链接工厂
        ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://www.itzmn.com:61616");
        //2.创建链接
        Connection connection = factory.createConnection();
        //3.开启连接
        connection.start();
        //4.创建一个session对象
        /**
         * 第一个参数,是否开启事务,如果开启 后面一个参数无意义
         * 第二个参数,应答模式
         */
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.创建一个目的地
        Topic createTopic = session.createTopic("testTopic");
        //6.创建一个内容提供者
        MessageConsumer consumer = session.createConsumer(createTopic);
        
        //获取数据
        consumer.setMessageListener(new MessageListener() {
            
            @Override
            public void onMessage(Message message) {
                TextMessage message2 = (TextMessage) message;
                try {
                    System.out.println(message2.getText());
                } catch (JMSException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        });
        System.out.println("消费者2启动");
        System.in.read();
        //8.关闭资源
        session.close();
        connection.close();
        
        
    }
    

效果如下


image.png

2 整合spring

spring-activemq.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
    xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd
    http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
    http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">
    
    <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://www.itzmn.com:61616" />
    </bean>
    <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
    <bean id="connectionFactory"
        class="org.springframework.jms.connection.SingleConnectionFactory">
        <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
        <property name="targetConnectionFactory" ref="targetConnectionFactory" />
    </bean>
    
    <!-- 配置生产者 -->
    <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
        <property name="connectionFactory" ref="connectionFactory" />
    </bean>
    
    
    <!--这个是队列目的地,点对点的 -->
    <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg>
            <value>spring-queue</value>
        </constructor-arg>
    </bean>
    <!--这个是主题目的地,一对多的 -->
    <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg value="springTopic" />
    </bean>
    <!-- 消息监听 -->
    <bean id="myMessageListener" class="jk.zmn.activemq.listener.MyMessageListener"/>
    <!-- 消息监听容器 -->
    <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destination" ref="queueDestination" />
        <property name="messageListener" ref="myMessageListener" />
    </bean>
    
    <!-- 消息监听 -->
    <bean id="myTopicMessageListener" class="jk.zmn.activemq.listener.MyTopicMessageListener"/>
    <!-- 消息监听容器 -->
    <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destination" ref="topicDestination" />
        <property name="messageListener" ref="myTopicMessageListener" />
    </bean>
    
    <!-- 消息监听 -->
    <bean id="myTopicMessageListener2" class="jk.zmn.activemq.listener.MyTopicMessageListener2"/>
    <!-- 消息监听容器 -->
    <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destination" ref="topicDestination" />
        <property name="messageListener" ref="myTopicMessageListener2" />
    </bean>
    
    
    
</beans>

queue模式

内容提供者

@Test
    public void testSpringActiveMqProducter() {
        ApplicationContext classPathXmlApplicationContext = new ClassPathXmlApplicationContext("classpath:applicationContext-activemq.xml");
        JmsTemplate jmsTemplate = classPathXmlApplicationContext.getBean(JmsTemplate.class);
        Destination destination = (Destination) classPathXmlApplicationContext.getBean("queueDestination");
        jmsTemplate.send(destination,new MessageCreator() {
            
            @Override
            public Message createMessage(Session session) throws JMSException {
                
                return session.createTextMessage("生意来了,张三购买商品");
            }
        });
    }

这个消费者,要实现messagelistener的接口

public class MyMessageListener implements MessageListener{

    @Override
    public void onMessage(Message message) {
        TextMessage message2 = (TextMessage) message;
        try {
            System.out.println(message2.getText());
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

}

发布订阅模式

发布者

@Test
    public void testSpringActiveMqTopicProducter() {
        ApplicationContext classPathXmlApplicationContext = new ClassPathXmlApplicationContext("classpath:applicationContext-activemq.xml");
        JmsTemplate jmsTemplate = classPathXmlApplicationContext.getBean(JmsTemplate.class);
        Destination destination = (Destination) classPathXmlApplicationContext.getBean("topicDestination");
        jmsTemplate.send(destination, new MessageCreator() {
            
            @Override
            public Message createMessage(Session session) throws JMSException {
                
                return session.createTextMessage("又来生意啦,李四要购买手机");
            }
        });
    }

订阅者

public class MyTopicMessageListener implements MessageListener{

    @Override
    public void onMessage(Message message) {
        try {
            TextMessage message2 = (TextMessage) message;
            System.out.println(message2.getText());
            System.out.println("我去减库存");
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

}
public class MyTopicMessageListener2 implements MessageListener{

    @Override
    public void onMessage(Message message) {
        try {
            TextMessage message2 = (TextMessage) message;
            System.out.println(message2.getText());
            System.out.println("我去生成订单");
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        
    }

}

代码太乱,实验的话,请到码云下载,

群号:552113611

上一篇下一篇

猜你喜欢

热点阅读