消息中间件

ActiveMQ-API(四)

2018-06-01  本文已影响14人  Airycode

创建临时消息
ActiveMQ通过createTemporaryQueue和createTemporaryTopic创建临时目标,这些目标持续到创建它的Connection关闭,只有创建临时目标的Connection所创建的客户端才可以从临时目标中接收消息,但是任何的生产者都可以向临时目标中发送消息。如果关闭了创建此目标的Connection,那么临时目标被关闭,内容也将消失。
TemporaryQueue createTemporaryQueue ();
TemporaryTopic createTemporaryTopic();

发布订阅模式代码实现:

package bhz.mq.pb;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Publish {

    private ConnectionFactory connectionFactory;
    //2:连接对象
    private Connection connection;
    
    //3:Session对象
    private Session session;
    
    //4:生产者
    private MessageProducer messageProducer;
    
    public Publish(){
        try {
            
            this.connectionFactory = new ActiveMQConnectionFactory("bhz","bhz",
                    "tcp://localhost:61616");
            this.connection = this.connectionFactory.createConnection();
            this.connection.start();
            this.session = this.connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
            this.messageProducer = this.session.createProducer(null);
            
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    public void sendMessage() throws Exception{
        Destination destination = this.session.createTopic("topic1");
        TextMessage t = session.createTextMessage("我是内容");
        this.messageProducer.send(destination, t);
    }
    
    public static void main(String[] args) throws Exception {
        Publish p = new Publish();
        p.sendMessage();
    }
    
}

package bhz.mq.pb;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Consumer1 {

    
    
    //1:连接工厂
    private ConnectionFactory connectionFactory;
    //2:连接对象
    private Connection connection;
        
    //3:Session对象
    private Session session;
    
    //4:消费者
    private MessageConsumer messageConsumer;
    
    //5:目标地址
    private Destination destination;
    
    public Consumer1(){
        try {
            this.connectionFactory = new ActiveMQConnectionFactory("bhz","bhz",
                    "tcp://localhost:61616");
            this.connection = this.connectionFactory.createConnection();
            this.connection.start();
            this.session = this.connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
            this.destination=this.session.createTopic("topic1");
            this.messageConsumer = this.session.createConsumer(this.destination);
            
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    public void receiver(){
        try {
            
            this.messageConsumer.setMessageListener(new Listener());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    class Listener implements MessageListener{

        @Override
        public void onMessage(Message message) {
            try {
                if (message instanceof TextMessage) {
                    System.out.println("c1收到消息");
                    TextMessage m = (TextMessage) message;
                    System.out.println(m.getText());
                }
                
            } catch (Exception e) {
                e.printStackTrace();
            }
            
        }
        
    }
    
    public static void main(String[] args) {
        Consumer1 c = new Consumer1();
        c.receiver();
    }
    
}

package bhz.mq.pb;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Consumer2 {

    
    
    //1:连接工厂
    private ConnectionFactory connectionFactory;
    //2:连接对象
    private Connection connection;
        
    //3:Session对象
    private Session session;
    
    //4:消费者
    private MessageConsumer messageConsumer;
    
    //5:目标地址
    private Destination destination;
    
    public Consumer2(){
        try {
            this.connectionFactory = new ActiveMQConnectionFactory("bhz","bhz",
                    "tcp://localhost:61616");
            this.connection = this.connectionFactory.createConnection();
            this.connection.start();
            this.session = this.connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
            
            
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    public void receiver(){
        try {
            this.destination=this.session.createTopic("topic1");
            this.messageConsumer = this.session.createConsumer(this.destination);
            this.messageConsumer.setMessageListener(new Listener());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    class Listener implements MessageListener{

        @Override
        public void onMessage(Message message) {
            try {
                if (message instanceof TextMessage) {
                    System.out.println("c2收到消息");
                    TextMessage m = (TextMessage) message;
                    System.out.println(m.getText());
                }
                
            } catch (Exception e) {
                e.printStackTrace();
            }
            
        }
        
    }
    
    public static void main(String[] args) {
        Consumer2 c = new Consumer2();
        c.receiver();
    }
    
}

package bhz.mq.pb;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Consumer3 {

    
    
    //1:连接工厂
    private ConnectionFactory connectionFactory;
    //2:连接对象
    private Connection connection;
        
    //3:Session对象
    private Session session;
    
    //4:消费者
    private MessageConsumer messageConsumer;
    
    //5:目标地址
    private Destination destination;
    
    public Consumer3(){
        try {
            this.connectionFactory = new ActiveMQConnectionFactory("bhz","bhz",
                    "tcp://localhost:61616");
            this.connection = this.connectionFactory.createConnection();
            this.connection.start();
            this.session = this.connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
            
            
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    public void receiver(){
        try {
            this.destination=this.session.createTopic("topic1");
            this.messageConsumer = this.session.createConsumer(this.destination);
            this.messageConsumer.setMessageListener(new Listener());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    class Listener implements MessageListener{

        @Override
        public void onMessage(Message message) {
            try {
                if (message instanceof TextMessage) {
                    System.out.println("c3收到消息");
                    TextMessage m = (TextMessage) message;
                    System.out.println(m.getText());
                }
                
            } catch (Exception e) {
                e.printStackTrace();
            }
            
        }
        
    }
    
    public static void main(String[] args) {
        Consumer3 c = new Consumer3();
        c.receiver();
    }
    
}

上一篇 下一篇

猜你喜欢

热点阅读