php实现Promise.all和Promise.race

2023-01-15  本文已影响0人  mrpzx001
$promise1 = function () {
    msleep(500);
    return 'one';
};
$promise2 = function () {
    msleep(100);
    return 'two';
};
$promise3 = function () {
    msleep(50);
    throw new \Exception('Reject');
};
var_dump(promise_all([$promise1, $promise2]));
var_dump(promise_race([$promise1, $promise2]));
var_dump(promise_race([$promise1, $promise2, $promise3]));
# php promise.php
array(2) {
  [1]=>
  string(3) "two"
  [0]=>
  string(3) "one"
}
string(3) "two"
object(Exception)#15 (7) {}
<?php

declare(strict_types=1);

use Swow\Coroutine;
use Swow\Channel;
use Swow\Sync\WaitGroup;
use function Swow\defer;

/**
 * @param array $callbacks 
 * @param int $parallel 并发数量
 * @return array 
 */
function promise_all(array $callbacks, int $parallel = -1)
{
    $wg = new WaitGroup();
    $channel = new Channel($parallel);
    $results = [];
    foreach ($callbacks as $key => $callback) {
        $wg->add();
        $channel->push(true);
        Coroutine::run(static function () use ($wg, $channel, $callback, $key, &$results) {
            try {
                $results[$key] = $callback();
            } catch (\Throwable) {
            } finally {
                $channel->pop();
                $wg->done();
            }
        });
    }
    $wg->wait();
    return $results;
}

/**
 * @param array $callbacks
 * @param int $timeout 超时
 * @param bool $throw 是否抛出异常
 * @return mixed
 */
function promise_race(array $callbacks, int $timeout = -1, bool $throw = true): mixed
{
    $coroutines = [];
    defer(static function () use (&$coroutines) {
        foreach ($coroutines as $coroutine) {
            if ($coroutine->isExecuting()) {
                $coroutine->kill();
            }
        }
    });
    $channel = new Channel();
    foreach ($callbacks as $callback) {
        $coroutines[] = Coroutine::run(static function () use ($channel, $callback) {
            try {
                $channel->push($callback());
            } catch (\Throwable $e) {
                $channel->push($e);
            }
        });
    }
    try {
        return $channel->pop($timeout);
    } catch (\Throwable $e) {
        if ($throw) {
            throw $e;
        }
    }
    return false;
}
上一篇 下一篇

猜你喜欢

热点阅读