3、JMS开发步骤(主题)

2020-03-20  本文已影响0人  金石_832e

主题一对多

主题生产者

package com.yd.telnet.modular.activemq;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @author
 * @Date 2020/3/19
 */
public class JmsProduce {
    //为什么是tcp,看源码!
    public static final String DEFAULT_BROKER_URL = "tcp://192.168.0.105:61616";
    public static final String TOPIC_NAME = "topic01";
    public static void main(String[] args) throws JMSException {
        // 1、创建连接工厂
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(DEFAULT_BROKER_URL);
        // 2、通过连接工厂,获取连接connection
        Connection connection = activeMQConnectionFactory.createConnection();
        // 3、启动连接
        connection.start();
        // 4、创建会话session
        // 两个参数,事务、签收
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 5、创建接收的对象(队列还是主题)
        // destination目的地(queue队列、topic主题)
        //Destination destination = session.createQueue(TOPIC_NAME );
        Topic topic = session.createTopic(TOPIC_NAME );
        // 6、创建消息生产者
        MessageProducer producer = session.createProducer(topic);
        // 7、通过使用producer产生三条消息发送到队列里面
        for(int i = 0;i<3;i++){
            // 逐一创建消息
            TextMessage textMessage = session.createTextMessage("msg------------" + i);
            // 通过producer发送给mq
            producer.send(textMessage);
        }
        // 8、关闭资源
        producer.close();
        session.close();
        connection.close();
        System.out.println("主题生产完成!");
    }
}

主题消费者

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;
import java.io.IOException;

/**
 * @author
 * @Date 2020/3/19
 */
public class JmsConsumer {
    public static final String DEFAULT_BROKER_URL = "tcp://192.168.0.105:61616";
    public static final String TOPIC_NAME = "topic01";

    public static void main(String[] args) throws JMSException, IOException {
        // 1、创建连接工厂
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(DEFAULT_BROKER_URL);
        // 2、通过连接工厂,获取连接connection
        Connection connection = activeMQConnectionFactory.createConnection();
        // 3、启动连接
        connection.start();
        // 4、创建会话session
        // 两个参数,事务、签收
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 5、创建接收的对象(队列还是主题)
        // destination目的地(queue队列、topic主题)
        //Destination destination = session.createQueue(QUEUE_NAME);
        Topic topic = session.createTopic(TOPIC_NAME);
        // 6、创建消息消费者
        MessageConsumer consumer = session.createConsumer(topic);
        // 7、通过监听的方式来消费消息
        consumer.setMessageListener(message -> {
            if (null != message && message instanceof TextMessage) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("监听消费者消费消息" + textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        // 8、保持控制台不灭
        System.in.read();
        // 9、关闭资源
        consumer.close();
        session.close();
        connection.close();
    }
}

可以尝试先启动生产者再启动消费者
image.png
清空记录,先启动消费者再启动生产者
image.png
对比一下,看看先有消费者才有生产者是不是符合第二种启动方式。
上一篇 下一篇

猜你喜欢

热点阅读