swoole进程池中使用beanstalkd,异步处理任务

2020-03-01  本文已影响0人  骑蚂蚁上高速_jun

beanstalkd的优缺点:

优点:

1.beanstalkd是基于内存的任务队列,性能较高。每个job有多种状态,状态之间可以相互转换。这些状态为job的使用者提供了使用的方便。在启动的时候, 带上 -b 参数也可将任务保存在文件中,防止服务器宕机后任务丢失。
2.在网络事件驱动方面,使用异步,高效的epoll作为事件驱动框架,但使用的是单线程模式。

缺点:

1.保存数据格式比较单一,只有List 链表一种数据结构。可以考虑加上 hash数据类型,可供查询功能。
2.不适合做任务量超高的大型系统,也不建议在分布式系统中使用。大系统可考虑使用RabbitMq
3.只能单独删除一个个的任务,不能一次删除整个管道(队列) 。 当管道中的任务全部删除后,beanstalk会自动将该管道删除{严重吐槽这个设计}
我的踩坑:1当生产者未产生消息时; 2管道中无可用任务(删除的不算)时; 3消费者未启动的情况下。无法使用$p->statsTube(tube); 检查项目中的管道状态 。 因为这三个条件全部满足的时候,该管道已经被beanstalk自动删除了。。连管道都不存在了,肯定无法查看管道的状态
4.beanstalkd 在消费端守护进程使用的时候,不能像redis那样 通过 redis.setOption(Redis::OPT_READ_TIMEOUT,-1);和 redis.brPop(keys, timeout ); 两个方法实现 没有任务进入时而永久阻塞[不报超时错误]的方法。
当时我在使用的时候踩过坑,不过也可在代码上规避这个问题。特此做下记录

消费端使用

环境说明: php7.2 , swoole 扩展; composer包 "pda/pheanstalk": "^4.0"

require "./vendor/autoload.php";
require "./sms.php";
use Pheanstalk\Pheanstalk;
use Swoole\Process;
use Swoole\Process\Pool;


class swoole
{
    public $daemon=false;
    private $events=[
        "workerStart","workerStop"
    ];
    /*
     * 进程池入口方法
     */
    public function main(){
        // 使用swoole 的进程池 做消费端
        $pool = new Pool(20,0,0,true);
        foreach ($this->events as $event){
            $pool->on($event,[$this,$event]);
        }
        $this->daemon &&  Process::daemon(); // 是否守护进程
        $pool->start();
    }
    public function workerStart(Pool $pool,int $workerId){

        set_error_handler(function ($errno,$errstr,$errfile,$errline)  {
            //....
        });

        swoole_set_process_name("swoole:{$workerId}");
        ini_set('default_socket_timeout', -1);
        $socketFactory =new SocketFactory("127.0.0.1", 11300);
        $run = true;

        try{
            $socketFactory->create();
            $b = Pheanstalk::createWithFactory($socketFactory);
        }catch(Exception $e){
            // 连接失败
            echo "connect falied.\n";
            $run = false;
            sleep(12);// 连接失败,退出当前子进程,重新启动新的进程,重新连接
            return false;
        }
        $logPath="/youPath/log.txt";
        while($run){
            /*echo "wait task\n";*/
            try{
                // 在此一定必须设置阻塞时间,不能设置<=0的无限制阻塞,切记。
 /*
踩坑:
我当时在此踩过坑 我设置-1让其无限制阻塞等待任务进入导致消费端bug。
因为当生产者长时间未生产消息时,消费端一直阻塞导致假死的情况[出现僵死进程]。
这应该是beanstalk本身的问题。 一旦产生假死,有新任务进入的时候。消费端无法消费。
解决方案:
在 reserveWithTimeout(int); 方法中设置一个阻塞时间;当没有任务进入时,消费者达到阻塞时间也会向下执行而返回 null值。
我只需要在下面代码判断一下,而跳出当次循环重新阻塞int 秒钟即可。用以保证benstalk消费端的活性(类似线程检测),而不会出现僵死进程
*/
                $job = $p->watch("tubeName")->ignore("default")->reserveWithTimeout(60);// 阻塞时间
                if(!$job){
                    echo "超时跳出一次 \n";
                    continue;
                }
            }catch(Throwable $e){
                if($e instanceof  \Pheanstalk\Exception\ConnectionException){
                    // 与服务器连接中断异常,退出循环.退出了当前子进程,由父进程重新创建子进程进行检测beanstalk
                    $run = false; // 
                }
                // 1. beanstalk 因为异常原因断开连接的时候会触发异常,跳出当前循环即可
                // 2 . 当beanstalk 链接正常后, 该消费端会自动进行连接正常。 $p->watch() 会正常消费的
                echo "超时跳出一次";
                continue;
            }
            // 接收到客户端投递的任务
            $std=json_decode($job->getData());
            $std->coenv=true;
            $sms = new sms();
            $r= $sms->yunsmsSendSMS($std); // 执行业务,发送短信
            if($r=="100"){
                $p->delete($job); // 发送成功,删除任务
                $log = "发送成功\n";
            }else {
                $p->bury($job); // 发送失败,将任务预留。以供维护者查看原因和在此将其变为 ready 状态进行消费
                $log = "发送失败\n";
            }
        }
    }

    public function workerStop(Pool $pool,int $workerId){
        echo "重新启动\{$workerId}\n";
    }
}
$swoole = new swoole();
$swoole ->daemon = ($argv[1] == "daemon") ? true : false;
$swoole->main();
服务启动说明
$ php swoole.php #调试启动
$ nohup php swoole.php daemon &   # 守护进程启动
// 启动之后建议前往运维端查看 消费者数量与 swoole进程池数量是否一致,
// 不一致则需要检查消费端启动的异常情况了
# 以下通过简单的运维代码,检查beanstalk 的运行状况和 该管道消费端的启动状况
require "./vendor/autoload.php";
$p=Pheanstalk::create("ip",11300);
try{
    var_dump($p->statsTube("tubeName")->getArrayCopy()); // 成功返回该管道信息 是一个数组
// 如果执行成功,重点 查看以下2项,重点查看以下2项的值与  swoole进程池数量是否一致
//不一致则需要检查消费端启动的异常情况了
    [
        current-watching=> 20,//当前消费者数量,该值应与swoole进程池数量一致
        current-waiting=> 20 // 空闲消费者在等待的数量 = 消费者数量 - 当前reserved(正在处理任务进程数)值。
                                      //  当前没有任务在处理的时候,等于  current-watching 数量                                    
    ]
}catch(Throwable $e){
   // 当以下3种情况都出现时,该管道就被beanstalk系统自动删除了,会出现错误
    1当生产者未产生消息时; 
    2管道中无可用任务(删除的不算)时; 
    3消费者未启动的情况下。
    var_dump($e->getMessage()); // 错误信息提示管道不存在
}

生产者
require "./vendor/autoload.php";
$p=Pheanstalk::create("ip",11300);
$job=$p->useTube("tubeName")->put(json_encode([
        "mobile"=>"13012345678",
        "content"=>"XXXXXXXXXX",
    ],JSON_UNESCAPED_UNICODE));
 var_dump($job);
上一篇下一篇

猜你喜欢

热点阅读