2、JMS开发步骤(队列)

2020-03-20  本文已影响0人  金石_832e

队列一对一

image.png
image.png image.png
观察两者是不是有些似曾相识
都是一个套路

依赖

        <!--activemq-->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.15.11</version>
        </dependency>
        <dependency>
            <groupId>org.apache.xbean</groupId>
            <artifactId>xbean-spring</artifactId>
            <version>3.16</version>
        </dependency>

ActiveMQ下的哪个版本,就装哪个<version>Linux中下载的版本号</version>


不废话直接上代码(生产者)

package com.yd.telnet.modular.activemq;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @author 
 * @Date 2020/3/19
 */
public class JmsProduce {
    //为什么是tcp,看源码!
    public static final String DEFAULT_BROKER_URL = "tcp://192.168.0.105:61616";
    public static final String QUEUE_NAME = "queue01";
    public static void main(String[] args) throws JMSException {
        // 1、创建连接工厂
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(DEFAULT_BROKER_URL);
        // 2、通过连接工厂,获取连接connection
        Connection connection = activeMQConnectionFactory.createConnection();
        // 3、启动连接
        connection.start();
        // 4、创建会话session
        // 两个参数,事务、签收
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 5、创建接收的对象(队列还是主题)
        // destination目的地(queue队列、topic主题)
        //Destination destination = session.createQueue(QUEUE_NAME);
        Queue queue = session.createQueue(QUEUE_NAME);
        // 6、创建消息生产者
        MessageProducer producer = session.createProducer(queue);
        // 7、通过使用producer产生三条消息发送到队列里面
        for(int i = 0;i<3;i++){
            // 逐一创建消息
            TextMessage textMessage = session.createTextMessage("msg------------" + i);
            // 通过producer发送给mq
            producer.send(textMessage);
        }
        // 8、关闭资源(先进后出,同jdbc)
        producer.close();
        session.close();
        connection.close();
    }
}

看效果

image.png
再运行一次,且删除掉其中的三条记录看结果(可以将删除的三条当做已消费)
image.png
image.png

总结
当有一个消息进入这个队列是,等待消费的消息是1,进入队列的消息是1。
当消息消费后,等待消费的消息是0,进入队列的消息是1,出队列的消息是1。
再来一条消息时,等待消费的消息是1,进入队列的消息就是2(只增不减)


不废话直接上代码(消费者)

package com.yd.telnet.modular.activemq;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @author 
 * @Date 2020/3/19
 */
public class JmsConsumer {
    public static final String DEFAULT_BROKER_URL = "tcp://192.168.0.105:61616";
    public static final String QUEUE_NAME = "queue01";
    public static void main(String[] args) throws JMSException {
        // 1、创建连接工厂
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(DEFAULT_BROKER_URL);
        // 2、通过连接工厂,获取连接connection
        Connection connection = activeMQConnectionFactory.createConnection();
        // 3、启动连接
        connection.start();
        // 4、创建会话session
        // 两个参数,事务、签收
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 5、创建接收的对象(队列还是主题)
        // destination目的地(queue队列、topic主题)
        //Destination destination = session.createQueue(QUEUE_NAME);
        Queue queue = session.createQueue(QUEUE_NAME);
        // 6、创建消息消费者
        MessageConsumer consumer = session.createConsumer(queue);
        while(true){
            TextMessage textMessage = (TextMessage) consumer.receive();
            if(null != textMessage){
                System.out.println("消费者处理消息"+ textMessage.getText());
            }else{
                break;
            }
        }
        // 7、关闭资源
        consumer.close();
        session.close();
        connection.close();
    }
}

运行代码后

image.png
image.png

由于consumer.receive()这个方法是一个不离不弃,啥时候activemq停止啥时候结束,所以产生receive的第二种写法


image.png

receive属于同步运行阻塞,所以又有了监听的方式

package com.yd.telnet.modular.activemq;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;
import java.io.IOException;

/**
 * @author 张思博
 * @Date 2020/3/19
 */
public class JmsConsumer {
    public static final String DEFAULT_BROKER_URL = "tcp://192.168.0.105:61616";
    public static final String QUEUE_NAME = "queue01";
    public static void main(String[] args) throws JMSException, IOException {
        // 1、创建连接工厂
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(DEFAULT_BROKER_URL);
        // 2、通过连接工厂,获取连接connection
        Connection connection = activeMQConnectionFactory.createConnection();
        // 3、启动连接
        connection.start();
        // 4、创建会话session
        // 两个参数,事务、签收
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 5、创建接收的对象(队列还是主题)
        // destination目的地(queue队列、topic主题)
        //Destination destination = session.createQueue(QUEUE_NAME);
        Queue queue = session.createQueue(QUEUE_NAME);
        // 6、创建消息消费者
        MessageConsumer consumer = session.createConsumer(queue);
        // 7、通过监听的方式来消费消息
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                if(null != message && message instanceof TextMessage){
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("监听消费者消费消息"+textMessage.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }
        });
        // 8、保持控制台不灭
        System.in.read();
        // 9、关闭资源
        consumer.close();
        session.close();
        connection.close();
    }
}

该种方式:有消息就消费,没消息就等待

消息的消费者接收消息可以采用两种方式:
1.consumer.receive()或 consumer.receive(int timeout);(同步阻塞)
2.注册一个MessageListener.
采用第一种方式,消息的接收者会一直等待下去,直到有消息到达或者超时。后一种方式会注册一个监听器,当有消息到达的时候,会调用它的onMessage()方法。(异步非阻塞)
可以用监听的方式启动两个消费者,生产的消息会被平均分配(自己试)


image.png
上一篇下一篇

猜你喜欢

热点阅读