JAVA

阻塞队列(二)(ArrayBlockingQueue )

2019-11-14  本文已影响0人  逗逼程序员

前言

ArrayBlockingQueue 是一个用数组实现的有界阻塞队列,其大小在构造函数来决定,确认之后就不能改变了,其内部按照先进先出的原则对元素进行排序,其中put方法和take方法 为添加和删除的阻塞方法。下面通过一个生产者-消费者模式来了解其使用方式:

生产者-消费者模型

public class BlockQueueDemo {

    private static final BlockingQueue<Person> QUEUE = new ArrayBlockingQueue<>(50);

    public static void main(String[] args) {
        new Thread(new Producer(QUEUE)).start();
        new Thread(new Consumer(QUEUE)).start();
        System.out.println("主线程结束-----------------");
    }
}

class Person {

    private String name;

    private String cardno;

    public Person(String name, String cardno) {
        this.name = name;
        this.cardno = cardno;
    }

    @Override
    public String toString() {
        return "name:" + this.name + ",cardno:" + this.cardno;
    }
}

class Producer implements Runnable {

    private BlockingQueue queue;

    public Producer(BlockingQueue queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        while (true) {
            try {
                //阻塞放入随机产生的对象Person
                queue.put(new Person(UUID.randomUUID().toString(), UUID.randomUUID().toString()));
                System.out.println("Produce Person-------Queue Size : " + queue.size());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

class Consumer implements Runnable {

    private BlockingQueue queue;

    public Consumer(BlockingQueue queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            //睡眠等待队列被生产者填充完毕
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        while (true) {
            try {
                //阻塞获取队列中的Person对象
                System.out.println("Consumer Person :" + queue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

ArrayBlockingQueue的线程阻塞是通过重入锁ReenterLock和Condition条件队列实现的, 所以ArrayBlockingQueue 阻塞等待队列 存在公平访问与非公平访问的区别,对于公平访问队列,被阻塞的线程可以按照阻塞的先后顺序访问队列,即先阻塞的线程先访问队列,而非公平锁,当队列可用时,阻塞的线程将进入争夺资源的竞争中,也就是谁先抢到谁先执行,没有固定的先后顺序,

ArrayBlockingQueue原理概要

ArrayBlockingQueue 的内部通过一个可重入锁ReentrantLock 和两个Condition 条件对象来实现阻塞,内部成员变量如下:

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
    //底层存储数组
    final Object[] items;
    //获取数据索引
    int takeIndex;
    //添加数据索引
    int putIndex;
    //队列元素个数
    int count;
    //控制并发访问锁
    final ReentrantLock lock;
    //notEmpty条件对象,用于通知take方法队列已有元素,可执行获取操作 
    private final Condition notEmpty;
    //notFull条件对象,用于通知put方法队列未满,可执行添加操作
    private final Condition notFull;
    //迭代器
    transient Itrs itrs = null;
}

从成员变量上看,ArrayBlockingQueue 内部通过数组对象items来存储数据。通过ReentrantLock 来同时控制添加线程和移除线程的并发访问。而对于notEmpty 条件对象则是用于存放等待或唤醒take 方法的线程。告诉他们队列已有元素,可以执行获取元素。

下面分析一下 阻塞的添加方法:

  public void put(E e) throws InterruptedException {
      //判断新增元素不为null
        checkNotNull(e);
      //获取锁
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();//该方法可中断
        try {
            //元素个数等于数组长度无法添加
            while (count == items.length)
                //当前线程挂起,添加到等待队列等待唤起
                notFull.await();
            //直接进去队列
            enqueue(e);
        } finally {
            //释放锁
            lock.unlock();
        }
  }

put 方法是一个阻塞方法,如果队列元素已满,那么当前线程会被notFull 条件对象挂起加到等待队列中,直到队列有空才会唤起添加操作。

1.png

其他方法的分析 同理。。。。

上一篇下一篇

猜你喜欢

热点阅读