11.task任务投递

2019-07-07  本文已影响0人  一个人的北京_
<?php
//tcp协议
$server=new Swoole\Server("0.0.0.0",9800);   //创建server对象

//task_ipc_mode设置Task进程与Worker进程之间通信的方式。
//1, 使用Unix Socket通信,默认模式
//2, 使用消息队列通信
//3, 使用消息队列通信,并设置为争抢模式
//参考连接:https://wiki.swoole.com/wiki/page/296.html

//指定消息队列的key,指定之后,当服务器关闭再拉起之后,会自动根据key值继续消费队列。未消费的队列数据存储在服务器tmp临时文件夹中,
$key=ftok(__DIR__,1);
echo $key;

$server->set([
    'worker_num'=>1, //设置进程
    'task_worker_num'=>1,  //task进程数
    'task_ipc_mode'=>2,
    'message_queue_key'=>$key,
]);

$server->on('start',function (){

});


$server->on('Shutdown',function (){
    echo "正常关闭";
});

$server->on('workerStart',function ($server,$fd){
    //include 'index.php';
    if($server->taskworker){
        echo 'task_worker:'.$server->worker_id.PHP_EOL;
    }else{
        echo 'worker:'.$server->worker_id.PHP_EOL;
    }

});


//监听事件,连接事件
$server->on('connect',function ($server,$fd){

    //echo "新的连接进入xxx:{$fd}".PHP_EOL;
});


//消息发送过来
$server->on('receive',function (swoole_server $server, int $fd, int $reactor_id, string $data){
    //var_dump("消息发送过来:".$data);
    //不需要立刻马上得到结果的适合task
    $data=['tid'=>time()];
    //Task传递数据的大小
    //数据小于8k直接通过管道传递,数据大于8k写入服务器tmp临时文件传递
    //onTask会读取这个文件,把他读出来
   //这里为了测试方便,我们写了1MB数据
    $data=str_repeat("a",1*1024*1024);

    $server->task($data); //投递到taskWorker进程组,后面加taskwork_id表示指定投递到某个task进程,范围是0 - (serv->task_worker_num -1),参考连接:https://wiki.swoole.com/wiki/page/134.html
    echo '异步非阻塞'.PHP_EOL;
    //服务端
});

//ontask事件回调
$server->on('task',function ($server,$task_id,$form_id,$data){
    var_dump($server->worker_id);
    echo "任务来自于:$form_id".",任务id为{$task_id}".PHP_EOL;

    try{

    }catch (\Exception $e){
        //$server->sendMessage();
    }

    sleep(10);
    $server->finish("执行完毕");
});

$server->on('finish',function ($server,$task_id,$data){
    //echo "任务{$task_id}执行完毕:{$data}".PHP_EOL;
    // var_dump(posix_getpid());

    //$server->send($data['fd'], "任务执行完毕");
});



//消息关闭
$server->on('close',function (){
    //echo "消息关闭".PHP_EOL;
});

//服务器开启
$server->start();

查看消息队列的命令:ipcs -q
上一篇下一篇

猜你喜欢

热点阅读