多线程并发框架使用一
2018-03-19 本文已影响0人
丹青水
Semaphore
[ˈseməfɔ:(r)] 计数信号量,是用来控制线程的并发数量,它可以协调各个线程,以保证合理是使用资源,常用于流量控制
ReentrantLock
一个可重入的互斥锁定 Lock,它具有与使用 synchronized 方法和语句所访问的隐式监视器锁定相同的一些基本行为和语义,但功能更强大。ReentrantLock 将由最近成功获得锁定,并且还没有释放该锁定的线程所拥有。当锁定没有被另一个线程所拥有时,调用 lock 的线程将成功获取该锁定并返回。如果当前线程已经拥有该锁定,此方法将立即返回可使用
Condition
java中条件变量都实现了java.util.concurrent.locks.Condition接口,条件变量的实例化是通过一个Lock对象上调用newCondition()方法来获取的,条件就和一个锁对象绑定起来了。因此,Java中的条件变量只能和锁配合使用,来控制并发程序访问竞争资源的安全。条件变量的出现是为了更精细控制线程等待与唤醒,在Java5之前,线程的等待与唤醒依靠的是Object对象的wait()和notify()/notifyAll()方法,这样的处理不够精细。简单讲,就是消费者/生产者的场景中,在原来的基础上,增加了队列满时及时通知消费者,队列空时及时通知生产者的优化,通常是两个条件变量一起出现,一个控制值,但两个条件变量可以毫无关系,终归来说还是在Lock的范围内。所以,从本质上来说,是对Object监视器的场景性优化,而不是全新机制的引入。
Exchanger
它允许在并发任务之间交换数据。具体来说,Exchanger类允许在两个线程之间定义同步点。当两个线程都到达同步点时,他们交换数据结构,因此第一个线程的数据结构进入到第二个线程中,第二个线程的数据结构进入到第一个线程中。
例子1(多进顺序执行)
public class SemaphoreDemo1 {
private Semaphore semaphore=new Semaphore(10);//初始化信号量的大小,new Semaphore(10,isFair) isfair代表是否公平信号的标识,公平就是按照调用顺序,非公平系统随机
private ReentrantLock reentrantLock=new ReentrantLock();//重用锁
public void fly(){
try {
System.out.println("first:" + Thread.currentThread().getName() + "firstTime:" +System.currentTimeMillis());
semaphore.acquire(3);//每个线程需要的信号量,假如有10个线程同时进来,每个需要3个通行证,则同时能进来3个,7个需要等待。
long start= System.currentTimeMillis();
System.out.println("threadName:" + Thread.currentThread().getName() + "beginTime:" +start);
Thread.sleep(500);
System.out.println("threadName:" + Thread.currentThread().getName() + "endTime:" + (System.currentTimeMillis()));
System.out.println("avaliable permit x:"+semaphore.availablePermits()); //可使用的通行证数量
System.out.println("qune thread : " + semaphore.getQueueLength());//获取等待许可的数量
reentrantLock.lock();
for(int i=0;i<5;i++){
System.out.println("threadName:" + Thread.currentThread().getName() + "out:" + i);
}
reentrantLock.unlock();
}catch (Exception e){
System.err.println(e.getMessage());
}finally {
semaphore.release(3);
System.out.println("avaliable permit xx:" + semaphore.availablePermits());
}
}
}
public class SemaphoreTest {
public static void main(String[] ags){
SemaphoreDemo1 semaphoreDemo1=new SemaphoreDemo1();
List<Thread> threadList= new ArrayList<>();
for(int i=0;i<10;i++){
threadList.add( new Thread(new DoWhat(semaphoreDemo1)));
}
for(Thread thread:threadList){
thread.start();
}
}
};
class DoWhat implements Runnable {
private SemaphoreDemo1 semaphoreDemo1;
public DoWhat(SemaphoreDemo1 semaphoreDemo1) {
this.semaphoreDemo1 = semaphoreDemo1;
}
public void run() {
semaphoreDemo1.fly();
}
}
例子2(模拟线程池)
public class PoolConnectDemo {
private int maxSize=10;
private int semaphoreLimit=2;
private List<String> connections;
private Semaphore limit=new Semaphore(semaphoreLimit);
private ReentrantLock lock=new ReentrantLock();
private Condition condition=lock.newCondition();//条件
public PoolConnectDemo() {
super();
for(int i=0;i<maxSize;i++){
this.connections.add("pool:"+i);
}
}
public String get(){
String returnStr=null;
try {
limit.acquire();//限制并发线程
lock.lock();//加锁
while (connections.size()==0){//如果么有
condition.await();//线程等待其他线程释放,挂起
}
returnStr=connections.remove(0);
lock.unlock();//释放锁
}catch (Exception e){
}
return returnStr;
}
public void put(String connect){
lock.lock();
connections.add(connect);
condition.signalAll();//通知其他等待线程继续执行,相当于notify和notifyAll
lock.unlock();
limit.release();
}
}
例子3(两个线程之间数据交换)
public class ExchangerDemo {
public static void main(String[] args) {
ExecutorService service = Executors.newCachedThreadPool();
final Exchanger<String> exchanger = new Exchanger<String>();
service.execute( new Runnable() {
public void run() {
try {
String food = "food" ;
System.out.println("threadName: " + Thread.currentThread().getName() + "exchange " + food + " in");
Thread.sleep(( long ) (Math.random() * 10000 ));
String money = (String) exchanger.exchange(food);
System.out.println( "threadName: " + Thread.currentThread().getName()
+ "exchange data out: " + money);
} catch (Exception e) {
}
}
});
service.execute( new Runnable() {
public void run() {
try {
String money = "money" ;
System.out.println("threadName: " + Thread.currentThread().getName() + "exchange " + money + " in" );
Thread.sleep(( long ) (Math.random() * 10000 ));
String food = (String) exchanger.exchange(money);
System.out.println( "threadName: " + Thread.currentThread().getName() + "exchage data out:" + food);
} catch (Exception e) {
}
}
});
service.shutdown();
}
}