消息队列RabbitMQ的PHP使用

2021-04-10  本文已影响0人  非资深技术人

附言:我当时的环境是win10宝塔PHP7.4和Nginx,使用模块amqp 1.10.2 for Windows 非线程安全版本。注:线程安全版本似乎无法载入dll,路径正确可能是环境问题
amqp模块:https://pecl.php.net/package/amqp/1.10.2/windows

RabbitMQ简要概括

1、AMQP:Advanced Message Queuing Protocol,是一个提供统一消息服务的应用层标准协议。

2、IPC(单一系统进程间通信) -> socket(不同机器间进程通信) -> AMQP(解决大型系统模块与组件间通信)

3、RabbitMQ 基于 Erlang 开发,是 AMQP 的一个开源实现。

4、RabbitMQ 系统架构图:

5、名词术语:

还有几个隐式的概念:

此外,Exchanges 分三种类型:

RabbitMQ安装使用

第一步:下载并安装erlang

原因:RabbitMQ服务端代码是使用并发式语言Erlang编写的,安装Rabbit MQ的前提是安装Erlang。

下载地址:http://www.erlang.org/downloads

image

根据本机位数选择erlang下载版本。

下载完是这么个东西:

image

双击,点next就可以。

image

选择一个自己想保存的地方,然后next、finish就可以。

image

安装完事儿后要记得配置一下系统的环境变量。

此电脑-->鼠标右键“属性”-->高级系统设置-->环境变量-->“新建”系统环境变量

image

变量名:ERLANG_HOME

变量值就是刚才erlang的安装地址,点击确定。

然后双击系统变量path

image

点击“新建”,将%ERLANG_HOME%\bin加入到path中。

image

最后windows键+R键,输入cmd,再输入erl,看到版本号就说明erlang安装成功了。

image

第二步:下载并安装RabbitMQ

下载地址:http://www.rabbitmq.com/download.html

image

双击下载后的.exe文件,安装过程与erlang的安装过程相同。

RabbitMQ安装好后接下来安装RabbitMQ-Plugins。打开命令行cd,输入RabbitMQ的sbin目录。

我的目录是:D:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.3\sbin

然后在后面输入rabbitmq-plugins enable rabbitmq_management命令进行安装

image

打开sbin目录,双击rabbitmq-server.bat

image

等几秒钟看到这个界面后,访问http://localhost:15672

然后可以看到如下界面

image

默认用户名和密码都是guest

登陆即可。

安装php扩展amqp

先查看自己的php版本

image

**记住版本 至于这个线程安全问题 这里引用了别人的自己看看吧 **http://blog.csdn.net/aoyoo111/article/details/19021295

接下来下载dll文件 地址http://pecl.php.net/package/amqp

image

下载稳定版的,点击DLL

image

php版本 ,X86 和X64 根据自己情况 , NTS 和 TS 就是那个thread safty 的状态 这个大家都会看吧 就不多说了

下载解压

image

将php_amqp.dll和php_amqp.pdb文件放到php目录的ext文件夹下 见下图:

image

将rabbitmq.4.dll和rabbitmq.4.pdb文件放到php根目录Nginx 忽略这一步 见下图:

image

php.ini里面添加

image

apache 修改http.conf 文件 添加Nginx 忽略这一步

image

之后重启apache

image

php 中使用Rabbitmq实现实现消息发送和接收

1,建立一个send.php文件用来发送消息

2,建立一个 receive.php 文件用来接收消息

代码如下

send.php

<?php
/**
* 发送消息
 */
$exchangeName = 'demo';
$routeKey = 'hello';
$message = 'Hello World!';
// 建立TCP连接
$connection = new AMQPConnection([
    'host' => 'localhost',
    'port' => '5672',
    'vhost' => '/',
    'login' => 'guest',
'password' => 'guest'
]);
$connection->connect() or die("Cannot connect to the broker!\n");
try {
    $channel = new AMQPChannel($connection);

    $exchange = new AMQPExchange($channel);
    $exchange->setName($exchangeName);
    $exchange->setType(AMQP_EX_TYPE_DIRECT);
    $exchange->declareExchange();

    echo 'Send Message: ' . $exchange->publish($message, $routeKey) . "\n";
    echo "Message Is Sent: " . $message . "\n";
} catch (AMQPConnectionException $e) {
    var_dump($e);
}
$connection->disconnect();// 断开连接

receive.php

<?php
/**
 * 接收消息
 */
$exchangeName = 'demo';
$queueName = 'hello';
$routeKey = 'hello';
// 建立TCP连接
$connection = new AMQPConnection([
    'host' => 'localhost',
    'port' => '5672',
    'vhost' => '/',
    'login' => 'guest',
    'password' => 'guest'
]);
$connection->connect() or die("Cannot connect to the broker!\n");
$channel = new AMQPChannel($connection);
$exchange = new AMQPExchange($channel);
$exchange->setName($exchangeName);
$exchange->setType(AMQP_EX_TYPE_DIRECT);
echo 'Exchange Status: ' . $exchange->declareExchange() . "\n";
$queue = new AMQPQueue($channel);
$queue->setName($queueName);
echo 'Message Total: ' . $queue->declareQueue() . "\n";
echo 'Queue Bind: ' . $queue->bind($exchangeName, $routeKey) . "\n";
var_dump("Waiting for message...");
// 消费队列消息
while(TRUE) {
    $queue->consume('processMessage');
}

// 断开连接
$connection->disconnect();
function processMessage($envelope, $queue) {
    $msg = $envelope->getBody();
    var_dump("Received: " . $msg);
    $queue->ack($envelope->getDeliveryTag()); // 手动发送ACK应答
}

测试:

打开两个终端,先运行接收者脚本监听消息发送:

php receive.php

在另一个终端中运行消息发送脚本:

php send.php

上一篇 下一篇

猜你喜欢

热点阅读