生产者消费者模型

2022-12-13  本文已影响0人  arkliu

传统的生产者消费者模型

使用两个线程,操作同一个变量,一个进行+1操作,另一个进行-1操作。

public class ThreadTest3 {

    public static void main(String[] args) {
        Data data = new Data();
        new Thread(()->{for(int i=0; i < 10; i++) {try {
            data.increment();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }}},"线程1").start();
        new Thread(()->{for(int i=0; i < 10; i++) {try {
            data.decrement();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }}},"线程2").start();
    }
}

// 判断等待  业务  通知
class Data {
    private int num = 0;
    
    // +1操作
    public synchronized void increment() throws InterruptedException {
        while (num != 0) {
            this.wait(); //等待
        }
        num++;
        System.out.println(Thread.currentThread().getName()+"==>"+num);
        this.notifyAll(); // 通知其他线程
    }
    
    // -1操作
    public synchronized void decrement() throws InterruptedException {
        while (num != 1) { 
            this.wait();//等待
        }
        num--;
        System.out.println(Thread.currentThread().getName()+"==>"+num);
        this.notifyAll(); // 通知其他线程
    }
}
image.png

Lock版本生产者消费者

public class ThreadTest3 {

    public static void main(String[] args) {
        Data data = new Data();
        new Thread(()->{for(int i=0; i < 10; i++) {try {
            data.increment();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }}},"线程1").start();
        new Thread(()->{for(int i=0; i < 10; i++) {try {
            data.decrement();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }}},"线程2").start();
        new Thread(()->{for(int i=0; i < 10; i++) {try {
            data.increment();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }}},"线程3").start();
        new Thread(()->{for(int i=0; i < 10; i++) {try {
            data.decrement();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }}},"线程4").start();
    }
}

// 判断等待  业务  通知
class Data {
    private int num = 0;
    private Lock mLock = new ReentrantLock();
    private Condition mCondition = mLock.newCondition();
    // +1操作
    public void increment() throws InterruptedException {
        
        try {
            mLock.lock(); // 1.上锁
            while (num != 0) {
                mCondition.await();// 2.等待
            }
            num++;
            System.out.println(Thread.currentThread().getName()+"==>"+num);
            mCondition.signalAll();// 3.通知其他线程
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            mLock.unlock(); // 4.解锁
        }
    }
    
    // -1操作
    public void decrement() throws InterruptedException {
        try {
            mLock.lock();
            while (num != 1) { 
                mCondition.await();//等待
            }
            num--;
            System.out.println(Thread.currentThread().getName()+"==>"+num);
            mCondition.signalAll();// 通知其他线程
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            mLock.unlock();
        }
    }
}
image.png

上面一次唤醒所有线程,所有的线程都去共同争抢资源,导致线程执行顺序是无序的,而不是我们想看到的(线程1, 线程2,线程2,线程4 依次执行的效果)。

使用Condition实现精确唤醒


import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ThreadTest4 {

    public static void main(String[] args) {
        Data2 data2 = new Data2();
        new Thread(()->{for(int i=0; i < 10; i++) {try {
            data2.printA();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }}},"线程1").start();
        new Thread(()->{for(int i=0; i < 10; i++) {try {
            data2.printB();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }}},"线程2").start();
        new Thread(()->{for(int i=0; i < 10; i++) {try {
            data2.printC();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }}},"线程3").start();
    }
}

// 判断等待  业务  通知
class Data2 {
    private int num = 1; // 1A  2B  3C
    private Lock mLock = new ReentrantLock();
    private Condition mCondition1 = mLock.newCondition();
    private Condition mCondition2 = mLock.newCondition();
    private Condition mCondition3 = mLock.newCondition();
    public void printA() throws InterruptedException {
        
        try {
            mLock.lock(); // 1.上锁
            while (num != 1) {
                mCondition1.await();// 2.等待
            }
            System.out.println(Thread.currentThread().getName()+"==AAAAA");
            num = 2;
            mCondition2.signal();// 3.通知mCondition2 await的线程
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            mLock.unlock(); // 4.解锁
        }
    }
    
    public void printB() throws InterruptedException {
        
        try {
            mLock.lock(); // 1.上锁
            while (num != 2) {
                mCondition2.await();// 2.等待
            }
            System.out.println(Thread.currentThread().getName()+"==BBBBB");
            num = 3;
            mCondition3.signal();// 3.通知mCondition3 await的线程
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            mLock.unlock(); // 4.解锁
        }
    }
    
    public void printC() throws InterruptedException {
        
        try {
            mLock.lock(); // 1.上锁
            while (num != 3) {
                mCondition3.await();// 2.等待
            }
            System.out.println(Thread.currentThread().getName()+"==CCCCC");
            num = 1;
            mCondition1.signal();// 3.通知mCondition1 await的线程
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            mLock.unlock(); // 4.解锁
        }
    }
}
image.png
上一篇下一篇

猜你喜欢

热点阅读