c语言扩展操作beanstalk在 swoole 进程池 Poo

2020-03-31  本文已影响0人  骑蚂蚁上高速_jun

直接上代码说明:

/**
 * 链接
*/
private function connectBeanstalk():Beanstalk{
    $b = new Beanstalk();
    $b->connect("127.0.0.1",11300);
    return $b;
}
/**
     * 守护进程运行
     * @param Pool $pool
     * @param int $workerId
     */
    public function workerStart(Pool $pool,int $workerId){
        swoole_set_process_name("ansyc:image:{$workerId}");
        $b = $this->connectBeanstalk();
        $run  =true;
        // 判断是否连接成功
        if(isset($b->connection) ? true :  false){
            $b->watch($this->tubeName); // 连接成功 监听 队列
        }else{
            echo "connect falied.\n";
            $run = false;
            sleep(12);// 连接失败,退出当前子进程,重新启动新的进程,重新连接
            return false;
        }
        pcntl_signal(SIGTERM, function ($signo) use(&$run,$pool){
            echo "pcntl 收到 SIGTERM 信号\n";
            $run=false;
        });
        ini_set("default_socket_timeout",-1);
        echo "进程 {$workerId} 启动\n";

        while($run){
            pcntl_signal_dispatch();
            // 检测连接是否有效
           if(!isset($bean->connection)){
                echo "重新生成进程 \n"; 
                $run = false; // 连接失败,重新fork 进程
            }
            echo "等待任务进入 \n";
             $job = $b->reserve(12);//没有任务的时候 阻塞 12s
              if($job == "TIMED_OUT\r\n"){
                // "心跳检测"; 超时啦
                continue;
            }
                if(($jobId = $job["id"] ?? 0) > 0){
                    echo "Pid:{$workerId} 存在下载任务 {$jobId} \n";
                    $data = $job["data"];
                    $std = @json_decode($data);
                    if($std){
                        echo "存在合法任务. \n";
                        // todo 处理任务 处理成功删除任务($b->delete($jobId)); 处理失败保存任务($b->bury($jobId))
                    }else{
                        echo "非法任务直接删除. \n";
                        $b->delete($jobId);
                    }
                }
        }
    }
上一篇下一篇

猜你喜欢

热点阅读