11.task任务投递
2019-07-07 本文已影响0人
一个人的北京_
<?php
//tcp协议
$server=new Swoole\Server("0.0.0.0",9800); //创建server对象
//task_ipc_mode设置Task进程与Worker进程之间通信的方式。
//1, 使用Unix Socket通信,默认模式
//2, 使用消息队列通信
//3, 使用消息队列通信,并设置为争抢模式
//参考连接:https://wiki.swoole.com/wiki/page/296.html
//指定消息队列的key,指定之后,当服务器关闭再拉起之后,会自动根据key值继续消费队列。未消费的队列数据存储在服务器tmp临时文件夹中,
$key=ftok(__DIR__,1);
echo $key;
$server->set([
'worker_num'=>1, //设置进程
'task_worker_num'=>1, //task进程数
'task_ipc_mode'=>2,
'message_queue_key'=>$key,
]);
$server->on('start',function (){
});
$server->on('Shutdown',function (){
echo "正常关闭";
});
$server->on('workerStart',function ($server,$fd){
//include 'index.php';
if($server->taskworker){
echo 'task_worker:'.$server->worker_id.PHP_EOL;
}else{
echo 'worker:'.$server->worker_id.PHP_EOL;
}
});
//监听事件,连接事件
$server->on('connect',function ($server,$fd){
//echo "新的连接进入xxx:{$fd}".PHP_EOL;
});
//消息发送过来
$server->on('receive',function (swoole_server $server, int $fd, int $reactor_id, string $data){
//var_dump("消息发送过来:".$data);
//不需要立刻马上得到结果的适合task
$data=['tid'=>time()];
//Task传递数据的大小
//数据小于8k直接通过管道传递,数据大于8k写入服务器tmp临时文件传递
//onTask会读取这个文件,把他读出来
//这里为了测试方便,我们写了1MB数据
$data=str_repeat("a",1*1024*1024);
$server->task($data); //投递到taskWorker进程组,后面加taskwork_id表示指定投递到某个task进程,范围是0 - (serv->task_worker_num -1),参考连接:https://wiki.swoole.com/wiki/page/134.html
echo '异步非阻塞'.PHP_EOL;
//服务端
});
//ontask事件回调
$server->on('task',function ($server,$task_id,$form_id,$data){
var_dump($server->worker_id);
echo "任务来自于:$form_id".",任务id为{$task_id}".PHP_EOL;
try{
}catch (\Exception $e){
//$server->sendMessage();
}
sleep(10);
$server->finish("执行完毕");
});
$server->on('finish',function ($server,$task_id,$data){
//echo "任务{$task_id}执行完毕:{$data}".PHP_EOL;
// var_dump(posix_getpid());
//$server->send($data['fd'], "任务执行完毕");
});
//消息关闭
$server->on('close',function (){
//echo "消息关闭".PHP_EOL;
});
//服务器开启
$server->start();
查看消息队列的命令:ipcs -q