程序员协程

基于ucontext.h的轻量级协程库

2018-08-01  本文已影响66人  neilzwshen

本文主要是对自己学习协程并实现轻量级协程过程的一个记录, 语言略显啰嗦, 各位见谅. 水平有限, 如有疏漏, 欢迎各位指正.

一 了解协程

优点

缺点



二 了解ucontext

数据结构

函数

  1. int getcontext(ucontext_t * ucp);

    • 获取当前上下文, 初始化ucp结构体, 将当前上下文保存到ucp中
    • 成功返回0; 失败返回-1, 并设置errno
  2. void makecontext(ucontext_t *ucp, void(*func)(), int argc, ...);

    • 创建一个目标上下文 创建方式: (1) getcontext, (2) 指定分配给上下文的栈uc_stack.ss_sp, (3) 指定这块栈的大小uc_stack.ss_size, (4) 指定uc_stack.ss_flags, (5) 指定后继上下文uc_link
    • 协程运行时使用主协程划分的栈空间,而协程切回主线程时需要将该部分栈空间的内容copy到每个协程各自的一个空间缓存起来,因为主协程中划分的栈空间并不是只用于一个协程,而是会用于多个协程
    • makecontext可以修改通过getcontext初始化得到的上下文, (必须先调用getcontext), 然后为ucp指定一个栈空间ucp->stack, 设置后继的上下文ucp->uc_link
    • 当上下文通过setcontext或者swapcontext激活后, 执行func函数(argc为后续的参数个数, 可变参数). 当func执行返回后, 继承的上下文被激活(ucp->uc_link), 如果为NULL, 则线程退出
    ucontext_t tar_ctx;
    ucontext_t next_ctx;
    char stack[100];
    getcontext(&tar_ctx);
    tar_ctx.uc_stack.ss_sp = stack;
    tar_ctx.uc_stack.ss_sp = sizeof(stack);
    tar_ctx.uc_stack.ss_flags = 0;
    tar_ctx.uc_link = &next_ctx;
    
  3. int setcontext(const ucontext_t *ucp)

    • 设置当前的上下文为ucp(激活ucp)
    • ucp来自getcontext, 那么上下文恢复至ucp
    • ucp来自makecontext, 那么将会调用makecontext函数的第二个参数指向的函数func, 如果func返回, 则恢复至ucp->uc_link指定的后继上下文, 如果该ucp中的uc_link为NULL, 那么线程退出
    • 成功不返回, 失败返回-1, 设置errno
  4. int swapcontext(ucontext_t *oucp, ucontext_t *ucp)

    • 切换上下文
    • 保存当前上下文至oucp, 激活ucp上下文(先执行makecontext指定的ucp入口函数, 而后会执行ucp->uc_link指向的后继上下文)
    • 成功不返回, 失败返回-1, 设置errno


三 轻量级协程实现

名词说明


思路


1 用户协程类 Coroutine

数据成员

主要成员函数


2 协程调度类

数据成员

成员函数


注意点

  1. 所有的用户协程都使用调度器的栈空间, 每个用户协程自身的buffer只不过用来作缓存
  2. SaveStack和ReloadStack函数的实现需要注意, 如何缓存协程栈
    • 协程栈是由用户分配的, 如代码中stack数组, 由于该数组的目的是用作栈空间, 而进程中栈是预分配的, 即首先确定栈的高地址, 从高地址开始往低使用, 根据这一点我们可以确定需要被缓存的栈空间大小.
    char * stackBottom = SingleSchedule::GetInst()->GetStackBottom();    // 获取到栈底, 即高地址
    char dumy = 0;                                                       // 最后创建的变量, 必然分配在栈顶
    assert(stackBottom-&dumy <= DEFAULT_STACK_SIZE);                     // 被栈缓存不能大于栈空间
    if (cap<stackBottom-&dumy){                                          // cap 代表当前栈缓存大小, 如果不够需要重分配
        if(buffer){                                                      // 释放当前栈缓存
            delete [] buffer;
        }
        cap = stackBottom-&dumy;
        buffer = new char[cap];
    }
    stack_size = stackBottom-&dumy;                                      // 栈空间大小
    memcpy(buffer, &dumy, stack_size);                                   // 缓存
    

代码实现

https://github.com/trioZwShen/my_code_rep/tree/master/My_Coroutine

1 用户协程

/**
 * @file    : Coroutine.h
 * @author  : neilzwshen
 * @time    : 2018-7-31
 * @version : 3.0
 * @remark  : 用户协程类
 */

#ifndef COROUTINE_H_
#define COROUTINE_H_
#define DEFAULT_STACK_SIZE (1024*1024)
#include <stdio.h>
#include <string.h>
#include <ucontext.h>


enum CoState {FREE = 0, RUNNING = 1, SUSPEND = 2};

class Coroutine
{
public:
    Coroutine();
    virtual ~Coroutine();

    /**
     * 用户协程入口函数
     */
    virtual void CoProcess();
    
    /**
     * 用户协程恢复函数
     */
    void Resume();

    /**
     * 获取协程id
     * @return [返回id]
     */
    int GetId()const {
        return id;
    }

    /**
     * 设置协程id
     */
    void SetId(int _id) {
        id = _id;
    }

    /**
     * 获取协程状态
     * @return [返回协程状态]
     */
    int GetState()const {
        return state;
    }

    /**
     * 设置协程状态
     */
    void SetState(CoState _state) {
        state = _state;
    }

protected:
    /**
     * 用户协程挂起函数
     */
    void Yield();

    /**
     * 堆栈缓存
     */
    void SaveStack();

    /**
     * 堆栈恢复
     */
    void ReloadStack();
    
public:
    char *buffer;       // 缓存协程堆栈
    ucontext_t ctx;

private:
    int stack_size;
    int cap;
    int id;
    CoState state;
};

#endif
#include <assert.h>
#include "Coroutine.h"
#include "Schedule.h"

Coroutine::Coroutine()
        :id(0),state(FREE),cap(0),stack_size(0),buffer(nullptr)
{

}

Coroutine::~Coroutine()
{
    delete [] buffer;
}

void Coroutine::CoProcess()
{

}

void Coroutine::Resume()
{
    if(state==SUSPEND){
        ReloadStack();
        state = RUNNING;
        swapcontext(&(SingleSchedule::GetInst()->mainCtx), &ctx);
    }
}

void Coroutine::Yield()
{
    if (state == RUNNING){
        SaveStack();
        state = SUSPEND;
        swapcontext(&ctx, &(SingleSchedule::GetInst()->mainCtx));
    }
}

void Coroutine::SaveStack()
{
    char * stackBottom = SingleSchedule::GetInst()->GetStackBottom();
    char dumy = 0;

    assert(stackBottom-&dumy <= DEFAULT_STACK_SIZE);
    if (cap<stackBottom-&dumy){
        if(buffer){
            delete [] buffer;
        }
        cap = stackBottom-&dumy;
        buffer = new char[cap];
    }
    stack_size = stackBottom-&dumy;
    memcpy(buffer, &dumy, stack_size);
}

void Coroutine::ReloadStack()
{
    memcpy(SingleSchedule::GetInst()->GetStackBottom()-stack_size, buffer, stack_size);
}

2 单例模板

/**
 * @file    : Singleton.h
 * @author  : neilzwshen
 * @time    : 2018-7-30
 * @version : 1.0
 * @remark  : 单例模板, 只要将对象作为T, 就可以获取到一个单例对象, 构造函数不能传参
 */

#ifndef SINGLETON_H_
#define SINGLETON_H_

template<class T>
class Singleton {
public:
    /**
     * 单例获取
     * @return [返回T的单例对象]
     */
    static T* GetInst(){
        if (!flag_instance){
            flag_instance = new Singleton();
        }
        return &flag_instance->_instance;
    }

protected:
    /**
     * 单例构造
     */
    Singleton(){}

private:
    /**
     * T对象实例
     */
    T _instance;
    /**
     * 单例模板实例,
     */
    static Singleton<T> * flag_instance;
};

template<class T>
Singleton<T> * Singleton<T>::flag_instance = 0;

#endif

3 协程调度器

/**
 * @file    : Schedule.h
 * @author  : neilzwshen
 * @time    : 2018-7-31
 * @version : 3.0
 * @remark  : 协程调度类
 */

#ifndef SCHEDULE_H_
#define SCHEDULE_H_
#include <stdio.h>
#include <map>
#include <ucontext.h>
#include "Coroutine.h"
#include "Singleton.h"


typedef std::map<int, Coroutine*> CrtMap;

class Schedule
{
public:
    Schedule();
    virtual ~Schedule();

    /**
     * 用户协程入口函数
     */
    static void CoroutineEntry(void * crt);

    /**
     * 将协程crt加入协程池, 并开启
     * @param  crt [协程指针]
     */
    void CoroutineNew(Coroutine * crt);

    /**
     * 恢复用户协程
     * @param id [description]
     */
    void Resume(int id);

    /**
     * 判断协程池中是否还有未完成的协程, 并将已经终止的协程删除
     * @return [返回协程数]
     */
    int HasCoroutine();

    /**
     * 根据协程id删除协程
     * @param id [协程id]
     */
    void Remove(int id);

    /**
     * 获取到栈底
     * @return [返回栈底地址]
     */
    char* GetStackBottom(){
        return stack + DEFAULT_STACK_SIZE;
    }

public:
    ucontext_t mainCtx;
    char stack[DEFAULT_STACK_SIZE];     // 运行协程堆栈

private:
    CrtMap crtPool;
};

typedef Singleton<Schedule> SingleSchedule;

#endif

#include <assert.h>
#include "Schedule.h"

Schedule::Schedule()
{

}

Schedule::~Schedule()
{

}

void Schedule::CoroutineEntry(void * crt) {
    ((Coroutine *)crt)->SetState(RUNNING);
    ((Coroutine *)crt)->CoProcess();
    ((Coroutine *)crt)->SetState(FREE);
}

void Schedule::CoroutineNew(Coroutine * crt) {
    
    int id = crt->GetId();
    CoState state = CoState(crt->GetState());
    assert(id != 0);
    assert(state == FREE);
    //printf("--%d,%d--\n",id, state);

    if (crtPool[id] != nullptr) {
        CrtMap::iterator it = crtPool.find(id);
        crtPool.erase(it);
    }
    
    // 构建用户协程上下文
    getcontext(&(crt->ctx));
    //memset(stack, 0, DEFAULT_STACK_SIZE);
    crt->ctx.uc_stack.ss_sp = stack;
    crt->ctx.uc_stack.ss_size = DEFAULT_STACK_SIZE;
    crt->ctx.uc_stack.ss_flags = 0;
    crt->ctx.uc_link = &mainCtx;
    crtPool[id] = crt;
    
    makecontext(&crt->ctx, (void(*)(void))CoroutineEntry, 1, (void *)crt);
    swapcontext(&mainCtx, &crt->ctx);
}

void Schedule::Resume(int id){
    if (crtPool[id] != nullptr) {
        crtPool[id]->Resume();
    }
}

int Schedule::HasCoroutine() {
    int count = 0;
    CrtMap::iterator it;
    for (it = crtPool.begin(); it != crtPool.end(); it++) {
        if (it->second->GetState() != FREE) {
            count++;
        }else{
            it=crtPool.erase(it);
            it--;
        }
    }
    return count;
}

void Schedule::Remove(int id) {
    if (crtPool[id] != nullptr) {
        crtPool.erase(crtPool.find(id));
    }
}

4 示例

#include <stdio.h>
#include <memory>
#include "Coroutine.h"
#include "Schedule.h"


class Logic1 : public Coroutine{
    void CoProcess(){
        puts("1");
        Yield();
        puts("4");
        Yield();
        puts("7");
    }
};

class Logic2 : public Coroutine{
    void CoProcess(){
        puts("2");
        Yield();
        puts("5");
        Yield();
        puts("8");
    }
};

class Logic3 : public Coroutine{
    void CoProcess(){
        puts("3");
        Yield();
        puts("6");
        Yield();
        puts("9");
    }
};

int main() {

    std::shared_ptr<Coroutine> ct1(new Logic1());
    std::shared_ptr<Coroutine> ct2(new Logic2());
    std::shared_ptr<Coroutine> ct3(new Logic3());

    ct1->SetId(1);
    ct2->SetId(2);
    ct3->SetId(3);

    SingleSchedule::GetInst()->CoroutineNew(ct1.get());
    SingleSchedule::GetInst()->CoroutineNew(ct2.get());
    SingleSchedule::GetInst()->CoroutineNew(ct3.get());

    SingleSchedule::GetInst()->Resume(1);
    SingleSchedule::GetInst()->Resume(2);
    SingleSchedule::GetInst()->Resume(3);
    SingleSchedule::GetInst()->Resume(1);
    SingleSchedule::GetInst()->Resume(2);
    SingleSchedule::GetInst()->Resume(3);


    //SingleSchedule::GetInst()->Remove(1);
    //SingleSchedule::GetInst()->Remove(2);
    //SingleSchedule::GetInst()->Remove(3);

    int count = SingleSchedule::GetInst()->HasCoroutine();
    printf("%d\n", count);

    return 0;
}

5 执行结果

szw@szw-VirtualBox:~/code/coroutine/temp/My_Coroutine_3_0$ ./main 
1
2
3
4
5
6
7
8
9
0
上一篇 下一篇

猜你喜欢

热点阅读