技术干货

ActiveMQ(一)-简单入门

2018-01-27  本文已影响16人  隔壁老王的隔壁啊

一、目录分析

ActiveMQ目录

从上面看出一般的项目目录类似,简单分析下其作用:

二、启动ActiveMQ

启动脚本
在win下直接点击如下脚本即可启动,访问目录:http://localhost:8161 activemq登录成功

用户名密码可在jetty-realm.properties文件中查看。

# 用户名: ,密码, [,角色 ...]
admin: admin, admin
user: user, user

三、 HelloWorld

基于P2P的简单例子

//参数1:是否启用事务;参数2:签收模式:
session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
简单说,就是消费者接受到消息后,需要告诉消息服务器,
我收到消息了。当消息服务器收到回执后,本条消息将失效。
因此签收将对PTP模式产生很大影响。如果消费者收到消息后,
并不签收,那么本条消息继续有效,很可能会被其他消费者消费掉!
package com.lw.activemq.p2p;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.management.j2ee.statistics.JMSProducerStats;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
 * @author liwen
 * @date:2017年12月13日 下午2:27:15
 * @Function: 生产者
 * @version 1.0
 */
public class JMSProducter {

    private static ActiveMQConnectionFactory connectionFactory;
    private static Connection conn;
    private static Queue destination;
    private static Session session;
    private static MessageProducer producer;

    public static void main(String[] args) {

        try {
            // 创建连接工厂
            connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
                    ActiveMQConnection.DEFAULT_PASSWORD, ActiveMQConnection.DEFAULT_BROKER_URL);
            // 获取连接
            conn = connectionFactory.createConnection();
            
            conn.start();
            // 创建session
            //参数1:是否启用事务;参数2:签收模式:
            session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
            // 创建消息队列
            destination = session.createQueue("helloworld");
            // 创建生产者
            producer = session.createProducer(destination);
            // 设置(非)持久化特性,如果非持久化,则意味MQ消息重启后会导致消息丢失
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            // 发送消息
            sendMessage(session, producer);
            // 提交
            session.commit();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (conn != null) {
                try {
                    conn.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    private static void sendMessage(Session session, MessageProducer producer) throws Exception {
        for (int i = 0; i < 10; i++) {
            // 创建文本消息
            TextMessage msg = session.createTextMessage("activemq 消息:" + i);
            System.out.println("Producter 消息:" + i );
            producer.send(msg);
        }
    }
}

package com.lw.activemq.p2p;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * @author liwen
 * @date:2017年12月13日 下午2:27:24
 * @Function: 消费者
 * @version 1.0
 */
public class JMSConsumer {

    private static org.apache.activemq.ActiveMQConnectionFactory connectionFactory;
    private static Connection conn;
    private static Session session;
    private static Queue destination;
    private static MessageConsumer consumer;

    public static void main(String[] args) {
        try {
            connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
                    ActiveMQConnection.DEFAULT_PASSWORD, ActiveMQConnection.DEFAULT_BROKER_URL);
            conn = connectionFactory.createConnection();
            conn.start();
            session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
            // 创建队列,要么是Queue,要么是Topic
            destination = session.createQueue("helloworld");
            consumer = session.createConsumer(destination);
            // 接收消息
            /*
             * while (true) { TextMessage text = (TextMessage) consumer.receive(); if (text
             * != null) { System.out.println("consumer收到的消息:" + text.getText()); } else {
             * break; } }
             */
            consumer.setMessageListener(new MessageListener() {

                public void onMessage(Message message) {
                    TextMessage text = (TextMessage) message;
                    try {
                        String msg = text.getText();
                        System.out.println("consumer收到消息:" + msg);
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

上面只是演示消息为字符串的例子,实际项目中可能为java类的比较多。ActiveMQ当然也支持。

ActiveMQ支持的消息类型

从上图可以看出,ActiveMQ可支持流、Map、java类(需要序列化)等消息格式。

四、 保证消息的成功处理

使用CLIENT_ACKNOWLEDGE模式解决,写在接收端。如果接收端不确认消息,那么activemq将会把这条消息一直保留,直到有一个接收端确认了消息的接收。即调用acknowledge方法。


[ 保证消息的成功处理]
上一篇 下一篇

猜你喜欢

热点阅读