同步队列生产与消费实现

2019-10-13  本文已影响0人  长孙俊明
package com.example.activemq.syncqueue;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 简单生产者
 */
public class Producer {
    public static void main(String[] args) {
        new ProducerThread("tcp://47.106.100.211:61616", "sync-queue").start();
    }

    static class ProducerThread extends Thread {
        String brokerUrl;
        String destinationUrl;

        public ProducerThread(String brokerUrl, String destinationUrl) {
            this.brokerUrl = brokerUrl;
            this.destinationUrl = destinationUrl;
        }

        @Override
        public void run() {
            ActiveMQConnectionFactory connectionFactory;
            Connection conn;
            Session session;

            try {
                // 1、创建连接工厂
                connectionFactory = new ActiveMQConnectionFactory(brokerUrl);

                // 2、创建连接
                conn = connectionFactory.createConnection();
                conn.start(); // 一定要start

                // 3、创建会话(可以创建一个或者多个session)
                session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);

                // 4、创建消息发送目标 (Topic or Queue)
                Destination destination = session.createQueue(destinationUrl);

                // 5、用目的地创建消息生产者
                MessageProducer producer = session.createProducer(destination);
                // 设置递送模式(持久化 / 不持久化)
                producer.setDeliveryMode(DeliveryMode.PERSISTENT);
                producer.setPriority(7);

                // 6、创建一条文本消息
                String text = "Hello world! From: " + Thread.currentThread().getName() + " : "
                        + System.currentTimeMillis();
                TextMessage message = session.createTextMessage(text);

                // 7、通过producer 发送消息
                System.out.println("Sent message: " + text);
                producer.send(message);

                // 8、 清理、关闭连接
                session.close();
                conn.close();
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
}

package com.example.activemq.syncqueue;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 简单消费者
 */
// http://activemq.apache.org/consumer-features.html
public class Consumer {
    public static void main(String[] args) {
        new ConsumerThread("tcp://47.106.100.211:61616", "sync-queue").start();
    }
}

class ConsumerThread extends Thread {

    String brokerUrl;
    String destinationUrl;

    public ConsumerThread(String brokerUrl, String destinationUrl) {
        this.brokerUrl = brokerUrl;
        this.destinationUrl = destinationUrl;
    }

    @Override
    public void run() {
        ActiveMQConnectionFactory connectionFactory;
        Connection conn;
        Session session;
        MessageConsumer consumer;

        try {
            // brokerURL
            // http://activemq.apache.org/connection-configuration-uri.html
            // 1、创建连接工厂
            connectionFactory = new ActiveMQConnectionFactory(this.brokerUrl);

            // 2、创建连接对象
            conn = connectionFactory.createConnection();
            conn.start(); // 一定要启动

            // 3、创建会话(可以创建一个或者多个session)
            session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);

            // 4、创建消息消费目标(Topic or Queue)
            Destination destination = session.createQueue(destinationUrl);

            // 5、创建消息消费者 http://activemq.apache.org/destination-options.html
            consumer = session.createConsumer(destination);

            // 6、接收消息(没有消息就持续等待)
            Message message = consumer.receive();

            if (message instanceof TextMessage) {
                System.out.println("收到文本消息:" + ((TextMessage) message).getText());
            } else {
                System.out.println(message);
            }

            consumer.close();
            session.close();
            conn.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

上一篇 下一篇

猜你喜欢

热点阅读