生产者-消费者模式 系列 之二 ReentrantLock, C

2017-06-21  本文已影响0人  飞奔的蚂蚁

这篇文章将主要集中在JDK 1.5中引入的 lock 机制和其对应的condition 条件变量. 将通过ReentrantLock 来实现系列一中同样功能的生产者-消费者模式.
Lock : Lock概念的出现,乍一看是把Java中一直使用的隐式对象锁(可参考系列一文章synchronized中锁类型的实验例子)以一种独立明确的方式进行了定义. 使得在多线程环境中对共享资源的访问控制,除synchronized之外,又多了一种选择. 但仔细琢磨你会发现, Lock还有很多synchronized无法比拟的优势.

try{   
lock.lock();   
//do something  
} finally{  
lock.unlock();  
} 
private Lock lock = new ReentrantLock();  
synchronized(lock){  
  //do something  
  lock.notify();  
} 
if ( condition variable == true )   
  signal;   
else  
  await;  
end;  

所以, 在命名Condition 变量时, 要根据你的上下文,给它赋予一个有意义,好辩识的名字.

生产者消费者场景假定

import java.util.Stack;  
import java.util.concurrent.locks.Condition;  
import java.util.concurrent.locks.Lock;  
import java.util.concurrent.locks.ReentrantLock;  
  
public class ReentrantLockCPTest {  
  
  
  
    public static void main(String[] args) {  
          
        Repository s = new Repository();  
        Maker f1 = new Maker(s,"P001");  
        Maker f2 = new Maker(s,"P002");  
          
        f1.start();  
        f2.start();  
  
        Taker c1 = new Taker(s,"C001");  
        Taker c2 = new Taker(s,"C002");  
        Taker c3 = new Taker(s,"C003");  
        c1.start();  
        c2.start();  
        c3.start();  
          
    }  
  
}  
class Repository{  
      
    private  final static int MAX_ELEMENT = 10;  
      
    private Stack<String> _store = new Stack<String>();  
      
    private Lock lock = new ReentrantLock();  
    private final Condition notEmpty = lock.newCondition();  
    private final Condition notFull = lock.newCondition();  
  
      
      
    public  void add(String in) throws InterruptedException{  
          
        lock.lock();  
          
        try{  
            while (this._store.size() >= MAX_ELEMENT) {//这里一定是Loop 循环,因为被阻塞的生产者线程被唤醒后要继续执行,但之前必须判断产品库是否已满.  
                System.out.println(Thread.currentThread().getName() + " is waiting on add.");  
                notFull.await(); //如果参数中数量达到10个的最大值.生产者线程等待.  
                System.out.println(Thread.currentThread().getName() + " is after waiting on add.");  
            }  
            this._store.push(in);  
            System.out.println(Thread.currentThread().getName() + " is adding product "+ in+". Remaining size is "+ this._store.size());  
            notEmpty.signal(); //唤醒那些因为产品库为零时等待的消费者进程.  
  
        }finally{  
            lock.unlock();  
        }  
  
  
  
  
    }  
      
    public   String  get() throws InterruptedException{  
        lock.lock();  
        String rtn = "";  
          
        try{  
              
            while (this._store.isEmpty()) {//这里一定是Loop 循环,因为被阻塞的消费者线程被唤醒后要继续执行,但之前必须判断产品库是否为空.  
                System.out.println(Thread.currentThread().getName() + " is waiting on get.");  
                notEmpty.await();//如果产品库为空,消费者线程等待.直到产品库中有产品时被唤醒.  
                System.out.println(Thread.currentThread().getName() + " is after waiting on get.");  
                  
            }  
              
            rtn = this._store.pop();  
            System.out.println(Thread.currentThread().getName() + " is getting product "+ rtn+". Remaining size is "+ this._store.size());  
            notFull.signal();//唤醒因产品库为空可能导致等待的生产者线程.  
              
        }finally{  
            lock.unlock();  
        }  
      
  
  
        return rtn;  
    }  
  
}  
  
class Maker implements Runnable {  
         
    private Repository _store;  
    private String _name;  
    private Thread _thread;  
    public Maker(Repository s,String name) {  
    super();  
    this._store = s;  
    this._name = name;  
    this._thread = new Thread(this,name);  
    }  
      
    public void start(){  
        this._thread.start();  
    }  
      
    @Override  
    public void run() {  
        int i = 0;  
        while(true){  
            try {  
                this._store.add(this._name + " Product "+ ++i);  
                Thread.sleep(1000);  
            } catch (InterruptedException e) {  
                e.printStackTrace();  
            } catch (Exception e){  
                e.printStackTrace();  
            }  
        }  
  
    }  
  
}  
  
  
class Taker implements Runnable {  
        
    private Repository _store;  
    private Thread _thread;  
    public Taker(Repository s,String name) {  
        super();  
        this._store =s;  
        this._thread = new Thread(this,name);  
    }  
    public void start(){  
        this._thread.start();  
    }  
      
    @Override  
    public void run() {  
        while(true){  
            try {  
                this._store.get();  
                Thread.sleep(5000);  
            } catch (InterruptedException e) {  
                e.printStackTrace();  
            }catch (Exception e) {  
                e.printStackTrace();  
            }  
        }  
  
    }  
  
}  

运行结果:

P002 is adding product P002 Product 1. Remaining size is 1  
P001 is adding product P001 Product 1. Remaining size is 2  
C002 is getting product P001 Product 1. Remaining size is 1  
C001 is getting product P002 Product 1. Remaining size is 0  
C003 is waiting on get.  
P001 is adding product P001 Product 2. Remaining size is 1  
P002 is adding product P002 Product 2. Remaining size is 2  
C003 is after waiting on get.  
C003 is getting product P002 Product 2. Remaining size is 1  
P002 is adding product P002 Product 3. Remaining size is 2  
P001 is adding product P001 Product 3. Remaining size is 3  
P001 is adding product P001 Product 4. Remaining size is 4  
P002 is adding product P002 Product 4. Remaining size is 5  
P002 is adding product P002 Product 5. Remaining size is 6  
P001 is adding product P001 Product 5. Remaining size is 7  
C001 is getting product P001 Product 5. Remaining size is 6  
P002 is adding product P002 Product 6. Remaining size is 7  
C002 is getting product P002 Product 6. Remaining size is 6  
P001 is adding product P001 Product 6. Remaining size is 7  
C003 is getting product P001 Product 6. Remaining size is 6  
P002 is adding product P002 Product 7. Remaining size is 7  
P001 is adding product P001 Product 7. Remaining size is 8  
P002 is adding product P002 Product 8. Remaining size is 9  
P001 is adding product P001 Product 8. Remaining size is 10  
P002 is waiting on add.  
P001 is waiting on add.  
C001 is getting product P001 Product 8. Remaining size is 9  
C002 is getting product P002 Product 8. Remaining size is 8  
P002 is after waiting on add.  
上一篇 下一篇

猜你喜欢

热点阅读