集群代码实现
2019-10-22 本文已影响0人
长孙俊明
amqp协议版本
import org.apache.qpid.jms.JmsConnectionFactory;
import javax.jms.*;
public class Producer {
public static void main(String[] args) {
String protocol = "amqp://120.25.242.46:5672";
protocol = "failover:(amqp://120.24.52.100:5672,amqp://120.79.71.22:5672,amqp://47.106.142.44:5672)";
new ProducerThread(protocol, "queue1").start();
}
static class ProducerThread extends Thread {
String brokerUrl;
String destinationUrl;
public ProducerThread(String brokerUrl, String destinationUrl) {
this.brokerUrl = brokerUrl;
this.destinationUrl = destinationUrl;
}
public void run() {
JmsConnectionFactory connectionFactory;
Connection conn;
Session session;
try {
// 1 创建连接工厂
connectionFactory = new JmsConnectionFactory(null, null, brokerUrl);
// 2 创建连接
conn = connectionFactory.createConnection();
conn.start();
// 3 创建会话
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4 创建消息发送目标
Destination destination = session.createQueue(destinationUrl);
// 5 用亩的地创建消息生产者
MessageProducer producer = session.createProducer(destination);
// 6 设置递送模式
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
producer.setPriority(7);
// 7 通过producer 发送消息
TextMessage textMessage = session.createTextMessage("11111111");
producer.send(textMessage);
session.close();
conn.close();
} catch(Exception e) {
e.printStackTrace();
}
}
}
}
import org.apache.qpid.jms.JmsConnectionFactory;
import javax.jms.*;
/**
* 简单消费者
*/
// http://activemq.apache.org/consumer-features.html
public class Consumer {
public static void main(String[] args) {
String protocol = "amqp://120.25.242.46:5672";
protocol = "failover:(amqp://120.24.52.100:5672,amqp://120.79.71.22:5672,amqp://47.106.142.44:5672)";
new ConsumerThread(protocol, "queue1").start();
}
}
class ConsumerThread extends Thread {
String brokerUrl;
String destinationUrl;
public ConsumerThread(String brokerUrl, String destinationUrl) {
this.brokerUrl = brokerUrl;
this.destinationUrl = destinationUrl;
}
@Override
public void run() {
JmsConnectionFactory connectionFactory;
Connection conn;
Session session;
MessageConsumer consumer;
try {
// brokerURL
// http://activemq.apache.org/connection-configuration-uri.html
// 1、创建连接工厂
connectionFactory = new JmsConnectionFactory(null, null, this.brokerUrl);
// 2、创建连接对象
conn = connectionFactory.createConnection();
conn.start(); // 一定要启动
// 3、创建会话(可以创建一个或者多个session)
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4、创建消息消费目标(Topic or Queue)
Destination destination = session.createQueue(destinationUrl);
// 5、创建消息消费者 http://activemq.apache.org/destination-options.html
consumer = session.createConsumer(destination);
// 6、异步接收消息
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if (message instanceof TextMessage) {
try {
System.out.println("Time: " + System.currentTimeMillis() + " 收到文本消息:"
+ ((TextMessage) message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
else {
System.out.println("message=" + message);
}
}
});
try {
// 担心收不到消息就关闭了,先睡眠一秒
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
consumer.close();
session.close();
conn.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-jms-client</artifactId>
<version>0.37.0</version>
</dependency>
<dependency>
<groupId>org.fusesource.mqtt-client</groupId>
<artifactId>mqtt-client</artifactId>
<version>1.12</version>
</dependency>
tcp协议版本
package com.example.activemq.cluster_queue.tcp;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* 简单生产者
*/
public class Producer {
public static void main(String[] args) {
String protocol = "failover:(tcp://120.24.52.100:61616,tcp://120.79.71.22:61616,tcp://47.106.142.44:61616)";
new ProducerThread(protocol, "queue3").start();
}
static class ProducerThread extends Thread {
String brokerUrl;
String destinationUrl;
public ProducerThread(String brokerUrl, String destinationUrl) {
this.brokerUrl = brokerUrl;
this.destinationUrl = destinationUrl;
}
@Override
public void run() {
ActiveMQConnectionFactory connectionFactory;
Connection conn;
Session session;
try {
// 1、创建连接工厂
connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
// 2、创建连接
conn = connectionFactory.createConnection();
conn.start(); // 一定要start
// 3、创建会话(可以创建一个或者多个session)
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4、创建消息发送目标 (Topic or Queue)
Destination destination = session.createQueue(destinationUrl);
// 5、用目的地创建消息生产者
MessageProducer producer = session.createProducer(destination);
// 设置递送模式(持久化 / 不持久化)
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
producer.setPriority(7);
// 6、创建一条文本消息
String text = "Hello world! From: " + Thread.currentThread().getName() + " : "
+ System.currentTimeMillis();
TextMessage message = session.createTextMessage(text);
// 7、通过producer 发送消息
System.out.println("Sent message: " + text);
producer.send(message);
// 8、 清理、关闭连接
session.close();
conn.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* 简单消费者
*/
// http://activemq.apache.org/failover-transport-reference.html
public class FailoverConsumer {
public static void main(String[] args) throws Exception {
String protocol = "failover:(tcp://120.24.52.100:61616,tcp://120.79.71.22:61616,tcp://47.106.142.44:61616)";
new ConsumerThread(protocol, "queue3").start();
System.in.read();
}
}
class ConsumerThread extends Thread {
String brokerUrl;
String destinationUrl;
public ConsumerThread(String brokerUrl, String destinationUrl) {
this.brokerUrl = brokerUrl;
this.destinationUrl = destinationUrl;
}
@Override
public void run() {
ActiveMQConnectionFactory connectionFactory;
Connection conn;
Session session;
MessageConsumer consumer;
try {
// brokerURL
// http://activemq.apache.org/connection-configuration-uri.html
// 1、创建连接工厂
connectionFactory = new ActiveMQConnectionFactory(this.brokerUrl);
// 2、创建连接对象
conn = connectionFactory.createConnection();
conn.start(); // 一定要启动
// 3、创建会话(可以创建一个或者多个session)
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4、创建消息消费目标(Topic or Queue)
Destination destination = session.createQueue(destinationUrl);
// 5、创建消息消费者 http://activemq.apache.org/destination-options.html
consumer = session.createConsumer(destination);
// 6、异步接收消息
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if (message instanceof TextMessage) {
try {
System.out.println(
Thread.currentThread().getName() + " 收到文本消息:" + ((TextMessage) message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
} else {
System.out.println(message);
}
}
});
// consumer.close();
// session.close();
// conn.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-jms-client</artifactId>
<version>0.37.0</version>
</dependency>
<dependency>
<groupId>org.fusesource.mqtt-client</groupId>
<artifactId>mqtt-client</artifactId>
<version>1.12</version>
</dependency>