PHP中RabbitMQ之amqp扩展实现
PHP中RabbitMQ之amqp扩展实现
参考地址:https://blog.csdn.net/yeyun666/article/details/85112742
在安装完成后我们就可以开始我们的RabbitMQ之旅了,本Demo示例只创建了一个直连交换机,共有四个文件Consum.php (消费者),Publish.php (生产者) ,RabbitMqParernt.php (自己封装的RabbitMQ的方法) ,以及test.php (测试数据)
RabbitMqParernt.php如下所
<?php
abstract class RabbitMqParernt
{
//rabbitMQ配置信息(默认配置)
public $config = array(
'host'=>'127.0.0.1', //host
'port'=>5672, //端口
'username'=>'guest', //账号
'password'=>'guest', //密码
'vhost'=>'/' //虚拟主机
);
public $exchangeName = ''; //交换机
public $queueName = ''; //队列名
public $routeKey = ''; //路由键
public $exchangeType = ''; //交换机类型
public $channel; //信道
public $connection; //连接
public $exchange; //交换机
public $queue; //队列
//初始化RabbitMQ($config数组是用来修改rabbitMQ的配置信息的)
public function __construct($exchangeName, $queueName, $routeKey, $exchangeType = '',$config = array())
{
$this->exchangeName = $exchangeName;
$this->queueName = $queueName;
$this->routeKey = $routeKey;
$this->exchangeType = $exchangeType;
if(!empty($config))
{
$this->setConfig($config);
}
$this->createConnet();
}
//对RabbitMQ的配置重新进行配置
public function setConfig($config)
{
if (!is_array($config))
{
throw new Exception('config不是一个数组');
}
foreach($config as $key => $value)
{
$this->config[$key] = $value;
}
}
//创建连接与信道
public function createConnet()
{
//创建连接
$this->connection = new AMQPConnection($this->config);
if(!$this->connection->connect())
{
throw new Exception('RabbitMQ创建连接失败');
}
//创建信道
$this->channel = new AMQPChannel($this->connection);
//创建交换机
$this->createExchange();
//生产时不需要队列,故队列名为空,只有消费时需要队列名
if(!empty($this->queueName))
{
$this->createQueue();
}
}
//创建交换机
public function createExchange()
{
$this->exchange = new AMQPExchange($this->channel);
$this->exchange->setName($this->exchangeName);
$this->exchange->setType(AMQP_EX_TYPE_DIRECT);
$this->exchange->setFlags(AMQP_DURABLE);
}
//创建队列,绑定交换机
public function createQueue()
{
$this->queue = new AMQPQueue($this->channel);
$this->queue->setName($this->queueName);
$this->queue->setFlags(AMQP_DURABLE);
$this->queue->bind($this->exchangeName, $this->routeKey);
}
public function dealMq($flag)
{
if($flag)
{
$this->queue->consume(function($envelope){$this->getMsg($envelope, $this->queue);},AMQP_AUTOACK);//自动ACK应答
}
else
{
$this->queue->consume(function($envelope){$this->processMessage($envelope, $this->queue);});
}
}
public function getMsg($envelope, $queue)
{
$msg = $envelope->getBody();
$this->doProcess($msg);
}
public function processMessage($envelope, $queue)
{
$msg = $envelope->getBody();
$this->doProcess($msg);
$queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答
}
//处理消息的真正函数,在消费者里使用
abstract public function doProcess($msg);
//发送消息
public function sendMessage($message)
{
$this->exchange->publish($message, $this->routeKey);
}
//关闭连接
public function closeConnect()
{
$this->channel->close();
$this->connection->disconnect();
}
}
Consum.php 如下所示
<?php
include_once('RabbitMqParernt.php');
class Consum extends RabbitMqParernt
{
public function __construct()
{
parent::__construct('exchange', 'queue', 'routeKey');
}
public function doProcess($msg)
{
echo $msg;
}
}
$consum = new Consum();
//$consum->dealMq(false);
$consum->dealMq(true);
Publish.php如下所示
<?php
include_once('RabbitMqParernt.php');
class Publish extends RabbitMqParernt
{
public function __construct()
{
parent::__construct('exchange', '', 'routeKey');
}
public function doProcess($msg)
{
}
}
---------------------