消息中间件

【ActiveMQ】主从数据共享部署

2019-06-23  本文已影响0人  佐蓝Gogoing

主从共享部署分为Shared FileSystem Master-SlaveShared DatabaseMaster-Slave,数据源分别为文件系统(如 NAS)和数据库,这里以 MySQL 数据库为例。

1. 添加数据库驱动

为了让 ActiveMQ 支持数据库,先要在${ACTICEMQ_HOME}/lib/extra添加数据库的驱动包

[root@localhost extra]# pwd
/var/activemq/lib/extra
[root@localhost extra]# ls
mqtt-client-1.15.jar  mysql-connector-java-5.1.47.jar

2. 修改配置文件

开打 ${ACTICEMQ_HOME}/conf/activemq.xml
在 broker 节点 添加 persistent="true" 使其支持持久化

<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" persistent="true" dataDirectory="${activemq.data}" schedulerSupport="true">

修改持久化适配器为数据库

<persistenceAdapter>
    <jdbcPersistenceAdapter dataSource="#mysql-ds" useDatabaseLock="false" transactionIsolation="4"/>
</persistenceAdapter>

在 broker 节点后添加数据源

<bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
    <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
    <property name="url" value="jdbc:mysql://192.168.0.100:3306/test_activemq?relaxAutoCommit=true"/>
    <property name="username" value="root"/>
    <property name="password" value="root123"/>
    <property name="poolPreparedStatements" value="true"/>
</bean>

3. 启动多个 ActiveMQ

启动多个配置文件相同的 ActiveMQ,多个 ActiveMQ 开启时,同一时间只有一个服务器可以拿到 MySQL 的连接。

4. 代码

消费者

public class ConsumerFailover {
    public static void main(String[] args) throws InterruptedException {
        // 非failover的公共参数配置通过nested.*,例如 failover:(...)?nested.wireFormat.maxInactivityDuration=1000
        // ?randomize=false 随机选择,默认是顺序
        // 指定优先切换 failover:(tcp://host1:61616,tcp://host2:61616,tcp://host3:61616)?priorityBackup=true&priorityURIs=tcp://local1:61616,tcp://local2:61616
        // maxReconnectDelay重连的最大间隔时间(毫秒)
        String brokerUrl = "failover:(tcp://192.168.0.102:61616,tcp://192.168.0.106:61616)?initialReconnectDelay=100";
        ConsumerThread queue1 = new ConsumerThread(brokerUrl, "queue1");
        queue1.start();
        queue1.join();
    }
}

class ConsumerThread extends Thread {

    String brokerUrl;
    String destinationUrl;

    public ConsumerThread(String brokerUrl, String destinationUrl) {
        this.brokerUrl = brokerUrl;
        this.destinationUrl = destinationUrl;
    }

    @Override
    public void run() {
        ActiveMQConnectionFactory connectionFactory;
        Connection conn;
        Session session;
        MessageConsumer consumer;

        try {
            // brokerURL http://activemq.apache.org/connection-configuration-uri.html
            // 1、创建连接工厂
            connectionFactory = new ActiveMQConnectionFactory(this.brokerUrl);
            // 2、创建连接对象
            conn = connectionFactory.createConnection();
            conn.start(); // 一定要启动
            // 3、创建会话(可以创建一个或者多个session)
            session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 4、创建点对点接收的目标,queue - 点对点
            Destination destination = session.createQueue(destinationUrl);

            // 5、创建消费者消息 http://activemq.apache.org/destination-options.html
            consumer = session.createConsumer(destination);

            // 6、接收消息
            consumer.setMessageListener(message -> {
                try {
                    if (message instanceof TextMessage) {
                        System.out.println("收到文本消息:" + ((TextMessage) message).getText());
                    } else {
                        System.out.println(message);
                    }
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            });
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

启动后客户端会按顺序去连接服务器,连接上了第一个

 INFO | Successfully connected to tcp://192.168.0.106:61616

生产者

public class Producer {
    public static void main(String[] args) {
        // 添加两个中间件,可以实现故障转移
        String brokerUrl = "failover:(tcp://192.168.0.102:61616,tcp://192.168.0.106:61616)?initialReconnectDelay=100";
        new ProducerThread(brokerUrl, "queue1").start();
    }

    static class ProducerThread extends Thread {
        String brokerUrl;
        String destinationUrl;

        public ProducerThread(String brokerUrl, String destinationUrl) {
            this.brokerUrl = brokerUrl;
            this.destinationUrl = destinationUrl;
        }

        @Override
        public void run() {
            ActiveMQConnectionFactory connectionFactory;
            Connection conn;
            Session session;

            try {
                // 1、创建连接工厂
                connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
                // 2、创建连接对象md
                conn = connectionFactory.createConnection();
                conn.start();
                // 3、创建会话
                session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
                // 4、创建点对点发送的目标
                 Destination destination = session.createQueue(destinationUrl);
                // 5、创建生产者消息
                MessageProducer producer = session.createProducer(destination);
                // 设置生产者的模式,有两种可选 持久化 / 不持久化
                producer.setDeliveryMode(DeliveryMode.PERSISTENT);
                // 6、创建一条文本消息
                String text = "Hello world!";
                TextMessage message = session.createTextMessage(text);
                for (int i = 0; i < 1; i++) {
                    // 7、发送消息
                    producer.send(message);
                }
                // 8、 关闭连接
                session.close();
                conn.close();
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
}

运行以下生产者的代码,可以看到消费者接收到了

 INFO | Successfully connected to tcp://192.168.0.106:61616
收到文本消息:Hello world!

5. 模拟故障

通过手动关闭 ActiveMQ 模拟故障场景,消费者断开后连接到第二个服务器

java.io.EOFException
    at java.io.DataInputStream.readInt(DataInputStream.java:392)
    at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:268)
    at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:240)
    at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:232)
    at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215)
    at java.lang.Thread.run(Thread.java:748)
 INFO | Successfully connected to tcp://192.168.0.106:61616
上一篇下一篇

猜你喜欢

热点阅读