ActiveMQ SpringJMS

2019-04-20  本文已影响0人  dwwl

消息中间件的应用场景:

解耦,把耗时的操作(不需要返回结果的操作)移除运行主线程,提高响应速度

场景:商品审核时需要的服务

商家商品服务:需要返回结果,实时性高
广告内容服务:需要返回结果,实时性高

搜索服务:把SKU数据写入到solr,不需要返回结果,没必要必须在主线程完成,实时性低,用到消息中间件
页面静态化服务:根据SKU数据生成静态页面,不需要返回结果,没必要必须在主线程完成,实时性低,用到消息中间件

JMS消息传递类型:

点对点(PTP):生产者 消费者一一对应,即使消费者有多个,每一个生产者产生的消息只能被一个消费者消费,发生竞争
特点:如果没有消费者,消息对会存在destination
应用场景实例:
商品审核通过,把sku数据放到solr上,在分布式服务器上时,多个service即多个主机,都引用同一个solr,这时,用queue即可

发送/订阅(pub/sub):有多个消费者链接到消息队列时,生产者放在队列的消息,能被多个消费者接受。
特点:如果没有消费者,消息会丢失
应用场景实例:
商品审核通过时,将根据sku数据利用页面静态化技术生成页面,此时分布式时(每一台主机都有自己页面库),此时用topic,每台主机都会执行
![1553308107057.png](https://img.haomeiwen.com/i8781623/16b8b9adb8ade447.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

入门小Demo:

依赖:


QueryProducer.java:

public class QueueProducer {

    public static void main(String[] args) throws JMSException {
        //1.创建连接工厂
        ConnectionFactory connectionFactory=new             ActiveMQConnectionFactory("tcp://192.168.25.135:61616");
        //2.创建连接
        Connection connection = connectionFactory.createConnection();
        //3.启动连接
        connection.start();
        //4.获取session(会话对象)  参数1:是否启动事务  参数2:消息确认方式
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.创建队列对象
        Queue queue = session.createQueue("test-queue");
        //6.创建消息生产者对象
        MessageProducer producer = session.createProducer(queue);
        //7.创建消息对象(文本消息)
        TextMessage textMessage = session.createTextMessage("欢迎来到申请的品优购世界");
        //8.发送消息
        producer.send(textMessage);
        //9.关闭资源
        producer.close();
        session.close();
        connection.close();
    }
}

QueueConsumer.java

public class QueueConsumer {

    public static void main(String[] args) throws JMSException, IOException {
        //1.创建连接工厂
        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.25.135:61616");
        //2.创建连接
        Connection connection = connectionFactory.createConnection();
        //3.启动连接
        connection.start();
        //4.获取session(会话对象)  参数1:是否启动事务  参数2:消息确认方式
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //5.创建队列对象
        Queue queue = session.createQueue("test-queue");
        //6.创建消息消费者对象
        MessageConsumer consumer = session.createConsumer(queue);
        //7.设置监听
        consumer.setMessageListener(new MessageListener() {
            
            public void onMessage(Message message) {
                TextMessage textMessage=(TextMessage)message;
                try {
                    System.out.println("提取的消息:"+ textMessage.getText() );
                } catch (JMSException e) {                  
                    e.printStackTrace();
                }
                
            }
        });
        //8.等待键盘输入
        System.in.read();
        
        //9.关闭资源
        consumer.close();
        session.close();
        connection.close();
    }
}

SpringJMS

依赖

  <dependencies>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-jms</artifactId>
        <version>${spring.version}</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-test</artifactId>
        <version>${spring.version}</version>
    </dependency>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.9</version>
    </dependency>
    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-client</artifactId>
        <version>5.13.4</version>
     </dependency>
  </dependencies>  

applicationContext-jms-producer.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
    xmlns:jms="http://www.springframework.org/schema/jms"
    xsi:schemaLocation="http://www.springframework.org/schema/beans   
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/context   
        http://www.springframework.org/schema/context/spring-context.xsd">
        
        
    <context:component-scan base-package="cn.itcast.demo"></context:component-scan>     
    
       
    <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->  
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">  
        <property name="brokerURL" value="tcp://192.168.25.135:61616"/>  
    </bean>
       
    <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->  
    <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">  
    <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->  
        <property name="targetConnectionFactory" ref="targetConnectionFactory"/>  
    </bean>  
           
    <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->  
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">  
        <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->  
        <property name="connectionFactory" ref="connectionFactory"/>  
    </bean>      
    <!--这个是队列目的地,点对点的  文本信息-->  
    <bean id="queueTextDestination" class="org.apache.activemq.command.ActiveMQQueue">  
        <constructor-arg value="queue_text"/>  
    </bean>    
    
    <!--这个是订阅模式  文本信息-->  
    <bean id="topicTextDestination" class="org.apache.activemq.command.ActiveMQTopic">  
        <constructor-arg value="topic_text"/>  
    </bean>  
    
</beans>

QueueProducer.java

注意Destination的注入方式 ,属性名称和xml相同:

@Component
public class QueueProducer {

    @Autowired
    private JmsTemplate jmsTemplate;
    
    @Autowired
    private Destination queueTextDestination;
    
    /**
     * 发送文本消息
     * @param text
     */
    public void sendTextMessage(final String text){
        jmsTemplate.send(queueTextDestination, new MessageCreator() {
            
            public Message createMessage(Session session) throws JMSException {
                
                return session.createTextMessage(text);
            }
        });
        
    }
    
}

消费模块

applicationContext-jms-consumer-queue.xml:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:context="http://www.springframework.org/schema/context" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"
    xmlns:jms="http://www.springframework.org/schema/jms"
    xsi:schemaLocation="http://www.springframework.org/schema/beans   
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/context   
        http://www.springframework.org/schema/context/spring-context.xsd">
    
    <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->  
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">  
        <property name="brokerURL" value="tcp://192.168.25.135:61616"/>  
    </bean>
       
    <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->  
    <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">  
    <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->  
        <property name="targetConnectionFactory" ref="targetConnectionFactory"/>  
    </bean>  
    
    <!--这个是队列目的地,点对点的  文本信息-->  
    <bean id="queueTextDestination" class="org.apache.activemq.command.ActiveMQQueue">  
        <constructor-arg value="queue_text"/>  
    </bean>    
    
    <!-- 我的监听类 -->
    <bean id="myMessageListener" class="cn.itcast.demo.MyMessageListener"></bean>
    
    
    <!-- 消息监听容器 -->
    <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destination" ref="queueTextDestination" />
        <property name="messageListener" ref="myMessageListener" />
    </bean>
    
</beans>

MyMessageListener.java


public class MyMessageListener implements MessageListener {

    public void onMessage(Message message) {
        
        TextMessage textMessage=(TextMessage)message;
        try {
            System.out.println("接收到消息:"+textMessage.getText());
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        

    }

}

上一篇下一篇

猜你喜欢

热点阅读