java

Maven配置rabbitMQ实际使用utils工具类

2021-09-09  本文已影响0人  yichen_china

maven配置文件
settings.xml 新增仓库路径

   <!-- rabbitmq仓库 -->
    <mirror>
        <id>rabbitmq</id>
        <mirrorOf>central</mirrorOf>
        <name>aliyun maven</name>
        <url>https://mvnrepository.com/artifact/com.rabbitmq/amqp-client/</url>
    </mirror>
    <!-- 中央仓库1 -->

项目pom文件 新增

    <dependencies>
        <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.13.1</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.25</version>
            <scope>test</scope>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.9</version>
        </dependency>
    </dependencies>
log4j.rootLogger=DEBUG,A1 log4j.logger.com.taotao = DEBUG log4j.logger.org.mybatis = DEBUGlog4j.appender.A1=org.apache.log4j.ConsoleAppenderlog4j.appender.A1.layout=org.apache.log4j.PatternLayoutlog4j.appender.A1.layout.ConversionPattern=%-d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c]-[%p] %m%n

java创建MQ连接帮助类

package com.beiyao.common.rabbitmq;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ConnectionRabbitmq {
    public static Connection getConnection() throws IOException, TimeoutException {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2.在工厂对象中设置MQ的连接信息(ip,port,virtualhost,username,password)
        factory.setHost("47.96.11.185");
        factory.setPort(5672);
        factory.setVirtualHost("host1");
        factory.setUsername("ytao");
        factory.setPassword("admin123");
        //3.通过工厂对象获取与MQ的链接
        Connection connection = factory.newConnection();
        return connection;
    }

}

消息生产者发送消息

package com.qfedu.mq.service;

import com.qfedu.mq.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class SendMsg {

    public static void main(String[] args) throws Exception{

        String msg = "Hello HuangDaoJun!";
        Connection connection = ConnectionUtil.getConnection();   
        Channel channel = connection.createChannel();    

        //定义队列(使用Java代码在MQ中新建一个队列)
        //参数1:定义的队列名称
        //参数2:队列中的数据是否持久化(如果选择了持久化)
        //参数3: 是否排外(当前队列是否为当前连接私有)
        //参数4:自动删除(当此队列的连接数为0时,此队列会销毁(无论队列中是否还有数据))
        //参数5:设置当前队列的参数
        //channel.queueDeclare("queue7",false,false,false,null);

        //参数1:交换机名称,如果直接发送信息到队列,则交换机名称为""
        //参数2:目标队列名称
        //参数3:设置当前这条消息的属性(设置过期时间 10)
        //参数4:消息的内容
        channel.basicPublish("","queue7",null,msg.getBytes());
        System.out.println("发送:" + msg);

        channel.close();
        connection.close();
    }

}

消息生产者发送消息

package com.qfedu.mq.service;

import com.qfedu.mq.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class SendMsg {

    public static void main(String[] args) throws Exception{

        String msg = "Hello HuangDaoJun!";
        Connection connection = ConnectionUtil.getConnection();   
        Channel channel = connection.createChannel();    

        //定义队列(使用Java代码在MQ中新建一个队列)
        //参数1:定义的队列名称
        //参数2:队列中的数据是否持久化(如果选择了持久化)
        //参数3: 是否排外(当前队列是否为当前连接私有)
        //参数4:自动删除(当此队列的连接数为0时,此队列会销毁(无论队列中是否还有数据))
        //参数5:设置当前队列的参数
        //channel.queueDeclare("queue7",false,false,false,null);

        //参数1:交换机名称,如果直接发送信息到队列,则交换机名称为""
        //参数2:目标队列名称
        //参数3:设置当前这条消息的属性(设置过期时间 10)
        //参数4:消息的内容
        channel.basicPublish("","queue7",null,msg.getBytes());
        System.out.println("发送:" + msg);

        channel.close();
        connection.close();
    }

}

消息生产者发送消息

package com.qfedu.mq.service;

import com.qfedu.mq.utils.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class SendMsg {

    public static void main(String[] args) throws Exception{

        String msg = "Hello HuangDaoJun!";
        Connection connection = ConnectionUtil.getConnection();   
        Channel channel = connection.createChannel();    

        //定义队列(使用Java代码在MQ中新建一个队列)
        //参数1:定义的队列名称
        //参数2:队列中的数据是否持久化(如果选择了持久化)
        //参数3: 是否排外(当前队列是否为当前连接私有)
        //参数4:自动删除(当此队列的连接数为0时,此队列会销毁(无论队列中是否还有数据))
        //参数5:设置当前队列的参数
        //channel.queueDeclare("queue7",false,false,false,null);

        //参数1:交换机名称,如果直接发送信息到队列,则交换机名称为""
        //参数2:目标队列名称
        //参数3:设置当前这条消息的属性(设置过期时间 10)
        //参数4:消息的内容
        channel.basicPublish("","queue7",null,msg.getBytes());
        System.out.println("发送:" + msg);

        channel.close();
        connection.close();
    }

}
5.1.2 消息消费者
package com.qfedu.mq.service;

import com.qfedu.mq.utils.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ReceiveMsg {

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, 
                        AMQP.BasicProperties properties, byte[] body) throws IOException {
                //body就是从队列中获取的数据
                String msg = new String(body);
                System.out.println("接收:"+msg);
            }
        };

        channel.basicConsume("queue1",true,consumer);
    }
}

5.2 工作模式---- 一个生产者多个消费者
5.2.1 发送者

public class SendMsg {

    public static void main(String[] args) throws Exception{
        System.out.println("请输入消息:");
        Scanner scanner = new Scanner(System.in);
        String msg = null;
        while(!"quit".equals(msg = scanner.nextLine())){
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();

            channel.basicPublish("","queue2",null,msg.getBytes());
            System.out.println("发送:" + msg);

            channel.close();
            connection.close();
        }
    }

}

5.2.2 消费者1

public class ReceiveMsg {

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //body就是从队列中获取的数据
                String msg = new String(body);
                System.out.println("Consumer1接收:"+msg);
                if("wait".equals(msg)){
                    try {
                        Thread.sleep(10000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        };

        channel.basicConsume("queue2",true,consumer);
    }
}

5.2.3 消费者2

public class ReceiveMsg {

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //body就是从队列中获取的数据
                String msg = new String(body);
                System.out.println("Consumer2接收:"+msg);
            }
        };

        channel.basicConsume("queue2",true,consumer);
    }
}

5.3 订阅模式
5.3.1 发送者 发送消息到交换机

public class SendMsg {

    public static void main(String[] args) throws Exception{
        System.out.println("请输入消息:");
        Scanner scanner = new Scanner(System.in);
        String msg = null;
        while(!"quit".equals(msg = scanner.nextLine())){
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();

            channel.basicPublish("ex1","",null,msg.getBytes());
            System.out.println("发送:" + msg);

            channel.close();
            connection.close();
        }
    }

}

5.3.2 消费者1

public class ReceiveMsg1 {

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //body就是从队列中获取的数据
                String msg = new String(body);
                System.out.println("Consumer1接收:"+msg);
                if("wait".equals(msg)){
                    try {
                        Thread.sleep(10000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        };

        channel.basicConsume("queue3",true,consumer);
    }
}

5.3.3 消费者2

public class ReceiveMsg2 {

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //body就是从队列中获取的数据
                String msg = new String(body);
                System.out.println("Consumer2接收:"+msg);
            }
        };

        channel.basicConsume("queue4",true,consumer);
    }
}

5.4 路由模式
5.4.1 发送者 发送消息到交换机

public class SendMsg {

    public static void main(String[] args) throws Exception{
        System.out.println("请输入消息:");
        Scanner scanner = new Scanner(System.in);
        String msg = null;
        while(!"quit".equals(msg = scanner.nextLine())){
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();

            if(msg.startsWith("a")){
                channel.basicPublish("ex2","a",null,msg.getBytes());
            }else if(msg.startsWith("b")){
                channel.basicPublish("ex2","b",null,msg.getBytes());
            }
            System.out.println("发送:" + msg);

            channel.close();
            connection.close();
        }
    }

}

5.4.2 消费者1

public class ReceiveMsg1 {

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //body就是从队列中获取的数据
                String msg = new String(body);
                System.out.println("Consumer1接收:"+msg);
                if("wait".equals(msg)){
                    try {
                        Thread.sleep(10000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        };

        channel.basicConsume("queue5",true,consumer);
    }
}

5.4.3 消费者2

public class ReceiveMsg2 {

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //body就是从队列中获取的数据
                String msg = new String(body);
                System.out.println("Consumer2接收:"+msg);
            }
        };

        channel.basicConsume("queue6",true,consumer);
    }
}
上一篇下一篇

猜你喜欢

热点阅读