原生 ActiveMQ

2017-11-05  本文已影响0人  getthrough

ActiveMq是一个apche开源的,基于生产者(producer)-消费者(consumer)模型的消息中间件,通常用于系统间的消息传递。生产者产生消息,将消息发送至消息服务器;消费者通过监听消息服务器中指定的消息进行消费(获取并使用)。

它支持一对一(point-to-point)队列式的消息和一对多(publish/subscribe)广播式的消息。参见下图:

producer_consumer.PNG
    <dependencies>
        <!--ActiveMq-->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.11.2</version>
        </dependency>
        <!--单元测试-->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>
    </dependencies>
@SuppressWarnings({"Duplicates", "UnusedAssignment"})
public class Producer {

    @Test
    public void produceMessage() {

        Connection connection = null;
        Session session = null;
        MessageProducer producer = null;

        try {
            // 创建连接工厂
            ConnectionFactory factory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.25.128:61616");
            // 获得连接
            connection = factory.createConnection();
            // 开启连接
            connection.start();
            // 创建会话
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 创建队列(点对点),指定队列名称;当消费者监听该队列,消息才能被消费
            Queue raw_mq_queue = session.createQueue("RAW_MQ_QUEUE");
            // 创建提供者
            producer = session.createProducer(raw_mq_queue);
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);// 设置消息持久化
            // 创建消息,指定消息内容
            // TextMessage textMessage = session.createTextMessage("the textMessage content");
            TextMessage textMessage = session.createTextMessage();
            textMessage.setText("message test");
            // 发送消息
            producer.send(textMessage);
        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            // 释放资源(producer,session,connection),略
            
        }

    }

}
@SuppressWarnings({"Duplicates", "UnusedAssignment"})
public class Consumer {

    @Test
    public void consumeMessage() {

        Connection connection = null;
        Session session = null;
        MessageConsumer messageConsumer = null;

        try {
            // 创建连接工厂
            String username = "admin";
            String password = "admin";
            String url = "tcp://192.168.25.128:61616";
            ConnectionFactory factory = new ActiveMQConnectionFactory(url);
            // 创建连接
            connection = factory.createConnection(username, password);
            // 开启连接
            connection.start();
            // 获得会话
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 创建目的地,Destination是Queue和Topic的父接口
            Destination destination = session.createQueue("RAW_MQ_QUEUE");// 指定监听的队列
            // 创建消费者
            messageConsumer = session.createConsumer(destination);
            // 设置消息监听
            /**不使用匿名内部类也可以单独创建一个类,实现MessageListener接口,重写onMessage方法*/
            messageConsumer.setMessageListener(new MessageListener() {
                public void onMessage(Message message) {
                    try {
                        TextMessage textMessage = (TextMessage) message;
                        String text = textMessage.getText();
                        System.out.println("consumer has recieved the message:" + text);

                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });

            System.in.read();// 只是为了不让消费线程死亡,可以持续监听消息。
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            // 释放资源
          
        }

    }
}

至此,简单的案例就完成了。

代码地址 : https://github.com/Getthrough/ActiveMq_Raw_DemoCode

上一篇 下一篇

猜你喜欢

热点阅读