开发程序员首页投稿(暂停使用,暂停投稿)

Java消息服务 —ActiveMQ实战

2018-01-15  本文已影响386人  itcode

1.JMS—Java消息服务

Java消息服务(Java Message Service,JMS)应用程序接口是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。
JMS消息服务的规范包含两种消息模式

常用的消息中间件

JMS API

消息的类型

2.windows环境下载安装ActiveMQ

• 下载 http://activemq.apache.org/download.html
• 解压文件夹,双击 home/bin/winXX/wrapper.exe 进行启动
• 浏览器中访问 http://localhost:8161
• 管理员账号和密码为 admin / admin

下载界面
apache-activemq-5.15.2解压后的文件

bin存放的是脚本文件
conf存放的是基本配置文件
data存放的是日志文件
docs存放的是说明文档
examples存放的是简单的实例
lib存放的是activemq所需jar包
webapps用于存放项目的目录

ActiveMQ默认启动时,启动了内置的jetty服务器,提供一个用于监控ActiveMQ的admin应用。 双击 home/bin/winXX/wrapper.exe 进行启动:


启动ActiveMQ

浏览器中访问 http://localhost:8161

image.png
管理员账号和密码为 admin / admin进行登录:
image.png
到这里为止,ActiveMQ 服务端就启动完毕了。

3.创建JMS-ActiveMQ工程

添加maven依赖:


maven依赖

3.1点对点消息

在点对点或队列模型下,一个生产者向一个特定的队列发布消息,一个消费者从该队列中读取消息。这里,生产者知道消费者的队列,并直接将消息发送到消费者的队列。这种模式被概括为:
• 只有一个消费者将获得消息
• 生产者不需要在接收者消费该消息期间处于运行状态,接收者也同样不需要在消息发送时处于运行状态。
• 每一个成功处理的消息都由接收者签收

点对点消息

①创建消息生产者

    public void producer() throws JMSException {
        //1.创建ConnectionFactory  连接工厂
        String brokerUrl = "tcp://localhost:61616";
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
        //2.创建Connection 连接对象
        Connection connection = connectionFactory.createConnection();
        //开启连接
        connection.start();
        //3.创建Session事务管理,通过参数设置事务级别
        Session session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);
        //4.创建Destination 目的地对象
        Destination destination = session.createQueue("text-Message");
        //5.创建消息的生产者
        MessageProducer messageProducer = session.createProducer(destination);
        //6.创建一条消息
        TextMessage textMessage = session.createTextMessage("测试-消息生产者");
        //7.发送消息
        messageProducer.send(textMessage);
        //提交事务
        session.commit();
        //8.释放资源
        messageProducer.close();
        session.close();
        connection.close();
    }

②创建消息消费者

    public void consumer() throws JMSException, IOException {
        //1.创建ConnectionFactory
        String brokerUrl = "tcp://localhost:61616";
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
        //2.创建Connection
        Connection connection = connectionFactory.createConnection();
        connection.start();
        //3.创建Session
        Session session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);
        //4.创建Destination 目的地对象
        Destination destination = session.createQueue("text-Message");
        //5.创建消费者
        MessageConsumer messageConsumer = session.createConsumer(destination);
        //6.消费消息,监听队列中的消息,若有新消息,会执行onMessage方法
        messageConsumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("消息:" + textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        //避免单元测试停止
        System.in.read();
        //7.释放资源
        messageConsumer.close();
        session.close();
        connection.close();
    }

③运行ActiveMQ项目
生产者运行结果:

生产者
消费者运行结果:
消费者
3.2发布/订阅 消息

发布者/订阅者模型支持向一个特定的消息主题发布消息。0或多个订阅者可能对接收来自特定消息主题的消息感兴趣。在这种模型下,发布者和订阅者彼此不知道对方。这种模式被概括为:
• 多个消费者可以获得消息
• 在发布者和订阅者之间存在时间依赖性。发布者需要建立一个订阅(subscription),以便客户能够购订阅。订阅者必须保持持续的活动状态以接收消息。

发布/订阅 消息

消息发布者:

//1.  创建连接工厂
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
//2.  创建连接 并 开启
Connection connection = connectionFactory.createConnection();
connection.start();
//3.  创建 Session
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//4.  创建 Topic 对象
Topic topic = session.createTopic("weixin-Topic");
//5.  创建生产者
MessageProducer producer = session.createProducer(topic);
//6.  发送消息
TextMessage textMessage = session.createTextMessage("Hello,Topic MQ");
producer.send(textMessage);
//7.  释放资源
producer.close();
session.close();
connection.close();

消息订阅者:

        //1.  创建连接工厂
        ConnectionFactory connectionFactory = new
                ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_BROKER_URL);
        //2.  创建并启动连接
        Connection connection = connectionFactory.createConnection();
        connection.start();
        //3.  创建 Session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //4.  创建目的地对象
        Topic topic = session.createTopic("weixin-Topic");
        //5.  创建消费者
        MessageConsumer consumer = session.createConsumer(topic);
        //6.  获取消息
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println(textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        System.in.read();
        //7.  释放资源
        consumer.close();
        session.close();
        connection.close();
上一篇 下一篇

猜你喜欢

热点阅读