Asio无栈协程
对于Asio无栈协程做一个比较全面的导引,从使用、原理和实现三个方面进行叙述。
1 引入
Asio Stackless Coroutine不同于一般依赖编译器实现的无栈协程,其实现方式很巧妙,完全依赖c++本身的函数机制和语言特性实现(Duff's Device / switch)。
其初衷是 无缝结合asio的async api,同时屏蔽掉异步编程的复杂性,通过引入极少几个宏,提供一种简单的同步组织代码的方式。
使用传统的异步+回调,逻辑代码会被迫拆分成多段,通过多次回调才能完成一次逻辑,比如要发起一次http访问,代码可能像这样:
void DoHttpCall(string host) {
// 解析域名,并注册下一步回调函数
_resolver.async_resolve(host, "http", HandleResolve);
}
// 建立连接
void HandleResolve(const asio::error_code& ec,
const tcp::resolver::results_type& endpoints){
if (!ec){
asio::async_connect(_socket, endpoints, HandleConnect);
}
}
// 发送请求
void HandleConnect(const asio::error_code& ec){
if (!ec){
asio::async_write(_socket, _request, HandleWrite);
}
}
// 读取响应并解析处理
void HandleWrite(const asio::error_code& ec) {
// 通过async_read系列函数读取响应并解析
// 后面应该还会有 反复多次 HandleReadLine 逐行解析http头部
// 然后是 HandleContent 读取http body 等等调用
// 此处略
}
如果使用asio无栈协程,则可以把所有逻辑流畅得放在一个函数里,代码会像这样(略去了错误处理):
void Do(std::shared_ptr<Context> client_ctx, asio::coroutine coro,
asio::error_code ec, std::size_t length,
asio::ip::tcp::resolver::results_type results) {
auto gen_io_func = [&]() {
return std::bind(&Do, client_ctx, coro, std::placeholders::_1,
std::placeholders::_2,
asio::ip::tcp::resolver::results_type());
};
reenter(coro) {
// 解析域名
yield {
auto result_func =
std::bind(&Do, client_ctx, coro, std::placeholders::_1, 0,
std::placeholders::_2);
client_ctx->_resolver.async_resolve(client_ctx->_request._host,
client_ctx->_request._port,
result_func);
}
// 建立连接
yield {
auto conn_func =
std::bind(&Do, client_ctx, coro, std::placeholders::_1, 0,
asio::ip::tcp::resolver::results_type());
client_ctx->_socket.async_connect(*results, conn_func);
}
// 发送请求
yield async_write(client_ctx->_socket, client_ctx->_request.ToBuffers(),
gen_io_func());
client_ctx->_buffer.fill(0);
// 读取并解析响应
do {
yield client_ctx->_socket.async_read_some(
asio::buffer(client_ctx->_buffer), gen_io_func());
std::tie(client_ctx->_valid_reply, std::ignore) =
client_ctx->_reply_parser.Parse(
client_ctx->_reply, client_ctx->_buffer.data(),
client_ctx->_buffer.data() + length);
} while (client_ctx->_valid_reply ==
ParseHelper::Result::indeterminate);
// 处理响应
// ...
}
}
可以看到,使用了协程之后,所有的逻辑都被以同步的方式放在了一个函数中处理,其可维护性和可读性都有很大提升。
完整代码见这里.
2 使用
本章结合官方文档对协程所涉及的语法进行描述。对于官方文档中已经提及的内容,不会再重复赘述,更多是一些补充和扩展。
2.1 语法
基于asio::coroutine实现协程,每一个协程必定包含一个coroutine实例,一个包含reenter、yield的逻辑函数,一般如下:
class Session : asio::coroutine {
public:
void operator()(error_code ec, std::size_t length = 0) {
// 这里的逻辑,每次重新进入协程都会执行
// 比如,可以在这里放置一些通用的错误处理逻辑
if (!err) {
// error handle
return ;
}
// 协程内的具体业务逻辑要放在reenter内
reenter(this) {
// 记录当前位置,发起async_accept调用,并切出协程
yield _acceptor->async_accept(_socket, *this);
// 如果有连接进来,触发回调,回到此处继续执行
// 记录当前位置,发起async_read调用,并切出协程
yield _socket->async_read_some(asio::buffer(*_buffer), *this);
// 如果有数据可读,触发回调,回到此处继续执行
}
// 这里的逻辑,每次切出协程均会执行
}
};
下面对使用协程所涉及的各个关键字进行描述。
class coroutine
每个协程都需要存储其本身的状态,为此,有了class coroutine。coroutine类可以拷贝构造、可以赋值,其数据成员只有一个int,消耗很少。
class coroutine {
public:
coroutine() : value_(0) {}
/// Returns true if the coroutine is the child of a fork.
bool is_child() const { return value_ < 0; }
/// Returns true if the coroutine is the parent of a fork.
bool is_parent() const { return !is_child(); }
/// Returns true if the coroutine has reached its terminal state.
bool is_complete() const { return value_ == -1; }
private:
friend class detail::coroutine_ref;
int value_;
};
coroutine一般有两种用法。一种是作为基类使用:
class session : coroutine {
void operator()(error err);
...
};
或者作为类的一个成员变量:
class session {
void operator()(error err);
...
coroutine coro_;
};
具体的协程逻辑则在类的operator()运算符中实现。
这里思路可以放开一点,可以直接抛弃class,直接将其作为函数的一个参数,结合std::bind进行传递(可以参见上一节的代码片段)。只要在协程的存在期间,能够维持coroutine对象的拷贝即可,具体的方式并不重要。
void RunCoro(coroutine coro, error err);
reenter
reenter用来定义协程的主体,协程的主要逻辑要放在reenter限定的作用域内。
协程采用继承coroutine方式实现时,你的代码会像这样:
reenter (this) {
// coroutine body
}
如果采用成员变量或者函数形式实现,则像这样:
reenter (_coro) {
// coroutine body
}
后文提及的关键字(yield、fork)只能在reenter限定的范围内使用;这样才能保证协程重新切回后,能够继续之前的位置执行。另外,要注意,reenter是基于switch实现,所以在协程体内,有着和switch一样的限制,要时刻留意临时变量的定义问题。
yield
yield关键字的用法,官方文档描述的很清晰,不在此赘述。
- yield statement ;
- yield { statements }
- yield return expression ;
- yield ;
- yield break ;
一个小的窍门是用yield {}来规避switch的局部变量限制:
yield {
auto conn_func =
std::bind(&Do, client_ctx, coro, std::placeholders::_1, 0,
asio::ip::tcp::resolver::results_type());
client_ctx->_socket.async_connect(*results, conn_func);
}
要注意一个非常容易踩且比较隐晦的坑:
class downloader : asio::coroutine {
using socket_t = boost::asio::ip::tcp::socket;
using resolver_t = boost::asio::ip::tcp::resolver;
std::shared_ptr<socket_t> socket_;
std::shared_ptr<resolver_t> resolver_;
const std::string file_;
const std::string host_;
size_t fileSize_{};
public:
void operator()(error_code ec = error_code(), std::size_t length = 0,
const resolver_t::results_type &results = {}) {
// Check if the last yield resulted in an error.
if (ec &7 ec != eof) {
throw std::runtime_error(ec.message());
}
// Jump to after the previous yield.
reenter(this) {
// 问题根源在这里
yield {
resolver_t::query query{host_, "80"};
// Use bind to skip the length parameter not provided by async_resolve
auto result_func = std::bind(&downloader::operator(), this, _1, 0, _2);
resolver_->async_resolve(query, result_func);
}
// 这里会偶发段错误
yield socket_->async_connect(*results, *this);
// 略去其他逻辑
//......
}
}
上面这段代码,将asio::coroutine作为基类来使用,然后重写了operator()运算符,通过传递*this拷贝类本身来传递coroutine实例,一切看起来很正确?
问题在于,这种基于基类的方式,和std::bind进行结合时,bind产生的可调用体中仅包含了this指针的拷贝,而不是类对象的拷贝。当类对象被释放后,这个this指针会变成野指针,在协程重新切回之后,下一次的async_connect(*results, *this)
,对一个野指针进行寻址访问,会发生什么,不必多说。
完整的错误示范可以见stackoverflow这个问题的accepted answer。
要注意,这个回答是错的,是错的,是错的。
fork
fork关键字的用法,官方文档描述的很清晰,不在此赘述。
- fork statement ;
fork一般会用在服务端,比如accept到新的连接,启动一个子协程去处理。
2.2 示例
这里给出两个示例,基本涵盖了asio::coroutine的所有用法。
基于asio::coroutine实现的http client
基于asio::coroutine实现的http server
3 原理
3.1 Duff设备与switch case的本质
Duff's Device
在上世纪,计算机单核性能较低,人们经常使用一种被称为“循环展开”的方式优化代码,使其在多核处理器和流水线处理器上性能更好。
简单来说,这样一段循环1000w次的代码:
int result = 0;
for (int i = 0; i < 10000000; ++i) {
sum += i;
}
可以修改成这样,只需要循环200w次即可:
int result = 0;
for (int i = 0; i < 2000000; i += 5) {
result += i;
result += i+1;
result += i+2;
result += i+3;
result += i+4;
}
Duff在编写一个数组拷贝函数时(将from指针开始count个元素的内容拷贝到to数组),使用到了上述技术。但是,数组长度往往无法被循环展开的次数整除(count%8 != 0),这意味着除了一个while循环之外,还需要一个额外的switch来将n无法整除的部分单独执行。代码如下:
void copy(char *to, char *from, int count) {
int n = (count + 7) / 8;
switch (count % 8) {
case 0: *to = *from++;
case 7: *to = *from++;
case 6: *to = *from++;
case 5: *to = *from++;
case 4: *to = *from++;
case 3: *to = *from++;
case 2: *to = *from++;
case 1: *to = *from++;
}
while (--n > 0) {
*to = *from++;
*to = *from++;
*to = *from++;
*to = *from++;
*to = *from++;
*to = *from++;
*to = *from++;
*to = *from++;
}
}
Duff稍加思索,将switch和while结合到一起,写出了如下的代码。
当函数开始执行时,会根据count%8算出的余数,跳转到case 0-7对应的代码执行,因为没有显式break,switch不会跳出,而是会继续执行跳转位置后方case的内容,这会完成无法整除部分的拷贝。直到执行到while语句,被循环捕获,进入while循环,这会完成整除部分的拷贝。
void copy(char *to, char *from, int count) {
int n = (count + 7) / 8;
switch (count % 8) {
case 0: do { *to = *from++;
case 7: *to = *from++;
case 6: *to = *from++;
case 5: *to = *from++;
case 4: *to = *from++;
case 3: *to = *from++;
case 2: *to = *from++;
case 1: *to = *from++;
} while (--n > 0);
}
}
switch case的本质
Duff's Device向我们揭示了这样一个事实:switch case后的语句,并没有我们想象中诸如代码块or作用域之类的限制,case的角色本质上是一种特殊的标签,可以在代码的任何地方放置case,且不会影响其他代码的执行。swtich的角色则是限定switch case标签的作用域,并且维护somthing like jump table
,用于case匹配时的跳转。
如果你打开c标准Labeled statements一节,你会发现 标签、case、default 一同被归为标签语句。
可以试试这个hello world,体会一下switch case的行为。
#include <iostream>
using std::cout;
using std::endl;
int main(int argc, char *argv[]) {
// int a(0);
int a(1);
switch (a) case(0):for(;;) {
cout<<"hello"<<endl;
case (1):
cout<<"world"<<endl;
return 0;
}
}
3.2 基于switch case实现协程
asio::coroutine正是受Duff's Device的启发,基于switch case这一特性实现的,没有涉及任何汇编。
协程的切换方式其实和函数很像,但其对比函数的区别是,协程的重要特性是可以返回并继续,当协程被重新切回后,应该能从其切走时的位置继续执行。
如果要把一个函数改造为协程,一种朴素的想法是,给这个函数增加一个静态变量state,在所有函数要恢复执行的位置,放置一个label。并在函数切走之前,设置state的值标识切回后,函数继续执行的位置。比如这样:
假定这是一个需要循环对某个server发起心跳探活的函数实现:
void coro_func(error_code ec) {
static int state = 0;
switch(state) {
case 0: goto LABEL0;
case 1: goto LABEL1;
case 2: goto LABEL2;
}
LABEL0:
for(;;) {
// 发起连接
state = 1;
asio::async_connect(socket, endpoints, &coro_func);
return;
LABEL1:
// 写入数据
state = 2;
asio::async_write(socket, request, &coro_func);
return;
LABEL2:
// 读取响应并处理,略
// sleep一段时间,略
}
}
这样的“协程”稍显简陋,主要的问题在于,所有的label都要手动设置,并时刻需要维护switch与label的一致性。
现在是Duff's Device大展身手的时候了:
void coro_func(error_code ec) {
static int state = 0;
switch(state) {
case 0:
for(;;) {
// 发起连接
state = 1;
asio::async_connect(socket, endpoints, &coro_func);
return;
case 1:
// 写入数据
state = 2;
asio::async_write(socket, request, &coro_func);
return;
case 2:
// 读取响应并处理,略
// sleep一段时间,略
}
}
}
现在只剩下两个问题:
- 将函数开头用于保存恢复位置的静态变量state用其他方式实现(多线程成精局部静态变量是非线程安全的),可以考虑使用类成员变量,或者函数参数等方式记录状态
- 现在我们仍然需要手动编写switch case,更好的方式是基于
__LINE__
封装一组宏,自动实现跳转恢复的逻辑,以保证代码的可读性和易用性
来看看asio::coroutine是如何实现的。
4 实现
asio::coroutine代码非常少,全部代码不过100行,下面贴出代码,并在每个代码片段之前以及代码中的注释中给出解释。
在看代码之前,你要理清几点:
- 基于Duff's Device的协程,其本质上是可以恢复并继续执行的函数,每一次切出,都是正常的函数return
- 协程的恢复(切回),都是正常的函数调用,都会从协程函数的开头开始执行,执行到reenter时,通过reenter宏定义的switch,基于你传入的class corourine对象中的value_跳转到具体的case继续执行
- 函数的恢复(切回)这一行为的触发,asio::coroutine没有定义,如果结合asio使用,切回的实现,是将协程函数注册为async_xxx系列函数的回调,然后在事件ready后,回调触发协程恢复,这可以让coroutine和异步接口无缝衔接
class coroutine用于替代上一节中 静态变量 state 的作用,主要作用是通过value_成员记录协程恢复后继续执行的位置。
除此之外,还通过一些特殊的取值,比如-1来标识协程退出等状态(其实这些完全可以单独搞一个bool,放在一起是为了节省内存使用和拷贝的消耗)。
// 协程上下文
class coroutine {
public:
/// Constructs a coroutine in its initial state.
coroutine() : value_(0) {}
// 用于在fork关键字后,区分当前是子协程还是父协程
// 要注意的是,is_child只有在子协程第一次进入,没有执行过yield进行切出前有效
// 一旦你进行了切出,子协程的value_也会被重置为正值
// Returns true if the coroutine is the child of a fork.
bool is_child() const { return value_ < 0; }
// Returns true if the coroutine is the parent of a fork.
bool is_parent() const { return !is_child(); }
// value_如果等于-1 代表协程已经结束
// 详见 ~coroutine_ref() 注释
bool is_complete() const { return value_ == -1; }
private:
friend class detail::coroutine_ref;
int value_;
};
class coroutine 对象的生命周期贯穿整个协程,而coroutine_ref在每次进入协程时创建,在每次切出协程后释放。
同时协程中所有对class coroutine中value_的修改,都是通过coroutine_ref进行的。
// 一个对于class coroutine的引用类
class coroutine_ref {
public:
// 从class coroutine对象构造
coroutine_ref(coroutine& c) : value_(c.value_), modified_(false) {}
coroutine_ref(coroutine* c) : value_(c->value_), modified_(false) {}
// 如果是通过yield临时切出协程,则一定会修改value_,即modified_一定为true
// 只有通过return、抛出异常,或者函数正常执行到末尾退出协程函数,modified_才会是false
// 即,当modifed_为false时,代表协程已经结束,此时将value_设为-1,表示协程已经complete
~coroutine_ref() { if (!modified_) value_ = -1; }
// 如果对value_进行修改,同步将modified_设为true
int& operator=(int v) { modified_ = true; return value_ = v; }
operator int() const { return value_; }
private:
void operator=(const coroutine_ref&);
int& value_;
bool modified_;
};
reenter 是所有协程都要包含的一个宏,用于开始协程体的定义,定义switch,以及一些特殊的case。
case 0后的逻辑在协程第一次进入时跳转执行。
如果是协程重新切回,则reenter中的switch会将其跳转到切出前的位置继续执行。
case -1后的逻辑,一方面,用于处理协程已经退出,但又被重新调度执行的情况;另一方面,可由其他位置的goto语句(比如yield)进入,执行协程切换时的退出逻辑。
terminate_coroutine 标签后的代码 定义了终止协程要做的逻辑。
bail_out_of_coroutine 标签后的代码 定义了退出协程要做的逻辑。
// reenter的实现
#define ASIO_CORO_REENTER(c) \
switch (::asio::detail::coroutine_ref _coro_value = c) \
case -1: if (_coro_value) \
{ \
goto terminate_coroutine; \
terminate_coroutine: \
_coro_value = -1; \
goto bail_out_of_coroutine; \
bail_out_of_coroutine: \
break; \
} \
else /* fall-through */ case 0:
#ifndef reenter
# define reenter(c) ASIO_CORO_REENTER(c)
#endif
yield 在需要临时切出当前协程,或者要终止协程时调用。
for (_coro_value = (n);;)
此类写法起到赋值并延展作用域的作用。
if (_coro_value == 0)
此判断永远为false,主动调用yield不会进入该分支,此分支判断中的代码,仅在协程切回,通过switch case jump进入执行。执行逻辑仅一个break,效果是跳出一开始的for作用域,直接继续执行yield语句后方的代码。
else switch (_coro_value ? 0 : 1)
后的代码比较复杂,主要是为了实现yield的多种功能,在看这里的代码之前,先回顾下yield的几种用法。yield共有五种用法,在这里,我们把它分成两类:
- 第一类
- yield statement ;
- yield { statements }
- yield return expression ;
- yield ;
- 第二类
- yield break ;
第一类,yield后跟一个用户自定义的语句或者空语句,此时yield的行为是:
记录当前的执行位置 -> 执行用户定义的语句 -> 跳出reenter限定的作用域执行函数末尾的代码然后返回
此后,协程如果被恢复执行,会接着yield下一行代码继续执行。
第二类,yield接break,此时的yield的行为是:
终止协程的运行(将value_设为-1) -> 跳出reenter限定的作用域执行函数末尾的代码然后返回
此后,协程完全终止,不再可以被恢复执行。
回到代码,else switch (_coro_value ? 0 : 1)
中的_coro_value恒大于0,所以一定会跳到末尾的case 0
执行,此时执行用户定义的语句。
- 如果用户的语句是
return
,则临时切出协程,并返回一个值。 - 如果用户的语句是
break
,则会跳出else for (;;) case 1:
所在的for循环,进入for(;;) case -1:
的循环,然后执行goto terminate_coroutine;
,将value_设为0,标识协程终止,并跳出reenter作用域,执行函数末尾的逻辑,然后返回。 - 如果用户的语句不是
break
和return
,则执行完用户的语句后,会进入for (;;) case 1:
所在for循环,然后执行goto bail_out_of_coroutine;
,跳出reenter作用域,执行函数末尾的逻辑,然后返回。
// yield的实现
#define ASIO_CORO_YIELD_IMPL(n) \
for (_coro_value = (n);;) \
if (_coro_value == 0) \
{ \
case (n): ; \
break; \
} \
else \
switch (_coro_value ? 0 : 1) \
for (;;) \
/* fall-through */ case -1: if (_coro_value) \
goto terminate_coroutine; \
else for (;;) \
/* fall-through */ case 1: if (_coro_value) \
goto bail_out_of_coroutine; \
else /* fall-through */ case 0:
# define ASIO_CORO_YIELD ASIO_CORO_YIELD_IMPL(__LINE__)
#ifndef yield
# define yield ASIO_CORO_YIELD
#endif
fork 在创建子协程时调用。
仔细看代码,你会发现,这里没有做任何与fork相关的操作,创建和启动子协程的操作是由用户编写的。
这里做的仅仅是在执行用户提供的子协程代码之前,将coro_value改为负值;
用户的子协程构造代码,会把 class coroutine 拷贝一份,拷贝构造出的class coroutine中的value即为负值。
在用户子协程函数返回之后,再把_coro_value改回正值,并继续后面的代码执行。
// fork的实现
#define ASIO_CORO_FORK_IMPL(n) \
for (_coro_value = -(n);; _coro_value = (n)) \
if (_coro_value == (n)) \
{ \
case -(n): ; \
break; \
} \
else
# define ASIO_CORO_FORK ASIO_CORO_FORK_IMPL(__LINE__)
#ifndef fork
# define fork ASIO_CORO_FORK
#endif