生产者/消费者问题

2021-03-20  本文已影响0人  霍胖子

生产者消费者问题

背景

在并发编程中,生产者消费者问题(producer/consumer)是一个经典的老生常谈的问题,有时也称为有界缓冲区问题。
问题的基本背景假设是:我们有一个固定大小的缓冲区,这个缓冲区分别有两种工作性质不同的线程去操作。其中一种线程负责向缓冲区中写入数据,我们称之为生产者线程。另一种线程则负责从缓冲区中拿取数据,并称之为消费者线程。

同时两种线程的写入和拿取工作要遵循一定的规则:

  1. 缓冲区未写满时,生产者线程可以向缓冲区中写入数据。但是消费者线程不能从缓冲区中读取数据。
  2. 缓冲区写满时,生产者线程不能向缓冲区中写入数据,消费者线程可以冲缓冲区中读取数据。
  3. 不管是那种性质的线程,在操作缓冲区时,均不可出现并发安全问题。

分析可以得知,解决生产者消费者问题,其实就是要解决线程同步问题与共享资源互斥访问问题。互斥问题的解决可以借助锁来实现,而线程同步则需借助信号量或其他工具来实现。

Java实现

class FixedSizeBuffer
{
    private static final int DEFAULT_BUFFER_SIZE = 1024;

    private final ReentrantLock lock = new ReentrantLock(); // 共享资源访问锁
    private final Condition isFull = lock.newCondition(); // buffer是否已满
    private final Condition isEmpty = lock.newCondition(); // buffer是否还空着

    private final int size; // buffer的大小
    private final byte[] buffer; // buffer
    private int cursor; // 写入游标


    public FixedSizeBuffer()
    {
        this(DEFAULT_BUFFER_SIZE);
    }

    public FixedSizeBuffer(int size)
    {
        if (size <= 0) throw new IllegalArgumentException();
        this.size = size;
        this.buffer = new byte[size];
        cursor = -1;
    }

    /**
     * 向buffer中写入一个字节的数据
     * @param content 数据内容
     * @throws InterruptedException 中断异常
     */
    public void putByte(byte content) throws InterruptedException
    {
        /*
            由于要对共享资源buffer进行访问,所以要加锁。
         */
        lock.lock();
        try
        {
            /*
                如果写入游标等于数组的最大下标,这时要停止写入。
                (因为,使用cursor = -1代表buffer内容被清空)
             */
            while (cursor == (size - 1))
            {
                System.out.println(Thread.currentThread().getName() + " : 缓冲区已满");
                isEmpty.signalAll(); // 唤醒消费者线程,可以从buffer中拿走数据了。
                /*
                    await() 方法会暂时挂起当前线程,并且释放当前线程所持有的锁。
                    当该线程被唤醒时,线程会从await代码下一处位置开始执行。
                    在线程被唤醒的同时,也会重新获取当前Condition所关联的锁。
                    
                    所以这里要使用while循环来判断,因为线程是在此处代码被唤醒的。
                    这样就做到了多重检查的作用。
                 */
                isFull.await(); 
            }
            ++cursor;
            System.out.println(Thread.currentThread().getName() + " => Buffer [cursor = " + cursor + "]");
            buffer[cursor] = content;
        }finally
        {
            lock.unlock();
        }


    }

    /**
     * 取出当前buffer的所有内容
     * @return buffer content
     * @throws InterruptedException 中断异常
     */
    public byte[] takeAll() throws InterruptedException
    {
        lock.lock();
        try
        {
            while (cursor != (size - 1))
                isEmpty.await(); // 当前buffer未满,需要等待写入线程唤醒。
            final byte[] result = new byte[size];
            System.arraycopy(buffer, 0, result, 0, size); // 将内容拷贝
            cursor = -1; // 重置写入游标
            System.out.println(Thread.currentThread().getName() + " <= Buffer [cursor = -1]");
            isFull.signalAll(); // 唤醒生产者线程
            return result;
        }finally
        {
            lock.unlock();
        }

    }
}
上一篇下一篇

猜你喜欢

热点阅读