4、简单封装RabbitMQ的发布与订阅模式

2017-04-06  本文已影响0人  Uzero

生产者类:Publisher.class.php

classPublisher{

          private $config=array();

          private $conn=Null;

          private $channel=Null;

          private $exchange=Null;

          public $is_ready=False;

         /**

          * 创建连接,并指定交换机

          * @paramarray $config RabbitMQ服务器信息

          * @paramstring $e_name交换机名称

          * @returnvoid

          */

          public function__construct($config,$e_name){

                  if(!($config&&$e_name)) {

                           return False;

                  }

                  shuffle($config);

                  $this->config=$config;

                  if(!self::connect()) {

                          return False;

                  }

                  $this->channel=newAMQPChannel($this->conn);

                  $this->establishExchange($e_name);

                  $this->is_ready=True;

          }

          /**

           * 发送消息

           * @paramstring $msg消息体

           * @paramstring $k_route路由键

           * @returnint / False

           */

           public functionsend($msg,$k_route){

                   $msg=trim(strval($msg));

                   if(!$this->exchange||$msg===''|| !$k_route) return False;

                   $ret=$this->exchange->publish($msg,$k_route);

                   return $ret;

           }

           /**

            * 创建链接

            * 无法链接时则会自动选择下一个配置项(IP不通的情况下会有5秒等待)

            * @paramint $i配置项索引

            * @returnbool

            */

            private functionconnect($i=0){

                    if(array_key_exists($i,$this->config)){

                             try{

                                            $this->conn=newAMQPConnection($this->config[$i]);

                                            $this->conn->connect();

                                            $ret=True;

                              }catch(AMQPConnectionException$e){

                                            $ret=$this->connect(++$i);

                              }

                     }else{

                             $ret=False;

                     }

                     return$ret;

            }

            /**

             * 创建交换机

             * @paramstring $name名称

             * @returnvoid

             */

             private functionestablishExchange($name){

                       $this->exchange=newAMQPExchange($this->channel);

                       $this->exchange->setName($name);

              }

              public function__destruct(){

                       if($this->conn){

                                    $this->conn->disconnect();

                        }

               }

}

消费者类:Consumer.class.php

class Consumer {

           private$config=array();

           private$durable=True;

           private$mirror=False;

           private$autodelete=False;

           private$conn=Null;

           private$channel=Null;

           private$queue=Null;

           public$is_ready=False;

           /**

            * 创建连接、交换机、队列,并绑定

            * @paramarray $config RabbitMQ服务器信息

            * @paramstring $e_name交换机名称

            * @paramstring $k_route路由键

            * @paramstring $q_name队列名称

            * @parambool $durable队列是否持久化

            * @parambool $mirror队列是否镜像

            * @returnvoid

            */

            public function__construct($config,$e_name,$k_route,$q_name,$durable=True,$mirror=False,$autodelete=False){

                         if(!($config&&$e_name&&$q_name&&$k_route)){

                                       return False;

                          }

                          shuffle($config);

                          $this->config=$config;

                          if(!self::connect()){

                                      return False;

                          }

                         $this->channel=newAMQPChannel($this->conn);

                         $this->durable= (bool)$durable;

                         $this->mirror= (bool)$mirror;

                         $this->autodelete= (bool)$autodelete;

                        $this->establishExchange($e_name);

                        $this->establishQueue($q_name,$e_name,$k_route);

                        $this->is_ready=True;

             }

            /**

             * 循环阻塞方式接收消息

             * @paramstring $fun_name自定义处理函数的函数名

              * @parambool $autoack是否自动发送ACK应答,否则需要在自定义处理函数中手动发送

              * @returnbool

              */

              public functionrun($fun_name,$autoack=True){

                         $fun_name=strval($fun_name);

                         if(!$fun_name|| !$this->queue)return False;

                         while(True){

                                    if($autoack)$this->queue->consume($fun_name,AMQP_AUTOACK);

                                    else$this->queue->consume($fun_name);

                         }

                }

                /**

                 * 创建链接

                 * 无法链接时则会自动选择下一个配置项(IP不通的情况下会有5秒等待)

                 * @paramint $i配置项索引

                 * @returnbool

                 */

                 private functionconnect($i=0){

                            if(array_key_exists($i,$this->config)){

                                     try{

                                                   $this->conn=newAMQPConnection($this->config[$i]);

                                                   $this->conn->connect();

                                                   $ret=True;

                                       }catch(AMQPConnectionException$e){

                                                    $ret=$this->connect(++$i);

                                       }

                           }else{

                                       $ret=False;

                           }

                          return $ret;

                }

               /**

                * 创建交换机

                * @paramstring $name名称

                * @returnint

                */

                private functionestablishExchange($name){

                           $ex=newAMQPExchange($this->channel);

                           $ex->setName($name);

                           $ex->setType(AMQP_EX_TYPE_DIRECT);//direct类型

                           if($this->durable) $ex->setFlags(AMQP_DURABLE);//持久化

                           //return $ex->declareExchange();

                           return true;

                }

                /**

                 * 创建队列

                 * @paramstring $name名称

                 * @paramstring $e_name交换机名称

                 * @paramstring $k_route路由键

                 * @returnint

                 */

                 private functionestablishQueue($name,$e_name,$k_route){

                             $this->queue=newAMQPQueue($this->channel);

                             $this->queue->setName($name);

                             if($this->durable)$this->queue->setFlags(AMQP_DURABLE);//持久化

                             if($this->mirror)$this->queue->setArgument('x-ha-policy','all');//镜像

                              if($this->autodelete)$this->queue->setFlags(AMQP_AUTODELETE);//auto-delete

                              $this->queue->declareQueue();

                              $ret=$this->queue->bind($e_name,$k_route);

                              return$ret;

                  }

                  public function__destruct(){

                            if($this->conn){

                                           $this->conn->disconnect();

                             }

                   }

}

demoC.php

include_once'./Consumer.class.php';

functionlogResult($word='') {

           $fp=fopen("log.txt","a");

           flock($fp,LOCK_EX) ;

           fwrite($fp,"执行日期:".strftime("%Y-%m-%d %H:%M:%S",time())."\n".$word."\n");

           flock($fp,LOCK_UN);

           fclose($fp);

}

$config=array(

         array(

                         'host'=>'127.0.0.1',

                         'port'=>'5672',

                         'login'=>'ybl',

                         'password'=>'ybl',

                         'vhost'=>'/'

         ),

         array(

                         'host'=>'127.0.0.2',

                         'port'=>'5672',

                         'login'=>'ybl',

                         'password'=>'ybl',

                         'vhost'=>'/'

          )

);

$e_name='demo';//交换机名

$q_name='ybl';//队列名

$k_route='hello';//路由key

if(!$cs=newConsumer($config,$e_name,$k_route,$q_name)){

             exit("error");

}

//第二个参数默认为true,自动发送ACK应答

$cs->run('dealMessage');

//消费回调函数,处理消息

functiondealMessage($envelope,$queue) {

$msg=$envelope->getBody();

//记录log日记

logResult($msg);

$queue->ack($envelope->getDeliveryTag());//手动发送ACK应答

}

demoP.php

include_once'./Publisher.class.php';

$config=array(

       array(

               'host'=>'127.0.0.1',

               'port'=>'5672',

               'login'=>'ybl',

               'password'=>'ybl',

               'vhost'=>'/'

      ),

      array(

              'host'=>'127.0.0.2',

              'port'=>'5672',

              'login'=>'ybl',

              'password'=>'ybl',

              'vhost'=>'/'

         )

);

$e_name='demo';//交换机名

$k_route='hello';//路由key

if(!$conn=newPublisher($config,$e_name)){

           echo'error';

           exit;

}

$msg='hello RabbitMQ';

for($i=0;$i<10;$i++){

           $res=$conn->send($msg,$k_route);

           ob_flush();

           flush();

           echo $res;

           sleep(1);

}

运行脚本demoP.php   demoC.php查看

上一篇 下一篇

猜你喜欢

热点阅读