chapter4 线程 模型 - Programming wit
2022-05-21 本文已影响0人
my_passion
chapter4 线程 模型: 3 种
TABLE 4.1 Thread programming models
Pipeline/流水线
each thread 对一系列 data set 重复执行 `相同操作`
将 result 传给 another thread 以进行 next stage
Work crew/工作组
工作组中 各线程
独立
可执行 `相同/不同 操作`
Client/server
C 为每个 job 与 独立 S "签约"
3 模型 可以 任意组合
Pipeline 1个 step 从 server 请求服务
server 用 Work crew
Work crew 多 workers 用 Pipeline
4.1 pipeline
多线程 有序 `串行处理 (processed serially)` "data items" 流
每个线程 按序对每个 item 执行特定操作, 将 result 传递到 pipeline 中 next thread
pipe.c
pipeline 中 每个线程 将其输入值加 1, 传给 next 线程
main 从 stdin 读 一系列 "命令行"
1] 数字: 被送入 pipeline 开头
2] 程序 从 pipeline 末尾读取 next result 的 字符: 处理 -> 将 next result 打印到 stdout
2级指针: 间接 改变指针的值
link2LevPtr 1个 变量 有 2 个作用
1] link2LevPtr 本身: 可 取 &pHead 或 &pItem->next
2] *link2LevPtr = pNewItem 即 phead 指向 pNewItem, 或 pItem->next 链到 pNewItem
常见应用:
[1] caller 的 ptr 实参 想被 (callee)函数修改 => &ptr 传入函数: para 为 2级指针
[2] 改变 链式结点 link 域, 以实现
1] 链到 newNode : *link2LevPtr = pNewItem;
2] update link2LevPtr: link2LevPtr = &pNewItem->next;
(1) mainThread
1) pipe_start (Pipe *pipe, long value)
调 pipe_send (pipe->head, value)
发送 data 到 firstStage
不考虑传输, 认为 <=> firstStage 收到
(2) resultThread
void *pipe_result (void *arg)
从 lastStage 取 result
activeDataNum > 0 则可取
(3) stageThread: 除 last stage 之外 n-1
pipe_stage (void *arg) // arg: Stage*
1) 等待 currentStage 上 dataCanBeProcessed
2) process
3) 发送到 nextStage 中 <=> next stage 上收成功:
[1] 等待 nextStage 上 dataCanBeAccepted <=> 等待 current Stage 上 data 发送成功
[2] nextStage 上 接收
[3] signal nextStage 上 dataCanBeProcessed
pipe_send (Stage *nextStage, long data)
4) signal currentStage 上 dataCanBeAccepted
=>
并发性 :
视 startThread 为 firstStage 的 preThread
视 resultThread 为 lastStage 的 Thread
otherStage: 有自己的 thread
则
1) curStageThread 与 preThread
preThread 发送成功 -> signal curStageThread 从 dataCanBeProcessed Cv 唤醒
2) curStageThread 与 curStageThread
curStageThread 发送成功 -> signal curStageThread 从 dataCanBeAccepted Cv 唤醒
stage->dataCanBeProcessed == 1 / 0 分别为
dataCanBeProcessedCv / dataCanBeAcceptedCv 唤醒 的 predicate 满足条件
// ====== pipe.c
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h> // malloc
#include <string.h>
typedef struct StageTag
{
pthread_mutex_t mutex; // protect curStage's data
long data; // data processing: process (and send) 完成后, 才能再 用于接收 input, stage 上 shared data
pthread_cond_t dataCanBeProcessedCv;
pthread_cond_t dataCanBeAcceptedCv;
int dataCanBeProcessed; // flag: used for predicate
pthread_t thread; // Thread for stage
struct StageTag *next; // next field, to form a stageList
} Stage;
typedef struct pipeTag
{
pthread_mutex_t mutex; // protect activeDataNum
int activeDataNum;
Stage *head;
Stage *tail;
int stageNum;
} Pipe;
typedef struct RetTag
{
Pipe* pipe;
long result;
}Ret;
void pipe_send (Stage *nextStage, long data)
{
pthread_mutex_lock (&nextStage->mutex); // ===== lock
// (1) wait on cv, until curStage dataCanBeProcessed
while (! (nextStage->dataCanBeProcessed == 0) )
pthread_cond_wait (&nextStage->dataCanBeAcceptedCv, &nextStage->mutex);
// (2) start/preStage 发 data 到 curStage
nextStage->data = data;
// (3) modify predicate & notify nextStageThread dataCanBeProcessed
if(nextStage->next == NULL)
{
printf("signal tail\n");
}
nextStage->dataCanBeProcessed = 1;
pthread_cond_signal (&nextStage->dataCanBeProcessedCv);
pthread_mutex_unlock ( &nextStage->mutex); // ===== unlock
}
void *pipe_stage (void *arg)
{
// (0) get pCurStage & extract pNextStage
Stage *stage = (Stage*)arg;
Stage *nextStage = stage->next;
pthread_mutex_lock (&stage->mutex); // ===== lock
while (1)
{
printf("=== stage waiting...\n");
// (1) wait until dataCanBeProcessed
while (! (stage->dataCanBeProcessed == 1) )
pthread_cond_wait (&stage->dataCanBeProcessedCv, &stage->mutex);
printf("=== stage process %ld start...\n", stage->data);
// (2) process & send to nextStge
pipe_send (nextStage, stage->data + 1);
printf("=== stage process&send %ld end...\n", stage->data + 1);
// (3) notify curStageThread dataCanBeAccepted
stage->dataCanBeProcessed = 0;
pthread_cond_signal (&stage->dataCanBeAcceptedCv);
}
// Note: loop never terminates, no need to unlock mutex explicitly
}
int pipe_create (Pipe *pipe, int stageNum)
{
Stage **stageList2LevPtr = &pipe->head, *new_stage = NULL, *stage = NULL;
pthread_mutex_init (&pipe->mutex, NULL);
pipe->stageNum = stageNum;
pipe->activeDataNum = 0;
// (1) construct stageList: n 个 stage
for (int i = 0; i <= stageNum; i++)
{
new_stage = (Stage*)malloc (sizeof (Stage) );
pthread_mutex_init (&new_stage->mutex, NULL);
pthread_cond_init (&new_stage->dataCanBeProcessedCv, NULL);
pthread_cond_init (&new_stage->dataCanBeAcceptedCv, NULL);
new_stage->dataCanBeProcessed = 0;
*stageList2LevPtr = new_stage;
stageList2LevPtr = &new_stage->next;
}
*stageList2LevPtr = (Stage*)NULL;
pipe->tail = new_stage;
// (2) a stage a thread created, except the last stage doesn't get a thread
for ( stage = pipe->head; stage->next != NULL; stage = stage->next)
pthread_create (
&stage->thread, NULL,
pipe_stage, (void*)stage);
return 0;
}
int pipe_start (Pipe *pipe, long value)
{
// (1) pipe->activeDataNum++
pthread_mutex_lock (&pipe->mutex); // ===== lock
pipe->activeDataNum++;
pthread_mutex_unlock (&pipe->mutex); // =====unlock
// (2) init, head stage 上 可接收 与 preStage/start 发送 data
pipe_send (pipe->head, value);
return 0;
}
// wait until the pipeline output a result -> write to caller's memory
// int pipe_result (Pipe *pipe, long *result)
void *pipe_result (void *arg)
{
pthread_detach( pthread_self() );
Pipe *pipe = ((Ret*)arg)->pipe;
printf("result init: %ld\n", ((Ret*)arg)->result);
long *result = &((Ret*)arg)->result;
Stage *tail = pipe->tail;
int empty = 0;
// (1) pipe־>activeDataNum check, and modify
pthread_mutex_lock (&pipe->mutex); // ===
printf("activeDataNum: %d\n", pipe->activeDataNum);
if (pipe->activeDataNum <= 0)
empty = 1;
else
pipe->activeDataNum--;
pthread_mutex_unlock (&pipe->mutex); // ===
pthread_mutex_lock ( &tail->mutex);
printf("pipe_result waiting...\n");
// (2) wait until dataCanBeProcessed // Note: result/mian 线程 要先 wait on dataCanBeProcessedCv
while (! (tail->dataCanBeProcessed == 1))
pthread_cond_wait (&tail->dataCanBeProcessedCv, &tail->mutex);
printf("pipe_result process...\n");
// (3) process / store data to caller's storage
*result = tail->data;
// (4) modify predicate & notify curStageThread dataCanBeProcessed
tail->dataCanBeProcessed = 0;
pthread_cond_signal (&tail->dataCanBeAcceptedCv);
pthread_mutex_unlock (&tail->mutex);
return NULL;
}
int main (int argc, char *argv[])
{
long value, result;
char line[128];
int stageNum = 3;
pthread_t myTid;
// (1) create stageNum 个 stageThread
Pipe my_pipe;
pipe_create (&my_pipe, stageNum);
printf ("Enter integer values, or \"=\" for next result\n");
// (2) create result 线程: 要先 wait on dataCanBeProcessedCv
Ret* res = (Ret*)malloc(sizeof(Ret) );
res->pipe = &my_pipe;
res->result = 0;
pthread_create(&myTid, NULL, pipe_result, (void*)res );
while (1)
{
printf ("Data> ");
// (3) line 含 "Enter" 最后的 "\n"
fgets (line, sizeof (line), stdin);
printf("line %s len: %d\n", line, (int)strlen (line));
if ( strlen (line) <= 1 )
continue;
if (strlen (line) <= 2 && line[0] == '=')
{
// (4)
printf("result: %ld\n", res->result);
free(res);
/*
if (pipe_result(&my_pipe, &result) )
printf ("Result is %ld\n", result);
else
printf ("Pipe is empty\n");
*/
}
else
{
// (5)
sscanf (line, "%ld", &value);
printf("value put to pipeline: %ld\n", value);
pipe_start (&my_pipe, value);
}
}
}
=== stage waiting...
Enter integer values, or "=" for next result
Data> result init: 0
=== stage waiting...
=== stage waiting...
activeDataNum: 0
pipe_result waiting...
2 // Note: first input
line 2
len: 2
value put to pipeline: 2
Data> === stage process 2 start...
=== stage process&send 3 end...
=== stage waiting...
=== stage process 3 start...
=== stage process&send 4 end...
=== stage waiting...
=== stage process 4 start...
signal tail
=== stage process&send 5 end...
=== stage waiting...
pipe_result process...
= // Note: second input
line =
len: 2
result: 5
Data>
4.2 客户/服务器 (C/S)
0 data
见 code
1 main
(1) 建 n 个 clients threads
(2) clientsDoneCv 上 等待 all clientthreads 结束
(3) quit request: terminate server thread(detached)
2 void *clientRoutine (void *arg) // clientThraedIndex
(1) 循环
1) 向 server 读请求
从 stdin 读 1 行 字符串 到 client's string: 同步
直到 server 上 读到 空行 (fgets + "Enter") 时, break 以结束 clientThread
fgets (request->text, 128, stdin)
-> request->text 尾部 '\n' 转化为 '\0'
-> copy 到 client's string
2) 格式化 string 为 formattedStr: 带上 threadIndex 等
3) 向 server 写请求: 写 formattedStr 到 stdout
(2) clinet 线程数 --, 若 减为 0, notify(唤醒) main 线程
3 void requestToServer (
int operationCode,
int sync,
const char *promptForRead,
char *string) // Note: paraOut/paraIn: read/write
=== client 侧
Note: client 调用, 不考虑 client 与 server 间 传输(如 socket) => requestToServer() <=> serverRespond()
(1) 第 1 次被 (client) 调用 时, 才 创建 1 个 serverThread & 置 running = 1
(2) request structure: allocate & fill
(3) Add request to list
(4) signal server/requestListNotEmptyCv to woke up & process request
(5) If request syn -> wait 直到 request->done
if read, copy request->text 到 client's string
4 void *serverRoutine (void *arg)
/* === server 侧
无限循环, 直到 收到 main 的 quit request
(1) cv.wait 直到 requestList not empty
(2) 取出 first request
(3) process request: 据 不同 operationCode
1) readRequest
-> read/fgets a line (from stdin) to(到) request->text,
-> turn line's newline("Enter") to null('\0' )
Note: line == NULL 用于 结束 client thread
2) writeRequest
request->text( requestToServer() 用 fill ) 输出到 stdout
(4) if request syn(read), signal client / request->doneCv, first request is done
else, free request's memory
(5) if request quit, server quit from loop
// === cs.c
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <math.h>
#define REQ_READ 1
#define REQ_WRITE 2
#define REQ_QUIT 3
typedef struct RequestTag
{
struct RequestTag *next; // form requestList
int operationCode;
int synchronous; // 0: no 1: yes
int done; // used for Predicate
pthread_cond_t doneCv; // use server's mutex, because server has only one thread
char promptForRead[32]; // only used for read
char text[128]; // Read/write text: fill by server / client
} Request;
typedef struct ServerContextTag
{
int running; // processing a request
Request *first; // requestList & predicate associated with requestListNotEmptyCv
Request *last; // FIFO: process order
pthread_mutex_t mutex;
pthread_cond_t requestListNotEmptyCv;
} ServerContext;
// shared data
ServerContext serverContext =
{
0, NULL, NULL,
PTHREAD_MUTEX_INITIALIZER,
PTHREAD_COND_INITIALIZER
};
int clientThreadsNum = 2;
pthread_mutex_t clientMutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t clientsDone = PTHREAD_COND_INITIALIZER;
void *serverRoutine (void *arg)
{
static pthread_mutex_t prompt_mutex = PTHREAD_MUTEX_INITIALIZER;
Request *request;
int operationCode, len;
while (1)
{
pthread_mutex_lock (&serverContext.mutex); // ===
// (1)
while (serverContext.first == NULL)
pthread_cond_wait (
&serverContext.requestListNotEmptyCv, &serverContext.mutex);
// (2)
request = serverContext.first;
serverContext.first = request->next;
if (serverContext.first == NULL)
serverContext.last = NULL;
pthread_mutex_unlock (&serverContext.mutex); // ===
// (3)
operationCode = request->operationCode;
switch (operationCode)
{
case REQ_QUIT:
break;
case REQ_READ:
if (strlen(request->promptForRead) > 0)
printf(request->promptForRead);
if (fgets(request->text, 128, stdin) == NULL)
request->text[0] = '\0';
len = strlen (request->text);
if (len > 0 && request->text[len-1] == '\n')
request->text[len-1] = '\0';
break;
case REQ_WRITE:
puts (request->text);
break;
default:
break;
}
// (4)
if (request->synchronous)
{
pthread_mutex_lock (&serverContext.mutex); // ===
request->done = 1;
pthread_cond_signal (&request->doneCv);
pthread_mutex_unlock (&serverContext.mutex); // ===
}
else
free (request);
// (5)
if (operationCode == REQ_QUIT)
break;
}
return NULL;
}
void requestToServer (
int operationCode,
int sync,
const char *promptForRead,
char *string) // Note: paraOut/paraIn: read/write
{
Request *request;
pthread_mutex_lock (&serverContext.mutex);
// (1)
if (!serverContext.running)
{
pthread_t thread;
pthread_attr_t detached_attr;
pthread_attr_init (&detached_attr);
pthread_attr_setdetachstate (
&detached_attr, PTHREAD_CREATE_DETACHED);
serverContext.running = 1;
pthread_create (&thread, &detached_attr,
serverRoutine, NULL);
pthread_attr_destroy (&detached_attr);
}
// (2)
request = (Request*)malloc (sizeof (Request));
request->next = NULL;
request->operationCode = operationCode;
request->synchronous = sync;
// 1) read & syn
if (sync)
{
request->done = 0;
pthread_cond_init (&request->doneCv, NULL);
}
if (promptForRead != NULL)
strncpy (request->promptForRead, promptForRead, 32);
else
request->promptForRead[0] = '\0';
// 2) write & nonSyn
if (operationCode == REQ_WRITE && string != NULL)
strncpy (request->text, string, 128);
else
request->text[0] = '\0';
// (3)
if (serverContext.first == NULL)
{
serverContext.first = request;
serverContext.last = request;
}
else
{
(serverContext.last)->next = request;
serverContext.last = request;
}
// (4)
pthread_cond_signal (&serverContext.requestListNotEmptyCv);
// (5)
if (sync)
{
while (!request->done)
pthread_cond_wait (
&request->doneCv, &serverContext.mutex);
if (operationCode == REQ_READ)
{
if (strlen (request->text) > 0)
strcpy (string, request->text);
else
string[0] = '\0';
}
pthread_cond_destroy (&request->doneCv);
free (request);
}
pthread_mutex_unlock (&serverContext.mutex);
}
void *clientRoutine (void *arg) // clientThraedIndex
{
int clientThraedIndex = (int)arg;
char promptForRead[32];
char string[128], formattedStr[128];
sprintf (promptForRead, "Client %d> ", clientThraedIndex);
// (1)
while (1)
{
requestToServer (REQ_READ, 1, promptForRead, string);
if (strlen (string) == 0)
break;
for (int loops = 0; loops < 2; loops++)
{
sprintf(formattedStr, "(%d#%d) %s", clientThraedIndex, loops, string);
requestToServer (REQ_WRITE, 0, NULL, formattedStr);
sleep (1);
}
}
// (2)
pthread_mutex_lock (&clientMutex); // ===
clientThreadsNum--;
if (clientThreadsNum <= 0)
pthread_cond_signal (&clientsDone);
pthread_mutex_unlock (&clientMutex); // ===
return NULL;
}
int main (int argc, char *argv[])
{
pthread_t thread;
// (1)
clientThreadsNum = clientThreadsNum;
for (int count = 0; count < clientThreadsNum; count++)
pthread_create (&thread, NULL,
clientRoutine, (void*)count);
// (2)
pthread_mutex_lock (&clientMutex); // ===
while (clientThreadsNum > 0)
pthread_cond_wait (&clientsDone, &clientMutex);
pthread_mutex_unlock (&clientMutex); // ===
printf ("All clients done\n");
// (3)
requestToServer (REQ_QUIT, 1, NULL, NULL);
return 0;
}
// print
Client 1> hello
Client 0> world
(1#0) hello
(1#1) hello
Client 1>
(0#0) world
(0#1) world
Client 0> ok
(0#0) ok
(0#1) ok
Client 0>
All clients done