集群代码实现

2019-10-22  本文已影响0人  长孙俊明

amqp协议版本

import org.apache.qpid.jms.JmsConnectionFactory;

import javax.jms.*;

public class Producer {

    public static void main(String[] args) {
        String protocol = "amqp://120.25.242.46:5672";
        protocol = "failover:(amqp://120.24.52.100:5672,amqp://120.79.71.22:5672,amqp://47.106.142.44:5672)";
        new ProducerThread(protocol, "queue1").start();
    }

    static class ProducerThread extends Thread {
        String brokerUrl;
        String destinationUrl;
        public ProducerThread(String brokerUrl, String destinationUrl) {
            this.brokerUrl = brokerUrl;
            this.destinationUrl = destinationUrl;
        }

        public void run() {
            JmsConnectionFactory connectionFactory;
            Connection conn;
            Session session;
            try {
                // 1 创建连接工厂
                connectionFactory = new JmsConnectionFactory(null, null, brokerUrl);
                // 2 创建连接
                conn = connectionFactory.createConnection();
                conn.start();
                // 3 创建会话
                session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
                // 4 创建消息发送目标
                Destination destination = session.createQueue(destinationUrl);
                // 5 用亩的地创建消息生产者
                MessageProducer producer = session.createProducer(destination);
                // 6 设置递送模式
                producer.setDeliveryMode(DeliveryMode.PERSISTENT);
                producer.setPriority(7);
                // 7 通过producer 发送消息
                TextMessage textMessage = session.createTextMessage("11111111");
                producer.send(textMessage);
                session.close();
                conn.close();
            } catch(Exception e) {
                e.printStackTrace();
            }
        }
    }
}
import org.apache.qpid.jms.JmsConnectionFactory;

import javax.jms.*;

/**
 * 简单消费者
 */
// http://activemq.apache.org/consumer-features.html
public class Consumer {
    public static void main(String[] args) {
        String protocol = "amqp://120.25.242.46:5672";
        protocol = "failover:(amqp://120.24.52.100:5672,amqp://120.79.71.22:5672,amqp://47.106.142.44:5672)";
        new ConsumerThread(protocol, "queue1").start();
    }
}

class ConsumerThread extends Thread {

    String brokerUrl;
    String destinationUrl;

    public ConsumerThread(String brokerUrl, String destinationUrl) {
        this.brokerUrl = brokerUrl;
        this.destinationUrl = destinationUrl;
    }

    @Override
    public void run() {
        JmsConnectionFactory connectionFactory;
        Connection conn;
        Session session;
        MessageConsumer consumer;

        try {
            // brokerURL
            // http://activemq.apache.org/connection-configuration-uri.html
            // 1、创建连接工厂
            connectionFactory = new JmsConnectionFactory(null, null, this.brokerUrl);

            // 2、创建连接对象
            conn = connectionFactory.createConnection();
            conn.start(); // 一定要启动

            // 3、创建会话(可以创建一个或者多个session)
            session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);

            // 4、创建消息消费目标(Topic or Queue)
            Destination destination = session.createQueue(destinationUrl);

            // 5、创建消息消费者 http://activemq.apache.org/destination-options.html
            consumer = session.createConsumer(destination);

            // 6、异步接收消息
            consumer.setMessageListener(new MessageListener() {

                @Override
                public void onMessage(Message message) {
                    if (message instanceof TextMessage) {
                        try {
                            System.out.println("Time: " + System.currentTimeMillis() + " 收到文本消息:"
                                    + ((TextMessage) message).getText());
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    }
                    else {
                        System.out.println("message=" + message);
                    }
                }
            });
            try {
                // 担心收不到消息就关闭了,先睡眠一秒
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            consumer.close();
             session.close();
             conn.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}
<dependency>
            <groupId>org.apache.qpid</groupId>
            <artifactId>qpid-jms-client</artifactId>
            <version>0.37.0</version>
        </dependency>
        <dependency>
            <groupId>org.fusesource.mqtt-client</groupId>
            <artifactId>mqtt-client</artifactId>
            <version>1.12</version>
        </dependency>

tcp协议版本

package com.example.activemq.cluster_queue.tcp;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 简单生产者
 */
public class Producer {
    public static void main(String[] args) {
        String protocol = "failover:(tcp://120.24.52.100:61616,tcp://120.79.71.22:61616,tcp://47.106.142.44:61616)";
        new ProducerThread(protocol, "queue3").start();
    }

    static class ProducerThread extends Thread {
        String brokerUrl;
        String destinationUrl;

        public ProducerThread(String brokerUrl, String destinationUrl) {
            this.brokerUrl = brokerUrl;
            this.destinationUrl = destinationUrl;
        }

        @Override
        public void run() {
            ActiveMQConnectionFactory connectionFactory;
            Connection conn;
            Session session;

            try {
                // 1、创建连接工厂
                connectionFactory = new ActiveMQConnectionFactory(brokerUrl);

                // 2、创建连接
                conn = connectionFactory.createConnection();
                conn.start(); // 一定要start

                // 3、创建会话(可以创建一个或者多个session)
                session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);

                // 4、创建消息发送目标 (Topic or Queue)
                Destination destination = session.createQueue(destinationUrl);

                // 5、用目的地创建消息生产者
                MessageProducer producer = session.createProducer(destination);
                // 设置递送模式(持久化 / 不持久化)
                producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
                producer.setPriority(7);

                // 6、创建一条文本消息
                String text = "Hello world! From: " + Thread.currentThread().getName() + " : "
                        + System.currentTimeMillis();
                TextMessage message = session.createTextMessage(text);

                // 7、通过producer 发送消息
                System.out.println("Sent message: " + text);
                producer.send(message);

                // 8、 清理、关闭连接
                session.close();
                conn.close();
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
}

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 简单消费者
 */
// http://activemq.apache.org/failover-transport-reference.html
public class FailoverConsumer {
    public static void main(String[] args) throws Exception {
        String protocol = "failover:(tcp://120.24.52.100:61616,tcp://120.79.71.22:61616,tcp://47.106.142.44:61616)";
        new ConsumerThread(protocol, "queue3").start();
        System.in.read();
    }
}

class ConsumerThread extends Thread {

    String brokerUrl;
    String destinationUrl;

    public ConsumerThread(String brokerUrl, String destinationUrl) {
        this.brokerUrl = brokerUrl;
        this.destinationUrl = destinationUrl;
    }

    @Override
    public void run() {
        ActiveMQConnectionFactory connectionFactory;
        Connection conn;
        Session session;
        MessageConsumer consumer;

        try {
            // brokerURL
            // http://activemq.apache.org/connection-configuration-uri.html
            // 1、创建连接工厂
            connectionFactory = new ActiveMQConnectionFactory(this.brokerUrl);

            // 2、创建连接对象
            conn = connectionFactory.createConnection();
            conn.start(); // 一定要启动

            // 3、创建会话(可以创建一个或者多个session)
            session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);

            // 4、创建消息消费目标(Topic or Queue)
            Destination destination = session.createQueue(destinationUrl);

            // 5、创建消息消费者 http://activemq.apache.org/destination-options.html
            consumer = session.createConsumer(destination);

            // 6、异步接收消息
            consumer.setMessageListener(new MessageListener() {

                @Override
                public void onMessage(Message message) {
                    if (message instanceof TextMessage) {
                        try {
                            System.out.println(
                                    Thread.currentThread().getName() + " 收到文本消息:" + ((TextMessage) message).getText());
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    } else {
                        System.out.println(message);
                    }
                }
            });

            // consumer.close();
            // session.close();
            // conn.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}
<dependency>
            <groupId>org.apache.qpid</groupId>
            <artifactId>qpid-jms-client</artifactId>
            <version>0.37.0</version>
        </dependency>
        <dependency>
            <groupId>org.fusesource.mqtt-client</groupId>
            <artifactId>mqtt-client</artifactId>
            <version>1.12</version>
        </dependency>
上一篇下一篇

猜你喜欢

热点阅读