并发--管程(Monitor)

2021-12-15  本文已影响0人  zhemehao819

共享模型之管程

1、共享带来的问题

线程出现问题的根本原因是因为线程上下文切换,导致线程里的指令没有执行完就切换执行其它线程了。

(1)临界区 Critical Section

例如,下面代码中的临界区:

static int counter = 0;

static void increment() 
// 临界区 
{   
    counter++; 
}

static void decrement() 
// 临界区 
{ 
    counter--; 
}

(2)竞态条件 Race Condition

多个线程在临界区内执行,由于代码的执行序列不同而导致结果无法预测,称之为发生了竞态条件

2、synchronized 解决方案

(1)解决手段

为了避免临界区的竞态条件发生,有多种手段可以达到目的。

本次课使用阻塞式的解决方案:synchronized,来解决上述问题,即俗称的【对象锁】,它采用互斥的方式让同一 时刻至多只有一个线程能持有【对象锁】,其它线程再想获取这个【对象锁】时就会阻塞住(blocked)。这样就能保证拥有锁 的线程可以安全的执行临界区内的代码,不用担心线程上下文切换

(2)synchronized语法

synchronized(对象) {
    //临界区
}

例:

static int counter = 0; 
//创建一个公共对象,作为对象锁的对象
static final Object room = new Object();

public static void main(String[] args) throws InterruptedException {    
    Thread t1 = new Thread(() -> {        
    for (int i = 0; i < 5000; i++) {            
        synchronized (room) {     
            counter++;            
         }       
       }    
    }, "t1");

    Thread t2 = new Thread(() -> {       
        for (int i = 0; i < 5000; i++) {         
            synchronized (room) {            
                counter--;          
            }    
        } 
    }, "t2");

    t1.start();    
    t2.start(); 
    t1.join();   
    t2.join();    
    log.debug("{}",counter); 
}

注意:拿到锁的线程2并不是在一个cpu时间片上把所有同步代码块内的代码执行完,而是就算发生上下文切换时切到线程1时,线程1也因为没拿到锁而blocked,锁还是在之前的那个线程2手里,直到执行完synchronized块后释放锁,并唤醒这个锁对象上的阻塞线程,也就是线程1.

synchronized 实际是用对象锁保证了临界区内代码的原子性,临界区内的代码对外是不可分割的,不会被线程切换所打断。

(3)synchronized加在方法上

3、变量的线程安全分析

成员变量和静态变量是否线程安全?

局部变量是否线程安全?

(1)如果调用的对象被共享,且执行了读写操作,则线程不安全

@Slf4j
public class CodeTest {

    public static void main(String[] args) {
        UnsafeTest unsafeTest = new UnsafeTest();
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                unsafeTest.method1();
            }, "t" + i).start();
        }
    }
}

class UnsafeTest {
    List<Integer> list = new ArrayList<>();

    public void method1() {
        for (int i = 0; i < 200; i++) {
            method2();
            method3();
        }
    }

    public void method2() {
        list.add(1);
    }

    public void method3() {
        list.remove(0);
    }
}

首先,存在共享变量list;
其次,ArrayList是非线程安全;
最后,从字节码指令角度分析ArrayList的add操作是分3步的,第一步会获取添加元素的下标 index,第二步对指定的 index 位置添加元素,第三步将 index 往后移。在并发情况下method2和method3均会发生指令交错(上下文切换),导致线程安全问题。

(2)如果是局部变量,则会在堆中创建对应的对象,不会存在线程安全问题。

可以将 list 修改成局部变量,然后将 list 作为引用传入方法中,因为局部变量是每个线程私有的,不会出现共享问题,那么就不会有上述问题了。

class SafeTest {

    public void method1() {
        List<Integer> list = new ArrayList<>();
        for (int i = 0; i < 200; i++) {
            method2(list);
            method3(list);
        }
    }

    public void method2(List<Integer> list) {
        list.add(1);
    }

    public void method3(List<Integer> list) {
        list.remove(0);
    }
}

思考 private 或 final的重要性

在上诉代码中,其实存在线程安全的问题,因为 method2,method3 方法都是用 public 声明的,如果一个类继承 SafeTest 类,对 method2,method3 方法进行了重写,比如重写 method3 方法,代码如下:

class UnsafeSubTest extends SafeTest {

    @Override
    public void method3(List<Integer> list) {
        new Thread(() -> {
            list.remove(0);
        }).start();
    }
}

可以看到重写的方法中又使用到了线程,当主线程和重写的 method3 方法的线程同时存在,此时 list 就是这两个线程的共享资源了,就会出现线程安全问题,我们可以用 private 访问修饰符解决此问题,代码实现如下:

class SafeTest {
    public final void method1() {
        List<Integer> list = new ArrayList<>();
        for (int i = 0; i < 200; i++) {
            method2(list);
            method3(list);
        }
    }

    private void method2(List<Integer> list) {
        list.add(1);
    }

    private void method3(List<Integer> list) {
        list.remove(0);
    }
}

从这个例子可以看出 private 或 final 提供【安全】的意义所在,请体会开闭原则中的【闭】。

常见线程安全类

这里说它们是线程安全的是指,多个线程调用它们同一个实例的某个方法时,是线程安全的

不可变类线程安全性

String、Integer 等都是不可变类,因为其内部的状态不可以改变,因此它们的方法都是线程安全的

有同学或许有疑问,String 有 replace,substring 等方法【可以】改变值啊,那么这些方法又是如何保证线程安 全的呢?

这是因为这些方法的返回值都创建了一个新的对象,而不是直接改变String、Integer对象本身。

典型案例分析

@Slf4j(topic = "c.ExerciseTransfer")
public class ExerciseTransfer {
    public static void main(String[] args) throws InterruptedException {
        Account a = new Account(1000);
        Account b = new Account(1000);
        Thread t1 = new Thread(() -> {
            for (int i = 0; i < 1000; i++) {
                a.transfer(b, randomAmount());
            }
        }, "t1");
        Thread t2 = new Thread(() -> {
            for (int i = 0; i < 1000; i++) {
                b.transfer(a, randomAmount());
            }
        }, "t2");
        t1.start();
        t2.start();
        t1.join();
        t2.join();
        // 查看转账2000次后的总金额
        log.debug("total:{}", (a.getMoney() + b.getMoney()));
    }

    // Random 为线程安全
    static Random random = new Random();

    // 随机 1~100
    public static int randomAmount() {
        return random.nextInt(100) + 1;
    }
}

// 账户
class Account {
    private int money;

    public Account(int money) {
        this.money = money;
    }

    public int getMoney() {
        return money;
    }

    public void setMoney(int money) {
        this.money = money;
    }

    // 转账
    public void transfer(Account target, int amount) {
        synchronized(Account.class) {
            if (this.money >= amount) {
                this.setMoney(this.getMoney() - amount);
                target.setMoney(target.getMoney() + amount);
            }
        }
    }
}

4、Monitor 概念

1)Java 对象头(Object Header)

以 32 位虚拟机为例,普通对象的对象头结构如下,其中的 Klass Word 为指针,指向对应的 Class 对象(类型指针);

普通对象

数组对象

其中 Mark Word 结构为


Mark Word在不同的锁状态下存储的内容不同。

所以一个对象的结构如下:

2)Monitor 原理

Monitor 被翻译为监视器或者说管程,由操作系统分配来关联锁对象。

每个 java 对象都可以关联一个 Monitor ,如果使用 synchronized 给对象上锁(重量级),该对象头的 Mark Word 中就被设置为指向 Monitor 对象的指针。

5、synchronized 原理进阶

1)synchronized 用于同步代码块与同步方法原理

当一个线程访问同步代码块时,首先是需要得到锁才能执行同步代码,当退出或者抛出异常时必须要释放锁,那么它是如何来实现这个机制的呢?我们先看一段简单的代码:

static final Object lock = new Object(); static int counter = 0;
public static void main(String[] args) { 
    synchronized (lock) {
        counter++; 
    }
}

查看反编译后结果:

  1. monitorenter是加锁
  2. monitorexit释放锁。monitorexit指令出现了两次,第1次为同步正常退出释放锁,第2次为发生异步退出释放锁;

2)轻量级锁

轻量级锁使用场景:当一个对象被多个线程所访问,但访问的时间是错开的(不存在竞争),此时就可以使用轻量级锁来优化。

轻量级锁对使用者是透明的,即语法仍然是 synchronized ,假设有两个方法同步块,利用同一个对象加锁:

    static final Object obj = new Object();
    public static void method1() {
        synchronized (obj) {
            // 同步代码块A
            method2();
        }
    }

    public static void method2() {
        synchronized (obj) {
            // 同步代码块B
        }
    }

2)锁膨胀

static Object obj = new Object();
public static void method1() {
  synchronized( obj ) {
    // 同步块
  }
}

3)自旋优化

重量级锁竞争的时候,还可以使用自旋来进行优化,如果当前线程自旋成功(即这时候持锁线程已经退出了同步块,释放了锁),这时当前线程就可以避免阻塞。

自旋重试成功的情况

自旋重试失败的情况

4)偏向锁(用于优化轻量级锁重入)

轻量级锁在没有竞争时(就自己这个线程),每次重入仍然需要执行 CAS 操作。

Java 6 中引入了偏向锁来做进一步优化:只有第一次使用 CAS 将线程 ID 设置到对象的 Mark Word 头,之后发现这个线程 ID 是自己的就表示没有竞争,不用重新 CAS。以后只要不发生竞争,这个对象就归该线程所有。

例如:

static final Object obj = new Object();
public static void m1() {
  synchronized( obj ) {
    // 同步块 A
    m2();
  }
}

public static void m2() {
  synchronized( obj ) {
    // 同步块 B
    m3();
  }
}

public static void m3() {
  synchronized( obj ) {
    // 同步块 C
  }
}

分析如图:

偏向状态

撤销偏向

以下几种情况会使对象的偏向锁失效:

5)批量重偏向

6)批量撤销

7)锁消除

默认是打开的,原理是JIT优化,消除锁。

6、Wait/Notify

(1)原理

API 介绍

它们都是线程之间进行协作的手段,都属于 Object 对象的方法。必须获得此对象的锁,才能调用这几个方法

public class Test1 {
    final static Object LOCK = new Object();
    public static void main(String[] args) throws InterruptedException {
        //只有在对象被锁住后才能调用wait方法
        synchronized (LOCK) {
            LOCK.wait();
        }
    }
}

演示notify和notifyAll区别

    final static Object obj = new Object();
    public static void main(String[] args) {
        new Thread(() -> {
            synchronized (obj) {
                log.debug("执行....");
                try {
                    obj.wait(); // 让线程在obj上一直等待下去
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                log.debug("其它代码....");
            }
        }).start();
        new Thread(() -> {
            synchronized (obj) {
                log.debug("执行....");
                try {
                    obj.wait(); // 让线程在obj上一直等待下去
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                log.debug("其它代码....");
            }
        }).start();
        // 主线程两秒后执行
        sleep(2);
        log.debug("唤醒 obj 上其它线程");
        synchronized (obj) {
            obj.notify(); // 唤醒obj上一个线程
            // obj.notifyAll(); // 唤醒obj上所有等待线程
        }
    }

notify 的一种结果:

20:00:53.096 [Thread-0] c.TestWaitNotify - 执行....
20:00:53.099 [Thread-1] c.TestWaitNotify - 执行....
20:00:55.096 [main] c.TestWaitNotify - 唤醒 obj 上其它线程
20:00:55.096 [Thread-0] c.TestWaitNotify - 其它代码....

notifyAll 的结果

19:58:15.457 [Thread-0] c.TestWaitNotify - 执行....
19:58:15.460 [Thread-1] c.TestWaitNotify - 执行....
19:58:17.456 [main] c.TestWaitNotify - 唤醒 obj 上其它线程
19:58:17.456 [Thread-1] c.TestWaitNotify - 其它代码....
19:58:17.456 [Thread-0] c.TestWaitNotify - 其它代码....

(2)Wait与Sleep的区别

不同点

相同点

(3)优雅地使用wait/notify

什么时候适合使用wait

使用wait/notify需要注意什么

synchronized (LOCK) {
    while(//不满足条件,一直等待,避免虚假唤醒) {
        LOCK.wait();
    }
    //满足条件后再运行
}

synchronized (LOCK) {
    //唤醒所有等待线程
    LOCK.notifyAll();
}

7、模式之保护性暂停

1)定义

即 Guarded Suspension,用在一个线程等待另一个线程的执行结果。

要点:

2)实现

public class GuardedObject {

    private Object response;
    private final Object lock = new Object();

    public Object get() {
        synchronized (lock) {
            // 条件不满足则等待
            while (response == null) {
                try {
                    lock.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            // 条件满足,干活
            return response;
        }
    }

    public void complete(Object response) {
        synchronized (lock) {
            // 条件满足,通知等待线程
            this.response = response;
            lock.notifyAll();
        }
    }
}

一个线程等待另一个线程的执行结果

public static void main(String[] args) {
      GuardedObject guardedObject = new GuardedObject();
        new Thread(() -> {
        try {
            // 子线程执行下载
            List<String> response = download();
            log.debug("download complete...");
            guardedObject.complete(response);
        } catch (IOException e) {
            e.printStackTrace();
        } }).start();

        log.debug("waiting...");
        // 主线程阻塞等待
        Object response = guardedObject.get();
        log.debug("get response: [{}] lines", ((List<String>) response).size());
  }

执行结果

18:42:18.568 [main] c.TestGuardedObject - waiting...
18:42:23.312 [Thread-0] c.TestGuardedObject - download complete... 
18:42:23.312 [main] c.TestGuardedObject - get response: [3] lines

带超时版 GuardedObject

public class GuardedObject {

    private Object response;
    private final Object lock = new Object();

    public Object get(long millis){

        synchronized (lock) {
            // 1) 记录最初时间
            long begin = System.currentTimeMillis();
            // 2) 已经经历的时间
            long timePassed = 0;

            while (response == null) {
                // 4) 假设 millis 是 1000,结果在 400 时唤醒了,那么还有 600 要等
                long waitTime = millis - timePassed;
                if (waitTime <= 0) {
                    break;
                }
                try {
                    lock.wait(waitTime);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // 3) 如果提前被唤醒,这时已经经历的时间假设为 400
                timePassed = System.currentTimeMillis() - begin;
                return response;
            }
        }

    public void complete(Object response) {
        synchronized (lock) {
            // 条件满足,通知等待线程
            this.response = response;
            lock.notifyAll();
        }
    }
}

3)join源码——使用保护性暂停模式

public final synchronized void join(long millis)
    throws InterruptedException {
        long base = System.currentTimeMillis();
        long now = 0;

        if (millis < 0) {
            throw new IllegalArgumentException("timeout value is negative");
        }

        if (millis == 0) {
            while (isAlive()) {
                wait(0);
            }
        } else {
            while (isAlive()) {
                long delay = millis - now;
                if (delay <= 0) {
                    break;
                }
                wait(delay);
                now = System.currentTimeMillis() - base;
            }
        }
    }

4)多任务版 GuardedObject

图中 Futures 就好比居民楼一层的信箱(每个信箱有房间编号),左侧的 t0,t2,t4 就好比等待邮件的居民,右侧的 t1,t3,t5 就好比邮递员。

如果需要在多个类之间使用 GuardedObject 对象,作为参数传递不是很方便,因此设计一个用来解耦的中间类, 这样不仅能够解耦【结果等待者】和【结果生产者】,还能够同时支持多个任务的管理。

rpc 框架的调用中就使用到了这种模式。


public class GuardedObject {
    // 标识 Guarded Object
    private int id;

    public GuardedObject(int id) {
        this.id = id;
    }

    public int getId() {
        return id;
    }

    // 结果
    private Object response;
    // 获取结果
    // timeout 表示要等待多久 2000 
    public Object get(long timeout) {
    synchronized (this) {
        // 开始时间 15:00:00
        long begin = System.currentTimeMillis(); // 经历的时间
        long passedTime = 0;
        while (response == null) {
            // 这一轮循环应该等待的时间
            long waitTime = timeout - passedTime; // 经历的时间超过了最大等待时间时,退出循环 
        if (timeout - passedTime <= 0) {
            break;
        } try {
            this.wait(waitTime); // 虚假唤醒 15:00:01
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // 求得经历时间
        passedTime = System.currentTimeMillis() - begin; // 15:00:02 1s }
        return response;
    }

    // 产生结果
    public void complete(Object response) {
        synchronized (this) {
            // 给结果成员变量赋值
            this.response = response;
            this.notifyAll();
        }
    }
}

中间解耦类

class Mailboxes {
    private static Map<Integer, GuardedObject> boxes = new Hashtable<>();
    private static int id = 1;
    
    // 产生唯一 id
    private static synchronized int generateId() {
        return id++; 
    }
    
    public static GuardedObject getGuardedObject(int id) { 
        return boxes.remove(id);
    }
    
    public static GuardedObject createGuardedObject() { 
        GuardedObject go = new GuardedObject(generateId()); 
        boxes.put(go.getId(), go);
        return go;
    }
    
    public static Set<Integer> getIds() { 
        return boxes.keySet();
    } 
}

业务相关类

class People extends Thread{

    @Override
    public void run() {
        // 收信
        GuardedObject guardedObject = Mailboxes.createGuardedObject();
        log.debug("开始收信 id:{}", guardedObject.getId());
        Object mail = guardedObject.get(5000);
        log.debug("收到信 id:{}, 内容:{}", guardedObject.getId(), mail);
    }
}
class Postman extends Thread { 
    private int id;
    private String mail;

    public Postman(int id, String mail) { 
        this.id = id;
        this.mail = mail; 
    }

    @Override
    public void run() {
        GuardedObject guardedObject = Mailboxes.getGuardedObject(id);
        log.debug("送信 id:{}, 内容:{}", id, mail); 
        guardedObject.complete(mail);
    } 
}

测试

public  static void main(String[] args) throws InterruptedException { 

      for (int i = 0; i < 3; i++) {
          new People().start(); 
      }
      Sleeper.sleep(1);
      for (Integer id : Mailboxes.getIds()) {
            new Postman(id, "内容" + id).start(); 
      }
}

8、异步模式之生产者/消费者

1)定义

要点:

2)实现

class Message {
    private int id;
    private Object message;

    public Message(int id, Object message) {
        this.id = id;
        this.message = message;
    }
    public int getId() {
        return id;
    }
    public Object getMessage() {
        return message;
    }
}

@Slf4j
class MessageQueue {

    private LinkedList<Message> queue;
    private int capacity;

    public MessageQueue(int capacity) {
        this.capacity = capacity;
        queue = new LinkedList<>();
    }

    public Message take() {
        synchronized (queue) {
            while (queue.isEmpty()) {
                log.debug("没货了, wait");
                try {
                    queue.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            Message message = queue.removeFirst();
            queue.notifyAll();
            return message;
        }
    }

    public void put(Message message) {
        synchronized (queue) {
            while (queue.size() == capacity) {
                log.debug("库存已达上限, wait");
                try {
                    queue.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            queue.addLast(message);
            queue.notifyAll();
        }
    }
}

应用:

    public static void main(String[] args) {
        MessageQueue messageQueue = new MessageQueue(2);

        // 4 个生产者线程, 下载任务
        for (int i = 0; i < 4; i++) {
            int id = I;
            new Thread(() -> {
                try {
                    log.debug("try put message({})", id);
                    messageQueue.put(new Message(id, "消息" + id));
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }, "生产者" + i).start();
        }

        // 1 个消费者线程, 处理结果
        new Thread(() -> {
            while (true) {
                Message message = messageQueue.take();
                log.debug("take message,id={}, message={}", message.getId(), message.getMessage());
            }
        }, "消费者").start();
    }

9、Park & Unpark

1)基本使用

它们是 LockSupport 类中的方法

// 暂停当前线程
LockSupport.park();
// 恢复某个线程的运行
LockSupport.unpark(暂停线程对象)

先 park 再 unpark

Thread t1 = new Thread(() -> {
    log.debug("start...");
    sleep(1);
    log.debug("park...");
    LockSupport.park();
    log.debug("resume...");
},"t1");
t1.start();

sleep(2);
log.debug("unpark...");
LockSupport.unpark(t1);

输出

18:42:52.585 c.TestParkUnpark [t1] - start...
18:42:53.589 c.TestParkUnpark [t1] - park...
18:42:54.583 c.TestParkUnpark [main] - unpark...
18:42:54.583 c.TestParkUnpark [t1] - resume...

先 unpark 再 park:

Thread t1 = new Thread(() -> {
    log.debug("start...");
    sleep(2);
    log.debug("park...");
    LockSupport.park();
    log.debug("resume...");
}, "t1");
t1.start();

sleep(1);
log.debug("unpark...");
LockSupport.unpark(t1);

输出:

18:43:50.765 c.TestParkUnpark [t1] - start...
18:43:51.764 c.TestParkUnpark [main] - unpark...
18:43:52.769 c.TestParkUnpark [t1] - park...
18:43:52.769 c.TestParkUnpark [t1] - resume...

特点:
与 Object 的 wait & notify 相比

2)park/unpark 原理

每个线程都有自己的一个 Parker 对象,由三部分组成 _counter , _cond 和 _mutex 打个比喻

  1. 当前线程调用 Unsafe.park() 方法
  2. 检查 _counter ,本情况为 0,这时,获得 _mutex 互斥锁
  3. 线程进入 _cond 条件变量阻塞
  4. 设置 _counter = 0

  1. 调用 Unsafe.unpark(Thread_0) 方法,设置 _counter 为 1
  2. 唤醒 _cond 条件变量中的 Thread_0
  3. Thread_0 恢复运行
  4. 设置 _counter 为 0

  1. 调用 Unsafe.unpark(Thread_0) 方法,设置 _counter 为 1
  2. 当前线程调用 Unsafe.park() 方法
  3. 检查 _counter ,本情况为 1,这时线程无需阻塞,继续运行
  4. 设置 _counter 为 0

10、线程中的状态转换

假设有线程 Thread t

情况 1:NEW –> RUNNABLE

情况 2 RUNNABLE <--> WAITING

t 线程用 synchronized(obj) 获取了对象锁后

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class TestWaitNotify {
    
    final static Object obj = new Object();
    
    public static void main(String[] args) {
        new Thread(() -> {
            synchronized (obj) {
                log.debug("执行....");
                try {
                    obj.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                log.debug("其它代码...."); // 断点
            }
        },"t1").start();
        
        new Thread(() -> {
            synchronized (obj) {
                log.debug("执行....");
                try {
                    obj.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                log.debug("其它代码...."); // 断点
            }
        },"t2").start();
        
        sleep(0.5);
        log.debug("唤醒 obj 上其它线程");
        synchronized (obj) {
            obj.notifyAll(); // 唤醒obj上所有等待线程 断点
        }
    }
}

情况 3 RUNNABLE <--> WAITING

情况 4 RUNNABLE <--> WAITING

情况 5 RUNNABLE <--> TIMED_WAITING

t 线程用 synchronized(obj) 获取了对象锁后

情况 6 RUNNABLE <--> TIMED_WAITING

情况 7 RUNNABLE <--> TIMED_WAITING

情况 8 RUNNABLE <--> TIMED_WAITING

情况 9 RUNNABLE <--> BLOCKED

情况 10 RUNNABLE <--> TERMINATED

11、多把锁

多把不相干的锁

一间大屋子有两个功能:睡觉、学习,互不相干。
现在小南要学习,小女要睡觉,但如果只用一间屋子(一个对象锁)的话,那么并发度很低
解决方法是准备多个房间(多个对象锁)

例如

public class BigRoom {
    public void sleep() {
        synchronized (this) {
            log.debug("sleeping 2 小时");
            Sleeper.sleep(2);
        }
    }
    public void study() {
        synchronized (this) {
            log.debug("study 1 小时");
            Sleeper.sleep(1);
        }
    }
}

执行:

BigRoom bigRoom = new BigRoom();
    new Thread(() -> {
    bigRoom.compute();
},"小南").start();

new Thread(() -> {
    bigRoom.sleep();
},"小女").start();

结果:

12:13:54.471 [小南] c.BigRoom - study 1 小时
12:13:55.476 [小女] c.BigRoom - sleeping 2 小时

改进:

public class BigRoom {
    private final Object studyRoom = new Object();
    private final Object bedRoom = new Object();

    public void sleep() {
        synchronized (bedRoom) {
            log.debug("sleeping 2 小时");
            Sleeper.sleep(2);
        }
    }
    public void study() {
        synchronized (studyRoom) {
            log.debug("study 1 小时");
            Sleeper.sleep(1);
        }
    }
}

执行结果:

12:15:35.069 [小南] c.BigRoom - study 1 小时
12:15:35.069 [小女] c.BigRoom - sleeping 2 小时

将锁的粒度细分

12、活跃性

定义:因为某种原因,使得代码一直无法执行完毕,这样的现象叫做活跃性

死锁

有这样的情况:一个线程需要同时获取多把锁,这时就容易发生死锁
t1 线程 获得 A对象 锁,接下来想获取 B对象的锁, t2 线程 获得 B对象 锁,接下来想获取 A对象的锁 例:

public static void main(String[] args) {
        final Object A = new Object();
        final Object B = new Object();
        new Thread(()->{
            synchronized (A) {
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                synchronized (B) {

                }
            }
        }).start();

        new Thread(()->{
            synchronized (B) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                synchronized (A) {

                }
            }
        }).start();
    }

定位死锁

检测死锁可以使用 jconsole工具,或者使用 jps 定位进程 id,再用 jstack 定位死锁:

cmd > jps
Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8
12320 Jps
33200 TestDeadLock // JVM 进程
11508 Main
28468 Launcher
cmd > jstack 33200
Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8

Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.91-b14 mixed mode):

"DestroyJavaVM" #13 prio=5 os_prio=0 tid=0x0000000003525000 nid=0x2f60 waiting on condition
[0x0000000000000000]
java.lang.Thread.State: RUNNABLE

"Thread-1" #12 prio=5 os_prio=0 tid=0x000000001eb69000 nid=0xd40 waiting for monitor entry
[0x000000001f54f000]
java.lang.Thread.State: BLOCKED (on object monitor)
at thread.TestDeadLock.lambda$main$1(TestDeadLock.java:28)
- waiting to lock <0x000000076b5bf1c0> (a java.lang.Object)
- locked <0x000000076b5bf1d0> (a java.lang.Object)
at thread.TestDeadLock$$Lambda$2/883049899.run(Unknown Source)
at java.lang.Thread.run(Thread.java:745)

"Thread-0" #11 prio=5 os_prio=0 tid=0x000000001eb68800 nid=0x1b28 waiting for monitor entry
[0x000000001f44f000]
java.lang.Thread.State: BLOCKED (on object monitor)
at thread.TestDeadLock.lambda$main$0(TestDeadLock.java:15)
- waiting to lock <0x000000076b5bf1d0> (a java.lang.Object)
- locked <0x000000076b5bf1c0> (a java.lang.Object)
at thread.TestDeadLock$$Lambda$1/495053715.run(Unknown Source)
at java.lang.Thread.run(Thread.java:745)

// 略去部分输出
Found one Java-level deadlock:
=============================
"Thread-1":
waiting to lock monitor 0x000000000361d378 (object 0x000000076b5bf1c0, a java.lang.Object),
which is held by "Thread-0"
"Thread-0":
waiting to lock monitor 0x000000000361e768 (object 0x000000076b5bf1d0, a java.lang.Object),
which is held by "Thread-1"
Java stack information for the threads listed above:
===================================================
"Thread-1":
at thread.TestDeadLock.lambda$main$1(TestDeadLock.java:28)
- waiting to lock <0x000000076b5bf1c0> (a java.lang.Object)
- locked <0x000000076b5bf1d0> (a java.lang.Object)
at thread.TestDeadLock$$Lambda$2/883049899.run(Unknown Source)
at java.lang.Thread.run(Thread.java:745)
"Thread-0":
at thread.TestDeadLock.lambda$main$0(TestDeadLock.java:15)
- waiting to lock <0x000000076b5bf1d0> (a java.lang.Object)
- locked <0x000000076b5bf1c0> (a java.lang.Object)
at thread.TestDeadLock$$Lambda$1/495053715.run(Unknown Source)
at java.lang.Thread.run(Thread.java:745)
Found 1 deadlock.

哲学家就餐问题

有五位哲学家,围坐在圆桌旁。

筷子类

class Chopstick {
    String name;

    public Chopstick(String name) {
        this.name = name;
    }

    @Override
    public String toString() {
        return "筷子{" + name + '}';
    }
}

哲学家类

@Slf4j
class Philosopher extends Thread {

    Chopstick left;
    Chopstick right;
    public Philosopher(String name, Chopstick left, Chopstick right) {
        super(name);
        this.left = left;
        this.right = right;
    }

    private void eat() {
        log.debug("eating...");
        Sleeper.sleep(1);
    }

    @Override
    public void run() {
        while (true) {
            // 获得左手筷子
            synchronized (left) {
                // 获得右手筷子
                synchronized (right) {
                    // 吃饭
                    eat();
                }// 放下右手筷子
            }// 放下左手筷子
        }
    }
}

就餐

Chopstick c1 = new Chopstick("1");
Chopstick c2 = new Chopstick("2");
Chopstick c3 = new Chopstick("3");
Chopstick c4 = new Chopstick("4");
Chopstick c5 = new Chopstick("5");
new Philosopher("苏格拉底", c1, c2).start();
new Philosopher("柏拉图", c2, c3).start();
new Philosopher("亚里士多德", c3, c4).start();
new Philosopher("赫拉克利特", c4, c5).start();
new Philosopher("阿基米德", c5, c1).start();

执行不多会,就执行不下去了

12:33:15.575 [苏格拉底] c.Philosopher - eating...
12:33:15.575 [亚里士多德] c.Philosopher - eating...
12:33:16.580 [阿基米德] c.Philosopher - eating...
12:33:17.580 [阿基米德] c.Philosopher - eating...
// 卡在这里, 不向下运行

使用 jconsole 检测死锁

这种线程没有按预期结束,执行不下去的情况,归类为【活跃性】问题,除了死锁以外,还有活锁和饥饿者两种情况

活锁

活锁出现在两个线程互相改变对方的结束条件,最后谁也无法结束,例如

public class TestLiveLock {
    
    static volatile int count = 10;
    static final Object lock = new Object();
    
    public static void main(String[] args) {
        new Thread(() -> {
            // 期望减到 0 退出循环
            while (count > 0) {
                sleep(0.2);
                count--;
                log.debug("count: {}", count);
            }
        }, "t1").start();
        new Thread(() -> {
            // 期望超过 20 退出循环
            while (count < 20) {
                sleep(0.2);
                count++;
                log.debug("count: {}", count);
            }
        }, "t2").start();
    }
}

饥饿

很多教程中把饥饿定义为,一个线程由于优先级太低,始终得不到 CPU 调度执行,也不能够结束,饥饿的情况不易演示,讲读写锁时会涉及饥饿问题

下面我讲一下我遇到的一个线程饥饿的例子,先来看看使用顺序加锁的方式解决之前的死锁问题
顺序加锁的解决方案

13、ReentrantLock

和synchronized相比具有的的特点

基本语法

//获取ReentrantLock对象
private ReentrantLock lock = new ReentrantLock();
//加锁
lock.lock();
try {
    //需要执行的代码
}finally {
    //释放锁
    lock.unlock();
}

可重入

    static ReentrantLock lock = new ReentrantLock();
    public static void main(String[] args) {
        method1();
    }
    public static void method1() {
        lock.lock();
        try {
            log.debug("execute method1");
            method2();
        } finally {
            lock.unlock();
        }
    }
    public static void method2() {
        lock.lock();
        try {
            log.debug("execute method2");
            method3();
        } finally {
            lock.unlock();
        }
    }
    public static void method3() {
        lock.lock();
        try {
            log.debug("execute method3");
        } finally {
            lock.unlock();
        }
    }

输出:

17:59:11.862 [main] c.TestReentrant - execute method1
17:59:11.865 [main] c.TestReentrant - execute method2
17:59:11.865 [main] c.TestReentrant - execute method3

可打断

如果某个线程处于阻塞状态,可以调用其interrupt方法让其停止阻塞,获得锁失败

简而言之就是:处于阻塞状态的线程,被打断了就不用阻塞了,直接停止运行

public static void main(String[] args) {
        ReentrantLock lock = new ReentrantLock();
        Thread t1 = new Thread(()-> {
            try {
                //加锁,可打断锁
                lock.lockInterruptibly();
            } catch (InterruptedException e) {
                e.printStackTrace();
                //被打断,返回,不再向下执行
                return;
            }finally {
                //释放锁
                lock.unlock();
            }

        });

        lock.lock();
        try {
            t1.start();
            Thread.sleep(1000);
            //打断
            t1.interrupt();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

ReentrantLock lock = new ReentrantLock();//这个锁不可打断。

锁超时

尝试去获得锁,没获取到就立刻失败。

    ReentrantLock lock = new ReentrantLock();
    Thread t1 = new Thread(() -> {
        log.debug("启动...");
        if (!lock.tryLock()) {
            log.debug("获取立刻失败,返回");
            return;
        }
        try {
            log.debug("获得了锁");
        } finally {
            lock.unlock();
        }
    }, "t1");
    
    lock.lock(); 
    log.debug("获得了锁");
    t1.start(); 
    try {
        sleep(2);
    } finally {
        lock.unlock();
    }

输出

20:15:02.918 [main] c.TestTimeout - 获得了锁 
20:15:02.921 [t1] c.TestTimeout - 启动... 
20:15:02.921 [t1] c.TestTimeout - 获取立刻失败,返回

超时失败:

    ReentrantLock lock = new ReentrantLock();
    Thread t1 = new Thread(() -> {
        log.debug("启动...");
        try {
            if (!lock.tryLock(1, TimeUnit.SECONDS)) {
                log.debug("获取等待 1s 后失败,返回");
                return;
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        try {
            log.debug("获得了锁");
        } finally {
            lock.unlock();
        }
    }, "t1");
    
    lock.lock(); 
    log.debug("获得了锁"); 
    t1.start();

    try {
        sleep(2);
    } finally {
        lock.unlock();
    }

输出:

20:19:40.537 [main] c.TestTimeout - 获得了锁
20:19:40.544 [t1] c.TestTimeout - 启动...
20:19:41.547 [t1] c.TestTimeout - 获取等待 1s 后失败,返回

使用 tryLock 解决哲学家就餐问题

class Chopstick extends ReentrantLock {
    String name;

    public Chopstick(String name) {
        this.name = name;
    }

    @Override
    public String toString() {
        return "筷子{" + name + '}';
    }
}
class Philosopher extends Thread {
    Chopstick left;
    Chopstick right;

    public Philosopher(String name, Chopstick left, Chopstick right) {
        super(name);
        this.left = left;
        this.right = right;
    }

    @Override
    public void run() {
        while (true) {
            // 尝试获得左手筷子
            if (left.tryLock()) {
                try {
                    // 尝试获得右手筷子
                    if (right.tryLock()) {
                        try {
                            eat();
                        } finally {
                            right.unlock();
                        }
                    }
                } finally {
                    left.unlock();
                }
            }
        }
    }

    private void eat() {
        log.debug("eating...");
        Sleeper.sleep(1);
    }
}

公平锁

一般时为了解决饥饿问题,公平锁一般没有必要,会降低并发度。

条件变量

synchronized 中也有条件变量,就是我们讲原理时那个 waitSet 休息室,当条件不满足时进入 waitSet 等待 。
ReentrantLock 的条件变量比 synchronized 强大之处在于,它是支持多个条件变量的,这就好比

使用要点:

    static ReentrantLock lock = new ReentrantLock();
    static Condition waitCigaretteQueue = lock.newCondition();
    static Condition waitbreakfastQueue = lock.newCondition();
    static volatile boolean hasCigrette = false;
    static volatile boolean hasBreakfast = false;

    public static void main(String[] args) {
        new Thread(() -> {
            try {
                lock.lock();
                while (!hasCigrette) {
                    try {
                        waitCigaretteQueue.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                log.debug("等到了它的烟");
            } finally {
                lock.unlock();
            }
        }).start();
        new Thread(() -> {
            try {
                lock.lock();
                while (!hasBreakfast) {
                    try {
                        waitbreakfastQueue.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                log.debug("等到了它的早餐");
            } finally {
                lock.unlock();
            }
        }).start();
        sleep(1);
        sendBreakfast();
        sleep(1);
        sendCigarette();
    }

    private static void sendCigarette() {
        lock.lock();
        try {
            log.debug("送烟来了");
            hasCigrette = true;
            waitCigaretteQueue.signal();
        } finally {
            lock.unlock();
        }
    }

    private static void sendBreakfast() {
        lock.lock();
        try {
            log.debug("送早餐来了");
            hasBreakfast = true;
            waitbreakfastQueue.signal();
        } finally {
            lock.unlock();
        }
    }

输出

20:52:27.680 [main] c.TestCondition - 送早餐来了 
20:52:27.682 [Thread-1] c.TestCondition - 等到了它的早餐 
20:52:28.683 [main] c.TestCondition - 送烟来了 
20:52:28.683 [Thread-0] c.TestCondition - 等到了它的烟

14、同步模式之顺序控制

1. 固定运行顺序

比如,必须先 2 后 1 打印

1.1 wait notify 版

    // 用来同步的对象
    static Object obj = new Object();
    // t2 运行标记, 代表 t2 是否执行过
    static boolean t2runed = false;

    public static void main(String[] args) {

        Thread t1 = new Thread(() -> {
            synchronized (obj) {
                // 如果 t2 没有执行过
                while (!t2runed) {
                    try {
                        // t1 先等一会
                        obj.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
            System.out.println(1);
        });

        Thread t2 = new Thread(() -> {
            System.out.println(2);
            synchronized (obj) {
                // 修改运行标记
                t2runed = true;
                // 通知 obj 上等待的线程(可能有多个,因此需要用 notifyAll)
                obj.notifyAll();
            }
        });
        t1.start();
        t2.start();
    }

1.2 Park Unpark 版

可以看到,实现上很麻烦:

可以使用 LockSupport 类的 park 和 unpark 来简化上面的题目:

    Thread t1 = new Thread(() -> {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
        }
        // 当没有『许可』时,当前线程暂停运行;有『许可』时,用掉这个『许可』,当前线程恢复运行 
        LockSupport.park();
        System.out.println("1");
    });
    
    Thread t2 = new Thread(() -> {
        System.out.println("2");
        // 给线程 t1 发放『许可』(多次连续调用 unpark 只会发放一个『许可』) 
        LockSupport.unpark(t1);
    });
    
    t1.start(); 
    t2.start();

park 和 unpark 方法比较灵活,他俩谁先调用,谁后调用无所谓。并且是以线程为单位进行『暂停』和『恢复』, 不需要『同步对象』和『运行标记』

2. 交替输出

线程 1 输出 a 5 次,线程 2 输出 b 5 次,线程 3 输出 c 5 次。现在要求输出 abcabcabcabcabc 怎么实现

2.1 Lock 条件变量版

class AwaitSignal extends ReentrantLock {

    public void start(Condition first) {
        this.lock();
        try {
            log.debug("start");
            first.signal();
        } finally {
            this.unlock();
        }
    }

    public void print(String str, Condition current, Condition next) {
        for (int i = 0; i < loopNumber; i++) {
            this.lock();
            try {
                // 先把所有线程都放在waitset休息室休息,后面先唤醒一个开始
                current.await();
                log.debug(str);
                next.signal();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                this.unlock();
            }
        }
    }

    // 循环次数
    private int loopNumber;

    public AwaitSignal(int loopNumber) {
        this.loopNumber = loopNumber;
    }
}
        AwaitSignal as = new AwaitSignal(5);
        Condition aWaitSet = as.newCondition();
        Condition bWaitSet = as.newCondition();
        Condition cWaitSet = as.newCondition();

        new Thread(() -> {
            as.print("a", aWaitSet, bWaitSet);
        }).start();
        new Thread(() -> {
            as.print("b", bWaitSet, cWaitSet);
        }).start();
        new Thread(() -> {
            as.print("c", cWaitSet, aWaitSet);
        }).start();

        as.start(aWaitSet);

2.2 Park Unpark 版

class SyncPark {

    private int loopNumber;
    private Thread[] threads;

    public SyncPark(int loopNumber) {
        this.loopNumber = loopNumber;
    }

    public void setThreads(Thread... threads) {
        this.threads = threads;
    }

    public void print(String str) {
        for (int i = 0; i < loopNumber; i++) {
            LockSupport.park();
            System.out.print(str);
            LockSupport.unpark(nextThread());
        }
    }

    private Thread nextThread() {
        Thread current = Thread.currentThread();
        int index = 0;
        for (int i = 0; i < threads.length; i++) {
            if (threads[i] == current) {
                index = I;
                break;
            }
        }
        if (index < threads.length - 1) {
            return threads[index + 1];
        } else {
            return threads[0];
        }
    }

    // 先把3个线程都放在数组里,每个线程都是先park,然后再先唤醒第一个
    public void start() {
        for (Thread thread : threads) {
            thread.start();
        }
        LockSupport.unpark(threads[0]);
    }
}
        SyncPark syncPark = new SyncPark(5);
        
        Thread t1 = new Thread(() -> {
            syncPark.print("a");
        });
        Thread t2 = new Thread(() -> {
            syncPark.print("b");
        });
        Thread t3 = new Thread(() -> {
            syncPark.print("c\n");
        });
        
        syncPark.setThreads(t1, t2, t3);
        syncPark.start();
上一篇 下一篇

猜你喜欢

热点阅读