从多线程到分布式

从多线程到分布式(二):用锁解决单机问题

2023-08-06  本文已影响0人  吟游雪人

问题先从简单的开始解决:
既然在单机上我们有稳定的方式去解决有序的问题,具体到代码,我们应该怎么处理。
答案之一,用锁。

锁的核心实现机制

一、悲观锁

之所以把一类锁称之为悲观锁,在使用这一类锁的时候,首先考虑的是不管有没有人在竞争,直接先给他加上个独占标记,如果加不上,证明有人在用了,这时候就去排队。它认为同时修改资源的概率很高,很容易出现冲突,所以访问共享资源前,先加上锁,总体效率会更优。

synchronized

名词解释:
临界区:受保护的代码块,同时只有一个线程能进入操作执行。

最常见的内置锁
synchronized

使用方式
class Foo {
  // 1.修饰非静态方法
  synchronized void foo() {
    // 临界区
  }

  // 2.修饰静态方法
  synchronized static void bar() {
    // 临界区
  }

  //3.修饰代码块
  Object obj = new Object();
  void baz() {
    synchronized(obj) {
      // 临界区
    }
  }
}  

锁的粒度级别
1.修饰静态方法,相当于整个类

class Foo{
  // 修饰静态方法
  synchronized(X.class) static void bar() {
    // 临界区
  }
}

2.修饰实例方法,相当于锁定当前实例对象

class Foo {
  // 修饰非静态方法
  synchronized(this) void foo() {
    // 临界区
  }
}

简单的例子

class SafeAdd {
  long value = 0L;
  synchronized long get() {
    return value;
  }
  synchronized void addOne() {
    value += 1;
  }
}

常见疑问
1.读取时为什么要加锁
我们可以理解为了防止多线程操作,所以addOne方法要加锁。但是,读取不会修改数据,因该是线程安全的,为什么读取也要放在synchronized里面?
答案就是,需要保证线程之间的可见性。在一个线程中要读取另一个线程中的操作结果,需要遵守之前说的happens before原则。也就是如果get方法不加锁,就有可能读不到操作后的结果。

class SafeAdd {
  long value = 0L;
  long get() {  //如果不加上synchronized,会有可见性问题
    return value;
  }
  synchronized void addOne() {
    value += 1;
  }
}

2.锁不在同一个对象导致失效
下面的例子中,锁对象一个是class,一个是this。此时锁不会按照预期生效。

class SafeAdd {
  static long value = 0L;
  synchronized long get() {
    return value;
  }
  synchronized static void addOne() {
    value += 1;
  }
}

下面的例子中,锁在两个新对象上,也失效。

class SafeAdd {
  long value = 0L;
  long get() {
    synchronized (new Object()) {
      return value;
    }
  }
  void addOne() {
    synchronized (new Object()) {
      value += 1;
    }
  }
}

条件变量
所谓的条件变量,就是满足一定条件后再往下执行。相同条件的可以放进一个队列里排排队。未满足条件时,在执行完临界区的代码之前,判断条件不满足就可以提前结束挂起,当前线程,释放锁,等待下一次被唤醒后再次进入临界区。
简而言之,就是方法不需要执行完,就在中途等待的方式。

拿synchronized 关键字举例,synchronized修饰的代码块,在编译期会自动生成相关加锁和解锁的代码,但是仅支持一个条件变量;任何一个对象都能成为条件变量,但是前提是获取锁。解锁的条件是代码执行完synchronized块或者调用wait方法。

synchroinzed(){
  //进入临界区
  .....
  while(条件不满足){
     wait()  //进入队列等待,并释放锁 
  }
  ....//不需要执行到最后这里就可以通过wait提前释放锁,执行到最后也会自动释放
}

因为notify() 只能保证在通知时间点,条件是满足的。而被通知线程的执行时间点和通知的时间点基本上不会重合,所以当线程执行的时候,很可能条件已经不满足了(保不齐有其他线程插队)。这一点你需要格外注意。所以是wait被唤醒后也要执行相同判断条件,用while可以表达这种重复判断。

下面是一个实际使用的例子,为了破除死锁,方式之一就是同时获取全部需要操作的变量。如果获取不到则释放。我们可以用一个队列保存操作变量,并判断是不是只有单个操作被保存。代码如下:

class Allocator {
  private List<Object> als;
  // 一次性申请所有资源
  synchronized void apply(Object from, Object to){
    // 经典写法
    while(als.contains(from) ||als.contains(to)){ 
      try{
        wait();//去排队去了。由于被唤醒后,是曾经满足条件,而不是现在一定满足,所以还要再判断。这就是为什么用while不用if的原因。因为synchronized锁的是this,所以调用的是this.wait()方法
      }catch(Exception e){
      }   
    }
    als.add(from);
    als.add(to); 
  }
  // 归还资源
  synchronized void free(Object from, Object to){
    als.remove(from);
    als.remove(to);
    notifyAll();
  }
}

如前文所述,synchronized 的锁和条件变量都是隐式的,同时也伴随着一些可能死锁的问题。因此Java并发包提供了一些显示的锁给我们选择。因为条件变量通知需要在不同线程中通信,所以需要锁配合一起使用,单独使用没有太大意义。并且一个锁也可以同时关联多个条件变量,也就是给复杂条件下给不同线程分组排队提供了可能。
Lock类怎么去破坏死锁条件:

  1. 能够响应中断。synchronized 的问题是,持有锁 A 后,如果尝试获取锁 B 失败,那么线程就进入阻塞状态,一旦发生死锁,就没有任何机会来唤醒阻塞的线程。但如果阻塞状态的线程能够响应中断信号,也就是说当我们给阻塞的线程发送中断信号的时候,能够唤醒它,那它就有机会释放曾经持有的锁 A。这样就破坏了不可抢占条件了。
  2. 支持超时。如果线程在一段时间之内没有获取到锁,不是进入阻塞状态,而是返回一个错误,那这个线程也有机会释放曾经持有的锁。这样也能破坏不可抢占条件。
  3. 非阻塞地获取锁。如果尝试获取锁失败,并不进入阻塞状态,而是直接返回,那这个线程也有机会释放曾经持有的锁。这样也能破坏不可抢占条件。

用Lock实现经典的转账示例:

class Account {
  private int balance;
  private final Lock lock
          = new ReentrantLock();
  // 转账
  void transfer(Account tar, int amt){
    while (true) {
      if(this.lock.tryLock()) {
        try {
          if (tar.lock.tryLock()) {
            try {
              this.balance -= amt;
              tar.balance += amt;
            } finally {
              tar.lock.unlock();
            }
          }//if
        } finally {
          this.lock.unlock();
        }
      }//if
    }//while
  }//transfer
}

用lock配合多个condition(condition可以看做是保存线程的队列)实现阻塞队列。
多条件队列是为了提升性能,下面的这个例子用notifyAll也可以实现相同的正确性,但是无关唤起就会增多,性能降低。

public class BlockedQueue<T>{
  final Lock lock =
    new ReentrantLock();
  // 条件变量:队列不满  
  final Condition notFull =  lock.newCondition();
  // 条件变量:队列不空  
  final Condition notEmpty = lock.newCondition();

  // 入队
  void enq(T x) {
    lock.lock();
    try {
      while (队列已满){
        // 等待队列不满,await阻塞入队,并解锁。
        notFull.await();
      }  
      // 省略入队操作...
      //入队后,通知可出队
      notEmpty.signal();
    }finally {
      lock.unlock();
    }
  }
  // 出队
  void deq(){
    lock.lock();
    try {
      while (队列已空){
        // 等待队列不空
        notEmpty.await();
      }  
      // 省略出队操作...
      //出队后,通知可入队
      notFull.signal();
    }finally {
      lock.unlock();
    }  
  }
}

锁的基本使用说完了,下一章解密如何实现一个锁。

上一篇 下一篇

猜你喜欢

热点阅读