协程coroutine的PHP与Golang实现

2021-05-23  本文已影响0人  Separes

1、概念

单进程OS.png 多进程OS.png 将单线程拆分为thread和coroutine 协程的调度模型 go的GMP模型,源自 https://www.bilibili.com/video/BV1gf4y1r79E?p=26

2、实现

2.1、 原生PHP实现简单的协程

一个调度器demo(源自 https://segmentfault.com/a/1190000012457145

/**
 * Class Scheduler
 */
Class Scheduler
{
    /**
     * @var SplQueue
     */
    protected $taskQueue;
    /**
     * @var int
     */
    protected $tid = 0;

    /**
     * Scheduler constructor.
     */
    public function __construct()
    {
        /* 原理就是维护了一个队列,
         * 前面说过,从编程角度上看,协程的思想本质上就是控制流的主动让出(yield)和恢复(resume)机制
         * */
        $this->taskQueue = new SplQueue();
    }

    /**
     * 增加一个任务
     *
     * @param Generator $task
     * @return int
     */
    public function addTask(Generator $task)
    {
        $tid = $this->tid;
        $task = new Task($tid, $task);
        $this->taskQueue->enqueue($task);
        $this->tid++;
        return $tid;
    }

    /**
     * 把任务进入队列
     *
     * @param Task $task
     */
    public function schedule(Task $task)
    {
        $this->taskQueue->enqueue($task);
    }

    /**
     * 运行调度器
     */
    public function run()
    {
        while (!$this->taskQueue->isEmpty()) {
            // 任务出队
            $task = $this->taskQueue->dequeue();
            $res = $task->run(); // 运行任务直到 yield

            if (!$task->isFinished()) {
                $this->schedule($task); // 任务如果还没完全执行完毕,入队等下次执行
            }
        }
    }
}

调度器与协程的使用

<?php
function task1() {
    for ($i = 1; $i <= 10; ++$i) {
        echo "This is task 1 iteration $i.\n";
        yield; // 主动让出CPU的执行权
    }
}
 
function task2() {
    for ($i = 1; $i <= 5; ++$i) {
        echo "This is task 2 iteration $i.\n";
        yield; // 主动让出CPU的执行权
    }
}
 
$scheduler = new Scheduler; // 实例化一个调度器
$scheduler->addTask(task1()); // 添加不同的闭包函数作为任务
$scheduler->addTask(task2());
$scheduler->run();

2.2 PHP-Swoole

PHP版本要求:>= 7.0
基于Server、Http\Server、WebSocket\Server进行开发,底层在onRequet, onReceive, onConnect等事件回调之前自动创建一个协程,在回调函数中使用协程API
使用Coroutine::create或go方法创建协程,在创建的协程中使用协程API

function Swoole\Coroutine::create(callable $function, ...$args) : int|false;
或
function go(callable $function, ...$args) : int|false; // 短名API

创建失败返回false
创建成功返回协程的ID
由于底层会优先执行子协程的代码,因此只有子协程挂起时,Coroutine::create才会返回,继续执行当前协程的代码。

在一个协程中使用go嵌套创建新的协程。因为Swoole的协程是单线程模型,因此:
使用go创建的子协程会优先执行,子协程执行完毕或挂起时,将重新回到父协程向下执行代码
如果子协程挂起后,父协程退出,不影响子协程的执行

go(function() {
    go(function () {
        co::sleep(3.0);
        go(function () {
            co::sleep(2.0);
            echo "co[3] end\n";
        });
        echo "co[2] end\n";
    });

    co::sleep(1.0);
    echo "co[1] end\n";
});

协程需要创建单独的内存栈,在PHP-7.2版本中底层会分配8K的stack来存储协程的变量,zval的尺寸为16字节,因此8K的stack最大可以保存512个变量。协程栈内存占用超过8K后ZendVM会自动扩容。
协程退出时会释放申请的stack内存。
PHP-7.1、PHP-7.0默认会分配256K栈内存
可调用Co::set(['stack_size' => 4096])修改默认的栈内存尺寸

2.3 Golang


package main

import (
    "fmt"
    "time"
)

//子goroutine
func newTask() {
    i := 0
    for {
        i++
        fmt.Printf("new Goroutine : i = %d\n", i)
        time.Sleep(1 * time.Second)
    }
}

//主goroutine
func main() {
    //创建一个go程 去执行newTask() 流程
    go newTask()

    fmt.Println("main goroutine exit")

    /*
        i := 0
        for {
            i++
            fmt.Printf("main goroutine: i = %d\n", i)
            time.Sleep(1 * time.Second)
        }
    */
}


3、对比

Go
func test() {
    db := new(database)
    close := db.connect()

    go func(db) {
        db.query(sql);
    } (db);

    go func(db) {
        db.query(sql);
    } (db);
}

Go是允许这样操作的,实际上这个可能会存在严重问题。socket读写操作产生并发,可能产生数据包错乱。

Swoole中禁止了这种行为。不允许多个协程同时读取同一个Socket。否则会产生致命错误。

PHP
function test() {
    $db = new Database;
    $db->connect('127.0.0.1', 6379);

    go(function () use ($db) {
        $db->query($sql);
    });

    go(function () use ($db) {
        $db->query($sql);
    });
}

以上代码中有2个协程同时操作$db对象,可能会产生严重错误。底层会直接抛出致命错误,错误信息为:

"%s has already been bound to another coroutine#%ld,
reading or writing of the same socket in multiple coroutines at the same time is not allowed."
错误码:SW_ERROR_CO_HAS_BEEN_BOUND

使用Channel或SplQueue实现连接池,管理资源对象,就可以很好地解决此问题。


上一篇 下一篇

猜你喜欢

热点阅读