jms--消息推送规范(代码书写)

2020-06-08  本文已影响0人  李霖神谷

1.书写生产者消息生产的代码:

package com.shuai;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * 向mq推送消息步奏
 */
public class JmsActivemq {
    public static final String ACTIVEURL = "tcp://192.168.29.128:61616";
    public static final String QUEUE1 = "queue1";

    public static void main(String[] args) throws JMSException {
//        1.创建连接工厂
        ActiveMQConnectionFactory activeMqConnectionFactory = new ActiveMQConnectionFactory(ACTIVEURL);
//        2.生产连接,并开启连接
        Connection connection = activeMqConnectionFactory.createConnection();
        connection.start();
//        3.创建session会话
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//        4.创建目的地
        Queue queue = session.createQueue(QUEUE1);
//        5.创建生产者
        MessageProducer producer = session.createProducer(queue);
//        6.创建三个消息
        for (int i = 1; i <= 3; i++) {
            //        7.生产者发送消息到创建好的队列里面
            TextMessage textMessage = session.createTextMessage("刘德华" + i);
            producer.send(textMessage);
        }
//        8.关闭资源
        producer.close();
        session.close();
        connection.close();
        System.out.println("over");
    }
}

mq效果:


image.png

第一框是发送的消息个数,第二个框是等待的消息个数
2.书写消费者接收消息的代码:
(1)正常的消费者消费消息:

package com.shuai;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class JmsActiveCustemer {
    public static final String ACTIVEURL = "tcp://192.168.29.128:61616";
    public static final String QUEUE1 = "queue1";

    public static void main(String[] args) throws JMSException {
        //1.创建连接工厂
        ActiveMQConnectionFactory mqConnectionFactory = new ActiveMQConnectionFactory(ACTIVEURL);
//        2.创建连接
        Connection connection = mqConnectionFactory.createConnection();
        connection.start();
//        3.创建session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//        4.创建目的地
        Queue queue = session.createQueue(QUEUE1);
//        5.创建消费者
        MessageConsumer consumer = session.createConsumer(queue);
//        6.接受消息打印
        while (true) {
            TextMessage message = (TextMessage) consumer.receive();
            if (null != message) {
                System.out.println(message.getText());
            } else {
                break;
            }
        }
//        7.关闭资源
        consumer.close();
        session.close();
        connection.close();
    }

}
image.png

(2)使用监听器消费消息,生产者生产一条消息,这边就会监听一条消息, 使用监听器需要添加System.in.read();这条语句保证控制台不关闭。
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if (null!=message && message instanceof TextMessage){
TextMessage textMessage=(TextMessage)message;
System.out.println(textMessage);
}
}
});
// 保证控制台不关闭
System.in.read();
当你开启两个消费者的时候,生产6个消息,每一个消费者消费3个消息
目的地一共有两种,这一种是队列还有一种是主题,当生产者生产6个消息的时候对于每一个主题都会接收到6个消息。这里要注意先启动消费者在启动生产,如果先启动生产没有订阅的话,生产的消息就是费消息。
3.MQ的可靠性:
(1).持久化内容:
// 持久化操作如果MQ挂了之前生产的消息还是不会丢失,如果设置为非持久化时服务挂之前的消息就会丢失。默认的是持久化操作。主题持久化操作需要在创建主题之后再将连接开启。
producer.setDeliveryMode(DeliveryMode.PERSISTENT);

(2)创建session所带参数事务与签收:
// 3.创建session会话
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
第一个参数是是否使用事务,设置为true如果出现错误的话会回滚。设置之后需要在关闭连接之前进行提交。签收的话可以根据自己的需求设置为手动或者自动签收。

上一篇下一篇

猜你喜欢

热点阅读