消息队列之RabbitMQ-安装与配置

2020-07-15  本文已影响0人  冰河winner

1、安装

RabbitMQ安装好以后,进入路径/usr/local/Cellar/rabbitmq/3.8.3/sbin,里面有很多命令:

> $  cd /usr/local/Cellar/rabbitmq/3.8.3/sbin
> $  ls

cuttlefish           
rabbitmq-diagnostics 
rabbitmq-plugins     
rabbitmq-server      
rabbitmqadmin
rabbitmq-defaults    
rabbitmq-env         
rabbitmq-queues      
rabbitmq-upgrade     
rabbitmqctl

例如,使用rabbitmq-plugins开启控制台插件:

> $ ./rabbitmq-plugins enable rabbitmq_management ##开启控制台插件

Enabling plugins on node rabbit@localhost:
rabbitmq_management
The following plugins have been configured:
  rabbitmq_amqp1_0
  rabbitmq_management
  rabbitmq_management_agent
  rabbitmq_mqtt
  rabbitmq_stomp
  rabbitmq_web_dispatch
Applying plugin configuration to rabbit@localhost...
Plugin configuration unchanged.

使用rabbitmqctl添加账号:

> $ ./rabbitmqctl add_user admin admin ## 添加账号
Adding user "admin" ...

> $ ./rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*" ## 添加访问权限
Setting permissions for user "admin" in vhost "/" ...

> $ ./rabbitmqctl set_user_tags admin administrator ## 设置超级权限
Setting tags for user "admin" to [administrator] ...

使用rabbitmq-server命令启动:

> $ ./rabbitmq-server ## 启动RabbitMQ

  ##  ##      RabbitMQ 3.8.3
  ##  ##
  ##########  Copyright (c) 2007-2020 Pivotal Software, Inc.
  ######  ##
  ##########  Licensed under the MPL 1.1. Website: https://rabbitmq.com

  Doc guides: https://rabbitmq.com/documentation.html
  Support:    https://rabbitmq.com/contact.html
  Tutorials:  https://rabbitmq.com/getstarted.html
  Monitoring: https://rabbitmq.com/monitoring.html

  Logs: /usr/local/var/log/rabbitmq/rabbit@localhost.log
        /usr/local/var/log/rabbitmq/rabbit@localhost_upgrade.log

  Config file(s): (none)

  Starting broker... completed with 6 plugins.

然后打开控制台'http://localhost:15672/',默认的账号密码为'guest/guest'。登录后的页面如下:

1.png

2、代码实例

   //消息队列名称
    private final static String QUEUE_NAME = "hello";

    @Test
    public void send () throws java.io.IOException, TimeoutException {

        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");
      
        //创建连接
        Connection connection = factory.newConnection();

        //创建消息通道
        Channel channel = connection.createChannel();

        //生成一个消息队列
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

        for (int i = 0; i < 10; i++) {
            String message = "Hello World RabbitMQ count: " + i;

            //发布消息,第一个参数表示路由(Exchange名称),为空则表示使用默认消息路由
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

            System.out.println(" [x] Sent '" + message + "'");
        }

        //关闭消息通道和连接
        channel.close();
        connection.close();

    }

    @Test
    public void consumer () throws java.io.IOException, java.lang.InterruptedException, TimeoutException {

        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");

        //创建连接
        Connection connection = factory.newConnection();

        //创建消息信道
        Channel channel = connection.createChannel();

        //消息队列
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        System.out.println("[*] Waiting for message. To exist press CTRL+C");

        AtomicInteger count = new AtomicInteger(0);

        //消费者用于获取消息信道绑定的消息队列中的信息
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                    byte[] body) throws IOException {
                String message = new String(body, "UTF-8");

                try {
                    System.out.println(" [x] Received '" + message);
                } finally {
                    System.out.println(" [x] Done");
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };
        channel.basicConsume(QUEUE_NAME, false, consumer);

        Thread.sleep(1000 * 60);
    }

3、参数

3.1 队列参数

声明队列的方法有四个参数:

queueDeclare(String queue, 
            boolean durable, 
            boolean exclusive, 
            Map<String, Object> arguments);

3.2 生产者参数

在生产者通过channel的basicPublish方法发布消息时,通常有几个参数需要设置,为此我们有必要了解清楚这些参数代表的具体含义及其作用,查看Channel接口,会发现存在3个重载的basicPublish方法:

void basicPublish (String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;

void basicPublish (String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body) throws IOException;

void basicPublish (String exchange, String routingKey, boolean mandatory, boolean immediate,
                   BasicProperties props, byte[] body) throws IOException;

参数分别是:

3.3 消费者参数

向信道的每次投递都带有一个投递标签(Delivery Tag),该投递标签是一个64位长的值,从1开始每次增加1,用于唯一标识信道的每次投递。

channel.basicAck()方法的第一个为参数位投递标签,用于标识对哪次消息投递进行确认,第二个参数表示是否进行消息的批量确认。若确认消息时开启批量确认,则投递标签小于当前消息投递标签的所有消息也都会进行确认。

使用批量确认,可起到减少网络流量的作用。

若投递的消息数目已经超过消费者的处理能力,继续投递消息将会导致消息的积压。此时消费者可选择拒绝:

void basicNack(long deliveryTag, boolean multiple, boolean requeue)

当有多个消费者同时监听一个队列时,RabbitMQ默认将消息逐一顺序分配给各消费者,该消息分配机制称为轮询(Round-Robin)。

消息转发到队列后,分配是提前一次性完成的,称为预取,即RabbitMQ尽可能快速地将消息推送至客户端,由客户端缓存本地,而并非在消息消费时才逐一确定。

消息的轮询分配机制和尽可能快速推送消息的机制给实际使用带来困难。实际情况下,每个消费者处理消息的能力、每个消息处理所需时间可能都是不同的,若只是机械化地顺次分配,可能造成一个消费者由于处理的消息的业务复杂、处理能力低而积压消息,另一个消费者早早处理完所有的消息,处于空闲状态,造成系统的处理能力的浪费。且无法加入新的消费者以提高系统的处理能力。

希望达到的效果是每个消费者都根据自身处理能力合理分配消息处理任务,既无挤压也无空闲,新加入的消费者也能分担消息处理任务,使系统的处理能力能够平行扩展。

上一篇 下一篇

猜你喜欢

热点阅读