ActiveMQ的发布和订阅

2019-07-15  本文已影响0人  HRADPX

  ActiveMQ的安装下载见:ActiveMQ的下载安装,ActiveMQ的点对点模型也在其中讲过。

1 订阅者Subscriber

public class Subscriber1 {
    // 默认的连接用户名
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    // 默认的连接密码
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    // 默认的连接地址
    private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL; 

    public static void main(String[] args) {

        //连接工厂
        ConnectionFactory connectionFactory;
        Connection connection = null;//连接
        Session session;//会话
        Destination destination;//消息目的地
        MessageConsumer consumer;//消息消费者

        connectionFactory = 
                new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL);

        try {
            //通过工厂获取连接
            connection = connectionFactory.createConnection();
            connection.start();//启动连接
            //创建session
            session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
            //这里的名字要和ActiveMQ中的队列名子一致
            destination = session.createTopic("短信发送");
            consumer = session.createConsumer(destination);//创建消息消费者
            // 写MQ的监听器
            consumer.setMessageListener(new MyMessageListener1());
            session.commit();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
/**
* 消息监听器
*/
class MyMessageListener1 implements MessageListener{

    @Override
    public void onMessage(Message message) {
        TextMessage msg = (TextMessage) message;
        try {
            System.out.println("Consumer1从MQ队列中接收消息:"+msg.getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

Subscriber2和Subscriber1一样,这里略去。

2 发布者

public class JMSPublisher {

    // 默认的连接用户名
    private static final String USERNAME= ActiveMQConnection.DEFAULT_USER;
    // 默认的连接密码
    private static final String PASSWORD= ActiveMQConnection.DEFAULT_PASSWORD;
    // 默认的连接地址
    private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL; 

    public static void main(String[] args) {

        //连接工厂
        ConnectionFactory connectionFactory;
        Connection connection = null;//连接
        Session session;//会话
        Destination destination;//消息目的地
        MessageProducer messageProducer;//消息生产者

        connectionFactory = 
                new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEURL);

        try {
            //通过工厂获取连接
            connection = connectionFactory.createConnection();
            connection.start();//启动连接
            //创建session
            session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
            destination = session.createTopic("短信发送");
            //创建消息生产者
            messageProducer = session.createProducer(destination);

            //向对列中发送10条消息
            for(int i = 0;i <= 9;i++){
                String message = i+"123456789";
                TextMessage msg = session.createTextMessage(message);
                messageProducer.send(destination,msg);
            }
            session.commit();

        }catch (Exception e){
            e.printStackTrace();
        }finally {
            // 关闭连接
            if(connection!=null){
                try {
                    connection.close();
                } catch (JMSException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        }
    }
}

3 测试

先启动两个订阅者,然后在开启发布者,每次发布者发布消息时,两个订阅者的窗口都会收到发布者发布的消息。

相对于点对点模型,发布订阅模式下,订阅者是被动接收MQ推送的消息,而点对点模型需要消费者主动去队列中取消息。

本文完

上一篇 下一篇

猜你喜欢

热点阅读