微服务容器云MQ

activemq Artemis笔记

2019-08-21  本文已影响0人  shoyu666

参考:https://activemq.apache.org/components/artemis/documentation/

activemq 是什么

ActiveMQ是开源的,支持多种协议(CORE,AMQP,MQTT,JMS...),
基于java的消息系统(或消息中间件)。
它支持业界标准协议,这样有利于客户端的选择(从c,c++,python,.net等)

当期有2个版本的ActiveMQ

因为Artemis是下一代的ActiveMQ,所以后面介绍基于Artemis。

ActiveMQ Artemis

下一代高性能,非阻塞架构,基于事件驱动的消息系统

特性:

使用

下载Artemis,并解压

//目录结构如下
         |___ bin //二进制和脚本
         |
         |___ examples //各自demo
         |      |___ common
         |      |___ features
         |      |___ perf
         |      |___ protocols
         |
         |___ lib //jar和libraries 
         |      |___ client
         |
         |___ schema //用于校验Artemis配置
         |
         |___ web //web上下文
                |___ api //api文档
                |___ hacking-guide
                |___ migration-guide //迁移指南
                |___ user-manual //用户手册

创建Broker Instance

执行  ${ARTEMIS_HOME}/bin/artemis create mybroker
${ARTEMIS_HOME}下载解压的目录
mybroker名字自定义
image.png

可以看到生成了一个目录mybroker,里面有各种文件。所有Broker Instance实际是一个包含所有配置文件和运行时数据的目录。

特别注意的是2个文件
//启动配置
etc/bootstrap.xml
//核心配置
etc/broker.xml

bootstrap.xml 内部包含了broker.xml,几乎所有的配置都在broker.xml进行,Artemis提供了大部分默认配置


//启动服务(使用了默认的etc/bootstrap.xml配置)
mybroker/bin/artemis run 

//指定配置启动服务(使用自定义bootstrap.xml配置)
mybroker/bin/artemis run -- xml:path/to/bootstrap.xml


//关闭服务
mybroker/bin/artemis stop
//生产者
public class Producer {

    public static void main(String[] args) throws Exception {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.2.98:61616");
        Connection connection = factory.createConnection();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue("TEST.QUEUE");
        MessageProducer producer = session.createProducer(queue);
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
        for (int i= 0; i < 100; i++) {
            TextMessage message = session.createTextMessage("hello world! " + i);
            producer.send(message);
            System.out.println(message);
        }
        producer.close();
    }
}
//消费者
public class Consumer {

    public static void main(String[] args) throws Exception {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.2.98:61616");
        Connection connection = factory.createConnection();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue("TEST.QUEUE");
        MessageConsumer consumer = session.createConsumer(queue);
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                System.out.println(message);
            }
        });

        connection.start();
    }
}

架构

image.png

Protocal

前面说过activemq支持多协议,(CORE,AMQP,MQTT,JMS...),对应图中Core Protocol,ProtocolA...,ProtocolA,B,C可以是MQTT等对应的协议。比较特殊的JMS Protocol 从图中看是基于Core Protocol,所以Core Protocol,MQTT等协议是直接和protocalManager交互,而JMS protocol是通过Core Protocol和protocalManager交互的。

ProtocalManager

ProtocalManager 负责协议到Artemis Server 核心模型的转换,即不管外部是什么协议,到Artemis Server都统一成一个概念。这样增加协议不会涉及Artemis Server的核心改动。这个统一的概念(核心模型),叫Addressing Model

Addressing Model

Addressing Model是Artemis 特有的,强大,灵活,高性能。它包含三个核心概念

Address,Queue,Routing Types 三者的概念和联系代表了Artemis 消息流转的核心。

Address

Address 表示一个消息的终端(messaging endpoint),类似其它系统的topic概念,Address可以绑定0个或多个queue 和 一个routing type.

Queue

Queue 是和Address关联的,Address对Queue是一对多的关系,当对应于一个Address的消息到来时,消息讲发送给Address的一个或多个Queue(取决于Routing Types的配置),Queue同时可以配置成自动创建和删除。
比如当Producter给一个Address发送消息时,如果Address不存在,Artemis会自动创建该Address。

Routing Types

Routing Types 决定了消息是如何发送给Address的Queue。
Routing Types 有2种类型。

注意:可以为一个address指定多个Routing Type,但不建议这么做。

介绍几种常见的Address配置

1:Point-to-Point Messaging
image.png

address.foo(Address)的Routing Types是Anycast
消息{1,2,3,4,5}依次发送到address.foo的Q1,Consumer 1 & Consumer 2 共同消息Q1的一部分消息,Consumer 1 & Consumer 2之间的负载取决于Artemis的负责策略。

配置:
<!--/etc/broker.xml-->
<addresses>
   <address name="address.foo">
      <anycast>
         <queue name="anycast"/>
      </anycast>
   </address>
</addresses>
2:Publish-Subscribe Messaging
image.png

消息会同时发给pubsub.foo的2个Queue,Consumer 1 & Consumer 2消费相同消息

配置:
<!--/etc/broker.xml-->
<addresses>
   <address name="topic.foo">
      <multicast/>
   </address>
</addresses>

下面这种配置没有必要,因为artemis可以自动创建queue

<!--/etc/broker.xml-->
<addresses>
   <address name="topic.foo">
      <multicast>
         <queue name="client123.topic.foo"/>
         <queue name="client456.topic.foo"/>
      </multicast>
   </address>
</addresses>
3:Point-to-Point Address multiple Queues
image.png
配置:
<!--/etc/broker.xml-->
<addresses>
   <address name="address.foo">
      <anycast>
         <queue name="q1"/>
         <queue name="q2"/>
      </anycast>
   </address>
</addresses>
4:Point-to-Point Address multiple Queues
image.png
配置:
<!--/etc/broker.xml-->
<addresses>
   <address name="foo.orders">
      <anycast>
         <queue name="orders"/>
      </anycast>
      <multicast/>
   </address>
</addresses>

Persistence(持久化)

消息系统高可用,要在crash或者restart的情况下消息不丢失。Artemis 提供了2种持久化方式,以便系统恢复后消息重新投递。

文件日志

磁盘写操作包含了移动磁头以定位到要写的位置,散落在磁盘的不同位置的文件意味着频繁的磁头移动,导致性能很差,所以磁盘的顺序写可以最大的利用磁盘的性能。类似于kafka的文件日志也是基于此。

Artemis的文件日志是只追加的,原因如上所述。Artemis的文件日志由多个文件组成,每个文件都是固定大小,预先创建。

add message, update message, delete message都会记录到文件日志,当一个文件满了之后,记录到下一个日志文件。

随着删除记录被添加到日志中 Artemis有一个复杂的文件垃圾收集算法,它可以确定是否还需要某个特定的日志文件——也就是说,它的所有数据已被删除,如果是,则可以回收和重用文件

Artemis同时有个压缩算法用于压缩日志文件(删除无用的数据,并压缩)

文件日志的写入方式

日志分类

bindings journal

保存了绑定相关数据:包含了queue的信息和属性,id序列计数器,始终是Java NIO方式,因为比起message 日志,bindings日志量级低

message journal

保存了消息相关数据:包含消息本身,重复Id缓存。
默认使用 AIO,如果不支持 AIO 回退到 Java NIO

零持久化(Zero Persistence)

意味着以下数据都没有持久化

//表示不做持久化,如下设置
persistence-enabled in broker.xml to false.

传输

Artemis 的底层传输涉及2个重要概念

Acceptors

我们从配置看起

<!--broker.xml-->
<acceptors>
    <acceptor name="netty">tcp://localhost:61617</acceptor>
    <!--可以有多个acceptor-->
    <acceptor>...</acceptor>
</acceptors>

acceptor表示Artemis 服务的一个接收器,用于接收客户端请求。
上面的配置定义了一个接收器,底层使用了netty,端口是61617。

可以为acceptor指定一些配置,方法是在url后面设置

<acceptor name="netty">tcp://localhost:61617?sslEnabled=true&keyStorePath=/path</acceptor>

Connectors

如同Acceptors用于服务器如何接收请求,Connectors是用于指定如何请求服务器

<!--broker.xml-->
<connector name="netty">tcp://localhost:61617</connector>

上面的配置主要用于:

客户端直连服务器

服务器端无需配置Connectors

ServerLocator locator = ActiveMQClient.createServerLocator("tcp://localhost:61617");

ClientSessionFactory sessionFactory = locator.createClientSessionFactory();

ClientSession session = sessionFactory.createSession(...);

Netty传输配置

<!--指定协议-->
<acceptor name="netty">tcp://localhost:61617?protocols=CORE,AMQP</acceptor>

发送&提交的保证

事务消息

当commit 或 rollback 请求发送到服务器时,客户端阻塞直到服务器执行后返回结果。

服务器会把commit 或 rollback 提交到文件日志中。
journal-sync-transactional配置影响这个过程。
当journal-sync-transactional 为 false时,服务器在响应客户端后某个时间写入。如果设置为true服务器会确保写入文件日志后再返回可客户端。

非事务消息发送

blockOnDurableSend 配置

如果设置为true,当发送持久化的非事务消息时,客户端会阻塞直到消息到达服务,并返回响应。默认是true

blockOnNonDurableSend 配置

如果设置为true,当发送非持久化非事务消息时,客户端会阻塞直到消息到达服务,并返回响应。默认是false

设置block on sends 为true 会降低系统性能,因为发送响应后下个发送才能执行。意味着系统的性能被网络往返时间限制,而不是带宽。
为了更好的性能,建议使用事务消息批量发送 或者 使用
Artemis 推荐的 ‘asynchronous send acknowledgements’ 特性

journal-sync-non-transactional设置

如果设置为true,当系统受到持久化非事务消息时,消息会routed 给 至少一个queue,然后系统持久化该消息,之后才发送响应给客户端。

BlockOnAcknowledge 设置

If this is set to true then all calls to acknowledge on non transacted sessions will block until the acknowledge has reached the server, and a response has been sent back. You might want to set this to true if you want to implement a strict at most once delivery policy. The default value is false
这里文档描述的意思不是特别理解,下面是从源代码注释看该设置表达的意思

//org.apache.activemq.artemis.api.core.client.ClientSession
/**
    * Returns whether the ClientConsumer created by the session will <em>block</em> when they acknowledge a message.
    *
    * @return <code>true</code> if the session's ClientConsumer block when they acknowledge a message, <code>false</code> else
    */
   boolean isBlockOnAcknowledge();

Asynchronous Send Acknowledgements

如果使用非事务session,又要保证消息发送,就如‘非务消息’发送里说的,需要牺牲性能。

作为改进,Artemis 提供了新的特性:'Asynchronous Send Acknowledgements',通过这个特性,Artemis 可以配置为发送消息无需阻塞等待,而是异步接收消息的acknowledgement 响应。确保消息到达的同时提高系统吞吐量。

Session.setSendAcknowledgementHandler(new MySendAcknowledgementsHandler());
class MySendAcknowledgementsHandler implements SendAcknowledgementHandler {

            int count = 0;

            @Override
            public void sendAcknowledged(final Message message) {
               System.out.println("Received send acknowledgement for message " + count++);
            }
}

Artemis Plugin Support

Artemis 被设计成支持插件,所有插件在同一个时间注册,并连接在一起,按照注册的时间顺序依次执行。

实现插件步骤

<broker-plugins>
   <broker-plugin class-name="some.plugin.UserPlugin">
      <property key="property1" value="val_1" />
      <property key="property2" value="val_2" />
   </broker-plugin>
</broker-plugins>
...

Configuration config = new ConfigurationImpl();
...

config.registerBrokerPlugin(new UserPlugin());

Libaio Native Libraries

Thread management

Server-Side Thread Management

Artemis 内部维护一个标准线程用于一般性的任务,另一个scheduled 线程池用于scheduled 任务

code read

send_deliver

image.png
上一篇下一篇

猜你喜欢

热点阅读