ActiveMQ持久化方案

2020-12-30  本文已影响0人  爱健身的兔子

1 ActiveMQ持久化模型

1.1 PTP

Queue的存储是很简单的,就是一个FIFO的Queue

1.2 PUB/SUB

对于持久化订阅主题,每一个消费者将获得一个消息的复制。

2 ActiveMQ持久化流程

当ActiveMQ接收到PERSISTENT Message消息后就需要借助持久化方案来完成PERSISTENT Message的存储。这个介质可以是磁盘文件系统、可以是ActiveMQ的内置数据库,还可以是某种外部提供的关系型数据库。

注意:

3 ActiveMQ内存介绍

ActiveMQ的内存配置在activemq.xml中,如下所示:

<systemUsage>
    <systemUsage>
        <memoryUsage>
            <memoryUsage percentOfJvmHeap="70" />
        </memoryUsage>
        <storeUsage>
            <storeUsage limit="100 gb"/>
        </storeUsage>
        <tempUsage>
            <tempUsage limit="50 gb"/>
        </tempUsage>
    </systemUsage>
</systemUsage>

注意

storeUsage和tempUsage并不是“最大可用空间”,而是一个阀值。

4 ActiveMQ持久化方案

ActiveMQ提供了一个插件式的消息存储,主要实现了如下几种:

  1. AMQ消息存储-基于文件的存储,以前默认的存储方式

  2. KahaDB消息存储-提供了容量的提升和恢复能力,现在的默认方式

  3. JDBC消息存储-消息基于JDBC存储

  4. Memory消息存储-基于内存的消息存储

  5. LevelDB消息存储,新推出的高效存储器,但官网不推荐用。

5 KahaDB消息存储器

KahaDB主要元素包括:一个内存Metadata Cache用来在内存中检索消息的存储位置、若干用于记录消息内容的Data log文件、一个在磁盘上检索消息存储位置的Metadata Store、还有一个用于在系统异常关闭后恢复Btree结构的redo文件。如下图所示:

5.1 kahaDB的配置

由于在ActiveMQ V5.4+的版本中,KahaDB是默认的持久化存储方案。所以即使您不配置任何的KahaDB参数信息,ActiveMQ也会启动KahaDB。在activemq的安装目录下的:conf/activemq.xml中有如下配置:(默认配置)

......
<broker xmlns="http://activemq.apache.org/schema/core"  brokerName="localhost" dataDirectory="${activemq.data}">
    ......
    <persistenceAdapter>
        <kahaDB directory="${activemq.data}/kahadb"/>
    </persistenceAdapter>
    ......
</broker>
......
5.2 kahaDB的配置属性

以下表格展示了KahaDB中所有的配置选项和其含义(加“*”部分为重要的配置选项):

property name default value Comments
*directory activemq-data 消息文件和日志的存储目录
*indexWriteBatchSize 1000 当Metadata cache区域和Metadata store区域不同的索引数量达到这个值后,Metadata cache将会发起checkpoint同步
*indexCacheSize 10000 内存中,索引的页大小。超过这个大小Metadata cache将会发起checkpoint同步
*enableIndexWriteAsync false 索引是否异步写到消息文件中,将以不要设置为true
*journalMaxFileLength 32mb 一个消息文件的大小
*enableJournalDiskSyncs true 如果为true,保证使用同步写入的方式持久化消息到journal文件中
*cleanupInterval 30000 清除(清除或归档)不再使用的db-*.log文件的时间周期(毫秒)。
*checkpointInterval 5000 写入索引信息到metadata store中的时间周期(毫秒)
ignoreMissingJournalfiles false 是否忽略丢失的journal文件。如果为false,当丢失了journal文件时,broker启动时会抛异常并关闭
checkForCorruptJournalFiles false 检查消息文件是否损坏,true,检查发现损坏会尝试修复
checksumJournalFiles false 产生一个checksum,以便能够检测journal文件是否损坏。
property name default value Comments
*archiveDataLogs false 当为true时,归档的消息文件被移到directoryArchive,而不是直接删除
*directoryArchive null 存储被归档的消息文件目录
databaseLockedWaitDelay 10000 在使用负载时,等待获得文件锁的延迟时间,单位ms
maxAsyncJobs 10000 等待写入journal文件的任务队列的最大数量。应该大于或等于最大并发producer的数量。配合并行存储转发属性使用。
concurrentStoreAndDispatchTopics false 如果为true,转发消息的时候同时提交事务
concurrentStoreAndDispatchQueues true 如果为true,转发Topic消息的时候同时存储消息的message store中
property name default value Comments
archiveCorruptedIndex false 是否归档错误的索引到Archive文件夹下
property name default value Comments
IndexDirectory 单独设置KahaDB中,db.data文件的存储位置。如果不进行设置,db.data文件的存储位置还是将以directory属性设置的值为准

6 JDBC消息存储器

ActiveMQ支持使用JDBC来持久化消息,我们只需要配置JDBC驱动即可,至于表结构activemq会自动帮我们建好表结构。

6.1 JDBC消息存储器配置
  1. 拷贝mysql的驱动包到activeMQ的 \lib\optional\ 目录下。

  2. 在activemq.xml配置文件里配置数据源

 <bean id="mysql_ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
        <property name="driverClassName">
            <value>com.mysql.jdbc.Driver</value>
        </property>
        <property name="url">
            <value>jdbc:mysql://localhost:3306/activemq</value>
        </property>
        <property name="username">
            <value>root</value>
        </property>
        <property name="password">
            <value>123456</value>
        </property>
    </bean>
  1. 修改broker的persistenceAdapter持久化方式
 <persistenceAdapter>
            <jdbcPersistenceAdapter dataDirectory="${activemq.base}/data" dataSource="#mysql_ds"/>
  </persistenceAdapter>
6.2 JDBC表结构
*************************** 1\. row ***************************
       Table: activemq_acks
Create Table: CREATE TABLE `activemq_acks` (
  `CONTAINER` varchar(250) NOT NULL,
  `SUB_DEST` varchar(250) DEFAULT NULL,
  `CLIENT_ID` varchar(250) NOT NULL,
  `SUB_NAME` varchar(250) NOT NULL,
  `SELECTOR` varchar(250) DEFAULT NULL,
  `LAST_ACKED_ID` bigint(20) DEFAULT NULL,
  `PRIORITY` bigint(20) NOT NULL DEFAULT '5',
  `XID` varchar(250) DEFAULT NULL,
  PRIMARY KEY (`CONTAINER`,`CLIENT_ID`,`SUB_NAME`,`PRIORITY`),
  KEY `ACTIVEMQ_ACKS_XIDX` (`XID`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

*************************** 1\. row ***************************
       Table: activemq_msgs
Create Table: CREATE TABLE `activemq_msgs` (
  `ID` bigint(20) NOT NULL,
  `CONTAINER` varchar(250) NOT NULL,
  `MSGID_PROD` varchar(250) DEFAULT NULL,
  `MSGID_SEQ` bigint(20) DEFAULT NULL,
  `EXPIRATION` bigint(20) DEFAULT NULL,
  `MSG` longblob,
  `PRIORITY` bigint(20) DEFAULT NULL,
  `XID` varchar(250) DEFAULT NULL,
  PRIMARY KEY (`ID`),
  KEY `ACTIVEMQ_MSGS_MIDX` (`MSGID_PROD`,`MSGID_SEQ`),
  KEY `ACTIVEMQ_MSGS_CIDX` (`CONTAINER`),
  KEY `ACTIVEMQ_MSGS_EIDX` (`EXPIRATION`),
  KEY `ACTIVEMQ_MSGS_PIDX` (`PRIORITY`),
  KEY `ACTIVEMQ_MSGS_XIDX` (`XID`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

*************************** 1\. row ***************************
       Table: activemq_lock
Create Table: CREATE TABLE `activemq_lock` (
  `ID` bigint(20) NOT NULL,
  `TIME` bigint(20) DEFAULT NULL,
  `BROKER_NAME` varchar(250) DEFAULT NULL,
  PRIMARY KEY (`ID`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

解释上面三张表:

注意:

当消费者消费了对了的消息的时候,队列里面的数据就会被删除。持久化topic的消息不管是否被消费,是否有消费者,产生的数据永远都存在,且只存储一条。所以持久化的topic大量数据后可能导致性能下降。这里就像公总号一样,消费者消费完后,消息还会保留。

6.3 JDBC Message Store with ActiveMQ Journal====优化版的JDBC存储

这种方式克服了JDBC Store的不足,使用快速的缓存写入技术,大大提高了性能。JDBC 配合其自带的 high performance journal。根据官方说法,它内置的高性能journal的工作类似于在缓存层工作,消息会优先写入到journal,后台的定时任务会每隔一段时间间隔同步到数据库。

<persistenceFactory>
            <journalPersistenceAdapterFactory
                 journalLogFiles="4"
                 journalLogFileSize="32768" 
                 useJournal="true" 
                 useQuickJournal="true" 
                 dataSource="#mysql_ds" 
                 dataDirectory="activemq-data"/>
        </persistenceFactory>

JDBC Store和JDBC Message Store with ActiveMQ Journal的区别

  1. JDBC with journal的性能优于jdbc。

  2. JDBC用于master/slave模式的数据库分享。

  3. JDBC with journal不能用于master/slave模式。

  4. 一般情况下,推荐使用jdbc with journal。

7 Memory消息存储

内存消息存储主要是存储所有的持久化的消息在内存中。这里没有动态的缓存存在,所以你必须注意设置你的broker所在的JVM和内存限制。这种方式的持久化消息只在当前JVM内有效,当重启JVM之后会丢失持久化的消息。配置如下:只需要将 persistent 属性设为false即可。

<broker xmlns="http://activemq.apache.org/schema/core" persistent="false" brokerName="brokerName" dataDirectory="${activemq.data}">

ActiveMQ的持久消息存储方案_u014066037的博客-CSDN博客

上一篇 下一篇

猜你喜欢

热点阅读