php实现Promise.all和Promise.race
2023-01-15 本文已影响0人
mrpzx001
- 测试
$promise1 = function () {
msleep(500);
return 'one';
};
$promise2 = function () {
msleep(100);
return 'two';
};
$promise3 = function () {
msleep(50);
throw new \Exception('Reject');
};
var_dump(promise_all([$promise1, $promise2]));
var_dump(promise_race([$promise1, $promise2]));
var_dump(promise_race([$promise1, $promise2, $promise3]));
- 结果
# php promise.php
array(2) {
[1]=>
string(3) "two"
[0]=>
string(3) "one"
}
string(3) "two"
object(Exception)#15 (7) {}
- 实现
<?php
declare(strict_types=1);
use Swow\Coroutine;
use Swow\Channel;
use Swow\Sync\WaitGroup;
use function Swow\defer;
/**
* @param array $callbacks
* @param int $parallel 并发数量
* @return array
*/
function promise_all(array $callbacks, int $parallel = -1)
{
$wg = new WaitGroup();
$channel = new Channel($parallel);
$results = [];
foreach ($callbacks as $key => $callback) {
$wg->add();
$channel->push(true);
Coroutine::run(static function () use ($wg, $channel, $callback, $key, &$results) {
try {
$results[$key] = $callback();
} catch (\Throwable) {
} finally {
$channel->pop();
$wg->done();
}
});
}
$wg->wait();
return $results;
}
/**
* @param array $callbacks
* @param int $timeout 超时
* @param bool $throw 是否抛出异常
* @return mixed
*/
function promise_race(array $callbacks, int $timeout = -1, bool $throw = true): mixed
{
$coroutines = [];
defer(static function () use (&$coroutines) {
foreach ($coroutines as $coroutine) {
if ($coroutine->isExecuting()) {
$coroutine->kill();
}
}
});
$channel = new Channel();
foreach ($callbacks as $callback) {
$coroutines[] = Coroutine::run(static function () use ($channel, $callback) {
try {
$channel->push($callback());
} catch (\Throwable $e) {
$channel->push($e);
}
});
}
try {
return $channel->pop($timeout);
} catch (\Throwable $e) {
if ($throw) {
throw $e;
}
}
return false;
}