threadPool of sogou's workflow -

2022-05-24  本文已影响0人  my_passion
参考 https://zhuanlan.zhihu.com/p/503733481
style: return value >= 0 表示 success

1. 三大特点 
    
(1) 线程池 running 时, 无需记录任何 线程 id 或对象, 线程池 terminating 时, 线程池可以通过 一个等一个的方式 优雅地去结束所有线程
    即 每个线程都是对等的

    例: 发起 destroy 的人 若为 mainThread, 如何做到一个等一个退出

        线程(池)的退出, 由 thrdpool_destroy() 设置 pool->terminate 开始
        
        每个线程, 在 while(1) 里 会第一时间发现 terminate (!= NUll), 线程池要退出 了, 然后 break 出 while 循环
        
            注意: 此时, 还 持有 mutex, 拿到 pool 上 唯一的 tid, 放到 自己的 临时变量, 据 tid 是否 == 0 决定是否 join preWorkerThread  
            
                把 自己的 tid == pthread_self() 放到 pool 上(以使 preThread join 他), 再解锁 mutex   
            
        => 
        1] 第1个从 pool 上 拿 tid 的 人, 会发现是 0 值, 可直接结束
            不用负责 等待 任何其他人, 但我 在完全结束前 需要 有人负责等待我的结束, 所以我会把我的 tid 放上去
            
        2] 而如果发现自己从 pool里拿到的 tid 不是 0 值, 说明 我要负责 join 上一个人, 并把我的 tid 放上去,让下一个人负责我
        
        3] 最后 那个人(workerThread), 是发现 pool->nthreads 为 0 的 人, 
            他通过 terminate(condition) 通知 发起 destroy 的人
        
        4] 最后 thrdPoolDestoryerThread 退出
        
        非常优雅的做法!!!

(2) a thrdPoolTask(消费者) 可以 调 another thrdPoolTask; 
    线程池 destorying 时, 线程池中的 task 也可以通过 调 thrdpool_schedule() 来 提交 nextTask
        这很重要, 因为线程本身很可能不知道线程池的状态
        
    即, 每个任务是对等的
    
        another thrdPoolTask / the next thrdPoolTask 是 ( thrdPoolTask routine 中 调)
            [1] thrdpool_schedule()     --- 生产者 
            [2] __thrdpool_terminate()  --- thrdPoolTerminateThread
            [3] thrdpool_destroy()      --- thrdPoolDestoryerThread
    
    只要 taskList 管理得好, 消费者(consumer) 所执行的函数 也可以做 生产者(producer)
    
    执行 routine 的线程, 都是 消费者
    发起 schedule 的线程, 都是 生产者
    
(3) 线程任务 可销毁 线程池

    即 每一种行为是对等的, 包括 destroy
    
2. 线程池中 任务入口: 
    [1] hook: 双向链表 的 node, 无 data 成员, 只有 pre/next 的 link 成员, 以 形成 hook list
    [2] task 靠 关联的 hook 被 挂起来, 形成 task list 
            prototype & context 相同
            
struct __thrdpool_task_entry
{
    struct list_head list;     // task's hook(钩子): 将 task 挂起来, hookList - - - 固定 offset - - - > taskList
    struct thrdpool_task task; // task 
};

3. 线程池
struct __thrdpool 
{
    struct list_head task_queue; // 任务队列 关联的 hookList 的 headNode(invalid, there is no associated task)   
    size_t nthreads;             // 线程数
    size_t stacksize;            // 线程栈 size 
    pthread_t tid;               // pool 运行 时, pool 上记录的 tid 是 zero 值 
    pthread_mutex_t mutex;        
    pthread_cond_t cond;
    pthread_key_t key;
    pthread_cond_t *terminate;
};

没有一个多余, 每个成员都很到位

task_queue
    1) init
    
        thrdpool_create() 中 
            INIT_LIST_HEAD(&pool->task_queue);
                hookList init: invalid 头结点(pool->task_queue) pre/next 都 指向自身
        task_queue 无 associated task
            => task_queue 含义: hookList 的 invalid 头结点 
    
    2) 尾插 task 到 taskList: 变 link 域 (pre/next)
    
        __thrdpool_schedule() 中 
            list_add_tail(&entry->list, &pool->task_queue); 
                尾插 task's hook 到 hookList <=> 尾插 task 到 taskList 
                尾插: 插到 hookList 的 headPre == tail 与 head 之间 
        
    3) 取出 firstTask -> delete it from tastList -> 先解锁 -> 再 执行 firstTask 
        
        取  firstValidHook(==) 的 ptr 
        -> 据 firstValidHook(struct mem) 的 ptr(mem ptr/address 更大) 
            与其 所在 taskEntry(struct) 地址 的 offset 
                求 其 taskEntry 的 addr 
        -> delete firstValidHook from hookList <=> delete firstTask from taskList 
        -> ...
        
        __thrdpool_routine()
        {
            // ...
            headNext = &pool->task_queue.next;
            // ...
            entry = list_entry(*headNext, struct __thrdpool_task_entry, list);
            list_del(*headNext);
            pthread_mutex_unlock(&pool->mutex); 
            
            task_routine = entry->task.routine;
            task_context = entry->task.context;
            free(entry);
            task_routine(task_context);
        }
        
    4) 逐个 取出 (first)task -> 从 taskList 删除 -> 通过 usrFuncPtr 让 user 回调
    
        thrdpool_destroy()
        {
            // ...
            list_for_each_safe(pos, tmp, &pool->task_queue)
                entry = list_entry(pos, struct __thrdpool_task_entry, list);
                list_del(pos);
                if (pending)
                    pending(&entry->task);
        }
    
nthreads
    = 0
        thrdpool_create() 中 
    
    ++ 
        1] __thrdpool_create_threads() 中 pthread_create() 后 
        2] thrdpool_increase()         中 pthread_create() 后 
        
    --
        1] __thrdpool_terminate() 中 in_pool == 1 时 
        
        2] __thrdpool_routine() 中 pool->terminate 被置为 cond 时

tid

    线程 id, 线程池 同时只记录 1个 tid
        线程池 运行时, 不会(没必要)记录 current 正在运行的 worker thread 的 tid => tid 是 空/无效值 0
        线程池 退出时, tid 用来实现 `链式等待`
    
    init: 0 值 
        memset(&pool->tid, 0, sizeof (pthread_t));
        
    use
        /* 
        1) extract old pool->tid: 把 线程池 上记录的 tid(上一个人的 tid) 拿下来, 
                我(current work thread) 来 负责 上一个人 (pre worker thread)
        2) update new pool->tid: 把我自己的 tid 记录到 线程池 上, 下一个人来 负责 我
        3) 每个人都减 1, 最后一个人(不算 发起 destory) 负责叫醒 发起 destroy 的 人 (main 线程)
        4) unlock 
        5) 只有第1个人 拿到的 tid == 0, 不 join other worker thread (there is no other worker thread)
        6) 只要不是 0 值, 我就要负责 等上一个人结束 才能退:
        */
        __thrdpool_routine() 
        {
            // ...
            tid = pool->tid;            
            pool->tid = pthread_self(); 
            if (--pool->nthreads == 0)   
                pthread_cond_signal(pool->terminate);

            pthread_mutex_unlock(&pool->mutex); // === 
            if (memcmp(&tid, &__zero_tid, sizeof (pthread_t)) != 0)
                pthread_join(tid, NULL); 
                
            // ...
        }       
        
        /* __thrdpool_terminate() 
         (1) 两种 case 下被调用 
            [1] __thrdpool_create_threads() 中 创建 n 个线程失败 时, 被调用 
            [2] thrdpool_destroy() 中 被调用
        
         (2) 与 __thrdpool_routine() 中 比较 pool->tid 与 0, 然后 join preWorkerThread
                [1] terminateThread/thrdPoolDestoryerThread 等 workerThreadN, 自己 不必被等待
                [2] workerThread1 不必等待
                [3] workerThread2-N 等待 preWorkerThread
        */
        __thrdpool_terminate()
        {
            // ...
            while (pool->nthreads > 0)
            pthread_cond_wait(&term, &pool->mutex); // === 
            // <=> pthread_cond_wait(pool->terminate, &pool->mutex);

            pthread_mutex_unlock(&pool->mutex);         // ===
            if (memcmp(&pool->tid, &__zero_tid, sizeof (pthread_t)) != 0)
                pthread_join(pool->tid, NULL); 
        }
        
    => pool->tid 含义: 
        
        workerThread1   workerThread2 ...   workerThreadN   thrdPoolDestoryerThread(main / workerThreadNPlus1)
        
        terminate 线程池 时, 池中各线程 & thrdPoolDestoryerThread 逐个 链式等待(join) 前1个 结束
        
        对 当前 workerThread1 & thrdPoolDestoryerThread 来说 preWorkerThread 的 identifer
            对 workerThread1 来说, preWorkerThread 不存在 => pool->tid = 0 表示 无效值 
            对 等待 workerThreadN 的 thrdPoolDestoryerThread 来说, preWorkerThread 是 workerThreadN
            
    Note
        thrdpool_increase() 与 __thrdpool_create_threads() 中 pthread_create(&tid,...) 的 tid 只在 两个函数中用 

cond
    配合 mutex, 线程间同步
        cond 用来 给 生产者和消费者 去操作 taskList 用的
        
    Note: 都在 hold mutex 时进行
    
    1) init 
        __thrdpool_init_locks(thrdpool_t *pool)
        {
            ...
            ret = pthread_cond_init(&pool->cond, NULL);
            ...
        }
    
    2) destory 
        __thrdpool_destroy_locks(thrdpool_t *pool)
        {
            ...
            pthread_cond_destroy(&pool->cond);
        }
    
    3) threadPool 运行时, worker thread waiting on cond 
        等待着 
            1] a task 被 schedule(调度) 进 threadPool, 或 
            2] 收到 结束线程池 的 broadcast(广播)
    
        __thrdpool_routine(void *arg)
        {
            while (1)
            {
                pthread_mutex_lock(&pool->mutex);   // ===
                while (!pool->terminate && list_empty(&pool->task_queue))
                    pthread_cond_wait(&pool->cond, &pool->mutex);
                    
                ...
            }
        }
        
    4) 调度 1个 task 时, signal 
    
        void __thrdpool_schedule(const struct thrdpool_task *task, void *buf,
                                 thrdpool_t *pool)
        {
            struct __thrdpool_task_entry *entry = (struct __thrdpool_task_entry *)buf;
            // 1)
            entry->task = *task;
            
            pthread_mutex_lock(&pool->mutex);   // ===
            
            // 2) 尾插 task 到 taskList 
            list_add_tail(&entry->list, &pool->task_queue); 
            
            // 3) 叫醒 a waiting worker thread
            pthread_cond_signal(&pool->cond);               
            
            pthread_mutex_unlock(&pool->mutex); // ===
        }
        
    5) 结束 线程池 时, broadcast
    
        __thrdpool_terminate(int thread_in_pool, thrdpool_t *pool)
        {
            pthread_cond_t term = PTHREAD_COND_INITIALIZER;
            pthread_mutex_lock(&pool->mutex);    // === 
            
            // (1) terminate 置 non-NULL
            pool->terminate = &term;
            
            // (2)
            pthread_cond_broadcast(&pool->cond);
        }
        
key
    线程池 的 key
    赋予 每个由 线程池创建的线程 用于 thread local
    用于 区分 destoryer 线程 是否由 线程池创建 <=> 是否为 consumerThread/workerThread 
    
    1) key_create
    
        thrdpool_create(size_t nthreads, size_t stacksize)
        {
            ...
            ret = pthread_key_create(&pool->key, NULL);
            ...
        }
        
    2) key_delete   
    
        thrdpool_create(size_t nthreads, size_t stacksize)
        {
            ...
            if (__thrdpool_create_threads(nthreads, pool) >= 0)
                    return pool;
                
            // === below: __thrdpool_create_threads(nthreads, pool) fail 时
            pthread_key_delete(pool->key);
            ...
        }
        
        thrdpool_destroy(void (*pending)(const struct thrdpool_task *),
                      thrdpool_t *pool)
        {
            ...
            pthread_key_delete(pool->key);
        }
        
    3) setspecific
    
        // 由线程池创建的线程, 用 key 关联 thread local(TSD 线程特定数据): 均设为 current 线程池结构指针 pool
        // => outside thread: pool->key 关联的不是 pool
        __thrdpool_routine(void *arg)
        {
            ...
            pthread_setspecific(pool->key, pool);
        }
    
    4) getspecific
    
        // 该函数 所在 线程(thrdPoolDestoryerThread) 是否 由线程池创建
        inline int thrdpool_in_pool(thrdpool_t *pool)
        {
            return pthread_getspecific(pool->key) == pool;
        }
        
        void thrdpool_destroy(void (*pending)(const struct thrdpool_task *),
                      thrdpool_t *pool)
        {
            // destoryerThread 是否由 线程池创建 <=> 是否为 consumerThread/workerThread
            int thread_in_pool = thrdpool_in_pool(pool);
            ...
        }
    
terminate
    
    2个用途
    1] 线程池 terminate/destory 时的 标记位 
    2] 销毁线程池 的 人(thrdPoolDestoryerThread) 要等待的 condition
        
    1) 线程池创建时, 置 NULL, 标志 线程池 处于 运行中(running)
    
        thrdpool_create(size_t nthreads, size_t stacksize)
        {
            ...
            pool->terminate = NULL;
            ...
        }

    2) 线程池 结束时, 置 non-Null / &term(Conditon), 
        1] 标志 线程池处于 terminating 中
        2] 同时 启用 terminate 作为 链式等待 workerThread 结束 的 CV/Condition 的 作用
        
        __thrdpool_routine(void *arg)
        {
            while (1)
            {
                // (1) 从 任务队列 取出 1个任务, 没有就等待
                pthread_mutex_lock(&pool->mutex);   // ===
                while (!pool->terminate && list_empty(&pool->task_queue)) 
                    pthread_cond_wait(&pool->cond, &pool->mutex); // ===

                if (pool->terminate)
                    break;
                }
                ...
            }
        }
        
        __thrdpool_terminate(int thread_in_pool, thrdpool_t *pool)
        {
            pthread_cond_t term = PTHREAD_COND_INITIALIZER;
            pthread_mutex_lock(&pool->mutex);           // ===
            
            // (1) 标志 线程池 terminate
            pool->terminate = &term;
            ...
            
            // (2) terminate 线程池 时, 池中各线程 & thrdPoolDestoryerThread 逐个 链式等待(join) 前1个 结束 
            while (pool->nthreads > 0)
                pthread_cond_wait(&term, &pool->mutex); // === 
                // <=> pthread_cond_wait(pool->terminate, &pool->mutex);
            ...
        }
    
    3) signal 
    
        __thrdpool_routine(void *arg)
        {
            ...
            if (--pool->nthreads == 0)  // 3) 每个人都减 1, 最后一个人(不算 发起 destory) 负责叫醒 发起 destroy 的 人
                pthread_cond_signal(pool->terminate);
            ...
        
        }
            
4. 问题?

    若 __thrdpool_terminate() 由线程池中 某线程 的 routine() 发起, 则 该线程 等完线程池中 thePreLastWorkerThread 完成, 
        自己却自行 detach, 那谁等 该线程呢?
        因为 这种情形, thrdPoolTerminateThread 或 thrdPoolDestoryerThread 不再是 mainThread,而可能 mainThread 已经终止, 
        那 所有 subThread(线程池中各线程) 就 直接蒸发了

上一篇下一篇

猜你喜欢

热点阅读