RabbitMQ的基本概念

2020-09-06  本文已影响0人  pingwazi

有问题请联系我QQ:273206491

RabbitMQ是基于Erlang语言(俗称:二郎神)对AMQP协议的实现。

1、各个模块之间的一览图

image.png

2、连接

这里以Java的客户端进行说明,客户端与RabbitMQ服务器之间是基于TCP连接的,而TCP连接的创建和销毁都非常耗费资源,因此RabbitMQ使用连接复用模式,也就是我们常用的Channel,一个TCP连接可以创建多个Channel,不同Channel之间是互相独立的,一个线程使用一个Channel是安全的,不会出现多线程共享同一个连接的问题(线程共享一个资源很容易出现线程安全的问题)。

2.1、连接创建示例

//连接工厂的初始化
ConnectionFactory connectionFactory=new ConnectionFactory();
connectionFactory.setHost("rabbitmq服务器的地址");
connectionFactory.setUsername("用户名");
connectionFactory.setPassword("密码");
//通过连接工厂与rabbitmq之间建立一个tcp连接
Connection connection =connectionFactory.newConnection();
//通过连接创建信道,多个线程之间不要共享同一个信道
Channel channel = connection.createChannel();

2.2、资源释放问题

在连接不使用的时候及时关闭连接是非常重要的步骤,可能你在本地开发时不释放连接不会有什么问题,但一旦程序上线后,连接不释放很有很多导致RabbitMQ的连接因耗尽而无法接受新的连接请求或者其他什么问题。
释放连接的方式也非常简单,直接调用Connection对象的close方法可以释放一个连接,同时也会释放这个连接下的所有Channel资源。

3、生产者

用于生产消息,使用basicPublish向RabbitMQ发送消息

upChannel.basicPublish("msgExchange", "message", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("utf-8"));
///下面这两种方式是等价的
MessageProperties.PERSISTENT_TEXT_PLAIN
或者
AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties().builder();
properties.deliveryMode(2);
properties.contentType("text/plain");
properties.headers(null);//用于设置自己的header属性,交换器的类型中就有一种为headers(但不推荐使用),当我们设置的headers属性值和交换器绑定的值一致是就能够路由到响应队列中

4、交换器

交换器可以认为是一个消息中转站,他通过和队列进行绑定,我们把消息发送到交换器中,交换器根据路由key再决定将消息投递给哪个队列(交换器的类型为fanout时,路由key是无效的)。

4.1、类型

4.2、创建

//声明一个交换器,如果存在就不创建,如果存在的交换器参数与声明交换器参数不匹配就会报错,如果不存在就会创建
//这方式什么是同步的
upChannel.exchangeDeclare("msgExchange",BuiltinExchangeType.DIRECT,false);
//这种方式声明式异步的,但是不推荐使用,因为可能你在调用了之后就去使用的时候RabbitMQ服务器还没有创建好。
upChannel.exchangeDeclareNoWait();

4.3、判断交换器是否存在

//判断指定交换器是否存在,如果不存在就会报404异常。
upChannel.exchangeDeclarePassive();

4.4、删除交换器

// 同步删除
upChannel.exchangeDelete("",true);
//异步删除(不需要等待删除完成)
upChannel.exchangeDeleteNoWait("",true);

5、绑定路由key

//将交换器与队列进行绑定通过message进行绑定
upChannel.queueBind("msgQueue","msgExchange","message");
//将交换器与队列进行解绑
upChannel.queueUnbind("","","")
//将交换器与交换器绑定在一起
upChannel.exchangeBind( "","","")
//将交换器与交换器解绑
upChannel.exchangeUnbind( "","","")

绑定路由key是在绑定交换器和队列时指定的一个key,其中message就是masQueue队列与msgExchange交换器的绑定路由key。

路由key是发送消息的时候指定的key,交换器类型为direct或者topic时,路由key与绑定路由key后才会将消息发送到对应的队列中。

6、队列

队列是RabbitMQ实际存储消息的地方

//声明一个名字为msgQueue、持久化、不排他、不自动删除的队列,如果此队列已经存在就不再创建
upChannel.queueDeclare("msgQueue", true, false, false, null);
//异步声明一个队列,不推荐使用,原因同交换器一样
upChannel.queueDeclareNoWait();

6.1、判断队列是否存在

//判断队列是否存在
upChannel.queueDeclarePassive();

6.2、删除队列

//同步删除
upChannel.queueDelete("",true,true);
//异步删除
upChannel.queueDeleteNoWait("",true,true);

7、消费者

消息的消费者有两种常用的模式,即推模式和拉模式。两种模式的实现方式完全不同,推模式是只要队列中有消息了就会推送给消费者(当然了也要受未确认消息数的限制),而拉模式则是消费者需要的时候再去RabbitMQ中获取,而他一次也只能获取获取一条消息。

7.1、拉模式

image.png

从图中可以看出,在单线程的情况下,消息的处理速度是比较慢的,当然了这里也可以使用多线程不断的从Rabbitmq中去获取,但这样就需要手动实现获取算法了。不废话,先上代码!

private void receiveGetMessage()
    {
        try
        {
            Channel downChannel=connection.createChannel();
            //交换器类型:fanout、direct、topic
            //声明一个名字为msgExchange、类型为direct并且持久化的交换器,如果交换器已经存在就不再创建
            downChannel.exchangeDeclare("msgExchange",BuiltinExchangeType.DIRECT,false);
            //声明一个名字为msgQueue、持久化、不排他、不自动删除的队列,如果此队列已经存在就不再创建
            downChannel.queueDeclare("msgQueue", true, false, false, null);
            //将交换器与队列进行绑定通过message进行绑定
            downChannel.queueBind("msgQueue","msgExchange","message");
            //消息未确认消息的数量
            downChannel.basicQos(1);//在非自动确认的模式下,限制最多允许未确认的消息数量
            boolean isBreak=false;
            while (!isBreak)
            {
                //消费消息
                GetResponse msgData = downChannel.basicGet("", false);
                String msgBody=new String(msgData.getBody(), "utf-8");
                System.out.println(Thread.currentThread().getId()+"RabbitMQ拉模式消费者收到消息: " + msgBody);
                //回复确认消息
                downChannel.basicAck(msgData.getEnvelope().getDeliveryTag(),false);
                if(StringUtils.isEmpty(msgBody))
                    isBreak=true;
            }
            downChannel.close();
        }
        catch (ShutdownSignalException ex)
        {
            //连接异常关闭了,这里要进行检查,并尝试重新建立连接
            ex.printStackTrace();
        }
        catch (IOException ex)
        {
            //发生io异常需要进行处理,对应channel可能关闭了
            ex.printStackTrace();
        } catch (TimeoutException e) {
            //信道资源释放超时,可能对应的channel关闭了
            e.printStackTrace();
        }
    }

以上代码我是通过但线程循环的方式从RabbitMQ中拉取代码,这种模式处理速度较慢,在不是用多线程进行处理的情况下,这中模式适合用于处理单个消息比较耗时的场景。

7.2、推模式

image.png

通过运行如下代码可以看得出,推模式实际上是使用了多线程的在进行处理的。但是他的吞吐量是默认拉模式的好几倍,这中模式适合于处理每个消息的时间比较短的场景。

private void receivePushMessage()
    {
        try
        {
            Channel downChannel=connection.createChannel();
            //交换器类型:fanout、direct、topic
            //声明一个名字为msgExchange、类型为direct并且持久化的交换器,如果交换器已经存在就不再创建
            downChannel.exchangeDeclare("msgExchange",BuiltinExchangeType.DIRECT,false);
            //声明一个名字为msgQueue、持久化、不排他、不自动删除的队列,如果此队列已经存在就不再创建
            downChannel.queueDeclare("msgQueue", true, false, false, null);
            //将交换器与队列进行绑定通过message进行绑定
            downChannel.queueBind("msgQueue","msgExchange","message");
            //消息未确认消息的数量
            downChannel.basicQos(10000);//在非自动确认的模式下,限制最多允许未确认的消息数量
            //消费消息
            downChannel.basicConsume("msgQueue",createConsumer(downChannel));
            System.out.println("RabbitMQ消费者正在运行中...");
            //不能释放信道资源!!!
            //因为这里的消费者是用的推模式,如果关闭了信道,后面在进行消息消费的时候会报错
            //downChannel.close();
        }
        catch (ShutdownSignalException ex)
        {
            //连接异常关闭了,这里要进行检查,并尝试重新建立连接
            ex.printStackTrace();
        }
        catch (IOException ex)
        {
            //发生io异常需要进行处理,对应channel可能关闭了
            ex.printStackTrace();
        }
    }

    /**
     * 创建消费对象
     * @param channel
     * @return
     */
    private Consumer createConsumer(Channel channel)
    {
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
                String message = new String(body);
                System.out.println(Thread.currentThread().getId()+"RabbitMQ推模式消费者收到消息: " + message);
                // 消息确认
                try {
                    channel.basicAck(envelope.getDeliveryTag(), false);//手动确认消息
                } catch (IOException e) {
                    //发生io异常需要进行处理,对应channel可能关闭了
                    e.printStackTrace();
                }
            }
        };
        return consumer;
    }
上一篇 下一篇

猜你喜欢

热点阅读