activemq Artemis笔记
参考:https://activemq.apache.org/components/artemis/documentation/
activemq 是什么
ActiveMQ是开源的,支持多种协议(CORE,AMQP,MQTT,JMS...),
基于java的消息系统(或消息中间件)。
它支持业界标准协议,这样有利于客户端的选择(从c,c++,python,.net等)
当期有2个版本的ActiveMQ
- ActiveMQ 5 "Classic"
- ActiveMQ Artemis
因为Artemis是下一代的ActiveMQ,所以后面介绍基于Artemis。
ActiveMQ Artemis
下一代高性能,非阻塞架构,基于事件驱动的消息系统
特性:
- 提供实现JMS 1.1 & 2.0 的客户端,包含JNDI
- 通过共享存储和网络复制提供高可用
- 简单&强大的协议无感知的寻址模型(addressing model)
- 灵活的集群用于分布式负载
- 基于日志的低延迟持久化
- 方便从ActiveMQ 5迁移
使用
下载Artemis,并解压
//目录结构如下
|___ bin //二进制和脚本
|
|___ examples //各自demo
| |___ common
| |___ features
| |___ perf
| |___ protocols
|
|___ lib //jar和libraries
| |___ client
|
|___ schema //用于校验Artemis配置
|
|___ web //web上下文
|___ api //api文档
|___ hacking-guide
|___ migration-guide //迁移指南
|___ user-manual //用户手册
创建Broker Instance
执行 ${ARTEMIS_HOME}/bin/artemis create mybroker
${ARTEMIS_HOME}下载解压的目录
mybroker名字自定义
image.png
可以看到生成了一个目录mybroker,里面有各种文件。所有Broker Instance实际是一个包含所有配置文件和运行时数据的目录。
- bin: 可执行脚本
- etc: 配置
- data: 消息持久化保存
- log: 日志
- tmp: 临时文件,可以安全删除
特别注意的是2个文件
//启动配置
etc/bootstrap.xml
//核心配置
etc/broker.xml
bootstrap.xml 内部包含了broker.xml,几乎所有的配置都在broker.xml进行,Artemis提供了大部分默认配置
//启动服务(使用了默认的etc/bootstrap.xml配置)
mybroker/bin/artemis run
//指定配置启动服务(使用自定义bootstrap.xml配置)
mybroker/bin/artemis run -- xml:path/to/bootstrap.xml
//关闭服务
mybroker/bin/artemis stop
//生产者
public class Producer {
public static void main(String[] args) throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.2.98:61616");
Connection connection = factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("TEST.QUEUE");
MessageProducer producer = session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
for (int i= 0; i < 100; i++) {
TextMessage message = session.createTextMessage("hello world! " + i);
producer.send(message);
System.out.println(message);
}
producer.close();
}
}
//消费者
public class Consumer {
public static void main(String[] args) throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.2.98:61616");
Connection connection = factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("TEST.QUEUE");
MessageConsumer consumer = session.createConsumer(queue);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
System.out.println(message);
}
});
connection.start();
}
}
架构
image.pngProtocal
前面说过activemq支持多协议,(CORE,AMQP,MQTT,JMS...),对应图中Core Protocol,ProtocolA...,ProtocolA,B,C可以是MQTT等对应的协议。比较特殊的JMS Protocol 从图中看是基于Core Protocol,所以Core Protocol,MQTT等协议是直接和protocalManager交互,而JMS protocol是通过Core Protocol和protocalManager交互的。
ProtocalManager
ProtocalManager 负责协议到Artemis Server 核心模型的转换,即不管外部是什么协议,到Artemis Server都统一成一个概念。这样增加协议不会涉及Artemis Server的核心改动。这个统一的概念(核心模型),叫Addressing Model
Addressing Model
Addressing Model是Artemis 特有的,强大,灵活,高性能。它包含三个核心概念
- Address
- Queue
- Routing Types
Address,Queue,Routing Types 三者的概念和联系代表了Artemis 消息流转的核心。
Address
Address 表示一个消息的终端(messaging endpoint),类似其它系统的topic概念,Address可以绑定0个或多个queue 和 一个routing type.
Queue
Queue 是和Address关联的,Address对Queue是一对多的关系,当对应于一个Address的消息到来时,消息讲发送给Address的一个或多个Queue(取决于Routing Types的配置),Queue同时可以配置成自动创建和删除。
比如当Producter给一个Address发送消息时,如果Address不存在,Artemis会自动创建该Address。
Routing Types
Routing Types 决定了消息是如何发送给Address的Queue。
Routing Types 有2种类型。
- Anycast:发送给单个queue,point-to-point
- Multicast:发送给每个queue publish-subscribe
注意:可以为一个address指定多个Routing Type,但不建议这么做。
介绍几种常见的Address配置
1:Point-to-Point Messaging
image.pngaddress.foo(Address)的Routing Types是Anycast
消息{1,2,3,4,5}依次发送到address.foo的Q1,Consumer 1 & Consumer 2 共同消息Q1的一部分消息,Consumer 1 & Consumer 2之间的负载取决于Artemis的负责策略。
配置:
<!--/etc/broker.xml-->
<addresses>
<address name="address.foo">
<anycast>
<queue name="anycast"/>
</anycast>
</address>
</addresses>
2:Publish-Subscribe Messaging
image.png消息会同时发给pubsub.foo的2个Queue,Consumer 1 & Consumer 2消费相同消息
配置:
<!--/etc/broker.xml-->
<addresses>
<address name="topic.foo">
<multicast/>
</address>
</addresses>
下面这种配置没有必要,因为artemis可以自动创建queue
<!--/etc/broker.xml-->
<addresses>
<address name="topic.foo">
<multicast>
<queue name="client123.topic.foo"/>
<queue name="client456.topic.foo"/>
</multicast>
</address>
</addresses>
3:Point-to-Point Address multiple Queues
image.png配置:
<!--/etc/broker.xml-->
<addresses>
<address name="address.foo">
<anycast>
<queue name="q1"/>
<queue name="q2"/>
</anycast>
</address>
</addresses>
4:Point-to-Point Address multiple Queues
image.png配置:
<!--/etc/broker.xml-->
<addresses>
<address name="foo.orders">
<anycast>
<queue name="orders"/>
</anycast>
<multicast/>
</address>
</addresses>
Persistence(持久化)
消息系统高可用,要在crash或者restart的情况下消息不丢失。Artemis 提供了2种持久化方式,以便系统恢复后消息重新投递。
- 文件日志
- JDBC
文件日志
磁盘写操作包含了移动磁头以定位到要写的位置,散落在磁盘的不同位置的文件意味着频繁的磁头移动,导致性能很差,所以磁盘的顺序写可以最大的利用磁盘的性能。类似于kafka的文件日志也是基于此。
Artemis的文件日志是只追加的,原因如上所述。Artemis的文件日志由多个文件组成,每个文件都是固定大小,预先创建。
add message, update message, delete message都会记录到文件日志,当一个文件满了之后,记录到下一个日志文件。
随着删除记录被添加到日志中 Artemis有一个复杂的文件垃圾收集算法,它可以确定是否还需要某个特定的日志文件——也就是说,它的所有数据已被删除,如果是,则可以回收和重用文件
Artemis同时有个压缩算法用于压缩日志文件(删除无用的数据,并压缩)
文件日志的写入方式
- Java NIO
- Linux Asynchronous IO
- Memory mapped
日志分类
- bindings journal
- message journal
- large message data
- duplicate id caches
- paging data
bindings journal
保存了绑定相关数据:包含了queue的信息和属性,id序列计数器,始终是Java NIO方式,因为比起message 日志,bindings日志量级低
message journal
保存了消息相关数据:包含消息本身,重复Id缓存。
默认使用 AIO,如果不支持 AIO 回退到 Java NIO
零持久化(Zero Persistence)
意味着以下数据都没有持久化
- bindings data
- message data
- large message data
- duplicate id caches
- paging data
//表示不做持久化,如下设置
persistence-enabled in broker.xml to false.
传输
Artemis 的底层传输涉及2个重要概念
- Acceptors
- Connectors
Acceptors
我们从配置看起
<!--broker.xml-->
<acceptors>
<acceptor name="netty">tcp://localhost:61617</acceptor>
<!--可以有多个acceptor-->
<acceptor>...</acceptor>
</acceptors>
acceptor表示Artemis 服务的一个接收器,用于接收客户端请求。
上面的配置定义了一个接收器,底层使用了netty,端口是61617。
可以为acceptor指定一些配置,方法是在url后面设置
<acceptor name="netty">tcp://localhost:61617?sslEnabled=true&keyStorePath=/path</acceptor>
Connectors
如同Acceptors用于服务器如何接收请求,Connectors是用于指定如何请求服务器
<!--broker.xml-->
<connector name="netty">tcp://localhost:61617</connector>
上面的配置主要用于:
- 当服务bridged另一个服务(涉及bridged概念)
- 当服务是cluster的一部分(涉及cluster的概念)
客户端直连服务器
服务器端无需配置Connectors
ServerLocator locator = ActiveMQClient.createServerLocator("tcp://localhost:61617");
ClientSessionFactory sessionFactory = locator.createClientSessionFactory();
ClientSession session = sessionFactory.createSession(...);
Netty传输配置
<!--指定协议-->
<acceptor name="netty">tcp://localhost:61617?protocols=CORE,AMQP</acceptor>
发送&提交的保证
事务消息
当commit 或 rollback 请求发送到服务器时,客户端阻塞直到服务器执行后返回结果。
服务器会把commit 或 rollback 提交到文件日志中。
journal-sync-transactional配置影响这个过程。
当journal-sync-transactional 为 false时,服务器在响应客户端后某个时间写入。如果设置为true服务器会确保写入文件日志后再返回可客户端。
非事务消息发送
blockOnDurableSend 配置
如果设置为true,当发送持久化的非事务消息时,客户端会阻塞直到消息到达服务,并返回响应。默认是true
blockOnNonDurableSend 配置
如果设置为true,当发送非持久化非事务消息时,客户端会阻塞直到消息到达服务,并返回响应。默认是false
设置block on sends 为true 会降低系统性能,因为发送响应后下个发送才能执行。意味着系统的性能被网络往返时间限制,而不是带宽。
为了更好的性能,建议使用事务消息批量发送 或者 使用
Artemis 推荐的 ‘asynchronous send acknowledgements’ 特性
journal-sync-non-transactional设置
如果设置为true,当系统受到持久化非事务消息时,消息会routed 给 至少一个queue,然后系统持久化该消息,之后才发送响应给客户端。
BlockOnAcknowledge 设置
If this is set to true then all calls to acknowledge on non transacted sessions will block until the acknowledge has reached the server, and a response has been sent back. You might want to set this to true if you want to implement a strict at most once delivery policy. The default value is false
这里文档描述的意思不是特别理解,下面是从源代码注释看该设置表达的意思
//org.apache.activemq.artemis.api.core.client.ClientSession
/**
* Returns whether the ClientConsumer created by the session will <em>block</em> when they acknowledge a message.
*
* @return <code>true</code> if the session's ClientConsumer block when they acknowledge a message, <code>false</code> else
*/
boolean isBlockOnAcknowledge();
Asynchronous Send Acknowledgements
如果使用非事务session,又要保证消息发送,就如‘非务消息’发送里说的,需要牺牲性能。
作为改进,Artemis 提供了新的特性:'Asynchronous Send Acknowledgements',通过这个特性,Artemis 可以配置为发送消息无需阻塞等待,而是异步接收消息的acknowledgement 响应。确保消息到达的同时提高系统吞吐量。
Session.setSendAcknowledgementHandler(new MySendAcknowledgementsHandler());
class MySendAcknowledgementsHandler implements SendAcknowledgementHandler {
int count = 0;
@Override
public void sendAcknowledged(final Message message) {
System.out.println("Received send acknowledgement for message " + count++);
}
}
Artemis Plugin Support
Artemis 被设计成支持插件,所有插件在同一个时间注册,并连接在一起,按照注册的时间顺序依次执行。
实现插件步骤
- 实现ActiveMQServerPlugin接口
- 确保插件在classPath中
- 通过配置或者代码注册
<broker-plugins>
<broker-plugin class-name="some.plugin.UserPlugin">
<property key="property1" value="val_1" />
<property key="property2" value="val_2" />
</broker-plugin>
</broker-plugins>
...
Configuration config = new ConfigurationImpl();
...
config.registerBrokerPlugin(new UserPlugin());
Libaio Native Libraries
Thread management
Server-Side Thread Management
Artemis 内部维护一个标准线程用于一般性的任务,另一个scheduled 线程池用于scheduled 任务