windows协程测试

2019-06-10  本文已影响0人  help_youself

 代码只是一个demo,主要是为了熟悉协程。
main.cc

#include <iostream>
#include <pthread.h>
#include <stdio.h>
#include "sche.h"
#include "corou.h"
#include "time.h"
using namespace std;
void test_cor_run1(void *arg){
    int a=1;
    std::cout<<"a "<<a<<std::endl;
    Schedule *sche=get_scheduler();
    Coroutine *c=sche->CurrentTask();
    c->CorYield();
    a++;
    std::cout<<"a "<<a<<std::endl;
}
void test_cor_dtor1(void *arg){
    std::cout<<"coroutine 1"<<std::endl;
}
void test_cor_run3(void *arg);
void test_cor_dtor3(void *arg);
void test_cor_run2(void *arg){
    int b=1;
    std::cout<<"b "<<b<<std::endl;
    Schedule *sche=get_scheduler();
    Coroutine *c=sche->CurrentTask();
    printf("in 2 %p\n",c);
    c->CorYield();
    b++;
    std::cout<<"b "<<b<<std::endl;
    sche->CreateTask(test_cor_run3,test_cor_dtor3,nullptr);
}
void test_cor_dtor2(void *arg){
    std::cout<<"coroutine 2"<<std::endl;
}
void test_cor_run3(void *arg){
    int z=1;
    std::cout<<"z "<<z<<std::endl;
    Schedule *sche=get_scheduler();
    Coroutine *c=sche->CurrentTask();
    printf("in 2 %p\n",c);
    c->CorYield();
    z++;
    std::cout<<"z "<<z<<std::endl;
}
void test_cor_dtor3(void *arg){
    std::cout<<"coroutine 3"<<std::endl;
}

int main(){
    Schedule *sche=Schedule::Create();
    sche->CreateTask(test_cor_run1,test_cor_dtor1,nullptr);
    sche->CreateTask(test_cor_run2,test_cor_dtor2,nullptr);
    int64_t delta=500;
    int64_t last=TimeMillis();
    int64_t stop=last+delta;
    int64_t now=0;
    while(true){
        now=TimeMillis();
        sche->ScheduleProcess();
        if(now>stop){
            break;
        }
    }
    int32_t elapse=now-last;
    printf("time %d\n",elapse);
    return 0;
}

sche.h

#pragma once
#include <pthread.h>
#include <map>
#include <deque>
#include <list>
#include "corou.h"
class Schedule{
public:
    Schedule();
    ~Schedule();
    static Schedule *Create();
    void CreateTask(task_run run,task_destory destroy,void *arg);
    void ScheduleProcess();
    uint32_t AllocCoroutineId(){
        return coroutine_id_++;
    }
    Coroutine* CurrentTask(){
        return current_task_;
    }
    void Join(){
        running_=false;
    }
    void CorAddToReady(Coroutine* c);
    void RemoveCor(Coroutine* c);
    void *get_main_ctx(){return main_ctx_;}
private:
    Coroutine *current_task_{nullptr};
    pthread_t pid_;
    bool running_{true};
    void *main_ctx_{nullptr};
    uint32_t coroutine_id_{0};
    std::deque<Coroutine*> ready_tasks_;
    std::map<uint32_t,Coroutine*> sleep_tasks_;
    std::list<Coroutine*> destroy_tasks_;
};
Schedule *get_scheduler();

sche.cc

#include "sche.h"
#include <iostream>
#include <stdlib.h>
#include <stdio.h>
#include <vector>
#include <functional>
#include <assert.h>
std::vector<std::function<void()>>* ExitList()
{
    static std::vector<std::function<void()>> *vec = new std::vector<std::function<void()>>;
    return vec;
}
static void MyOnExit(){
    auto vec = ExitList();
    for (auto fn : *vec) {
        fn();
    }
    vec->clear();
}

static int InitOnExit() {
    atexit(&MyOnExit);
    return 0;
}
pthread_key_t thread_sched_key;
static pthread_once_t key_once = PTHREAD_ONCE_INIT;
static void thread_key_destructor(void *data)
{
    free(data);
}
static void thread_key_create(void)
{
    assert(pthread_key_create(&thread_sched_key,
        thread_key_destructor) == 0);
    assert(pthread_setspecific(thread_sched_key, NULL) == 0);

    return;
}
Schedule *get_scheduler(){
    return (Schedule*)pthread_getspecific(thread_sched_key);
}
Schedule::Schedule(){
    main_ctx_=ConvertThreadToFiber(NULL);
}
Schedule::~Schedule(){
    while(!destroy_tasks_.empty()){
        Coroutine *c=destroy_tasks_.front();
        destroy_tasks_.pop_front();
        delete c;
    }
    while(!ready_tasks_.empty()){
        Coroutine *c=ready_tasks_.front();
        destroy_tasks_.pop_front();
        delete c;
    }
    ConvertFiberToThread();
    main_ctx_=nullptr;
    std::cout<<"dtor"<<std::endl;
}
Schedule *Schedule::Create(){
    assert(pthread_once(&key_once, thread_key_create) == 0);
    Schedule *sche=get_scheduler();
    if(!sche){
        InitOnExit();
        sche=new Schedule();
        auto vec = ExitList();
        vec->push_back([=]{
                       delete sche;
                       });
        pthread_setspecific(thread_sched_key,(void*)sche);
    }
    return sche;
}
void Schedule::CreateTask(task_run run,task_destory destroy,void *arg){
    Coroutine *c=new Coroutine(this,run,destroy,arg);
    ready_tasks_.push_back(c);
}
void Schedule::CorAddToReady(Coroutine* c){
     ready_tasks_.push_back(c);
}
void Schedule::RemoveCor(Coroutine* c){
    destroy_tasks_.push_back(c);
}
void Schedule::ScheduleProcess(){
    /*while(running_)*/{
    while(!destroy_tasks_.empty()){
        Coroutine *c=destroy_tasks_.front();
        destroy_tasks_.pop_front();
        delete c;
    }
    if(!ready_tasks_.empty()){
        Coroutine *c=ready_tasks_.front();
        ready_tasks_.pop_front();
        current_task_=c;
        c->context.SwapIn();
        current_task_=nullptr;
    }
}
}

corou.h

#pragma once
#include <WinSock2.h>
#include <windows.h>
#include <stdint.h>
class Schedule;
//# define FCONTEXT_CALL __stdcall
typedef void (WINAPI *fn_t)(void *);
class Context{
public:
Context(fn_t fn, void* vp){
            //SIZE_T commit_size = 4 * 1024;
            ctx_ = CreateFiber(0,fn,vp);
}
~Context(){
    DeleteFiber(ctx_);
}
void SwapIn(){
    SwitchToFiber(ctx_);
}
void SwapOut(void* main){
    SwitchToFiber(main);
}
private:
    void* ctx_;
};
typedef void (*task_run)(void *arg);
typedef void (*task_destory)(void *arg);
class Coroutine{
public:
    Coroutine(Schedule *sche,task_run process,task_destory dtor,void *user_data);
    ~Coroutine();
    void CorYield();
public:
    uint32_t id;
    Schedule *scheduler;
    task_run run;
    task_destory destroy;
    void *arg;
    Context context;
};
void WINAPI fiberProc(void *fiber);

corou.cc

#include "corou.h"
#include <assert.h>
#include "sche.h"
Coroutine::Coroutine(Schedule *sche,task_run process,task_destory dtor,void *user_data)
:scheduler(sche)
,run(process)
,destroy(dtor)
,arg(user_data)
,context(fiberProc,this){
    id=scheduler->AllocCoroutineId();
}
Coroutine::~Coroutine(){
    if(destroy){
        destroy(arg);
    }
}
void Coroutine::CorYield(){
    scheduler->CorAddToReady(this);
    context.SwapOut(scheduler->get_main_ctx());
}
void WINAPI fiberProc(void *fiber){
    Coroutine *coroutine=static_cast<Coroutine*>(fiber);
    assert(coroutine);
    coroutine->run(coroutine->arg);
    Schedule *sche=coroutine->scheduler;
    sche->RemoveCor(coroutine);
    SwitchToFiber(sche->get_main_ctx());
}

time.h

#pragma once
#include <stdint.h>
int64_t TimeMillis();
int64_t TimeMicro();

time.cc

#include "time.h"
#define WIN_32
#if defined (WIN_32)
#include <windows.h>
#include <time.h>
#else
#include <unistd.h>
#include <time.h>
#include <unistd.h>
#include <sys/time.h>
#include <sys/wait.h>
#include <sys/types.h>
#endif

static inline void itimeofday(long *sec, long *usec){
    #if defined (WIN_32)
    static long mode = 0, addsec = 0;
    bool retval;
    static int64_t freq = 1;
    int64_t qpc;
    if (mode == 0) {
        retval = QueryPerformanceFrequency((LARGE_INTEGER*)&freq);
        freq = (freq == 0)? 1 : freq;
        retval = QueryPerformanceCounter((LARGE_INTEGER*)&qpc);
        addsec = (long)time(NULL);
        addsec = addsec - (long)((qpc / freq) & 0x7fffffff);
        mode = 1;
    }
    retval = QueryPerformanceCounter((LARGE_INTEGER*)&qpc);
    retval = retval * 2;
    if (sec) *sec = (long)(qpc / freq) + addsec;
    if (usec) *usec = (long)((qpc % freq) * 1000000 / freq);
    #else
    struct timeval time;
    gettimeofday(&time, NULL);
    if (sec) *sec = time.tv_sec;
    if (usec) *usec = time.tv_usec;
    #endif
}
static int64_t base_clock64(void){
    long s, u;
    int64_t value;
    itimeofday(&s, &u);
    value = ((int64_t)s) * 1000 + (u / 1000);
    return value;
}
static int32_t base_clock32(void){
    return (int32_t)(base_clock64()& 0xfffffffful);
}

int64_t TimeMillis(){
    return base_clock64();
}
int64_t TimeMicro(){
    return base_clock64()*1000;
}

reference
[1] lthread https://github.com/halayli/lthread
[2] libgo https://github.com/yyzybb537/libgo
[3] kcp https://github.com/skywind3000/kcp
[4] linux中模拟pthread接口的用户态线程库gnu pth

上一篇下一篇

猜你喜欢

热点阅读