php 消息队列rabbitMQ

2020-08-22  本文已影响0人  王宣成

一、安装

使用docker安装rabbitMQ
docker pull rabbitmq:3.7.7-management

docker images

docker run -d --name rabbitmq3.7.7 -p 5672:5672 -p 15672:15672 -v `pwd`/data:/var/lib/rabbitmq --hostname myRabbit -e RABBITMQ_DEFAULT_VHOST=my_vhost -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin 镜像id

打开浏览器访问 账号: admin 密码: admin
http://127.0.0.1:15672
windows安装扩展,根据自己的php选择下载
https://pecl.php.net/package/amqp/
php_amqp.dll 文件放到php目录的ext文件夹下
php.ini里面添加
extension=php_amqp.dll
rabbitmq.4.dll 放到php根目录下
重启查看phpinfo 是否成功显示amqp

二、使用

新建文件消费者 consumer.php
vhost 查看
image.png
<?php

//声明连接参数
$config = array(
    'host' => '127.0.0.1',
    'vhost' => 'my_vhost', 
    'port' => 5672,
    'login' => 'admin',
    'password' => 'admin'
);

//连接broker
$cnn = new AMQPConnection($config);
if (!$cnn->connect()) {
    echo "Cannot connect to the broker";
    exit();
}

//在连接内创建一个通道
$ch = new AMQPChannel($cnn);

//创建一个交换机
$ex = new AMQPExchange($ch);

//声明路由键
$routingKey = 'key_1';

//声明交换机名称
$exchangeName = 'exchange_1';

//声明队列名称
$queueName = 'queue_1';

//设置交换机名称
$ex->setName($exchangeName);

//设置交换机类型
//AMQP_EX_TYPE_DIRECT:直连交换机
//AMQP_EX_TYPE_FANOUT:扇形交换机
//AMQP_EX_TYPE_HEADERS:头交换机
//AMQP_EX_TYPE_TOPIC:主题交换机
$ex->setType(AMQP_EX_TYPE_DIRECT);

//设置交换机持久
$ex->setFlags(AMQP_DURABLE);

//声明交换机
$ex->declareExchange();

//创建一个消息队列
$q = new AMQPQueue($ch);

//设置队列名称
$q->setName($queueName);

//设置队列持久
$q->setFlags(AMQP_DURABLE);

//声明消息队列
$q->declareQueue();

//交换机和队列通过$routingKey进行绑定
$q->bind($ex->getName(), $routingKey);

//接收消息并进行处理的回调方法
function receive($envelope, $queue)
{
    //休眠两秒,
    sleep(2);
    //echo消息内容
    echo $envelope->getBody() . "\n";
    //显式确认,队列收到消费者显式确认后,会删除该消息
    $queue->ack($envelope->getDeliveryTag());
}

//设置消息队列消费者回调方法,并进行阻塞
$q->consume("receive");
//$q->consume("receive", AMQP_AUTOACK);//隐式确认,不推荐
新建文件生产者 send.php
<?php

$config = array(
    'host' => 'localhost',
    'vhost' => 'my_vhost',
    'port' => 5672,
    'login' => 'admin',
    'password' => 'admin'
);

$cnn = new AMQPConnection($config);
if (!$cnn->connect()) {
    echo "Cannot connect to the broker";
    exit();
}

$ch = new AMQPChannel($cnn);
$ex = new AMQPExchange($ch);

//消息的路由键,一定要和消费者端一致
$routingKey = 'key_1';

//交换机名称,一定要和消费者端一致,
$exchangeName = 'exchange_1';

$ex->setName($exchangeName);
$ex->setType(AMQP_EX_TYPE_DIRECT);
$ex->setFlags(AMQP_DURABLE);
$ex->declareExchange();

//创建10个消息
for ($i = 1; $i <= 10; $i++) {

    //消息内容
    $msg = array(
        'data'  => 'message_' . $i,
        'hello' => 'world',
    );

    //发送消息到交换机,并返回发送结果
    //delivery_mode:2声明消息持久,持久的队列+持久的消息在RabbitMQ重启后才不会丢失
    echo "<pre>";
    print_r("Send Message:" . $ex->publish(json_encode($msg), $routingKey, AMQP_NOPARAM, array('delivery_mode' => 2)) . "\n");

}

服务器上命令模式下执行 consumer.php 消费测试
php consumer.php
推荐使用supervisor守护进程工具执行 consumer.php
客户端访问 send.php 生产

http://localhost/rabbitmq/send.php


php代码可以使用 php-amqplib 库

https://github.com/php-amqplib/php-amqplib

新建composer.json
{
    "require": {
        "php-amqplib/php-amqplib":"2.8.*"
    }
}
composer安装
composer install 
send.php
<?php
require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'admin', 'admin','my_vhost');

$channel = $connection->channel();
$channel->queue_declare('hello', false, false, false, false);
$msg = new AMQPMessage('Hello World!');
$channel->basic_publish($msg, '', 'hello');
echo " [x] Sent 'Hello World!'\n";

$channel->close();
$connection->close();

consumer.php
<?php
require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost', 5672, 'admin', 'admin', 'my_vhost');
$channel = $connection->channel();
$channel->queue_declare('hello', false, false, false, false);

echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";

$callback = function ($msg) {
    echo " [x] Received ", $msg->body, "\n";
};

$channel->basic_consume('hello', '', false, true, false, false, $callback);

while (count($channel->callbacks)) {
    $channel->wait();
}

$channel->close();
$connection->close();

上一篇下一篇

猜你喜欢

热点阅读