RabbitMQ入门和基本模型

2021-10-09  本文已影响0人  Raral

RabbitMQ

安装

  1. 一般来说安装 RabbitMQ 之前要安装 Erlang ,可以去Erlang官网下载。接着去RabbitMQ官网下载安装包,之后解压缩即可。根据操作系统不同官网提供了相应的安装说明:Windows、Debian / Ubuntu、RPM-based Linux、Mac

如果是Mac 用户,个人推荐使用 HomeBrew 来安装,安装前要先更新 brew:

brew update
brew install rabbitmq
  1. 启动
    启动很简单,找到安装后的 RabbitMQ 所在目录下的 sbin 目录,可以看到该目录下有6个以 rabbitmq 开头的可执行文件,直接执行 rabbitmq-server 即可,下面将 RabbitMQ 的安装位置以 . 代替,启动命令就是:
./sbin/rabbitmq-server

#如果电脑已安装查看目录
[root@VM-0-2-centos mq]# find / -name rabbitmq-server
/etc/logrotate.d/rabbitmq-server
/usr/sbin/rabbitmq-server
/usr/lib/rabbitmq/bin/rabbitmq-server
/usr/lib/rabbitmq/lib/rabbitmq_server-3.7.18/sbin/rabbitmq-server
/usr/lib/ocf/resource.d/rabbitmq/rabbitmq-server

#后台启动
./sbin/rabbitmq-server -detached

  1. 浏览器查看 http://ip:15672/ 登录即可 默认账户密码:guest
  2. 设置用户和密码(https://www.cnblogs.com/whs123/p/14184317.html)
[root@VM-0-2-centos sbin]# ./rabbitmqctl add_user xx admin123
Adding user "xxx" ...
[root@VM-0-2-centos sbin]# ./rabbitmqctl set_user_tags xxxadministrator
Setting tags for user "xxx" to [administrator] ...

rabbitmq使用

六种消费模型:

  1. P - Q - C(基本消息模型)
    P:生产者,也就是要发送消息的程序

C:消费者:消息的接受者,会一直等待消息到来。
queue:消息队列,图中红色部分。可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.7.2</version>
</dependency>
public class RabbitMqUtils {
    private static ConnectionFactory connectionFactory;
    static {
        //重量级类常见,在类加载创建一次
         connectionFactory = new ConnectionFactory();

    }

    // 定义创建连接
   public static Connection getConnection() {


       //获取连接对象
       try {
           //创建mq连接工厂对象
           connectionFactory.setHost("x.y.z.j");
           connectionFactory.setPort(5672);
           connectionFactory.setVirtualHost("/gzsz");//设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq
           connectionFactory.setUsername("lg123");
           connectionFactory.setPassword("lg123");
           Connection connection = connectionFactory.newConnection();
           return connection;
       } catch (IOException e) {
           e.printStackTrace();
       } catch (TimeoutException e) {
           e.printStackTrace();
       }
       return null;
   }

   //关闭通道,关闭连接
    public static void closeChannel(Channel channel, Connection connection) {
        try {
            channel.close();
            connection.close();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }

    //
}

public class Provider {

    private final static String QUEUE_NAME = "simple_queue";

    public void sendMessage() throws IOException, TimeoutException {
        // 1. 获取链接
        Connection connection = RabbitMqUtils.getConnection();
        // 2. 获取通道对象
        Channel channel = connection.createChannel();
        // 3. 通道绑到对应消息队列
        //参数:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
        /**
         * 参数明细
         * 1、queue 队列名称
         * 2、durable 是否持久化,如果持久化,mq重启后队列还在
         * 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建
         * 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)
         * 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间
         */
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);

        // 4.通过通道向指定的队列发布消息
        //参数:String exchange, String routingKey, BasicProperties props, byte[] body
        /**
         * 参数明细:
         * 1、exchange,交换机,如果不指定将使用mq的默认交换机(设置为"")
         * 2、routingKey,路由key,交换机根据路由key来将消息转发到指定的队列,如果使用默认交换机,routingKey设置为队列的名称
         * 3、props,消息的属性
         * 4、body,消息内容
         */
        String msg = "hello rabbitmq";
        channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
        System.out.println("发送消息:" + msg.getBytes());
        // 5. 关闭通道和链接
        RabbitMqUtils.closeChannel(channel,connection);
    }

    public static void main(String[] args) throws IOException, TimeoutException {
        Provider provider = new Provider();
        provider.sendMessage();
    }
}
public class Customer {
    private final static String QUEUE_NAME = "simple_queue";

    public void acceptMessage() throws IOException, TimeoutException {
        Connection connection = RabbitMqUtils.getConnection();

        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME,false,false,false,null);


        //消费消息
        //参数1: 队列名称;参数2:开启消息自动确认机制
        //参数3: 消费完成回调接口
        channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) {

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("body =" + new String(body));
            }

        });

//        channel.close();
//        connection.close();

    }

    public static void main(String[] args) throws IOException, TimeoutException {
        Customer customer = new Customer();
        customer.acceptMessage();
    }

}

消息确认机制(ACK)
通过刚才的案例可以看出,消息一旦被消费者接收,队列中的消息就会被删除。

那么问题来了:RabbitMQ怎么知道消息被接收了呢?

如果消费者领取消息后,还没执行操作就挂掉了呢?或者抛出了异常?消息消费失败,但是RabbitMQ无从得知,这样消息就丢失了!

因此,RabbitMQ有一个ACK机制。当消费者获取消息后,会向RabbitMQ发送回执ACK,告知消息已经被接收。不过这种回执ACK分两种情况:

自动ACK:消息一旦被接收,消费者自动发送ACK

手动ACK:消息接收后,不会发送ACK,需要手动调用

大家觉得哪种更好呢?

这需要看消息的重要性:

如果消息不太重要,丢失也没有影响,那么自动ACK会比较方便

如果消息非常重要,不容丢失。那么最好在消费完成后手动ACK,否则接收消息后就自动ACK,RabbitMQ就会把消息从队列中删除。如果此时消费者宕机,那么消息就丢失了。

我们之前的测试都是自动ACK的,如果要手动ACK,需要改动我们的代码:
消费者:

public class Customer {
    private final static String QUEUE_NAME = "simple_queue";

    public void acceptMessage() throws IOException, TimeoutException {
        Connection connection = RabbitMqUtils.getConnection();

        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME,false,false,false,null);


        //消费消息
        //参数1: 队列名称;参数2:开启消息自动确认机制
        //参数3: 消费完成回调接口
        channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel) {

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者: body =" + new String(body));
                // 手动进行ACK
                /*
                 *  void basicAck(long deliveryTag, boolean multiple) throws IOException;
                 *  deliveryTag:用来标识消息的id
                 *  multiple:是否批量.true:将一次性ack所有小于deliveryTag的消息。
                 */

                channel.basicAck(envelope.getDeliveryTag(), false);
            }

        });

//        channel.close();
//        connection.close();
    }

    public static void main(String[] args) throws IOException, TimeoutException {
        Customer customer = new Customer();
        customer.acceptMessage();
    }

}
  1. p -q- [c1, c2 ] (work消息模型,竞争消息模型)
    P:生产者:任务的发布者

C1:消费者1:领取任务并且完成任务,假设完成速度较慢(模拟耗时)

C2:消费者2:领取任务并且完成任务,假设完成速度较快

public class Provider {
    private final static String QUEUE_NAME = "work_queue";

    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMqUtils.getConnection();
        Channel channel = connection.createChannel();
        //通道声明队列
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);
        //发布消息
        for (int i = 1; i <= 10; i++) {
            channel.basicPublish("",QUEUE_NAME, null, ("task..." + i).getBytes());
            System.out.println("生产者-发送消息:"+ "task..." + i);
        }
        RabbitMqUtils.closeChannel(channel,connection);
    }
}

public class Customer1 {
    private final static String QUEUE_NAME = "work_queue";
    public void acceptMessage() throws IOException, TimeoutException {
        Connection connection = RabbitMqUtils.getConnection();

        Channel channel = connection.createChannel();
//        channel.basicQos(1);
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);

        channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel) {

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1:" + new String(body));
                //手动确认
                channel.basicAck(envelope.getDeliveryTag(), false);
            }

        });

//        channel.close();
//        connection.close();

    }

    public static void main(String[] args) throws IOException, TimeoutException {
        Customer1 customer1 = new Customer1();
        customer1.acceptMessage();
        System.out.println("消费者1已启动");
    }
}

public class Customer2 {
    private final static String QUEUE_NAME = "work_queue";
    public void acceptMessage() throws IOException, TimeoutException {
        Connection connection = RabbitMqUtils.getConnection();

        Channel channel = connection.createChannel();
        // 不平均分配
//        channel.basicQos(1);
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);


        //参数2:消息自动确认
        channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) {

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("消费者2:" + new String(body));
//                channel.basicAck(envelope.getDeliveryTag(), false);
            }

        });

//        channel.close();
//        connection.close();

    }

    public static void main(String[] args) throws IOException, TimeoutException {
        Customer2 customer2 = new Customer2();
        customer2.acceptMessage();
        System.out.println("消费者2已启动");
    }
}

能者多劳
刚才的实现有问题吗?

消费者1比消费者2的效率要低,一次任务的耗时较长

然而两人最终消费的消息数量是一样的

消费者1大量时间处于空闲状态,消费者2一直忙碌

现在的状态属于是把任务平均分配,正确的做法应该是消费越快的人,消费的越多。
怎么实现呢?

通过 BasicQos 方法设置prefetchCount = 1。这样RabbitMQ就会使得每个Consumer在同一个时间点最多处理1个Message。换句话说,在接收到该Consumer的ack前,他它不会将新的Message分发给它。相反,它会将其分派给不是仍然忙碌的下一个Consumer。

值得注意的是:prefetchCount在手动ack的情况下才生效,自动ack不生效。

public class Customer1 {
    private final static String QUEUE_NAME = "work_queue";
    public void acceptMessage() throws IOException, TimeoutException {
        Connection connection = RabbitMqUtils.getConnection();

        Channel channel = connection.createChannel();
        //prefetchCount在手动ack的情况下才生效,自动ack不生效
        channel.basicQos(1);
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);

        channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel) {

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1:" + new String(body));
                //手动确认
                channel.basicAck(envelope.getDeliveryTag(), false);
            }

        });

//        channel.close();
//        connection.close();

    }

    public static void main(String[] args) throws IOException, TimeoutException {
        Customer1 customer1 = new Customer1();
        customer1.acceptMessage();
        System.out.println("消费者1已启动");
    }
}

订阅模型分类
说明下:

1、一个生产者多个消费者
2、每个消费者都有一个自己的队列
3、生产者没有将消息直接发送给队列,而是发送给exchange(交换机、转发器)
4、每个队列都需要绑定到交换机上
5、生产者发送的消息,经过交换机到达队列,实现一个消息被多个消费者消费
例子:注册->发邮件、发短信

X(Exchanges):交换机一方面:接收生产者发送的消息。另一方面:知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。

Exchange类型有以下几种:

Fanout:广播,将消息交给所有绑定到交换机的队列

Direct:定向,把消息交给符合指定routing key 的队列

Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列

Header:header模式与routing不同的地方在于,header模式取消routingkey,使用header中的 key/value(键值对)匹配队列。

Header模式不展开了,感兴趣可以参考这篇文章https://blog.csdn.net/zhu_tianwei/article/details/40923131
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

  1. P-X-[q1,q2]-[c1,c2] (Publish/subscribe(交换机类型:Fanout,也称为广播 ))
    和前面两种模式不同:

1) 声明Exchange,不再声明Queue

2) 发送消息到Exchange,不再发送到Queue

public class Provider {
    private final static String EXCHANGE_NAME = "test_fanout_exchange";

    public static void main(String[] args) throws IOException {
        Connection connection = RabbitMqUtils.getConnection();
        Channel channel = connection.createChannel();
        // 通道声明到交互机
        //参数1: 交互机名称; 参数2: 交换机类型: fanout
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        String msg = "注册成功";
        channel.basicPublish(EXCHANGE_NAME,"", null, msg.getBytes());

        //发布消息
//        for (int i = 1; i <= 10; i++) {
//            channel.basicPublish("logs","", null, (i + "fanout").getBytes());
//
//        }
        System.out.println("生产者:发送消息:" + msg.getBytes());
        RabbitMqUtils.closeChannel(channel,connection);
    }
}

public class Customer1 {
    private final static String EXCHANGE_NAME = "test_fanout_exchange";// 交换机
    private final static String QUEUE_NAME = "fanout_exchange_queue_sms";// 发送短信队列
    public void acceptMessage() throws IOException, TimeoutException {
        Connection connection = RabbitMqUtils.getConnection();
        Channel channel = connection.createChannel();
//        channel.exchangeDeclare("logs","fanout");
//        String queueName = channel.queueDeclare().getQueue();
        //声明队列名称
        channel.queueDeclare(QUEUE_NAME, false,false,false,null);
        //绑定队列到交换机
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
        channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1【短信服务】:" + new String(body));
            }
        });

    }

    public static void main(String[] args) throws IOException, TimeoutException {
        Customer1 customer1 = new Customer1();
        customer1.acceptMessage();
    }
}


public class Customer2 {
    private final static String EXCHANGE_NAME = "test_fanout_exchange";// 交换机
    private final static String QUEUE_NAME = "fanout_exchange_queue_email";// 发送邮件队列
    public void acceptMessage() throws IOException, TimeoutException {
        Connection connection = RabbitMqUtils.getConnection();
        Channel channel = connection.createChannel();
//        channel.exchangeDeclare("logs","fanout");
//        String queueName = channel.queueDeclare().getQueue();
        //声明队列名称
        channel.queueDeclare(QUEUE_NAME, false,false,false,null);
        //绑定队列到交换机
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
        channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者2【邮件服务】:" + new String(body));
            }
        });

    }

    public static void main(String[] args) throws IOException, TimeoutException {
        Customer2 customer1 = new Customer2();
        customer1.acceptMessage();
    }
}


思考
1、publish/subscribe与work queues有什么区别。

区别:

1)work queues不用定义交换机,而publish/subscribe需要定义交换机。

2)publish/subscribe的生产方是面向交换机发送消息,work queues的生产方是面向队列发送消息(底层使用默认交换机)。

3)publish/subscribe需要设置队列和交换机的绑定,work queues不需要设置,实际上work queues会将队列绑定到默认的交换机 。

相同点:

所以两者实现的发布/订阅的效果是一样的,多个消费端监听同一个队列不会重复消费消息。
2、实际工作用 publish/subscribe还是work queues。

建议使用 publish/subscribe,发布订阅模式比工作队列模式更强大(也可以做到同一队列竞争),并且发布订阅模式可以指定自己专用的交换机。

分享两道面试题
面试题:

避免消息堆积?

1) 采用workqueue,多个消费者监听同一队列。

2)接收到消息以后,而是通过线程池,异步消费。

如何避免消息丢失?

1) 消费者的ACK机制。可以防止消费者丢失消息。

但是,如果在消费者消费之前,MQ就宕机了,消息就没了?

2)可以将消息进行持久化。要将消息持久化,前提是:队列、Exchange都持久化

接下来几种模式参考

上一篇 下一篇

猜你喜欢

热点阅读