Java技术升华ActiveMQ消息中间件生活杂谈

消息中间件-ActiveMQ详解

2018-12-25  本文已影响190人  AKyS佐毅

1、消息中间件之JMS规范

什么是Java消息服务

MOM

JMS的概念和规范

为什么需要JMS

JMS五种不同的消息正文格式

JMS的优势

JMS消息传送模型

点对点消息传送模型

发布/订阅消息传递模型

2、JMS编码接口之间的关系

image

JMS的可靠性机制

事务性会话

Session session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);

设置为true的时候,消息会在session.commit以后自动签收

非事务性会话

Session session=connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);

在该模式下,消息何时被确认取决于创建会话时的应答模式

AUTO_ACKNOWLEDGE

CLIENT_ACKNOWLEDGE

DUPS_OK_ACKNOWLEDGE

本地事务

3、ActiveMQ的应用场景

3.1、安装

3.2、代码实践

点对点模型通信

Message生产者

public class JmsSender {

    public static void main(String[] args) {
        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("" +
                "tcp://192.168.11.140:61616");
        Connection connection=null;
        try {
            //创建连接
            connection=connectionFactory.createConnection();
            connection.start();

            Session session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);

            //创建队列(如果队列已经存在则不会创建, first-queue是队列名称)
            //destination表示目的地
            Destination destination=session.createQueue("first-queue");
            //创建消息发送者
            MessageProducer producer=session.createProducer(destination);

            TextMessage textMessage=session.createTextMessage("hello, 菲菲,我是帅帅的mic");
            producer.send(textMessage);
            session.commit();
            session.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }finally {
            if(connection!=null){
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

Message消费者

public class JmsReceiver {

    public static void main(String[] args) {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("" +
                "tcp://192.168.11.140:61616");
        Connection connection = null;
        try {
            //创建连接
            connection = connectionFactory.createConnection();
            connection.start();

            Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);

            //创建队列(如果队列已经存在则不会创建, first-queue是队列名称)
            //destination表示目的地
            Destination destination = session.createQueue("first-queue");
            //创建消息接收者
            MessageConsumer consumer = session.createConsumer(destination);

            TextMessage textMessage = (TextMessage) consumer.receive();
            System.out.println(textMessage.getText());
            session.commit();
            session.close();
        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

测试发布/订阅(Pub/Sub)模型通信

Topic生产者

public class JmsTopicSender {

    public static void main(String[] args) {
        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("" +
                "tcp://192.168.11.140:61616");
        Connection connection=null;
        try {
            //创建连接
            connection=connectionFactory.createConnection();
            connection.start();

            Session session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);

            //创建队列(如果队列已经存在则不会创建, first-queue是队列名称)
            //destination表示目的地
            Destination destination=session.createTopic("first-topic");
            //创建消息发送者
            MessageProducer producer=session.createProducer(destination);
            TextMessage textMessage=session.createTextMessage("今天心情,晴转多云");
            producer.send(textMessage);
            session.commit();
            session.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }finally {
            if(connection!=null){
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

Topic消费者

public class JmsTopicReceiver {

    public static void main(String[] args) {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("" +
                "tcp://192.168.11.140:61616");
        Connection connection = null;
        try {
            //创建连接
            connection = connectionFactory.createConnection();
            connection.start();

            Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);

            //创建队列(如果队列已经存在则不会创建, first-queue是队列名称)
            //destination表示目的地
            Destination destination = session.createTopic("first-topic");
            //创建消息接收者
            MessageConsumer consumer = session.createConsumer(destination);
            TextMessage textMessage = (TextMessage) consumer.receive();
            System.out.println(textMessage.getText());
            session.commit();
            session.close();
        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

持久化存储

public class JmsTopicPersistenteReceiver {

    public static void main(String[] args) {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("" +
                "tcp://192.168.11.140:61616");
        Connection connection = null;
        try {
            //创建连接
            connection = connectionFactory.createConnection();
            connection.setClientID("DUBBO-ORDER"); //设置持久订阅
            connection.start();

            Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);

            //创建队列(如果队列已经存在则不会创建, first-queue是队列名称)
            //destination表示目的地
            Topic topic = session.createTopic("first-topic");
            //创建消息接收者
//            MessageConsumer consumer = session.createConsumer(destination);
            MessageConsumer consumer = session.createDurableSubscriber(topic,"DUBBO-ORDER");
            TextMessage textMessage = (TextMessage) consumer.receive();
            System.out.println(textMessage.getText());
            session.commit();
            session.close();
        } catch (JMSException e) {
            e.printStackTrace();
        } finally {
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

点对点通信和发布订阅通信模式的区别就是创建生产者和消费者对象时提供的Destination对象不同,如果是点对点通信创建的Destination对象是Queue,发布订阅通信模式通信则是Topic。

3.3、整合Spring使用

<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-jms</artifactId>
    <version>4.2.7.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-context-support</artifactId>
    <version>4.2.7.RELEASE</version>
</dependency>

比如我们在我们的系统中现在有两个服务,第一个服务发送消息,第二个服务接收消息,我们下面看看这是如何实现的

发送消息的配置文件:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
    xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd
    http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
    http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">

    <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://192.168.25.155:61616" />
    </bean>
    <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
    <bean id="connectionFactory"
        class="org.springframework.jms.connection.SingleConnectionFactory">
        <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
        <property name="targetConnectionFactory" ref="targetConnectionFactory" />
    </bean>
    <!-- 配置生产者 -->
    <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
        <property name="connectionFactory" ref="connectionFactory" />
    </bean>
    <!--这个是队列目的地,点对点的 -->
    <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg>
            <value>spring-queue</value>
        </constructor-arg>
    </bean>
    <!--这个是主题目的地,一对多的 -->
    <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg value="topic" />
    </bean>
</beans>

发送消息的测试方法:

@Test
public void testSpringActiveMq() throws Exception {
    //初始化spring容器
    ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-activemq.xml");
    //从spring容器中获得JmsTemplate对象
    JmsTemplate jmsTemplate = applicationContext.getBean(JmsTemplate.class);
    //从spring容器中取Destination对象
    Destination destination = (Destination) applicationContext.getBean("queueDestination");
    //使用JmsTemplate对象发送消息。
    jmsTemplate.send(destination, new MessageCreator() {
        
        @Override
        public Message createMessage(Session session) throws JMSException {
            //创建一个消息对象并返回
            TextMessage textMessage = session.createTextMessage("spring activemq queue message");
            return textMessage;
        }
    });
}
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
    xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd
    http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
    http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">

    <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL" value="tcp://192.168.25.168:61616" />
    </bean>
    <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
    <bean id="connectionFactory"
        class="org.springframework.jms.connection.SingleConnectionFactory">
        <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
        <property name="targetConnectionFactory" ref="targetConnectionFactory" />
    </bean>
    <!--这个是队列目的地,点对点的 -->
    <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg>
            <value>spring-queue</value>
        </constructor-arg>
    </bean>
    <!--这个是主题目的地,一对多的 -->
    <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
        <constructor-arg value="topic" />
    </bean>
    <!-- 接收消息 -->
    <!-- 配置监听器 -->
    <bean id="myMessageListener" class="cn.e3mall.search.listener.MyMessageListener" />
    <!-- 消息监听容器 -->
    <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destination" ref="queueDestination" />
        <property name="messageListener" ref="myMessageListener" />
    </bean>
</beans>

创建一个MessageListener的实现类

public class MyMessageListener implements MessageListener {

    @Override
    public void onMessage(Message message) {
        
        try {
            TextMessage textMessage = (TextMessage) message;
            //取消息内容
            String text = textMessage.getText();
            System.out.println(text);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

}

测试接收消息的代码

@Test
public void testQueueConsumer() throws Exception {
    //初始化spring容器
    ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-activemq.xml");
    //等待
    System.in.read();
}

4、消息的发送策略

持久化消息

connectionFactory.setProducerWindowSize();
textMessage.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENCE);

非持久化消息模式下,默认就是异步发送过程,如果需要对非持久化消息的每次发送的消息都获得broker的回执的话

connectionFactory.setAlwaysSyncSend();

5、consumer获取消息是pull还是(broker的主动 push)

Queue

topic

假如prefetchSize=0 . 此时对于consumer来说,就是一个pull模式

6、acknowledge为什么能够在第5次主动执行ack以后,把前面的消息都确认掉

deliveredMessages 表示已经被consumer接收但是未被确认的消息

public static final byte DATA_STRUCTURE_TYPE = CommandTypes.MESSAGE_DISPATCH;

protected ConsumerId consumerId;
protected ActiveMQDestination destination;
protected Message message;
protected int redeliveryCounter;

protected transient long deliverySequenceId;
protected transient Object consumer;
protected transient Runnable transmitCallback;
protected transient Throwable rollbackCause;
 private void beforeMessageIsConsumed(MessageDispatch md) throws JMSException {
        md.setDeliverySequenceId(session.getNextDeliveryId());
        lastDeliveredSequenceId = md.getMessage().getMessageId().getBrokerSequenceId();
        if (!isAutoAcknowledgeBatch()) {
            synchronized(deliveredMessages) {
                deliveredMessages.addFirst(md);
            }
            if (session.getTransacted()) {
                if (transactedIndividualAck) {
                    immediateIndividualTransactedAck(md);
                } else {
                    ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
                }
            }
        }
}
private void afterMessageIsConsumed(MessageDispatch md, boolean messageExpired) throws JMSException {
        if (unconsumedMessages.isClosed()) {
            return;
        }
        if (messageExpired) {
            acknowledge(md, MessageAck.DELIVERED_ACK_TYPE);
            stats.getExpiredMessageCount().increment();
        } else {
            stats.onMessage();
            if (session.getTransacted()) {
                // Do nothing.
            } else if (isAutoAcknowledgeEach()) {
                if (deliveryingAcknowledgements.compareAndSet(false, true)) {
                    synchronized (deliveredMessages) {
                        if (!deliveredMessages.isEmpty()) {
                            if (optimizeAcknowledge) {
                                ackCounter++;

                                // AMQ-3956 evaluate both expired and normal msgs as
                                // otherwise consumer may get stalled
                                if (ackCounter + deliveredCounter >= (info.getPrefetchSize() * .65) || (optimizeAcknowledgeTimeOut > 0 && System.currentTimeMillis() >= (optimizeAckTimestamp + optimizeAcknowledgeTimeOut))) {
                                    MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
                                    if (ack != null) {
                                        deliveredMessages.clear();
                                        ackCounter = 0;
                                        session.sendAck(ack);
                                        optimizeAckTimestamp = System.currentTimeMillis();
                                    }
                                    // AMQ-3956 - as further optimization send
                                    // ack for expired msgs when there are any.
                                    // This resets the deliveredCounter to 0 so that
                                    // we won't sent standard acks with every msg just
                                    // because the deliveredCounter just below
                                    // 0.5 * prefetch as used in ackLater()
                                    if (pendingAck != null && deliveredCounter > 0) {
                                        session.sendAck(pendingAck);
                                        pendingAck = null;
                                        deliveredCounter = 0;
                                    }
                                }
                            } else {
                                MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
                                if (ack!=null) {
                                    deliveredMessages.clear();
                                    session.sendAck(ack);
                                }
                            }
                        }
                    }
                    deliveryingAcknowledgements.set(false);
                }
            } else if (isAutoAcknowledgeBatch()) {
                ackLater(md, MessageAck.STANDARD_ACK_TYPE);
            } else if (session.isClientAcknowledge()||session.isIndividualAcknowledge()) {
                boolean messageUnackedByConsumer = false;
                synchronized (deliveredMessages) {
                    messageUnackedByConsumer = deliveredMessages.contains(md);
                }
                if (messageUnackedByConsumer) {
                    ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
                }
            }
            else {
                throw new IllegalStateException("Invalid session state.");
            }
        }
}

7、消息确认

8、ActiveMQ支持的传输协议

9、ActiveMQ持久化存储

ActiveMQ提供了插件式的消息存储,主要有有如下几种:

1、kahaDB 默认的存储方式

<persistenceAdapter>
    <kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>

可用的属性有:

2、AMQ 基于文件的存储方式

AMQ Message Store是ActiveMQ5.0缺省的持久化存储,它是一个基于文件、事务存储设计为快速消息存储的一个结构,该结构是以流的形式来进行消息交互的。

这种方式中,Messages被保存到data logs中,同时被reference store进行索引以提高存取速度。Data logs由一些简单的data log文件组成,缺省的文件大小是32M,如果某个消息的大小超过了data log文件的大小,那么可以修改配置以增加data log文件的大小。如果某个data log文件中所有的消息都被成功消费了,那么这个data log文件将会被标记,以便在下一轮的清理中被删除或者归档。

AMQ Message Store配置示例:

3、JDBC 基于数据库的存储

配置如下:

image.jpeg image.jpeg

JDBC Message Store with ActiveMQ Journal

这种方式克服了JDBC Store的不足,使用快速的缓存写入技术,大大提高了性能。
配置如下:

<beans>
  <broker brokerName="test-broker" xmlns="http://activemq.apache.org/schema/core">
    <persistenceFactory>
    <journalPersistenceAdapterFactory journalLogFiles="4" journalLogFileSize="32768"
      useJournal="true" useQuickJournal="true" dataSource="#mysql_ds" dataDirectory="activemq-data">
    </persistenceFactory>
  </broker>
</beans>

JDBC Store和JDBC Message Store with ActiveMQ Journal的区别:

生成的数据库表

1.消息表,缺省表明为ACTIVEMQ_MSGS, queue和topic都存储在里面,结构如下:


image.png

2.ACTIVEMQ_ACKS表存储持久订阅的信息和最后一个持久订阅接收的消息ID,结构如下:


image.png

3.锁定表,缺省表名为ACTIVEMQ_LOCK,用来确保在某一时刻,只能有一个ActiveMQ broker实例来访问数据库,结构如下:


image.png

4、Memory Message Store

<beans>
  <broker brokerName="test_broker" persistent="false" xmlns="http://activemq.apache.org/schema/core">
    <transportConnectors>
    <transportConnector uri="tcp://localhost:61635">
     </transportConnectors>
</beans>

更多信息,关注java架构

WechatIMG136.jpeg
上一篇 下一篇

猜你喜欢

热点阅读