IPC

详解底层读写锁

2019-12-15  本文已影响0人  madao756

前言:这篇文章硬核了,我们来看看 pthread 的读写锁是怎么实现的

0X00 读写锁的感性认识

最适合用来理解「读写锁」的就是访问银行账户余额了

0X01 与读写锁相关的函数

#include <pthread.h>

int pthread_rwlock_rdlock (pthread_rwlock_t *rwptr);
int pthread_rwlock_wrlock(pthread_rwlock_t *rwptr);
int pthread_rwlock_unlock (pthread_rwlock_t *rwptr);

上述函数的作用显而易见,其中 pthread_rwlock_rdlock 和 pthread_rwlock_wrlock 都是阻塞的函数

#include <pthread.h>
int pthread_rwlock_tryrdlock (pthread_rwlock_t *rwptr);
int pthread_rwlock_trywrlock(pthread_rwlock_t *rwptr);

这上面的函数作用也是显而易见,但是这是非阻塞的函数,如果没有获取成功,会立即返回一个值,而不是阻塞

同样对读写锁的属性也有相应的函数

#include <pthread.h>
int pthread_rwlock_init (pthread_rwlock_t *rwprr, const pthread_rwlockattr_t *attr);

int pthread_rwlock_destroy (pthread_rwlock_t *rwptr);

前者我们可以用来初始化一个读写锁,而当一个线程不再需要某个读写锁的时候,可以用后者摧毁这个读写锁

更多的内容可以参考:《UNIX 网络编程进程间通信》8.3

0X02 读写锁的底层实现

接下来我们将学习如何用「条件变量」和「互斥锁」实现一个读写锁

首先我们得知道读写锁有哪些特性:

我们来看读写锁的 struct

typedef struct {
  pthread_mutex_t   rw_mutex;       /*最基础的读写互斥锁*/
  pthread_cond_t    rw_condreaders; /* 读等待的条件变量 */
  pthread_cond_t    rw_condwriters; /* 写等待的条件变量 */
  int               rw_magic;
  int               rw_nwaitreaders;
  int               rw_nwaitwriters;
  int               rw_refcount;
} pthread_rwlock_t;

主要说后面几个 int 成员的作用

这个成员用来标志「读写锁」是不是初始化成功,是不是被摧毁了。

我们看后面的 init 函数,rw_magic 的赋值一定是在最后面。而看 destory 函数 rw_magic 的赋值一定是在最前面。

因为这就是一个标志,init 的时候,rw_magic 被置数 RW_MAGIC,标志这个「读写锁」全部成员初始化成功,而 destory 的时候,一开始就要将 rw_magic 置 0。哪怕这个其他成员还没来得及 destory 就调度到其他代码了,也能知道这个锁被 destory 了

这两个成员就显而易见了,这个读写锁的等待的「读」和「写」的数

这个成员用来表示本「读写锁」的状态,0 的时候,表示这个锁可用,-1 的时候表示这是一个写锁,大于 0 的值意味着它当前容纳着多少个「读锁」

接下来我们介绍一些相关的函数

初始化与摧毁

int
pthread_rwlock_init(pthread_rwlock_t *rw, pthread_rwlockattr_t *attr)
{
    int     result;

    // 不支持初始化的时候申明属性
    if (attr != NULL)
        return(EINVAL);     /* not supported */

    // 显而易见初始化一个互斥锁和两个条件变量
    if ( (result = pthread_mutex_init(&rw->rw_mutex, NULL)) != 0)
        goto err1;
    if ( (result = pthread_cond_init(&rw->rw_condreaders, NULL)) != 0)
        goto err2;
    if ( (result = pthread_cond_init(&rw->rw_condwriters, NULL)) != 0)
        goto err3;
    
    rw->rw_nwaitreaders = 0;
    rw->rw_nwaitwriters = 0;
    rw->rw_refcount = 0;
    rw->rw_magic = RW_MAGIC;

    return(0);

 // 如果初始化失败,则毁掉之间创建好的互斥锁或者条件变量,并报错 
err3:
    pthread_cond_destroy(&rw->rw_condreaders);
err2:
    pthread_mutex_destroy(&rw->rw_mutex);
err1:
    return(result);         /* an errno value */
}
pthread_rwlock_destroy(pthread_rwlock_t *rw)
{
    if (rw->rw_magic != RW_MAGIC)
        return(EINVAL);
    // 确保在没有使用的情况下销毁
    if (rw->rw_refcount != 0 ||
        rw->rw_nwaitreaders != 0 || rw->rw_nwaitwriters != 0)
        return(EBUSY);

    pthread_mutex_destroy(&rw->rw_mutex);
    pthread_cond_destroy(&rw->rw_condreaders);
    pthread_cond_destroy(&rw->rw_condwriters);
    // 标记已摧毁
    rw->rw_magic = 0;

    return(0);
}

开始难一点的函数剖析了:

int
pthread_rwlock_rdlock(pthread_rwlock_t *rw)
{
    int     result;
    // 检查这个锁是不是被摧毁了
    if (rw->rw_magic != RW_MAGIC)
        return(EINVAL);
   // 每个相关的函数都必须给这个「读写锁」的互斥锁上锁,用来保护当前「读写锁」中的数据
    if ( (result = pthread_mutex_lock(&rw->rw_mutex)) != 0)
        return(result);

    // a 检查当前锁是不是读写锁 b 检查当前是否有写锁在等待
    // 如果有写锁在等待,就阻塞自己,等待读的条件变量
    while (rw->rw_refcount < 0 || rw->rw_nwaitwriters > 0) {
        rw->rw_nwaitreaders++;
        result = pthread_cond_wait(&rw->rw_condreaders, &rw->rw_mutex);
        rw->rw_nwaitreaders--;
        if (result != 0)
            break;
    }
    // 取得「读锁」的时候 rw_refcount + 1,表示此时又多容纳了一个新的「读锁」
    if (result == 0)
        rw->rw_refcount++;      /* another reader has a read lock */

    pthread_mutex_unlock(&rw->rw_mutex);
    return (result);
}

接下来是是读锁的非阻塞版本 pthread_rwlock_tryrdlock,这个非常简单,并没有想象中那么难

int
pthread_rwlock_tryrdlock(pthread_rwlock_t *rw)
{
    int     result;

    if (rw->rw_magic != RW_MAGIC)
        return(EINVAL);

    if ( (result = pthread_mutex_lock(&rw->rw_mutex)) != 0)
        return(result);

    if (rw->rw_refcount < 0 || rw->rw_nwaitwriters > 0)
        result = EBUSY;         /* held by a writer or waiting writers */
    else
        rw->rw_refcount++;      /* increment count of reader locks */

    pthread_mutex_unlock(&rw->rw_mutex);
    return(result);
}

非阻塞也不是意味着用的都是非阻塞函数,这个非阻塞的版本与之前的阻塞版本也没什么不同,只是不再阻塞等待「写锁」了

我们再来看看「写锁」

int
pthread_rwlock_wrlock(pthread_rwlock_t *rw)
{
    int     result;

    if (rw->rw_magic != RW_MAGIC)
        return(EINVAL);

    if ( (result = pthread_mutex_lock(&rw->rw_mutex)) != 0)
        return(result);

    
    // 有两种情况 写锁需要阻塞
    // a 当前锁是一个写锁 也就是 rw->rw_refcount = -1
    // b 有读锁拥有当前锁,此时 rw->rw_refcount > 0
    // 所以 rw->rw_refcount != 0 的时候必须阻塞
    while (rw->rw_refcount != 0) {
        rw->rw_nwaitwriters++;
        result = pthread_cond_wait(&rw->rw_condwriters, &rw->rw_mutex);
        rw->rw_nwaitwriters--;
        if (result != 0)
            break;
    }
    if (result == 0)
        rw->rw_refcount = -1;

    pthread_mutex_unlock(&rw->rw_mutex);
    return(result);
}

而写锁不必等待读锁的条件变量,只需要读锁自己解锁以后 rw->rw_refcount-- 就 ok

我们再来看看上写锁的非阻塞版本:

int
pthread_rwlock_trywrlock(pthread_rwlock_t *rw)
{
    int     result;

    if (rw->rw_magic != RW_MAGIC)
        return(EINVAL);

    if ( (result = pthread_mutex_lock(&rw->rw_mutex)) != 0)
        return(result);

    if (rw->rw_refcount != 0)
        result = EBUSY;         /* held by either writer or reader(s) */
    else
        rw->rw_refcount = -1;   /* available, indicate a writer has it */

    pthread_mutex_unlock(&rw->rw_mutex);
    return(result);
}

这个很简单就不说了

最后我们来看看如何给当前「读写锁」解锁的 pthread_rwlock_unlock


int
pthread_rwlock_unlock(pthread_rwlock_t *rw)
{
    int     result;

    if (rw->rw_magic != RW_MAGIC)
        return(EINVAL);

    if ( (result = pthread_mutex_lock(&rw->rw_mutex)) != 0)
        return(result);

    if (rw->rw_refcount > 0)
        rw->rw_refcount--;          /* 释放一个读 */
    else if (rw->rw_refcount == -1)
        rw->rw_refcount = 0;        /* 释放一个写 */
    else
        err_dump("rw_refcount = %d", rw->rw_refcount);

    if (rw->rw_nwaitwriters > 0) {
        // 唤起一个写
        if (rw->rw_refcount == 0)
            result = pthread_cond_signal(&rw->rw_condwriters);
    } else if (rw->rw_nwaitreaders > 0)
        // 唤起所有读
        result = pthread_cond_broadcast(&rw->rw_condreaders);

    pthread_mutex_unlock(&rw->rw_mutex);
    return(result);
}

更详细的内容可以参考:《UNIX 网络编程进程间通信》

0X03 线程取消

如果上面的函数在阻塞的时候,调用的线程被取消了,那么计数器就会出错,因为这个线程被取消了,理应不再持有这个「读写锁」,

但是「读写锁」的一些成员记录着等待的线程数,在取消的时候并没有做相应的处理,所以计数器出错

首先我们得知道到线程取消的函数调用是什么:

#include <pthread.h>

// 若成功则返回 0,若出错则为正的 Exxx 值
int pthread_cancel(pthread_t tid);

一个线程可以被同一进程内的任何其他线程取消,唯一参数就是线程的 tid

如果启动了多个线程执行某个任务,如果其中某个线程发现了一个错误,它和其他线程就有必要终止

因此我们就有这样的两个函数,用来在线程被取消、终止之前执行:

#include <pthread.h>
void pthread_cleanup_push(void (*function) (void *), void *arg);
void pthread_cleanup_pop(int execute);

前者被用来处理被取消(pthread_cancel)的情况,后者用来处理线程自愿终止(调用 pthread_exit 或线程自己结束)

更多的内容可以参考:《UNIX 网络编程进程间通信》8.5

完结撒花~

上一篇 下一篇

猜你喜欢

热点阅读