IT人故事会

一文读懂JAVA并发容器类ArrayList,Set,Queue

2021-03-01  本文已影响0人  IT人故事会

上节说了ConcurrentHashMap,之前的知识会映射到今天的内容点上面,学了这些方法到底怎么用,更多List,Set,Queue要去看源码的时候,掌握现有知识点,源码对你难度不太大了,里面的变量命名比较麻烦。本次说说List,重要的说里面的原理,使用这块老铁们应该都明白。

(一)ArrayList

List 接口的大小可变数组的实现。实现了所有可选列表操作,并允许包括 null 在内的所有元素。

ArrayList默认有一个空的数组, 数据的顺序插入,如果当前的数组长度不够存储的时候,进行扩容处理,直接去创建一个新的数组,创建完成之后,把数组进行拷贝,本身是线程非安全的,不要一边遍历,一边删除代码。扩容的时候存在i++,之前也说过i++的情况下很容易存在高并发问题。

(二)CopyOnWriteArrayList

List 接口的大小可变数组的实现。实现了所有可选列表操作,并允许包括 null 在内的所有元素。

内部持有一个ReentrantLock lock = new ReentrantLock();底层是用volatile transient声明的数组 array
读写分离,写时复制出一个新的数组,完成插入。修改或者移除操作后将新数组赋值给array

(三)ArrayList 和 CopyOnWriteArrayList

CopyOnWriteArrayList容器即写时复制的容器和ArrayList比较,优点是并发安全,缺点有两个

  1. 多了内存占用,写数据是copy一份完成的数据,单独进行操作,占用两份内存。
  2. 数据一致性:数据写完之后,其他线程不一定是马上读取到最新内容。

(四)Set

无序(没有下标) 集合中的元素不重复。

因为是基于HashMap的,只要保证key不重复,其实内部就不会重复。

CopyOnWriteArrayList中允许有重复的元素;但是,CopyOnWriteArraySet是一个集合,所以它不能有重复集合,因此,CopyOnWriteArrayList额外提供了addIfAbsent()和addAllAbsent()这两个添加元素的API,通过这些API来添加元素时,只有当元素不存在时才执行添加操作!

所有操作都是无阻塞的,所有操作都可以并行,包括写,实现了ConcurrentMap接口,直接支持一些原子复合操作(与ConcurrentHashMap类似),可排序(与TreeMap一样),默认按键自然有序,可以传递比较器自定义排序,实现了SortedMap和NavigableMap接口。

(五)Queue

基本上,一个队列就是一个先入先出(FIFO)的数据结构。

  1. 我去买一本书,立即买到了,或者没有就走了,这就是非阻塞;(编程中设置IO成非阻塞,返回后再去检查描述符,或者等待通知,然后再去读取。相当于老板告诉我可以先忙点别的,过一会再来问问,或者老板通知我。但期间这个窗口(文件描述符)别人是用不了的)("立即买到了"在IO中也需要等待,不能算非阻塞IO)
  2. 如果恰好书店没有,我就等一直等到书店有了这本书买到了才走,这就是阻塞;而排在我后面的人呢只有我买到了书后才能再买书了。
  3. 如果书店恰好没有,我就告诉书店老板,书来了告诉我一声让我来取或者直接送到我家,然后我就走了,去做别的事了,这就是异步。这时候如果很多人来买书,都是老板登记一下完事。 (从IO角度来说,“告诉我来取”,这个近似于信号驱动IO,不能算异步IO。必须书送到我家才算是异步,如果不送到我家,我想看这本书之前,终究还是需要我跑一趟)
  4. 前面两种情况,非阻塞和阻塞都可以称为同步。

ArrayBlockingQueue是采用数组实现的有界阻塞线程安全队列。如果向已满的队列继续塞入元素,将导致当前的线程阻塞。如果向空队列获取元素,那么将导致当前线程阻塞。

import java.util.concurrent.ArrayBlockingQueue;


// 它是基于数组的阻塞循环队列, 此队列按 FIFO(先进先出)原则对元素进行排序。
public class ArrayBlockingQueueDemo {
    public static void main(String[] args) throws InterruptedException {
        // 构造时需要指定容量(量力而行),可以选择是否需要公平(最先进入阻塞的,先操作)
        ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(3, false);
        // 1秒消费数据一个
        new Thread(() -> {
            while (true) {
                try {
                    // queue.task();
                    System.out.println("取到数据:" + queue.poll()); // poll非阻塞
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                }
            }
        }).start();

        Thread.sleep(3000L); // 让前面的线程跑起来,上边是消费者,下面的方法是生产者。

        // 三个线程塞数据
        for (int i = 0; i < 3; i++) {
            new Thread(() -> {
                try {
                     queue.put(Thread.currentThread().getName()); // put阻塞(如果当前的队列已经塞满了数据,线程不会继续往下执行,等待其他线程把
                    // 队列的数据拿出去// )
//                    queue.offer(Thread.currentThread().getName()); // offer非阻塞,满了返回false
                    System.out.println(Thread.currentThread() + "塞入完成");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
}

前面三秒是没有数据的,等数据存储完毕后才可以读取。用了6个线程来完成。队列对于线程池,连接池都会有队列的使用,存放一些对象和数据。
存放:里面设置了lock,拿到一把锁,然后进行入队列,数量++,索引自行维护putIndex。
移除:数量--,找到对应的节点,lock设置了一把锁。索引自行维护takeIndex(记录下一个要获取的)。

DelayQueue是一个无界阻塞队列,只有在延迟期满时才能从中提取元素。该队列的头部是延迟期满后保存时间最长的Delayed 元素。
DelayQueue是一个用来延时处理的队列,所谓延时处理就是说可以为队列中元素设定一个过期时间,相关的操作受到这个设定时间的控制。

import java.util.Date;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

// (基于PriorityQueue来实现的)是一个存放Delayed 元素的无界阻塞队列,
// 只有在延迟期满时才能从中提取元素。该队列的头部是延迟期满后保存时间最长的 Delayed 元素。
// 如果延迟都还没有期满,则队列没有头部,并且poll将返回null。
// 当一个元素的 getDelay(TimeUnit.NANOSECONDS) 方法返回一个小于或等于零的值时,
// 则出现期满,poll就以移除这个元素了。此队列不允许使用 null 元素。
public class DelayQueueDemo {
    public static void main(String[] args) throws InterruptedException {
        DelayQueue<Message> delayQueue = new DelayQueue<Message>();
        // 这条消息5秒后发送
        Message message = new Message("message - 00001", new Date(System.currentTimeMillis() + 5000L));
        delayQueue.add(message);

        while (true) {
            System.out.println(delayQueue.poll());
            Thread.sleep(1000L);
        }
        // 线程池中的定时调度就是这样实现的
    }
}

// 实现Delayed接口的元素才能存到DelayQueue
class Message implements Delayed {

    // 判断当前这个元素,是不是已经到了需要被拿出来的时间
    @Override
    public long getDelay(TimeUnit unit) {
        // 默认纳秒
        long duration = sendTime.getTime() - System.currentTimeMillis();
        return TimeUnit.NANOSECONDS.convert(duration, TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed o) {
        return o.getDelay(TimeUnit.NANOSECONDS) > this.getDelay(TimeUnit.NANOSECONDS) ? 1 : -1;
    }

    String content;
    Date sendTime;

    /**
     * @param content  消息内容
     * @param sendTime 定时发送
     */
    public Message(String content, Date sendTime) {
        this.content = content;
        this.sendTime = sendTime;
    }

    @Override
    public String toString() {
        return "Message{" +
                "content='" + content + '\'' +
                ", sendTime=" + sendTime +
                '}';
    }
}

延迟的排序是根据谁快,谁的位置最靠前。

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

// 它是基于链表的队列,此队列按 FIFO(先进先出)排序元素。
// 如果有阻塞需求,用这个。类似生产者消费者场景
public class LinkedBlockingQueueDemo {
    public static void main(String[] args) throws InterruptedException {
        // 构造时可以指定容量,默认Integer.MAX_VALUE
        LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<String>(3);
        // 1秒消费数据一个
        new Thread(() -> {
            while (true) {
                try {
                    System.out.println("取到数据:" + queue.poll()); // poll非阻塞
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                }
            }
        }).start();

        Thread.sleep(3000L); // 让前面的线程跑起来

        // 三个线程阻塞数据
        for (int i = 0; i < 6; i++) {
            new Thread(() -> {
                try {
                    // queue.put(Thread.currentThread().getName()); // put阻塞
                    queue.offer(Thread.currentThread().getName()); // offer非阻塞,满了返回false
                    System.out.println(Thread.currentThread() + "塞入完成");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
}

存放数据的方式:链表的形式。
入队列用了Atomic的形式,原子性。
LinkedBlockingQueue 和 ConcurrentBlockingQueue 两个的区别是 LinkedBlockingQueue 是通过lock加锁的方式,ConcurrentBlockingQueue 是通过cas的方式。

import java.util.PriorityQueue;
import java.util.concurrent.PriorityBlockingQueue;

// 包装了 PriorityQueue
// 是一个带优先级的 队列,而不是先进先出队列。
// 元素按优先级顺序被移除,该队列也没有上限
// 没有容量限制的,自动扩容
// 虽然此队列逻辑上是无界的,但是由于资源被耗尽,所以试图执行添加操作可能会导致 OutOfMemoryError),
// 但是如果队列为空,
// 那么取元素的操作take就会阻塞,所以它的检索操作take是受阻的。另外,
// 入该队列中的元素要具有比较能力
public class PriorityBlockingQueueDemo {
    public static void main(String[] args) {
        // 可以设置比对方式
        PriorityBlockingQueue<String> priorityQueue = new PriorityBlockingQueue<>(2);
        priorityQueue.add("c");
        priorityQueue.add("a");
        priorityQueue.add("b");

        System.out.println(priorityQueue.poll());
        System.out.println(priorityQueue.poll());
        System.out.println(priorityQueue.poll());
    }
}
import java.util.Comparator;
import java.util.PriorityQueue;

public class PriorityQueueDemo {
    public static void main(String[] args) {
        // 可以设置比对方式
        PriorityQueue<String> priorityQueue = new PriorityQueue<>(new Comparator<String>() {
            @Override //
            public int compare(String o1, String o2) {
                // 实际就是 元素之间的 比对。
                return 0;
            }
        });
        priorityQueue.add("c");
        priorityQueue.add("a");
        priorityQueue.add("b");

        System.out.println(priorityQueue.poll());
        System.out.println(priorityQueue.poll());
        System.out.println(priorityQueue.poll());

        PriorityQueue<MessageObject> MessageObjectQueue = new PriorityQueue<>(new Comparator<MessageObject>() {
            @Override
            public int compare(MessageObject o1, MessageObject o2) {
                return o1.order > o2.order ? -1 : 1;
            }
        });
    }
}

class MessageObject {
    String content;
    int order;
}

一个带优先级的 队列,而不是先进先出队列。
元素按优先级顺序被移除,该队列也没有上限, 没有容量限制的,自动扩容。
虽然此队列逻辑上是无界的,但是由于资源被耗尽,所以试图执行添加操作可能会导致 OutOfMemoryError), 但是如果队列为空, 那么取元素的操作take就会阻塞,所以它的检索操作take是受阻的。另外, 入该队列中的元素要具有比较能力
new Comparator<MessageObject>() 比较器

import java.util.concurrent.SynchronousQueue;

// 这是一个神奇的队列, 因为他不存数据。 手把手的交互数据
public class SynchronousQueueDemo {
    public static void main(String[] args) throws InterruptedException {
        SynchronousQueue<String> synchronousQueue = new SynchronousQueue<>();
        // synchronousQueue.add("a"); // IllegalStateException
        // synchronousQueue.offer("a");
        System.out.println(synchronousQueue.poll()); // 非阻塞

        // 阻塞式的用法
        new Thread(() -> {
            try {
                System.out.println("等数据....");
                System.out.println(synchronousQueue.take());
                System.out.println("执行完毕....");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

        Thread.sleep(1000L);
        System.out.println("准备赛数据了");
        synchronousQueue.put("a");// 等待有人取走他
    }
}

SynchronousQueue 就是不存队列的,里面是不存数据的。源码里面不存在存储单位。
这个方法就是等待,

PS:ArrayList链表,Set不重复链表,Queue队列,90%的可能都是使用现有JDK已经提供的方法,很少自己去实现这些功能。针对这几个源码可以好好的看下,尝试画下图,其实花不了多少时间的,JVM,JDK都是很基础的东西,翻过这座山就比别人强,一定要手把手的摸过,手把手的爬过去的,好记性不如烂笔头,一定要实战才能出真知,磨刀不误砍柴工,就是从哪个阶段过来的,理解都渴望学习新框架,新技术的心情,相信厚积薄发吧。

上一篇下一篇

猜你喜欢

热点阅读