显示锁(Lock)和阻塞对列(BlockingQueue)

2016-07-18  本文已影响545人  SilenceDut

synchronized是不错,但它并不完美。它有一些功能性的限制:比如它无法中断一个正在等候获得锁的线程;

显示锁LocK

java.util.concurrent.lock 中的Lock 框架是锁定的一个抽象,它允许把锁定的实现作为 Java 类,而不是作为语言的特性来实现。这就为Lock 的多种实现留下了空间,各种实现可能有不同的调度算法、性能特性或者锁定语义。

ReentrantLock

ReentrantLock 类实现了Lock ,它拥有与synchronized 相同的并发性和内存语义,但是添加了类似锁投票、定时锁等候和可中断锁等候的一些特性。此外,它还提供了在激烈争用情况下更佳的性能。(换句话说,当许多线程都想访问共享资源时,JVM 可以花更少的时候来调度线程,把更多时间用在执行线程上。)

class LockStudy {            
    private Lock lock = new ReentrantLock();// 锁对象            
    public void output(String name) {                              
        lock.lock();      // 得到锁                
        try {                   
            //doSomething            
        } finally {                  
            lock.unlock();// 释放锁                
        }            
    }        
}   

需要注意的是,用synchronized修饰的方法或者语句块在代码执行完之后锁自动释放,而是用Lock需要我们手动释放锁,所以为了保证锁最终被释放(发生异常情况),要把互斥区放在try内,释放锁放在finally内。

Condition

ReentrantLock里有个函数newCondition(),该函数得到一个锁上的"条件",用于实现线程间的通信,条件变量很大一个程度上是为了解决Object.wait/notify/notifyAll难以使用的问题。
Condition拥有await(),signal(),signalAll(),await对应于Object.waitsignal对应于Object.notifysignalAll对应于Object.notifyAll。特别说明的是Condition的接口改变名称就是为了避免与Object中的wait/notify/notifyAll的语义和使用上混淆,因为Condition同样有wait/notify/notifyAll方法()因为任何类都拥有这些方法。
每一个Lock可以有任意数据的Condition对象,Condition是与Lock绑定的,所以就有Lock的公平性特性:如果是公平锁,线程为按照FIFO的顺序从Condition.await中释放,如果是非公平锁,那么后续的锁竞争就不保证FIFO顺序了。下面是一个用Lock和Condition实现的一个生产者消费者的模式:

import java.util.concurrent.locks.Condition;  
import java.util.concurrent.locks.Lock;  
import java.util.concurrent.locks.ReentrantLock;
public class ProductQueue<T> {  

    private final T[] items;  

    private final Lock lock = new ReentrantLock();  

    private Condition notFull = lock.newCondition();  

    private Condition notEmpty = lock.newCondition();  


    private int head, tail, count;  

    public ProductQueue(int maxSize) {  
        items = (T[]) new Object[maxSize];  
    }  

    public ProductQueue() {  
        this(10);  
    }  

    public void put(T t) throws InterruptedException {  
        lock.lock();  
        try {  
            while (count == getCapacity()) {  
                notFull.await();  
            }  
            items[tail] = t;  
            if (++tail == getCapacity()) {  
                tail = 0;  
            }  
            ++count;  
            notEmpty.signalAll();  
        } finally {  
            lock.unlock();  
        }  
    }  

    public T take() throws InterruptedException {  
        lock.lock();  
        try {  
            while (count == 0) {  
                notEmpty.await();  
            }  
            T ret = items[head];  
            items[head] = null;//GC  
      
            if (++head == getCapacity()) {  
                head = 0;  
            }  
            --count;  
            notFull.signalAll();  
            return ret;  
        } finally {  
            lock.unlock();  
        }  
    }  

    public int getCapacity() {  
        return items.length;  
    }  

    public int size() {  
        lock.lock();  
        try {  
            return count;  
        } finally {  
            lock.unlock();  
        }  
    }
}  

这就是多个Condition的强大之处,假设缓存队列中已经存满,那么阻塞的肯定是写线程,唤醒的肯定是读线程,相反,阻塞的肯定是读线程,唤醒的肯定是写线程,那么假设只有一个Condition会有什么效果呢,缓存队列中已经存满,这个Lock不知道唤醒的是读线程还是写线程了,如果唤醒的是读线程,皆大欢喜,如果唤醒的是写线程,那么线程刚被唤醒,又被阻塞了,这时又去唤醒,这样就浪费了很多时间。

读写锁ReadWriteLock

读-写锁定允许对共享数据进行更高级别的并发访问。虽然一次只有一个线程(writer 线程)可以修改共享数据,但在许多情况下,任何数量的线程可以同时读取共享数据(reader 线程)。

阻塞队列——BlockingQueue

阻塞队列(BlockingQueue)是java.util.concurrent下的主要用来控制线程同步的工具。如果BlockQueue是空的,从BlockingQueue取东西的操作将会被阻断进入等待状态,直到BlockingQueue进了东西才会被唤醒。同样,如果BlockingQueue是满的,任何试图往里存东西的操作也会被阻断进入等待状态,直到BlockingQueue里有空间才会被唤醒继续操作。
阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。BlockingQueue是具体实现的接口。具体的实现类有LinkedBlockingQueue,ArrayBlockingQueued等。一般其内部的都是通过Lock和Condition来实现阻塞和唤醒。下面是一个通过BlockingQueue实现的生产者消费者的例子。

生产者:

public class Producer implements Runnable {  
    BlockingQueue<String> queue;  

    public Producer(BlockingQueue<String> queue) {  
        this.queue = queue;  
    }  

    @Override  
    public void run() {  
        try {  
            String temp = "A Product, 生产线程:"  
                + Thread.currentThread().getName(    );  
            System.out.println("I have made a product:"  
                + Thread.currentThread().getName());  
            queue.put(temp);//如果队列是满的话,会阻塞当前线程  
        } catch (InterruptedException e) {  
            e.printStackTrace();  
        }  
    }  
}  

消费者:

public class Consumer implements Runnable{  
    BlockingQueue<String> queue;  
  
    public Consumer(BlockingQueue<String> queue){  
        this.queue = queue;  
    }  
  
    @Override  
    public void run() {  
        try {  
            String temp = queue.take();//如果队列为空,会阻塞当前线程  
            System.out.println(temp);  
        } catch (InterruptedException e) {  
            e.printStackTrace();  
        }  
    }  
}  

测试类:

public class ProducerConsumeTest{  

    public static void main(String[] args) {  
        BlockingQueue<String> queue = new LinkedBlockingQueue<String>(2);  
        // BlockingQueue<String> queue = new LinkedBlockingQueue<String>();  
        //不设置的话,LinkedBlockingQueue默认大小为Integer.MAX_VALUE  
      
        // BlockingQueue<String> queue = new ArrayBlockingQueue<String>(2);  
        Consumer consumer = new Consumer(queue);  
        Producer producer = new Producer(queue);  
        for (int i = 0; i < 5; i++) {  
            new Thread(producer, "Producer" + (i + 1)).start();  
            new Thread(consumer, "Consumer" + (i + 1)).start();  
        }  
    }  
}

参考:Java线程(九):Condition-线程通信更高效的方式

上一篇下一篇

猜你喜欢

热点阅读