Asio无栈协程

2021-02-09  本文已影响0人  chnmagnus

对于Asio无栈协程做一个比较全面的导引,从使用、原理和实现三个方面进行叙述。

官方文档:Asio Stackless Coroutine

1 引入

Asio Stackless Coroutine不同于一般依赖编译器实现的无栈协程,其实现方式很巧妙,完全依赖c++本身的函数机制和语言特性实现(Duff's Device / switch)。

其初衷是 无缝结合asio的async api,同时屏蔽掉异步编程的复杂性,通过引入极少几个宏,提供一种简单的同步组织代码的方式。

使用传统的异步+回调,逻辑代码会被迫拆分成多段,通过多次回调才能完成一次逻辑,比如要发起一次http访问,代码可能像这样:

void DoHttpCall(string host) {
    // 解析域名,并注册下一步回调函数
    _resolver.async_resolve(host, "http", HandleResolve);
}
// 建立连接
void HandleResolve(const asio::error_code& ec,
    const tcp::resolver::results_type& endpoints){
    if (!ec){
      asio::async_connect(_socket, endpoints, HandleConnect);
    }
}
// 发送请求
void HandleConnect(const asio::error_code& ec){
    if (!ec){
        asio::async_write(_socket, _request, HandleWrite);
    }
}
// 读取响应并解析处理
void HandleWrite(const asio::error_code& ec) {
    // 通过async_read系列函数读取响应并解析
    // 后面应该还会有 反复多次 HandleReadLine 逐行解析http头部
    // 然后是 HandleContent 读取http body 等等调用
    // 此处略
}

如果使用asio无栈协程,则可以把所有逻辑流畅得放在一个函数里,代码会像这样(略去了错误处理):

void Do(std::shared_ptr<Context> client_ctx, asio::coroutine coro,
        asio::error_code ec, std::size_t length,
        asio::ip::tcp::resolver::results_type results) {

    auto gen_io_func = [&]() {
        return std::bind(&Do, client_ctx, coro, std::placeholders::_1,
                         std::placeholders::_2,
                         asio::ip::tcp::resolver::results_type());
    };

    reenter(coro) {
        // 解析域名
        yield {
            auto result_func =
                std::bind(&Do, client_ctx, coro, std::placeholders::_1, 0,
                          std::placeholders::_2);
            client_ctx->_resolver.async_resolve(client_ctx->_request._host,
                                                client_ctx->_request._port,
                                                result_func);
        }
        // 建立连接
        yield {
            auto conn_func =
                std::bind(&Do, client_ctx, coro, std::placeholders::_1, 0,
                          asio::ip::tcp::resolver::results_type());
            client_ctx->_socket.async_connect(*results, conn_func);
        }
        // 发送请求
        yield async_write(client_ctx->_socket, client_ctx->_request.ToBuffers(),
                          gen_io_func());

        client_ctx->_buffer.fill(0);
        // 读取并解析响应
        do {
            yield client_ctx->_socket.async_read_some(
                asio::buffer(client_ctx->_buffer), gen_io_func());
            std::tie(client_ctx->_valid_reply, std::ignore) =
                client_ctx->_reply_parser.Parse(
                    client_ctx->_reply, client_ctx->_buffer.data(),
                    client_ctx->_buffer.data() + length);
        } while (client_ctx->_valid_reply ==
                 ParseHelper::Result::indeterminate);
        // 处理响应
        // ...
    }
}

可以看到,使用了协程之后,所有的逻辑都被以同步的方式放在了一个函数中处理,其可维护性和可读性都有很大提升。
完整代码见这里.

2 使用

本章结合官方文档对协程所涉及的语法进行描述。对于官方文档中已经提及的内容,不会再重复赘述,更多是一些补充和扩展。

2.1 语法

基于asio::coroutine实现协程,每一个协程必定包含一个coroutine实例,一个包含reenter、yield的逻辑函数,一般如下:

class Session : asio::coroutine {
public:
    void operator()(error_code ec, std::size_t length = 0) {
        // 这里的逻辑,每次重新进入协程都会执行
        // 比如,可以在这里放置一些通用的错误处理逻辑
        if (!err) {
            // error handle
            return ;
        }

        // 协程内的具体业务逻辑要放在reenter内
        reenter(this) {
            // 记录当前位置,发起async_accept调用,并切出协程
            yield _acceptor->async_accept(_socket, *this);
            // 如果有连接进来,触发回调,回到此处继续执行

            // 记录当前位置,发起async_read调用,并切出协程
            yield _socket->async_read_some(asio::buffer(*_buffer), *this);
            // 如果有数据可读,触发回调,回到此处继续执行
        }

        // 这里的逻辑,每次切出协程均会执行
    }
};

下面对使用协程所涉及的各个关键字进行描述。

class coroutine

每个协程都需要存储其本身的状态,为此,有了class coroutine。coroutine类可以拷贝构造、可以赋值,其数据成员只有一个int,消耗很少。

class coroutine {
public:
  coroutine() : value_(0) {}
  /// Returns true if the coroutine is the child of a fork.
  bool is_child() const { return value_ < 0; }
  /// Returns true if the coroutine is the parent of a fork.
  bool is_parent() const { return !is_child(); }
  /// Returns true if the coroutine has reached its terminal state.
  bool is_complete() const { return value_ == -1; }
private:
  friend class detail::coroutine_ref;
  int value_;
};

coroutine一般有两种用法。一种是作为基类使用:

class session : coroutine {
    void operator()(error err);
    ...
};

或者作为类的一个成员变量:

class session {
    void operator()(error err);
    ...
    coroutine coro_;
};

具体的协程逻辑则在类的operator()运算符中实现。

这里思路可以放开一点,可以直接抛弃class,直接将其作为函数的一个参数,结合std::bind进行传递(可以参见上一节的代码片段)。只要在协程的存在期间,能够维持coroutine对象的拷贝即可,具体的方式并不重要。

void RunCoro(coroutine coro, error err);

reenter

reenter用来定义协程的主体,协程的主要逻辑要放在reenter限定的作用域内。

协程采用继承coroutine方式实现时,你的代码会像这样:

reenter (this) {
    // coroutine body
}

如果采用成员变量或者函数形式实现,则像这样:

reenter (_coro) {
    // coroutine body
}

后文提及的关键字(yield、fork)只能在reenter限定的范围内使用;这样才能保证协程重新切回后,能够继续之前的位置执行。另外,要注意,reenter是基于switch实现,所以在协程体内,有着和switch一样的限制,要时刻留意临时变量的定义问题。

yield

yield关键字的用法,官方文档描述的很清晰,不在此赘述。

一个小的窍门是用yield {}来规避switch的局部变量限制:

yield {
    auto conn_func =
        std::bind(&Do, client_ctx, coro, std::placeholders::_1, 0,
                    asio::ip::tcp::resolver::results_type());
    client_ctx->_socket.async_connect(*results, conn_func);
}

要注意一个非常容易踩且比较隐晦的坑:

class downloader : asio::coroutine {
    using socket_t = boost::asio::ip::tcp::socket;
    using resolver_t = boost::asio::ip::tcp::resolver;

    std::shared_ptr<socket_t> socket_;
    std::shared_ptr<resolver_t> resolver_;

    const std::string file_;
    const std::string host_;
    size_t fileSize_{};

public:
    void operator()(error_code ec = error_code(), std::size_t length = 0,
                  const resolver_t::results_type &results = {}) {

        // Check if the last yield resulted in an error.
        if (ec &7 ec != eof) {
            throw std::runtime_error(ec.message());
        }

        // Jump to after the previous yield.
        reenter(this) {
            // 问题根源在这里
            yield {
                resolver_t::query query{host_, "80"};

                // Use bind to skip the length parameter not provided by async_resolve
                auto result_func = std::bind(&downloader::operator(), this, _1, 0, _2);

                resolver_->async_resolve(query, result_func);
            }

            // 这里会偶发段错误
            yield socket_->async_connect(*results, *this);

            // 略去其他逻辑
            //......
        }
    }

上面这段代码,将asio::coroutine作为基类来使用,然后重写了operator()运算符,通过传递*this拷贝类本身来传递coroutine实例,一切看起来很正确?
问题在于,这种基于基类的方式,和std::bind进行结合时,bind产生的可调用体中仅包含了this指针的拷贝,而不是类对象的拷贝。当类对象被释放后,这个this指针会变成野指针,在协程重新切回之后,下一次的async_connect(*results, *this),对一个野指针进行寻址访问,会发生什么,不必多说。

完整的错误示范可以见stackoverflow这个问题的accepted answer

要注意,这个回答是错的,是错的,是错的。

fork

fork关键字的用法,官方文档描述的很清晰,不在此赘述。

fork一般会用在服务端,比如accept到新的连接,启动一个子协程去处理。

2.2 示例

这里给出两个示例,基本涵盖了asio::coroutine的所有用法。

基于asio::coroutine实现的http client
基于asio::coroutine实现的http server

3 原理

3.1 Duff设备与switch case的本质

Duff's Device

在上世纪,计算机单核性能较低,人们经常使用一种被称为“循环展开”的方式优化代码,使其在多核处理器和流水线处理器上性能更好。

简单来说,这样一段循环1000w次的代码:

int result = 0;
for (int i = 0; i < 10000000; ++i) {
    sum += i; 
}

可以修改成这样,只需要循环200w次即可:

int result = 0;
for (int i = 0; i < 2000000; i += 5) {
    result += i;
    result += i+1;
    result += i+2;
    result += i+3;
    result += i+4;
}

Duff在编写一个数组拷贝函数时(将from指针开始count个元素的内容拷贝到to数组),使用到了上述技术。但是,数组长度往往无法被循环展开的次数整除(count%8 != 0),这意味着除了一个while循环之外,还需要一个额外的switch来将n无法整除的部分单独执行。代码如下:

void copy(char *to, char *from, int count) {
    int n = (count + 7) / 8;
    switch (count % 8) {
        case 0: *to = *from++;
        case 7: *to = *from++;
        case 6: *to = *from++;
        case 5: *to = *from++;
        case 4: *to = *from++;
        case 3: *to = *from++;
        case 2: *to = *from++;
        case 1: *to = *from++;
    }
    while (--n > 0) {
        *to = *from++;
        *to = *from++;
        *to = *from++;
        *to = *from++;
        *to = *from++;
        *to = *from++;
        *to = *from++;
        *to = *from++;
    }
}

Duff稍加思索,将switch和while结合到一起,写出了如下的代码。

当函数开始执行时,会根据count%8算出的余数,跳转到case 0-7对应的代码执行,因为没有显式break,switch不会跳出,而是会继续执行跳转位置后方case的内容,这会完成无法整除部分的拷贝。直到执行到while语句,被循环捕获,进入while循环,这会完成整除部分的拷贝。

void copy(char *to, char *from, int count) {
    int n = (count + 7) / 8;
    switch (count % 8) {
    case 0: do { *to = *from++;
    case 7:      *to = *from++;
    case 6:      *to = *from++;
    case 5:      *to = *from++;
    case 4:      *to = *from++;
    case 3:      *to = *from++;
    case 2:      *to = *from++;
    case 1:      *to = *from++;
            } while (--n > 0);
    }
}

switch case的本质

Duff's Device向我们揭示了这样一个事实:switch case后的语句,并没有我们想象中诸如代码块or作用域之类的限制,case的角色本质上是一种特殊的标签,可以在代码的任何地方放置case,且不会影响其他代码的执行。swtich的角色则是限定switch case标签的作用域,并且维护somthing like jump table,用于case匹配时的跳转。
如果你打开c标准Labeled statements一节,你会发现 标签、case、default 一同被归为标签语句。

可以试试这个hello world,体会一下switch case的行为。

#include <iostream>

using std::cout;
using std::endl;

int main(int argc, char *argv[]) {
    // int a(0);
    int a(1);
    switch (a) case(0):for(;;) {
        cout<<"hello"<<endl;
        case (1):
        cout<<"world"<<endl;
        return 0;
    }
}

3.2 基于switch case实现协程

asio::coroutine正是受Duff's Device的启发,基于switch case这一特性实现的,没有涉及任何汇编。

协程的切换方式其实和函数很像,但其对比函数的区别是,协程的重要特性是可以返回并继续,当协程被重新切回后,应该能从其切走时的位置继续执行。
如果要把一个函数改造为协程,一种朴素的想法是,给这个函数增加一个静态变量state,在所有函数要恢复执行的位置,放置一个label。并在函数切走之前,设置state的值标识切回后,函数继续执行的位置。比如这样:

假定这是一个需要循环对某个server发起心跳探活的函数实现:

void coro_func(error_code ec) {
    static int state = 0;
    switch(state) {
        case 0: goto LABEL0;
        case 1: goto LABEL1;
        case 2: goto LABEL2;
    }
LABEL0:
    for(;;) {
        // 发起连接
        state = 1;
        asio::async_connect(socket, endpoints, &coro_func);
        return;
LABEL1:
        // 写入数据
        state = 2;
        asio::async_write(socket, request, &coro_func);
        return;
LABEL2:
        // 读取响应并处理,略
        // sleep一段时间,略
    }
}

这样的“协程”稍显简陋,主要的问题在于,所有的label都要手动设置,并时刻需要维护switch与label的一致性。
现在是Duff's Device大展身手的时候了:

void coro_func(error_code ec) {
    static int state = 0;
    switch(state) {
        case 0:
        for(;;) {
            // 发起连接
            state = 1;
            asio::async_connect(socket, endpoints, &coro_func);
            return;
        case 1:
            // 写入数据
            state = 2;
            asio::async_write(socket, request, &coro_func);
            return;
        case 2:
            // 读取响应并处理,略
            // sleep一段时间,略
        }
    }
}

现在只剩下两个问题:

  1. 将函数开头用于保存恢复位置的静态变量state用其他方式实现(多线程成精局部静态变量是非线程安全的),可以考虑使用类成员变量,或者函数参数等方式记录状态
  2. 现在我们仍然需要手动编写switch case,更好的方式是基于__LINE__封装一组宏,自动实现跳转恢复的逻辑,以保证代码的可读性和易用性

来看看asio::coroutine是如何实现的。

4 实现

asio::coroutine代码非常少,全部代码不过100行,下面贴出代码,并在每个代码片段之前以及代码中的注释中给出解释。

在看代码之前,你要理清几点:

  1. 基于Duff's Device的协程,其本质上是可以恢复并继续执行的函数,每一次切出,都是正常的函数return
  2. 协程的恢复(切回),都是正常的函数调用,都会从协程函数的开头开始执行,执行到reenter时,通过reenter宏定义的switch,基于你传入的class corourine对象中的value_跳转到具体的case继续执行
  3. 函数的恢复(切回)这一行为的触发,asio::coroutine没有定义,如果结合asio使用,切回的实现,是将协程函数注册为async_xxx系列函数的回调,然后在事件ready后,回调触发协程恢复,这可以让coroutine和异步接口无缝衔接

class coroutine用于替代上一节中 静态变量 state 的作用,主要作用是通过value_成员记录协程恢复后继续执行的位置。
除此之外,还通过一些特殊的取值,比如-1来标识协程退出等状态(其实这些完全可以单独搞一个bool,放在一起是为了节省内存使用和拷贝的消耗)。

// 协程上下文
class coroutine {
public:
  /// Constructs a coroutine in its initial state.
  coroutine() : value_(0) {}

  // 用于在fork关键字后,区分当前是子协程还是父协程
  // 要注意的是,is_child只有在子协程第一次进入,没有执行过yield进行切出前有效
  // 一旦你进行了切出,子协程的value_也会被重置为正值
  // Returns true if the coroutine is the child of a fork.
  bool is_child() const { return value_ < 0; }
  // Returns true if the coroutine is the parent of a fork.
  bool is_parent() const { return !is_child(); }

  // value_如果等于-1 代表协程已经结束
  // 详见 ~coroutine_ref() 注释
  bool is_complete() const { return value_ == -1; }

private:
  friend class detail::coroutine_ref;
  int value_;
};

class coroutine 对象的生命周期贯穿整个协程,而coroutine_ref在每次进入协程时创建,在每次切出协程后释放。
同时协程中所有对class coroutine中value_的修改,都是通过coroutine_ref进行的。

// 一个对于class coroutine的引用类
class coroutine_ref {
public:
  // 从class coroutine对象构造
  coroutine_ref(coroutine& c) : value_(c.value_), modified_(false) {}
  coroutine_ref(coroutine* c) : value_(c->value_), modified_(false) {}

  // 如果是通过yield临时切出协程,则一定会修改value_,即modified_一定为true
  // 只有通过return、抛出异常,或者函数正常执行到末尾退出协程函数,modified_才会是false
  // 即,当modifed_为false时,代表协程已经结束,此时将value_设为-1,表示协程已经complete
  ~coroutine_ref() { if (!modified_) value_ = -1; }

  // 如果对value_进行修改,同步将modified_设为true
  int& operator=(int v) { modified_ = true; return value_ = v; }

  operator int() const { return value_; }
private:
  void operator=(const coroutine_ref&);
  int& value_;
  bool modified_;
};

reenter 是所有协程都要包含的一个宏,用于开始协程体的定义,定义switch,以及一些特殊的case。

case 0后的逻辑在协程第一次进入时跳转执行。

如果是协程重新切回,则reenter中的switch会将其跳转到切出前的位置继续执行。

case -1后的逻辑,一方面,用于处理协程已经退出,但又被重新调度执行的情况;另一方面,可由其他位置的goto语句(比如yield)进入,执行协程切换时的退出逻辑。
terminate_coroutine 标签后的代码 定义了终止协程要做的逻辑。
bail_out_of_coroutine 标签后的代码 定义了退出协程要做的逻辑。

// reenter的实现
#define ASIO_CORO_REENTER(c) \
  switch (::asio::detail::coroutine_ref _coro_value = c) \
    case -1: if (_coro_value) \
    { \
      goto terminate_coroutine; \
      terminate_coroutine: \
      _coro_value = -1; \
      goto bail_out_of_coroutine; \
      bail_out_of_coroutine: \
      break; \
    } \
    else /* fall-through */ case 0:

#ifndef reenter
# define reenter(c) ASIO_CORO_REENTER(c)
#endif

yield 在需要临时切出当前协程,或者要终止协程时调用。

for (_coro_value = (n);;) 此类写法起到赋值并延展作用域的作用。

if (_coro_value == 0) 此判断永远为false,主动调用yield不会进入该分支,此分支判断中的代码,仅在协程切回,通过switch case jump进入执行。执行逻辑仅一个break,效果是跳出一开始的for作用域,直接继续执行yield语句后方的代码。

else switch (_coro_value ? 0 : 1)后的代码比较复杂,主要是为了实现yield的多种功能,在看这里的代码之前,先回顾下yield的几种用法。yield共有五种用法,在这里,我们把它分成两类:

第一类,yield后跟一个用户自定义的语句或者空语句,此时yield的行为是:
记录当前的执行位置 -> 执行用户定义的语句 -> 跳出reenter限定的作用域执行函数末尾的代码然后返回
此后,协程如果被恢复执行,会接着yield下一行代码继续执行。

第二类,yield接break,此时的yield的行为是:
终止协程的运行(将value_设为-1) -> 跳出reenter限定的作用域执行函数末尾的代码然后返回
此后,协程完全终止,不再可以被恢复执行。

回到代码,else switch (_coro_value ? 0 : 1)中的_coro_value恒大于0,所以一定会跳到末尾的case 0执行,此时执行用户定义的语句。

// yield的实现
#define ASIO_CORO_YIELD_IMPL(n) \
  for (_coro_value = (n);;) \
    if (_coro_value == 0) \
    { \
      case (n): ; \
      break; \
    } \
    else \
      switch (_coro_value ? 0 : 1) \
        for (;;) \
          /* fall-through */ case -1: if (_coro_value) \
            goto terminate_coroutine; \
          else for (;;) \
            /* fall-through */ case 1: if (_coro_value) \
              goto bail_out_of_coroutine; \
            else /* fall-through */ case 0:

# define ASIO_CORO_YIELD ASIO_CORO_YIELD_IMPL(__LINE__)

#ifndef yield
# define yield ASIO_CORO_YIELD
#endif

fork 在创建子协程时调用。

仔细看代码,你会发现,这里没有做任何与fork相关的操作,创建和启动子协程的操作是由用户编写的。

这里做的仅仅是在执行用户提供的子协程代码之前,将coro_value改为负值;
用户的子协程构造代码,会把 class coroutine 拷贝一份,拷贝构造出的class coroutine中的value
即为负值。

在用户子协程函数返回之后,再把_coro_value改回正值,并继续后面的代码执行。

// fork的实现
#define ASIO_CORO_FORK_IMPL(n) \
  for (_coro_value = -(n);; _coro_value = (n)) \
    if (_coro_value == (n)) \
    { \
      case -(n): ; \
      break; \
    } \
    else

# define ASIO_CORO_FORK ASIO_CORO_FORK_IMPL(__LINE__)

#ifndef fork
# define fork ASIO_CORO_FORK
#endif
上一篇下一篇

猜你喜欢

热点阅读