ActiveMQ
2018-10-26 本文已影响0人
lfffasd
1.pom依赖
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.11.1</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-web</artifactId>
<version>5.11.1</version>
</dependency>
</dependencies>
2.启动ActiveMQ
http://localhost:8161/admin/ 进入其web界面
其中Queues是点对点发送,Topics是订阅多方发送
image.png
3、activeMQ的生产者
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* Created by peaimage on 2018/10/26.
*/
public class JMSProducer {
private static final String USERNAME= ActiveMQConnection.DEFAULT_USER;//默认连接用户
private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD;//默认连接密码
private static final String BROKERURL=ActiveMQConnection.DEFAULT_BROKER_URL;//默认连接路径
private static final int SENDNUM=10;
public static void main(String[] args) {
ConnectionFactory connectionFactory;//连接工厂
Connection connection=null;//连接
Session session;//发送或者接收消息的线程
Destination destination;//消息目的地
MessageProducer messageProducer;//消息生产者
//实例化连接工厂
connectionFactory=new ActiveMQConnectionFactory(JMSProducer.USERNAME,JMSProducer.PASSWORD,JMSProducer.BROKERURL);
try {
connection=connectionFactory.createConnection();//通过连接工厂获取连接
connection.start();//启动连接
session=connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);//创建Session
destination=session.createQueue("FirstQueue1");//创建消息队列
// destination=session.createTopic("FirstTopic1"); //创建订阅消息
messageProducer=session.createProducer(destination);//创建消息生产者
sendMessage(session,messageProducer);
session.commit();
}catch (Exception e){
e.printStackTrace();
}finally {
if (connection!=null){
try{
connection.close();
}catch (Exception e){
e.printStackTrace();
}
}
}
}
public static void sendMessage(Session session,MessageProducer messageProducer){
for (int i=0;i<JMSProducer.SENDNUM;i++){
try {
TextMessage message= session.createTextMessage("ActiveMQ发送的消息"+i);
messageProducer.send(message);
System.out.println("发送消息"+i);
}catch (Exception e){
e.printStackTrace();
}
}
}
}
4、activeMQ的消费者
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* Created by peaimage on 2018/10/26.
*/
public class JMSConsumer {
private static final String USERNAME= ActiveMQConnection.DEFAULT_USER;//默认连接用户
private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD;//默认连接密码
private static final String BROKERURL=ActiveMQConnection.DEFAULT_BROKER_URL;//默认连接路径
private static final int SENDNUM=10;
public static void main(String[] args){
ConnectionFactory connectionFactory;//连接工厂
Connection connection=null;//连接
Session session;//发送或者接收消息的线程
Destination destination;//消息目的地
MessageConsumer messageConsumer;//消息消费者
try {
connectionFactory=new ActiveMQConnectionFactory(JMSConsumer.USERNAME,JMSConsumer.PASSWORD,JMSConsumer.BROKERURL);//实例化工厂
connection=connectionFactory.createConnection();//创建连接
connection.start();//启动连接
session=connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);//创建session
destination=session.createQueue("FirstQueue1");//创建连接队列
// destination=session.createTopic("FirstTopic1");创建订阅消息
messageConsumer=session.createConsumer(destination);//创建消息消费者
// 直接进行消息消费
// while (true){
// TextMessage textMessage= (TextMessage)messageConsumer.receive(1000000);
// if (textMessage!=null){
// System.out.println(textMessage.getText());
// }else {
// break;
// }
// }
messageConsumer.setMessageListener(new Listenr());//创建消息监听
}catch (Exception e){
e.printStackTrace();
}
}
}
5、消息监听
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
/**
* Created by peaimage on 2018/10/26.
*/
public class Listenr implements MessageListener {
public void onMessage(Message message) {
try {
System.out.println("收到的消息"+((TextMessage)message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}