Reentrantlock和Condition使用案例
多线程编程是开发高并发应用的重点和难点,是许多互联网公司面试环节必不可少的部分.打算围绕多线程编程总结一些核心概念及它们之间的关系,本篇是ReentrantLock和Condition的使用.
ReentrantLock
简介
ReentrantLock是在JDK1.5中引入的,具有可重入、可中断、可限时、公平锁/非公平锁等特性.关于它的适用场景可以参考ReentrantLock可重入锁的使用场景
公平锁和非公平锁
ReentrantLock只有2个构造方法,对应生成非公平锁和公平锁:
private ReentrantLock lock = new ReentrantLock(); //参数默认false,不公平锁
private ReentrantLock lock = new ReentrantLock(true); //公平锁
公平锁指的是先来的线程会先得到锁,后来的线程后得到锁,因此会产生线程排队的消耗,效率比非公平锁低.非公平锁后来的线程可能会先得到锁.
可重入
先看一段代码:
lock.lock();
lock.lock();
try {
doSomething();
} catch (Exception e) {
} finally {
lock.unlock();
lock.unlock();
}
这段代码中同一个线程先后2次获得了同一个锁,这种锁就是可重入锁.当然,获取了2次,就必须释放2次,否则会导致死锁.再看一段代码:
public class ReentrantDemo {
Lock lock = new Lock();
int count = 0;
public void A() {
lock.lock();
B();
lock.unlock();
}
public void B() {
lock.lock();
C();
lock.unlock();
}
public void C() {
lock.lock();
count++;
lock.unlock();
}
}
这段代码中方法A调用了方法B,方法B又调用了方法C.线程执行到A时已经获得了锁,到B时可以再次获得同一把锁,到C时仍然可以获得同一个把锁,这就是可重入.注意,获得了几次锁,就必须释放几次锁,否则会导致死锁.关于可重入也可以看看这篇Java并发编程——锁与可重入锁
可中断
线程获取锁的过程中如果锁被另一线程持有则会进入等待状态,该线程可以被中断从而退出等待.等待过程可被中断的线程不能通过一般的lock.lock()
方式获得锁,而是需要通过lock.lockInterruptibly()
方式获得锁.使用lock()方法时是不能被中断的.关于中断的案例可以参考ReentrantLock的使用
可限时
通过lock.tryLock(long timeout, TimeUnit unit)
方式获取锁时可以指定等待的时间,超出这个时间以后会返回false并退出等待.
Condition
简介
Condition是JDK1.5引入的.Condition依赖于lock,必须和一个lock绑定,通过lock的newCondition()方法创建.
Condition和监视器(monitor)方法对比
监视器方法包括wait()/notify()/notifyAll(),需要配合synchronized使用.而Condition是配合lock使用的.因此,Condition类似于监视器方法,Condition的await()类似于监视器方法wait(),Conditon的signal()类似于监视器方法notify(),Condition的signalAll()类似于监视器方法notifyAll().区别是如果对某个对象中的临界资源(即存在线程安全问题的资源)进行操作,使用该对象的监视器方法配合synchronized只能所有的线程都在一个等待集合中(wait-set).但是通过为该对象创建多个Condition可以实现等待一个lock的线程根据condition的不同被划分到多个等待集合中(wait-set),进行更复杂的线程同步.Oracle官网是这样说的:
Condition factors out the Object monitor methods (wait, notify and notifyAll) into distinct objects to give the effect of having multiple wait-sets per object, by combining them with the use of arbitrary Lock implementations. Where a Lock replaces the use of synchronized methods and statements, a Condition replaces the use of the Object monitor methods.
condition调用await()方法后线程会不会释放lock?
会释放lock,接收到同一个condition的signal后线程需要重新获得锁.Oracle官网对await()的解释是这样的:
Causes the current thread to wait until it is signalled or interrupted.
The lock associated with this Condition is atomically released and the current thread becomes disabled for thread scheduling purposes and lies dormant until one of four things happens:
• Some other thread invokes the signal() method for this Condition and the current thread happens to be chosen as the thread to be awakened; or
• Some other thread invokes the signalAll() method for this Condition; or
• Some other thread interrupts the current thread, and interruption of thread suspension is supported; or
• A "spurious wakeup" occurs.
In all cases, before this method can return the current thread must re-acquire the lock associated with this condition.
再来看signal()方法的解释:
Wakes up one waiting thread.
If any threads are waiting on this condition then one is selected for waking up. That thread must then re-acquire the lock before returning from await.
其实仔细思考还有一个问题,线程继续执行以后是从方法的lock.lock()处开始执行还是接着await()继续执行?目前并没有查到任何资料,Oracle也没有提到.个人目前猜测是await()方法中会做一次lock.unlock()操作,也就是释放锁,同时把线程加入到当前condition的等待队列中,当线程被唤醒时,会做一次lock.lock()操作,也就是重新获得锁,然后线程从await()方法处继续向下执行,参考深入浅出 Java Concurrency (9): 锁机制 part 4
案例
案例简介
建立一个缓冲队列,读线程从队列中循环读取数据,写线程向队列中循环写入数据.该案例根据Oracle官网给出的Conditon代码示例进行了改编.
案例代码
包括4个类: BoundedBuffer、BoundedBufferWrite、BounedBufferRead和BoundedBufferTest类
BoundedBuffer.java代码如下:
package com.ms.thread.reentrantlock;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* @author chenxin
* @since 2018-04-21
*/
public class BoundedBuffer {
//锁对象
final Lock lock = new ReentrantLock();
//写线程条件
final Condition notFull = lock.newCondition();
//读线程条件
final Condition notEmpty = lock.newCondition();
// 缓存队列
final Object[] items = new Object[100];
int putptr = 0;
int takeptr = 0;
int count = 0;
public void put(Object x) throws InterruptedException {
lock.lock();
try {
while (count == items.length)
//如果队列满了阻塞写线程.写线程会释放lock,写线程接收到signal时需要重新获得lock
notFull.await();
//赋值
items[putptr] = x;
if (++putptr == items.length)
//如果写索引写到队列的最后一个位置了,那么重置为0,开启下一轮
putptr = 0;
//队列中已存在的元素个数加1
++count;
//发出非空signal,等待非空signal的线程会被唤醒
notEmpty.signal();
} finally {
lock.unlock();
}
}
public Object take() throws InterruptedException {
lock.lock();
try {
while (count == 0)
//如果队列为空,等待非空signal.释放lock,接收到signal后需要先获得lock
notEmpty.await();
Object x = items[takeptr];// 取值
if (++takeptr == items.length)
//如果读索引读到队列的最后一个位置了,那么重置为0
takeptr = 0;
//队列中已存在的元素个数减1
--count;//
//发出condition队列不满signal,等待该signal的线程继续执行.
notFull.signal();
return x;
} finally {
lock.unlock();
}
}
}
BoundedBufferWrite.java代码如下:
package com.ms.thread.reentrantlock;
/**
* 写线程
* @author chenxin
* @since 2018-04-21
*/
public class BoundedBufferWrite implements Runnable{
private BoundedBuffer buffer;
public BoundedBufferWrite(BoundedBuffer boundedBuffer) {
this.buffer = boundedBuffer;
}
@Override
public void run() {
try {
while (true) {
buffer.put("iengchen");
System.out.println(Thread.currentThread().getName()+":写入成功");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
BounedBufferRead.java代码如下:
package com.ms.thread.reentrantlock;
/**
* 读线程
* @author chenxin
* @since 2018-04-21
*/
public class BounedBufferRead implements Runnable {
private BoundedBuffer buffer;
public BounedBufferRead(BoundedBuffer boundedBuffer) {
this.buffer = boundedBuffer;
}
@Override
public void run() {
try {
while (true) {
buffer.take();
System.out.println(Thread.currentThread().getName()+"读取成功");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
BoundedBufferTest.java代码如下:
package com.ms.thread.reentrantlock;
/**
* 用于测试BoundedBuffer、BoundedBufferWrite和BounedBufferRead类
* @author chenxin
* @since 2018-04-21
*/
public class BoundedBufferTest {
//定义写线程数量
private static int writeNum = 3;
//定义读线程数量
private static int readNum = 3;
public static void main(String[] args) {
//所有的写线程和读线程共同操作的缓冲队列
BoundedBuffer buffer = new BoundedBuffer();
//启动写线程
for(int i=0; i<writeNum; i++) {
new Thread(new BoundedBufferWrite(buffer), "写入线程-"+i).start();
System.out.println("启动写线程—"+i);
}
//启动读线程
for(int j=0; j<readNum; j++) {
new Thread(new BounedBufferRead(buffer), "读取线程-"+j).start();
System.out.println("启动读线程-"+j);
}
}
}
执行结果
打印部分结果如下:
写入线程-2:写入成功
读取线程-1读取成功
读取线程-1读取成功
写入线程-2:写入成功
写入线程-2:写入成功
读取线程-0读取成功
读取线程-0读取成功
写入线程-2:写入成功
写入线程-2:写入成功
写入线程-1:写入成功
读取线程-2读取成功
读取线程-2读取成功
读取线程-2读取成功
读取线程-2读取成功
读取线程-2读取成功
读取线程-2读取成功
读取线程-2读取成功
读取线程-2读取成功
读取线程-2读取成功
写入线程-0:写入成功
写入线程-0:写入成功
写入线程-0:写入成功
读取线程-2读取成功
读取线程-2读取成功
读取线程-2读取成功
写入线程-1:写入成功
写入线程-1:写入成功
写入线程-1:写入成功
写入线程-1:写入成功
写入线程-1:写入成功
写入线程-1:写入成功
写入线程-1:写入成功
写入线程-1:写入成功
写入线程-2:写入成功
读取线程-0读取成功
读取线程-0读取成功
读取线程-0读取成功
读取线程-0读取成功
读取线程-0读取成功
读取线程-0读取成功
读取线程-0读取成功
读取线程-1读取成功
读取线程-1读取成功
读取线程-1读取成功
读取线程-1读取成功
读取线程-1读取成功
读取线程-1读取成功
读取线程-1读取成功
读取线程-1读取成功
读取线程-1读取成功
读取线程-1读取成功
读取线程-0读取成功
读取线程-0读取成功
读取线程-0读取成功
写入线程-2:写入成功
写入线程-2:写入成功
写入线程-2:写入成功
写入线程-2:写入成功
写入线程-2:写入成功
写入线程-2:写入成功
写入线程-2:写入成功