Java高级开发

Lock接口及其实现

2020-01-16  本文已影响0人  依弗布德甘

锁的本质

在多线程中,同一时间对同一资源进行读写操作,为了保证数据的一致性与准确性,解决线程安全问题的实现

synchronized

Lock


Locks包 类层次结构

Lock接口

方法前面 描述
void lock(); 获取锁(一直抢锁,阻塞WAITING)
boolean tryLock(); 获取锁(只抢锁一次,不阻塞)
boolean tryLock(long time,TimeUnit unit) throws InterruptedException; 获取锁(有超时时间的抢锁,阻塞时间)
void lockInterruptibly() throws InterruptedException; 获取锁(由外部提醒中断的抢锁,阻塞)
void unlock(); 释放锁
Condition newCondition(); --

抢锁试例代码

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class Demo {
    //公平锁 - 先进先出
    //static Lock lock = new ReentrantLock(true);
    //非公平锁
    static Lock lock = new ReentrantLock();

    public static void main(String args[]) throws InterruptedException {
        //主线程 拿到锁
        lock.lock();

        Thread th =  new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("开始抢锁...");
                // 子线程抢锁,线程处于Waiting 状态,一直要等到抢锁成功
                lock.lock();  

                //子线程抢锁,成功抢到返回ture,否则抢锁失败返回false,线程不会阻塞
                boolean result = lock.tryLock();
                System.out.println("是否获得到锁:" +result);*/

                //子线程带有超时时间的抢锁,抢锁时间内线程处于Waiting 状态
                try {
                    boolean result = lock.tryLock(5, TimeUnit.SECONDS);
                    System.out.println("是否获得到锁:" +result);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } 

                //子线程抢锁,未抢到锁线程处于Waiting状态,需要别的地方来调用interrupt终止抢锁操作
                try { 
                    lock.lockInterruptibly();
                } catch (InterruptedException e) {
                    e.printStackTrace(); 
                } 

            }
        });
        th.start();
        Thread.sleep(3000);
        System.out.println(th.getState());

        //中断线程
        Thread.sleep(5000);
        th.interrupt();

        Thread.sleep(100000L);
        lock.unlock();
    }

}


Condition

//使当前线程在接收到信号前或被中断前一直保持等待状态. 
void await()
 
//使当前线程在接收到信号前或被中断前或达到指定时间前一直保持等待状态(TimeUnit为时间单位). 
boolean await(long time, TimeUnit unit)
 
//使当前线程在接收到信号前或被中断前或达到指定时间前一直保持等待状态(单位为毫秒). 
long awaitNanos(long nanosTimeout)
 
//使当前线程在接收到信号前或被中断前或达到最后日期期限前一直保持等待状态. 
boolean awaitUntil(Date deadline)
 
//唤醒一个在该Condition实例等待的线程. 
void signal()
 
//唤醒所有在该Condition实例等待的线程.     
void signalAll()
Condition试列代码
    private static Lock lock = new ReentrantLock();
    //相当于Synchronized里面的WaitSet等待池
    static Condition condition = lock.newCondition();    

    public static void main(String args[]){

        Thread th = new Thread(){
            @Override
            public void run() {
                super.run();

                lock.lock();
                try {
                    //await 不仅将线程挂起,还会释放锁
                    condition.await();     
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
            }
        };
        th.start();
 
        lock.lock();
        try {
            condition.signal();
        }finally {
            lock.unlock();
        } 
    }


/*
  1、自己实现一个阻塞队列,只能存储  n个元素
   put时,若队列未满,直接put,
         若队列满,就阻塞,直到再有空间
   get时,若队列中有元素,则获取到元素
         若无元素,则等待元素
 */

class DemoQueue{
    Lock lock = new ReentrantLock();
    Condition putCondition = lock.newCondition();
    Condition takeCondition = lock.newCondition();

    List<Object> list = new ArrayList<>();

    private int length;
    public DemoQueue(int length){
        this.length = length;
    }

    public void put(Object obj){
        lock.lock();
        try {
            if (list.size() <length){
                list.add(obj);
                takeCondition.signal();
                System.out.println("put:"+obj);
            }else{
                putCondition.await();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public Object take(){
        lock.lock();
        Object obj = null;
        try {
            if (list.size() > 0){
                obj = list.remove(0);
                putCondition.signal();
                System.out.println("take:" +obj);
            }else{
                takeCondition.await();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
            return obj;
        }
    } 
}


ReentrantLock - 重入

public class Demo {
    //可重入锁
    static Lock lock =  new ReentrantLock();  

    public static void main(String args[]) throws InterruptedException {
        //当前线程已获取锁
        lock.lock();    
        System.out.println("get lock 1...");
        //再次获取,是否能成功
        lock.lock();   
        System.out.println("get lock 2...");
        //没有完全解锁
        lock.unlock();       
        //释放了锁
        lock.unlock();   
        //第三次释放锁-报错    
        lock.unlock();        
    }
  1. 初始值,owner=null 、count=0
  2. 第一次加锁,owner=mainThread 、count=1
  3. 第二次加锁,owner=mainThread 、count=2
  4. 第一次解锁,owner=mainThread 、count=1
  5. 第二次解锁,owner=null 、count=0
  6. 第三次解锁,报错

线程来加锁,都是先判断count是否为0,再判断owner


ReentrantLock原理解析

ReentrantLock

假设有4个线程(t1、t2、t3、t4),其中 t1、t2、t3同时来抢锁

  1. 首先线程都会先读取count里面的值
  2. 判断count等于0未被锁,再通过CAS操作修改count
  3. 假设t1线程修改成功,则owner = t1 ,count = 1 . 加锁成功。
  4. t2、t3自旋,判断count=1已上锁,再判断owner是否为自己,如果不是加入waiters锁池
  5. 如果t1线程再次调用 lock方法加锁,则会先判断count是否被上锁,如果是再判断owner是否为自己,如果是自己则修改count + 1
ReentrantLock
  1. 此时t1线程调用unlock方法,则会让count-1.
  2. 如果unlock方法使count=0 时,解锁成功owner=null,并且会唤醒队列头部的t2线程
  3. t2线程被唤醒后,按照第1第2步骤开始抢锁。
  4. 如果t2线程被唤醒的时候后,t4线程同样也来抢锁,并不一定是t2会抢锁成功。所以这里是非公平锁

ReadWriteLock读写锁

一对关联锁,一个用于读一个用于写;读锁可以由多个线程同时持有,写锁同一时间只能有一个线程持有

ReadWriteLock ReadWriteLock

ReadWriteLock锁降级

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/*
// 缓存示例
读取数据,使用数据的过程中,数据不能被修改
 */

public class DemoCacheData {

    public static void main(String args[]){
        //System.out.println(TeacherInfoCache.get("Kody"));;
    }

}

//实现缓存
//使用数据

class TeacherInfoCache {
    //数据是否可用
    static volatile boolean cacheValid;

    //锁
    static final ReadWriteLock rwl = new ReentrantReadWriteLock();

    //查询并使用数据,
    static void processCacheData(String dataKey){
        Object data =null;

        rwl.readLock().lock();
        try {
            //判断数据是否可用
            if (cacheValid){
                data = Redis.data.get(dataKey);
            }else{
                rwl.readLock().unlock();
                rwl.writeLock().lock();
                try {
                    if (!cacheValid){
                        //从数据库读取
                        data = DataBase.queryUserInfo();
                        Redis.data.put(dataKey, data);
                        cacheValid = true;
                    }
                }finally {
                    rwl.readLock().lock();    //锁降级
                    rwl.writeLock().unlock();
                }
            }

            //使用数据, like 写数据库,落地到文件,做某种计算

        }finally {
            rwl.readLock().unlock();
        }
    }
}

class DataBase{
    static String queryUserInfo(){
        System.out.println("查询数据库。。。");
        return "name:Kody,age:40,gender:true,";
    }
}

class Redis{
    static Map<String, Object> data = new HashMap<>();
}


Demo 代码示例

ReentrantLock:
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.LockSupport;

public class DemoReentrantLock implements Lock {
    //记录锁的拥有者
    AtomicReference<Thread> owner = new AtomicReference<>();
    //等待队列,锁池
    private LinkedBlockingQueue<Thread> waiters = new LinkedBlockingQueue<>();
    //重入次数
    AtomicInteger count = new AtomicInteger(0);


    @Override
    public boolean tryLock() {
        //判断count值是否为0
        int ct = count.get();
        if (ct !=0){
            //锁被占用,判断是否是当前线程
            if (owner.get() == Thread.currentThread()){
                count.set(ct + 1);
                return true;
            }else{
                return false;
            }
        }else{
            //所未被占用,用CAS修改count值,抢锁
            if (count.compareAndSet(ct, ct + 1)){
                owner.set(Thread.currentThread());
                return true;
            }else{
                return false;
            }
        }
    }

    @Override
    public void lock() {
        if (!tryLock()){
            //将线程放入度列排队
            waiters.offer(Thread.currentThread());

            for (;;){
                //判断是否是队列头部,防止伪唤醒
                Thread head = waiters.peek();
                if (head == Thread.currentThread()){
                    if (!tryLock()){
                        //没抢到所,继续park
                        LockSupport.park();
                    }else{
                        //抢到锁,将线程出队列
                        waiters.poll();
                        return;
                    }
                }else{
                    //为唤醒后,继续park
                    LockSupport.park();
                }
            }
        }
    }

    @Override
    public void unlock() {
        if (tryUnlock()){
            Thread head = waiters.peek();
            if  (head != null){
                LockSupport.unpark(head);
            }
        }
    }

    public boolean tryUnlock(){
        //判断是否是当前线程,若不是,抛异常
        if (owner.get() != Thread.currentThread()){
            throw new IllegalMonitorStateException();
        }else{
            //需要将count-1
            int ct = count.get();
            int nextc = ct -1 ;
            count.set(nextc);

            //若count变为0,解锁成功
            if (nextc == 0){
                owner.compareAndSet(Thread.currentThread(), null);
                return true;
            }else{
                return false;
            }
        }
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {

    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return false;
    }

    @Override
    public Condition newCondition() {
        return null;
    }
}

ReadWriteLock:
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;


public class DemoReadWriteLock {
    volatile AtomicInteger readCount = new AtomicInteger(0);
    AtomicInteger writeCount = new AtomicInteger(0);

    //独占锁 拥有者
    AtomicReference<Thread> owner = new AtomicReference<>();

    //等待队列
    public volatile LinkedBlockingQueue<WaitNode> waiters = new LinkedBlockingQueue<WaitNode>();
    class WaitNode{
        int type = 0;   //0 为想获取独占锁的线程,  1为想获取共享锁的线程
        Thread thread = null;
        int arg = 0;

        public WaitNode(Thread thread, int type, int arg){
            this.thread = thread;
            this.type = type;
            this.arg = arg;
        }
    }


    //获取独占锁
    public void lock() {
        int arg = 1;
        //尝试获取独占锁,若成功,退出方法,    若失败...
        if (!tryLock(arg)){
            //标记为独占锁
            WaitNode waitNode = new WaitNode(Thread.currentThread(), 0, arg);
            waiters.offer(waitNode);    //进入等待队列

            //循环尝试拿锁
            for(;;){
                //若队列头部是当前线程
                WaitNode head = waiters.peek();
                if (head!=null && head.thread == Thread.currentThread()){
                    if (!tryLock(arg)){      //再次尝试获取 独占锁
                        LockSupport.park();     //若失败,挂起线程
                    } else{     //若成功获取
                        waiters.poll();     //  将当前线程从队列头部移除
                        return;         //并退出方法
                    }
                }else{  //若不是队列头部元素
                    LockSupport.park();     //将当前线程挂起
                }
            }
        }
    }

    //释放独占锁
    public boolean unlock() {
        int arg = 1;

        //尝试释放独占锁 若失败返回true,若失败...
        if(tryUnlock(arg)){
            WaitNode next = waiters.peek(); //取出队列头部的元素
            if (next !=null){
                Thread th = next.thread;
                LockSupport.unpark(th);     //唤醒队列头部的线程
            }
            return true;                //返回true
        }
        return false;
    }

    //尝试获取独占锁
    public boolean tryLock(int acquires) {
        //如果read count !=0 返回false
        if (readCount.get() !=0)
            return false;

        int wct = writeCount.get();     //拿到 独占锁 当前状态

        if (wct==0){
            if (writeCount.compareAndSet(wct, wct + acquires)){     //通过修改state来抢锁
                owner.set(Thread.currentThread());  //  抢到锁后,直接修改owner为当前线程
                return true;
            }
        }else if (owner.get() == Thread.currentThread()){
            writeCount.set(wct + acquires);     //修改count值
            return true;
        }
        return false;
    }

    //尝试释放独占锁
    public boolean tryUnlock(int releases) {
        //若当前线程没有 持有独占锁
        if(owner.get()!= Thread.currentThread()){
            throw new IllegalMonitorStateException();       //抛IllegalMonitorStateException
        }

        int wc= writeCount.get();
        int nextc = wc - releases;      //计算 独占锁剩余占用
        writeCount.set(nextc);      //不管是否完全释放,都更新count值

        if (nextc==0){  //是否完全释放
            owner.compareAndSet(Thread.currentThread(), null);
            return true;
        }else{
            return false;
        }
    }


    //获取共享锁
    public void lockShared() {
        int arg = 1;

        if (tryLockShared(arg) < 0){    //如果tryAcquireShare失败
            //将当前进程放入队列
            WaitNode node = new WaitNode(Thread.currentThread(), 1, arg);
            waiters.offer(node);  //加入队列

            for (;;){
                //若队列头部的元素是当前线程
                WaitNode head = waiters.peek();
                if (head!=null && head.thread == Thread.currentThread()){
                    if (tryLockShared(arg) >=0){    //尝试获取共享锁,  若成功
                        waiters.poll();      //将当前线程从队列中移除

                        WaitNode next = waiters.peek();
                        if (next!=null && next.type==1){    //如果下一个线程也是等待共享锁
                            LockSupport.unpark(next.thread);    //将其唤醒
                        }
                        return;     //退出方法
                    }else{                      //若尝试失败
                        LockSupport.park();     //挂起线程
                    }
                }else{  //若不是头部元素
                    LockSupport.park();
                }
            }
        }
    }

    //解锁共享锁
    public boolean unLockShared() {
        int arg = 1;

        if (tryUnLockShared(arg)){     //当read count变为0,才叫release share成功
            WaitNode next = waiters.peek();
            if (next!=null){
                LockSupport.unpark(next.thread);
            }
            return true;
        }
        return false;
    }

    //尝试获取共享锁
    public int tryLockShared(int acquires) {
        for (;;){
            if (writeCount.get()!=0 &&
                    owner.get() != Thread.currentThread())
                return -1;

            int rct = readCount.get();
            if (readCount.compareAndSet(rct, rct + acquires)){
                return 1;
            }
        }
    }

    //尝试解锁共享锁
    public boolean tryUnLockShared(int releases) {
        for(;;){
            int rc = readCount.get();
            int nextc = rc - releases;
            if (readCount.compareAndSet(rc, nextc)){
                return nextc==0;
            }
        }
    }
}

上一篇下一篇

猜你喜欢

热点阅读