多线程

chapter4 线程 模型 - Programming wit

2022-05-21  本文已影响0人  my_passion

chapter4 线程 模型: 3 种

TABLE 4.1 Thread programming models
    Pipeline/流水线    
        
        each thread 对一系列 data set 重复执行 `相同操作`
        
            将 result 传给 another thread 以进行 next stage

    Work crew/工作组   

        工作组中 各线程
            独立      
            可执行 `相同/不同 操作`          
            
    Client/server

        C 为每个 job 与 独立 S "签约"
    
    3 模型 可以 任意组合
        Pipeline 1个 step 从 server 请求服务
        server 用 Work crew
        Work crew 多 workers 用 Pipeline

4.1 pipeline

多线程 有序 `串行处理 (processed serially)` "data items" 流

    每个线程 按序对每个 item 执行特定操作, 将 result 传递到 pipeline 中 next thread

pipe.c

    pipeline 中 每个线程 将其输入值加 1, 传给 next 线程 

    main 从 stdin 读 一系列 "命令行"

        1] 数字: 被送入 pipeline 开头
        
        2] 程序 从 pipeline 末尾读取 next result 的 字符: 处理 -> 将 next result 打印到 stdout

2级指针: 间接 改变指针的值 
    link2LevPtr 1个 变量 有 2 个作用
        1] link2LevPtr 本身: 可 取 &pHead 或 &pItem->next 
        2] *link2LevPtr = pNewItem 即 phead 指向 pNewItem, 或 pItem->next 链到 pNewItem
    常见应用: 
        [1] caller 的 ptr 实参 想被 (callee)函数修改 => &ptr 传入函数: para 为 2级指针 
        [2] 改变 链式结点 link 域, 以实现  
            1] 链到 newNode      : *link2LevPtr = pNewItem;
            2] update link2LevPtr: link2LevPtr = &pNewItem->next;
(1) mainThread

    1) pipe_start (Pipe *pipe, long value) 
            
            调 pipe_send (pipe->head, value)
            
        发送 data 到 firstStage
        
            不考虑传输, 认为 <=> firstStage 收到
        
(2) resultThread

    void *pipe_result (void *arg) 
        
    从 lastStage 取 result
    
      activeDataNum > 0 则可取

(3) stageThread: 除 last stage 之外 n-1
     
    pipe_stage (void *arg) // arg: Stage*
     
    1) 等待 currentStage 上 dataCanBeProcessed

    2) process
    
    3) 发送到 nextStage 中 <=> next stage 上收成功:
        [1] 等待 nextStage 上 dataCanBeAccepted <=> 等待 current Stage 上 data 发送成功
        [2] nextStage 上 接收
        [3] signal nextStage 上 dataCanBeProcessed

        pipe_send (Stage *nextStage, long data) 
        
    4) signal currentStage 上 dataCanBeAccepted

=> 

并发性 :

视 startThread 为 firstStage 的 preThread

视 resultThread 为 lastStage 的 Thread

otherStage: 有自己的 thread

1) curStageThread 与 preThread

preThread 发送成功 -> signal curStageThread 从 dataCanBeProcessed Cv 唤醒

2) curStageThread 与 curStageThread

curStageThread 发送成功 -> signal curStageThread 从 dataCanBeAccepted Cv 唤醒

stage->dataCanBeProcessed == 1 / 0 分别为
dataCanBeProcessedCv / dataCanBeAcceptedCv 唤醒 的 predicate 满足条件

// ====== pipe.c    
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h> // malloc 
#include <string.h>

typedef struct StageTag 
{
    pthread_mutex_t mutex;  // protect curStage's data
    long data;  // data processing: process (and send) 完成后, 才能再 用于接收 input, stage 上 shared data 
    pthread_cond_t dataCanBeProcessedCv;    
    pthread_cond_t dataCanBeAcceptedCv;     
    int dataCanBeProcessed; // flag: used for predicate
    pthread_t thread;       // Thread for stage
    struct StageTag *next;  // next field, to form a stageList
} Stage;

typedef struct pipeTag 
{
    pthread_mutex_t mutex;  // protect activeDataNum
    int activeDataNum;
    Stage *head;        
    Stage *tail;            
    int stageNum;       
} Pipe;

typedef struct RetTag
{
    Pipe* pipe;
    long result;
}Ret;

void pipe_send (Stage *nextStage, long data) 
{
    pthread_mutex_lock (&nextStage->mutex);                  // ===== lock
    
    // (1) wait on cv, until curStage dataCanBeProcessed 
    while (! (nextStage->dataCanBeProcessed == 0) )
        pthread_cond_wait (&nextStage->dataCanBeAcceptedCv, &nextStage->mutex);  

    // (2) start/preStage 发 data 到 curStage 
    nextStage->data = data; 
    
    // (3) modify predicate & notify nextStageThread dataCanBeProcessed
    if(nextStage->next == NULL)
    {
        printf("signal tail\n");
    }
    nextStage->dataCanBeProcessed = 1;                                  
    pthread_cond_signal (&nextStage->dataCanBeProcessedCv);     
    
    pthread_mutex_unlock ( &nextStage->mutex);              // ===== unlock
}

void *pipe_stage (void *arg) 
{
    // (0) get pCurStage & extract pNextStage 
    Stage *stage = (Stage*)arg;                     
    Stage *nextStage = stage->next; 

    pthread_mutex_lock (&stage->mutex);     // ===== lock
    
    while (1) 
    {
        printf("=== stage waiting...\n");
        // (1) wait until dataCanBeProcessed
        while (! (stage->dataCanBeProcessed == 1) )             
            pthread_cond_wait (&stage->dataCanBeProcessedCv, &stage->mutex);  
        
        printf("=== stage process %ld start...\n", stage->data);
        // (2) process & send to nextStge
        pipe_send (nextStage, stage->data + 1);                     
        printf("=== stage process&send %ld end...\n", stage->data + 1); 
        
        // (3) notify curStageThread dataCanBeAccepted                          
        stage->dataCanBeProcessed = 0;          
        pthread_cond_signal (&stage->dataCanBeAcceptedCv); 
    }

    // Note: loop never terminates, no need to unlock mutex explicitly
}

int pipe_create (Pipe *pipe, int stageNum)
{
    Stage **stageList2LevPtr = &pipe->head, *new_stage = NULL, *stage = NULL;

    pthread_mutex_init (&pipe->mutex, NULL);
    pipe->stageNum = stageNum;
    pipe->activeDataNum = 0;

    // (1) construct stageList: n 个 stage 
    for (int i = 0; i <= stageNum; i++) 
    {
        new_stage = (Stage*)malloc (sizeof (Stage) );
        
        pthread_mutex_init (&new_stage->mutex, NULL);
        
        pthread_cond_init (&new_stage->dataCanBeProcessedCv, NULL);
        pthread_cond_init (&new_stage->dataCanBeAcceptedCv, NULL);
        
        new_stage->dataCanBeProcessed = 0;
        
        *stageList2LevPtr = new_stage;          
        stageList2LevPtr = &new_stage->next;
    }
    
    *stageList2LevPtr = (Stage*)NULL;  
    pipe->tail = new_stage; 

    // (2) a stage a thread created, except the last stage doesn't get a thread
    for ( stage = pipe->head; stage->next != NULL; stage = stage->next) 
        pthread_create (
            &stage->thread, NULL, 
            pipe_stage, (void*)stage);
            
    return 0;
}

int pipe_start (Pipe *pipe, long value)
{
    // (1) pipe->activeDataNum++
    pthread_mutex_lock (&pipe->mutex);      // ===== lock
    pipe->activeDataNum++;                                  
    pthread_mutex_unlock (&pipe->mutex);    // =====unlock
    
    // (2) init, head stage 上 可接收 与 preStage/start 发送 data
    pipe_send (pipe->head, value);                      
    
    return 0;
}

// wait until the pipeline output a result -> write to caller's memory
// int pipe_result (Pipe *pipe, long *result)
void *pipe_result (void *arg) 
{
    pthread_detach( pthread_self() ); 
    
    Pipe *pipe = ((Ret*)arg)->pipe;
    printf("result init: %ld\n", ((Ret*)arg)->result);
    long *result = &((Ret*)arg)->result;
    
    Stage *tail = pipe->tail;
    int empty = 0;
    
    // (1) pipe־>activeDataNum check, and modify
    pthread_mutex_lock (&pipe->mutex);  // ===
    
    printf("activeDataNum: %d\n", pipe->activeDataNum); 
    
    if (pipe->activeDataNum <= 0)
        empty = 1;
    else
        pipe->activeDataNum--;
    
    pthread_mutex_unlock (&pipe->mutex); // ===

    pthread_mutex_lock ( &tail->mutex);
    
    printf("pipe_result waiting...\n");
    // (2) wait until dataCanBeProcessed // Note: result/mian 线程 要先 wait on dataCanBeProcessedCv 
    while (! (tail->dataCanBeProcessed == 1))   
        pthread_cond_wait (&tail->dataCanBeProcessedCv, &tail->mutex);
    
    printf("pipe_result process...\n");
    // (3) process / store data to caller's storage
    *result = tail->data;
    
    // (4) modify predicate & notify curStageThread dataCanBeProcessed
    tail->dataCanBeProcessed = 0;
    pthread_cond_signal (&tail->dataCanBeAcceptedCv);
    
    pthread_mutex_unlock (&tail->mutex);
    
    return NULL;
}

int main (int argc, char *argv[])
{
    long value, result;
    char line[128];
    int stageNum = 3;
    pthread_t myTid;
    
    // (1) create stageNum 个 stageThread
    Pipe my_pipe;
    pipe_create (&my_pipe, stageNum);
    
    printf ("Enter integer values, or \"=\" for next result\n");
    
    // (2) create result 线程: 要先 wait on dataCanBeProcessedCv 
    Ret* res = (Ret*)malloc(sizeof(Ret) );
    res->pipe = &my_pipe;
    res->result = 0;
    pthread_create(&myTid, NULL, pipe_result, (void*)res );
    
    while (1) 
    {
        printf ("Data> ");
        
        // (3) line 含 "Enter" 最后的 "\n"
        fgets (line, sizeof (line), stdin);
        
        printf("line %s len: %d\n", line, (int)strlen (line));
        if ( strlen (line) <= 1 ) 
            continue;
    
        if (strlen (line) <= 2 && line[0] == '=') 
        {
            // (4)
            printf("result: %ld\n", res->result); 
            free(res);
            /*
            if (pipe_result(&my_pipe, &result) ) 
                printf ("Result is %ld\n", result);
            else
                printf ("Pipe is empty\n");
            */
        }
        else 
        {   
            // (5)
            sscanf (line, "%ld", &value);
            printf("value put to pipeline: %ld\n", value);
            pipe_start (&my_pipe, value);   
        } 
    }
}
=== stage waiting...
Enter integer values, or "=" for next result
Data> result init: 0
=== stage waiting...
=== stage waiting...
activeDataNum: 0
pipe_result waiting...
2 // Note: first input
line 2
 len: 2
value put to pipeline: 2
Data> === stage process 2 start...
=== stage process&send 3 end...
=== stage waiting...
=== stage process 3 start...
=== stage process&send 4 end...
=== stage waiting...
=== stage process 4 start...
signal tail
=== stage process&send 5 end...
=== stage waiting...
pipe_result process...
= // Note: second input
line =
 len: 2
result: 5
Data> 

4.2 客户/服务器 (C/S)

0   data 
    见 code 
    
1   main 

(1) 建 n 个 clients threads
(2) clientsDoneCv 上 等待 all clientthreads 结束
(3) quit request: terminate server thread(detached) 

2   void *clientRoutine (void *arg) // clientThraedIndex

(1) 循环
        
    1) 向 server 读请求 
        从 stdin 读 1 行 字符串 到 client's string: 同步
        直到 server 上 读到 空行 (fgets + "Enter") 时, break 以结束 clientThread 
        
        fgets (request->text, 128, stdin)
            -> request->text 尾部 '\n' 转化为 '\0' 
            -> copy 到 client's string

    2) 格式化 string 为 formattedStr: 带上 threadIndex 等  
    
    3) 向 server 写请求: 写 formattedStr 到 stdout

(2) clinet 线程数 --, 若 减为 0, notify(唤醒) main 线程 
    
3   void requestToServer (
        int         operationCode,
        int         sync,
        const char  *promptForRead,
        char        *string) // Note: paraOut/paraIn: read/write
    
=== client 侧
Note: client 调用, 不考虑 client 与 server 间 传输(如 socket) => requestToServer() <=> serverRespond()
(1) 第 1 次被 (client) 调用 时, 才 创建 1 个 serverThread & 置 running = 1
(2) request structure: allocate & fill 
(3) Add request to list 
(4) signal server/requestListNotEmptyCv to woke up & process request 
(5) If request syn -> wait 直到 request->done
    if read, copy request->text 到 client's string 

4   void *serverRoutine (void *arg)
/* === server 侧 
无限循环, 直到 收到 main 的 quit request
(1) cv.wait 直到 requestList not empty
(2) 取出 first request 
(3) process request: 据 不同 operationCode

    1) readRequest 
        -> read/fgets a line (from stdin) to(到) request->text, 
        -> turn line's newline("Enter") to null('\0' ) 
           Note: line == NULL 用于 结束 client thread

    2) writeRequest
        request->text( requestToServer() 用 fill ) 输出到 stdout    
    
(4) if request syn(read), signal client / request->doneCv, first request is done
    else, free request's memory 
(5) if request quit, server quit from loop      
// === cs.c
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <math.h>   

#define REQ_READ        1   
#define REQ_WRITE       2      
#define REQ_QUIT        3 

typedef struct RequestTag 
{
    struct RequestTag  *next;   // form requestList   
    int                 operationCode;      
    int                 synchronous;  // 0: no 1: yes
    int                 done;     // used for Predicate
    pthread_cond_t      doneCv;   // use server's mutex, because server has only one thread 
    char                promptForRead[32];   // only used for read  
    char                text[128];           // Read/write text: fill by server / client 
} Request;

typedef struct ServerContextTag 
{
    int               running;  // processing a request 
    Request           *first;   // requestList & predicate associated with requestListNotEmptyCv
    Request           *last;    // FIFO: process order
    pthread_mutex_t   mutex;
    pthread_cond_t    requestListNotEmptyCv;
} ServerContext;

// shared data 
ServerContext serverContext = 
{
    0, NULL, NULL,
    PTHREAD_MUTEX_INITIALIZER, 
    PTHREAD_COND_INITIALIZER
};

int clientThreadsNum = 2;
pthread_mutex_t clientMutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t clientsDone = PTHREAD_COND_INITIALIZER;
    
void *serverRoutine (void *arg)
{
    static pthread_mutex_t prompt_mutex = PTHREAD_MUTEX_INITIALIZER;
    Request *request;
    int operationCode, len;

    while (1) 
    {
        pthread_mutex_lock (&serverContext.mutex);  // ===

        // (1) 
        while (serverContext.first == NULL) 
            pthread_cond_wait (
                &serverContext.requestListNotEmptyCv, &serverContext.mutex);
        
        // (2) 
        request = serverContext.first;
        serverContext.first = request->next;
        if (serverContext.first == NULL)
            serverContext.last = NULL;
            
        pthread_mutex_unlock (&serverContext.mutex); // ===

        // (3)
        operationCode = request->operationCode;
        switch (operationCode) 
        {
            case REQ_QUIT:
                break;
            
            case REQ_READ:
                if (strlen(request->promptForRead) > 0)
                    printf(request->promptForRead);
                
                if (fgets(request->text, 128, stdin) == NULL)
                    request->text[0] = '\0';
            
                len = strlen (request->text);
                
                if (len > 0 && request->text[len-1] == '\n')
                    request->text[len-1] = '\0';
                    
                break;
            
            case REQ_WRITE:
                puts (request->text);
                break;
                
            default:
                break;
        }
        
        // (4) 
        if (request->synchronous) 
        {
            pthread_mutex_lock (&serverContext.mutex);   // ===

            request->done = 1;
            pthread_cond_signal (&request->doneCv);
            
            pthread_mutex_unlock (&serverContext.mutex); // ===

        } 
        else
            free (request);
        
        // (5) 
        if (operationCode == REQ_QUIT)
            break;
    }
    return NULL;
}

void requestToServer (
    int         operationCode,
    int         sync,
    const char  *promptForRead,
    char        *string) // Note: paraOut/paraIn: read/write
{
    Request *request;

    pthread_mutex_lock (&serverContext.mutex);

    // (1)
    if (!serverContext.running)     
    {
        pthread_t thread;
        pthread_attr_t detached_attr;

        pthread_attr_init (&detached_attr);

        pthread_attr_setdetachstate (
            &detached_attr, PTHREAD_CREATE_DETACHED);
        
        serverContext.running = 1; 

        pthread_create (&thread, &detached_attr,
                        serverRoutine, NULL);

        pthread_attr_destroy (&detached_attr);
    }

    // (2) 
    request = (Request*)malloc (sizeof (Request));
    request->next = NULL;
    request->operationCode = operationCode;
    request->synchronous = sync;
    
    // 1) read & syn 
    if (sync) 
    {
        request->done = 0;
        pthread_cond_init (&request->doneCv, NULL);
    }

    if (promptForRead != NULL)
        strncpy (request->promptForRead, promptForRead, 32);
    else
        request->promptForRead[0] = '\0';
        
    // 2) write & nonSyn
    if (operationCode == REQ_WRITE && string != NULL)
        strncpy (request->text, string, 128);
    else
        request->text[0] = '\0';

    // (3) 
    if (serverContext.first == NULL) 
    {
        serverContext.first = request;
        serverContext.last = request;
    } 
    else 
    {
        (serverContext.last)->next = request;
        serverContext.last = request;
    }

    // (4) 
    pthread_cond_signal (&serverContext.requestListNotEmptyCv);

    // (5) 
    if (sync) 
    {
        while (!request->done) 
            pthread_cond_wait (
                &request->doneCv, &serverContext.mutex);

        if (operationCode == REQ_READ) 
        {
            if (strlen (request->text) > 0)
                strcpy (string, request->text);
            else
                string[0] = '\0';
        }
        pthread_cond_destroy (&request->doneCv);

        free (request);
    }
    pthread_mutex_unlock (&serverContext.mutex);
}

void *clientRoutine (void *arg) // clientThraedIndex
{
    int clientThraedIndex = (int)arg;
    
    char promptForRead[32];
    char string[128], formattedStr[128];
    sprintf (promptForRead, "Client %d> ", clientThraedIndex);

    // (1)
    while (1) 
    {
        requestToServer (REQ_READ, 1, promptForRead, string);

        if (strlen (string) == 0)
            break;
            
        for (int loops = 0; loops < 2; loops++) 
        {
            sprintf(formattedStr, "(%d#%d) %s", clientThraedIndex, loops, string);
                
            requestToServer (REQ_WRITE, 0, NULL, formattedStr);
            
            sleep (1);
        }
    }
    
    // (2)  
    pthread_mutex_lock (&clientMutex);   // ===

    clientThreadsNum--;
    
    if (clientThreadsNum <= 0) 
        pthread_cond_signal (&clientsDone);
    
    pthread_mutex_unlock (&clientMutex); // ===

    return NULL;
}

int main (int argc, char *argv[])
{
    pthread_t thread;
    
    // (1) 
    clientThreadsNum = clientThreadsNum;
    for (int count = 0; count < clientThreadsNum; count++) 
        pthread_create (&thread, NULL,
                        clientRoutine, (void*)count);
    
    // (2) 
    pthread_mutex_lock (&clientMutex);     // ===
    
    while (clientThreadsNum > 0)
        pthread_cond_wait (&clientsDone, &clientMutex);
        
    pthread_mutex_unlock (&clientMutex);  // ===
    
    printf ("All clients done\n");
    
    // (3) 
    requestToServer (REQ_QUIT, 1, NULL, NULL);
    
    return 0;
}
// print
Client 1> hello
Client 0> world
(1#0) hello
(1#1) hello
Client 1> 
(0#0) world
(0#1) world
Client 0> ok
(0#0) ok
(0#1) ok
Client 0> 
All clients done

4.3 工作组 (Work Crew)

上一篇下一篇

猜你喜欢

热点阅读