rabbitmq 工作队列

2024-04-18  本文已影响0人  该死的金箍

//生产者 代码【在构造函数中进行了连接操作 析构函数使用关闭连接操作】

public function taskQueue()
{
    //durable  true 开启队列持久化 第三个参数
    $this->channel->queue_declare('task_queue', false, true, false, false);
    $argv = ['a', 'b', 'c', 'd'];
    $data =implode(' ', array_slice($argv, 1));
    if (empty($data)) {
        $data ="Hello World!";
    }
    $msg =new AMQPMessage(
        $data,
        array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
    );
    $this->channel->basic_publish($msg, '', 'task_queue');
    echo ' [x] Sent ', $data, "\n";
}

//消费者代码  我这里使用的TP6框架的console

protected function execute(Input$input, Output$output)
{
    $this->channel->queue_declare('task_queue', false, true, false, false); //durable  true 开启队列持久化 第三个参数
    echo " [*] Waiting for messages. To exit press CTRL+C\n";
    $callback =function ($msg) {
        echo ' [x] Received ', $msg->getBody(), "\n";
        //这里用延迟执行 表示程序正在执行中
        sleep(substr_count($msg->getBody(), ' '));
        echo " [x] Done\n";
        $msg->ack();//手动确认ack
    };
    //限制RabbitMQ只发不超过1条的消息给同一个消费者。当消息处理完毕后,有了反馈,才会进行第二次发送。
    $this->channel->basic_qos(null, 1, null);
    $this->channel->basic_consume('task_queue', '', false, false, false, false, $callback);//第四个参数为false 手动ack  true 自动ack
    try {
        $this->channel->consume();
    }catch (\Throwable$exception) {
        echo $exception->getMessage();
    }
    $this->channel->close();
    $this->connection->close();
}

这里消费者投递多个队列

可以使用多个窗口进行生产 发现每个窗口都会拿到 消息
通过sleep()延时 能清晰看到区别
加了下面这句代码 和没有加上这句代码的区别
 $this->channel->basic_qos(null, 1, null);

消息持久化

   $this->channel->queue_declare('task_queue', false, true, false, false); //durable  true 开启队列持久化 第三个参数
    并且 $msg =new AMQPMessage($data,        array('delivery_mode' => 2)    );消息持久化

当生产者投递了消息  然后没有生产者进行消费 进行关闭rabbitmq服务
重启后 会发现消息依旧存在  

手动ACK机制

 $this->channel->basic_consume('task_queue', '', false, false, false, false, $callback);//第四个参数为false 手动ack  true 自动ack
当生产者正在处理队列中的消息 中途突然断开连接  然后并没有执行到
 $msg->ack();//手动确认ack
未处理的消息会依旧存在   【正常流程取出就会删除掉】

上一篇下一篇

猜你喜欢

热点阅读