4、简单封装RabbitMQ的发布与订阅模式
生产者类:Publisher.class.php
classPublisher{
private $config=array();
private $conn=Null;
private $channel=Null;
private $exchange=Null;
public $is_ready=False;
/**
* 创建连接,并指定交换机
* @paramarray $config RabbitMQ服务器信息
* @paramstring $e_name交换机名称
* @returnvoid
*/
public function__construct($config,$e_name){
if(!($config&&$e_name)) {
return False;
}
shuffle($config);
$this->config=$config;
if(!self::connect()) {
return False;
}
$this->channel=newAMQPChannel($this->conn);
$this->establishExchange($e_name);
$this->is_ready=True;
}
/**
* 发送消息
* @paramstring $msg消息体
* @paramstring $k_route路由键
* @returnint / False
*/
public functionsend($msg,$k_route){
$msg=trim(strval($msg));
if(!$this->exchange||$msg===''|| !$k_route) return False;
$ret=$this->exchange->publish($msg,$k_route);
return $ret;
}
/**
* 创建链接
* 无法链接时则会自动选择下一个配置项(IP不通的情况下会有5秒等待)
* @paramint $i配置项索引
* @returnbool
*/
private functionconnect($i=0){
if(array_key_exists($i,$this->config)){
try{
$this->conn=newAMQPConnection($this->config[$i]);
$this->conn->connect();
$ret=True;
}catch(AMQPConnectionException$e){
$ret=$this->connect(++$i);
}
}else{
$ret=False;
}
return$ret;
}
/**
* 创建交换机
* @paramstring $name名称
* @returnvoid
*/
private functionestablishExchange($name){
$this->exchange=newAMQPExchange($this->channel);
$this->exchange->setName($name);
}
public function__destruct(){
if($this->conn){
$this->conn->disconnect();
}
}
}
消费者类:Consumer.class.php
class Consumer {
private$config=array();
private$durable=True;
private$mirror=False;
private$autodelete=False;
private$conn=Null;
private$channel=Null;
private$queue=Null;
public$is_ready=False;
/**
* 创建连接、交换机、队列,并绑定
* @paramarray $config RabbitMQ服务器信息
* @paramstring $e_name交换机名称
* @paramstring $k_route路由键
* @paramstring $q_name队列名称
* @parambool $durable队列是否持久化
* @parambool $mirror队列是否镜像
* @returnvoid
*/
public function__construct($config,$e_name,$k_route,$q_name,$durable=True,$mirror=False,$autodelete=False){
if(!($config&&$e_name&&$q_name&&$k_route)){
return False;
}
shuffle($config);
$this->config=$config;
if(!self::connect()){
return False;
}
$this->channel=newAMQPChannel($this->conn);
$this->durable= (bool)$durable;
$this->mirror= (bool)$mirror;
$this->autodelete= (bool)$autodelete;
$this->establishExchange($e_name);
$this->establishQueue($q_name,$e_name,$k_route);
$this->is_ready=True;
}
/**
* 循环阻塞方式接收消息
* @paramstring $fun_name自定义处理函数的函数名
* @parambool $autoack是否自动发送ACK应答,否则需要在自定义处理函数中手动发送
* @returnbool
*/
public functionrun($fun_name,$autoack=True){
$fun_name=strval($fun_name);
if(!$fun_name|| !$this->queue)return False;
while(True){
if($autoack)$this->queue->consume($fun_name,AMQP_AUTOACK);
else$this->queue->consume($fun_name);
}
}
/**
* 创建链接
* 无法链接时则会自动选择下一个配置项(IP不通的情况下会有5秒等待)
* @paramint $i配置项索引
* @returnbool
*/
private functionconnect($i=0){
if(array_key_exists($i,$this->config)){
try{
$this->conn=newAMQPConnection($this->config[$i]);
$this->conn->connect();
$ret=True;
}catch(AMQPConnectionException$e){
$ret=$this->connect(++$i);
}
}else{
$ret=False;
}
return $ret;
}
/**
* 创建交换机
* @paramstring $name名称
* @returnint
*/
private functionestablishExchange($name){
$ex=newAMQPExchange($this->channel);
$ex->setName($name);
$ex->setType(AMQP_EX_TYPE_DIRECT);//direct类型
if($this->durable) $ex->setFlags(AMQP_DURABLE);//持久化
//return $ex->declareExchange();
return true;
}
/**
* 创建队列
* @paramstring $name名称
* @paramstring $e_name交换机名称
* @paramstring $k_route路由键
* @returnint
*/
private functionestablishQueue($name,$e_name,$k_route){
$this->queue=newAMQPQueue($this->channel);
$this->queue->setName($name);
if($this->durable)$this->queue->setFlags(AMQP_DURABLE);//持久化
if($this->mirror)$this->queue->setArgument('x-ha-policy','all');//镜像
if($this->autodelete)$this->queue->setFlags(AMQP_AUTODELETE);//auto-delete
$this->queue->declareQueue();
$ret=$this->queue->bind($e_name,$k_route);
return$ret;
}
public function__destruct(){
if($this->conn){
$this->conn->disconnect();
}
}
}
demoC.php
include_once'./Consumer.class.php';
functionlogResult($word='') {
$fp=fopen("log.txt","a");
flock($fp,LOCK_EX) ;
fwrite($fp,"执行日期:".strftime("%Y-%m-%d %H:%M:%S",time())."\n".$word."\n");
flock($fp,LOCK_UN);
fclose($fp);
}
$config=array(
array(
'host'=>'127.0.0.1',
'port'=>'5672',
'login'=>'ybl',
'password'=>'ybl',
'vhost'=>'/'
),
array(
'host'=>'127.0.0.2',
'port'=>'5672',
'login'=>'ybl',
'password'=>'ybl',
'vhost'=>'/'
)
);
$e_name='demo';//交换机名
$q_name='ybl';//队列名
$k_route='hello';//路由key
if(!$cs=newConsumer($config,$e_name,$k_route,$q_name)){
exit("error");
}
//第二个参数默认为true,自动发送ACK应答
$cs->run('dealMessage');
//消费回调函数,处理消息
functiondealMessage($envelope,$queue) {
$msg=$envelope->getBody();
//记录log日记
logResult($msg);
$queue->ack($envelope->getDeliveryTag());//手动发送ACK应答
}
demoP.php
include_once'./Publisher.class.php';
$config=array(
array(
'host'=>'127.0.0.1',
'port'=>'5672',
'login'=>'ybl',
'password'=>'ybl',
'vhost'=>'/'
),
array(
'host'=>'127.0.0.2',
'port'=>'5672',
'login'=>'ybl',
'password'=>'ybl',
'vhost'=>'/'
)
);
$e_name='demo';//交换机名
$k_route='hello';//路由key
if(!$conn=newPublisher($config,$e_name)){
echo'error';
exit;
}
$msg='hello RabbitMQ';
for($i=0;$i<10;$i++){
$res=$conn->send($msg,$k_route);
ob_flush();
flush();
echo $res;
sleep(1);
}
运行脚本demoP.php demoC.php查看