php 消息队列rabbitMQ
2020-08-22 本文已影响0人
王宣成
一、安装
使用docker安装rabbitMQ
docker pull rabbitmq:3.7.7-management
docker images
docker run -d --name rabbitmq3.7.7 -p 5672:5672 -p 15672:15672 -v `pwd`/data:/var/lib/rabbitmq --hostname myRabbit -e RABBITMQ_DEFAULT_VHOST=my_vhost -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin 镜像id
打开浏览器访问 账号: admin 密码: admin
http://127.0.0.1:15672
windows安装扩展,根据自己的php选择下载
https://pecl.php.net/package/amqp/
php_amqp.dll 文件放到php目录的ext文件夹下
php.ini里面添加
extension=php_amqp.dll
rabbitmq.4.dll 放到php根目录下
重启查看phpinfo 是否成功显示amqp
二、使用
新建文件消费者 consumer.php
vhost 查看
image.png<?php
//声明连接参数
$config = array(
'host' => '127.0.0.1',
'vhost' => 'my_vhost',
'port' => 5672,
'login' => 'admin',
'password' => 'admin'
);
//连接broker
$cnn = new AMQPConnection($config);
if (!$cnn->connect()) {
echo "Cannot connect to the broker";
exit();
}
//在连接内创建一个通道
$ch = new AMQPChannel($cnn);
//创建一个交换机
$ex = new AMQPExchange($ch);
//声明路由键
$routingKey = 'key_1';
//声明交换机名称
$exchangeName = 'exchange_1';
//声明队列名称
$queueName = 'queue_1';
//设置交换机名称
$ex->setName($exchangeName);
//设置交换机类型
//AMQP_EX_TYPE_DIRECT:直连交换机
//AMQP_EX_TYPE_FANOUT:扇形交换机
//AMQP_EX_TYPE_HEADERS:头交换机
//AMQP_EX_TYPE_TOPIC:主题交换机
$ex->setType(AMQP_EX_TYPE_DIRECT);
//设置交换机持久
$ex->setFlags(AMQP_DURABLE);
//声明交换机
$ex->declareExchange();
//创建一个消息队列
$q = new AMQPQueue($ch);
//设置队列名称
$q->setName($queueName);
//设置队列持久
$q->setFlags(AMQP_DURABLE);
//声明消息队列
$q->declareQueue();
//交换机和队列通过$routingKey进行绑定
$q->bind($ex->getName(), $routingKey);
//接收消息并进行处理的回调方法
function receive($envelope, $queue)
{
//休眠两秒,
sleep(2);
//echo消息内容
echo $envelope->getBody() . "\n";
//显式确认,队列收到消费者显式确认后,会删除该消息
$queue->ack($envelope->getDeliveryTag());
}
//设置消息队列消费者回调方法,并进行阻塞
$q->consume("receive");
//$q->consume("receive", AMQP_AUTOACK);//隐式确认,不推荐
新建文件生产者 send.php
<?php
$config = array(
'host' => 'localhost',
'vhost' => 'my_vhost',
'port' => 5672,
'login' => 'admin',
'password' => 'admin'
);
$cnn = new AMQPConnection($config);
if (!$cnn->connect()) {
echo "Cannot connect to the broker";
exit();
}
$ch = new AMQPChannel($cnn);
$ex = new AMQPExchange($ch);
//消息的路由键,一定要和消费者端一致
$routingKey = 'key_1';
//交换机名称,一定要和消费者端一致,
$exchangeName = 'exchange_1';
$ex->setName($exchangeName);
$ex->setType(AMQP_EX_TYPE_DIRECT);
$ex->setFlags(AMQP_DURABLE);
$ex->declareExchange();
//创建10个消息
for ($i = 1; $i <= 10; $i++) {
//消息内容
$msg = array(
'data' => 'message_' . $i,
'hello' => 'world',
);
//发送消息到交换机,并返回发送结果
//delivery_mode:2声明消息持久,持久的队列+持久的消息在RabbitMQ重启后才不会丢失
echo "<pre>";
print_r("Send Message:" . $ex->publish(json_encode($msg), $routingKey, AMQP_NOPARAM, array('delivery_mode' => 2)) . "\n");
}
服务器上命令模式下执行 consumer.php 消费测试
php consumer.php
推荐使用supervisor守护进程工具执行 consumer.php
客户端访问 send.php 生产
http://localhost/rabbitmq/send.php
php代码可以使用 php-amqplib 库
https://github.com/php-amqplib/php-amqplib
新建composer.json
{
"require": {
"php-amqplib/php-amqplib":"2.8.*"
}
}
composer安装
composer install
send.php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'admin', 'admin','my_vhost');
$channel = $connection->channel();
$channel->queue_declare('hello', false, false, false, false);
$msg = new AMQPMessage('Hello World!');
$channel->basic_publish($msg, '', 'hello');
echo " [x] Sent 'Hello World!'\n";
$channel->close();
$connection->close();
consumer.php
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('localhost', 5672, 'admin', 'admin', 'my_vhost');
$channel = $connection->channel();
$channel->queue_declare('hello', false, false, false, false);
echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
$callback = function ($msg) {
echo " [x] Received ", $msg->body, "\n";
};
$channel->basic_consume('hello', '', false, true, false, false, $callback);
while (count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();