一文读懂JAVA并发容器类ArrayList,Set,Queue
上节说了ConcurrentHashMap,之前的知识会映射到今天的内容点上面,学了这些方法到底怎么用,更多List,Set,Queue要去看源码的时候,掌握现有知识点,源码对你难度不太大了,里面的变量命名比较麻烦。本次说说List,重要的说里面的原理,使用这块老铁们应该都明白。
(一)ArrayList
- ① 介绍
List 接口的大小可变数组的实现。实现了所有可选列表操作,并允许包括 null 在内的所有元素。
- ② 内部的存储方式
ArrayList默认有一个空的数组, 数据的顺序插入,如果当前的数组长度不够存储的时候,进行扩容处理,直接去创建一个新的数组,创建完成之后,把数组进行拷贝,本身是线程非安全的,不要一边遍历,一边删除代码。扩容的时候存在i++,之前也说过i++的情况下很容易存在高并发问题。
(二)CopyOnWriteArrayList
- ① 介绍
List 接口的大小可变数组的实现。实现了所有可选列表操作,并允许包括 null 在内的所有元素。
- ② 内部的存储方式
内部持有一个ReentrantLock lock = new ReentrantLock();底层是用volatile transient声明的数组 array
读写分离,写时复制出一个新的数组,完成插入。修改或者移除操作后将新数组赋值给array
(三)ArrayList 和 CopyOnWriteArrayList
CopyOnWriteArrayList容器即写时复制的容器和ArrayList比较,优点是并发安全,缺点有两个
- 多了内存占用,写数据是copy一份完成的数据,单独进行操作,占用两份内存。
- 数据一致性:数据写完之后,其他线程不一定是马上读取到最新内容。
(四)Set
- ① 介绍
无序(没有下标) 集合中的元素不重复。
- ② Set集合
- ③ HashSet
因为是基于HashMap的,只要保证key不重复,其实内部就不会重复。
- ④ CopyOnWriteArraySet
CopyOnWriteArrayList中允许有重复的元素;但是,CopyOnWriteArraySet是一个集合,所以它不能有重复集合,因此,CopyOnWriteArrayList额外提供了addIfAbsent()和addAllAbsent()这两个添加元素的API,通过这些API来添加元素时,只有当元素不存在时才执行添加操作!
- ⑤ ConcurrentSkipListSet
所有操作都是无阻塞的,所有操作都可以并行,包括写,实现了ConcurrentMap接口,直接支持一些原子复合操作(与ConcurrentHashMap类似),可排序(与TreeMap一样),默认按键自然有序,可以传递比较器自定义排序,实现了SortedMap和NavigableMap接口。
(五)Queue
- ① 介绍
基本上,一个队列就是一个先入先出(FIFO)的数据结构。
- ② 阻塞和非阻塞
- 我去买一本书,立即买到了,或者没有就走了,这就是非阻塞;(编程中设置IO成非阻塞,返回后再去检查描述符,或者等待通知,然后再去读取。相当于老板告诉我可以先忙点别的,过一会再来问问,或者老板通知我。但期间这个窗口(文件描述符)别人是用不了的)("立即买到了"在IO中也需要等待,不能算非阻塞IO)
- 如果恰好书店没有,我就等一直等到书店有了这本书买到了才走,这就是阻塞;而排在我后面的人呢只有我买到了书后才能再买书了。
- 如果书店恰好没有,我就告诉书店老板,书来了告诉我一声让我来取或者直接送到我家,然后我就走了,去做别的事了,这就是异步。这时候如果很多人来买书,都是老板登记一下完事。 (从IO角度来说,“告诉我来取”,这个近似于信号驱动IO,不能算异步IO。必须书送到我家才算是异步,如果不送到我家,我想看这本书之前,终究还是需要我跑一趟)
- 前面两种情况,非阻塞和阻塞都可以称为同步。
- ③ 常用方法
- ④ ArrayBlockingQueue
ArrayBlockingQueue是采用数组实现的有界阻塞线程安全队列。如果向已满的队列继续塞入元素,将导致当前的线程阻塞。如果向空队列获取元素,那么将导致当前线程阻塞。
import java.util.concurrent.ArrayBlockingQueue;
// 它是基于数组的阻塞循环队列, 此队列按 FIFO(先进先出)原则对元素进行排序。
public class ArrayBlockingQueueDemo {
public static void main(String[] args) throws InterruptedException {
// 构造时需要指定容量(量力而行),可以选择是否需要公平(最先进入阻塞的,先操作)
ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(3, false);
// 1秒消费数据一个
new Thread(() -> {
while (true) {
try {
// queue.task();
System.out.println("取到数据:" + queue.poll()); // poll非阻塞
Thread.sleep(1000L);
} catch (InterruptedException e) {
}
}
}).start();
Thread.sleep(3000L); // 让前面的线程跑起来,上边是消费者,下面的方法是生产者。
// 三个线程塞数据
for (int i = 0; i < 3; i++) {
new Thread(() -> {
try {
queue.put(Thread.currentThread().getName()); // put阻塞(如果当前的队列已经塞满了数据,线程不会继续往下执行,等待其他线程把
// 队列的数据拿出去// )
// queue.offer(Thread.currentThread().getName()); // offer非阻塞,满了返回false
System.out.println(Thread.currentThread() + "塞入完成");
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
}
前面三秒是没有数据的,等数据存储完毕后才可以读取。用了6个线程来完成。队列对于线程池,连接池都会有队列的使用,存放一些对象和数据。
存放:里面设置了lock,拿到一把锁,然后进行入队列,数量++,索引自行维护putIndex。
移除:数量--,找到对应的节点,lock设置了一把锁。索引自行维护takeIndex(记录下一个要获取的)。
- ⑤ DelayQueue
DelayQueue是一个无界阻塞队列,只有在延迟期满时才能从中提取元素。该队列的头部是延迟期满后保存时间最长的Delayed 元素。
DelayQueue是一个用来延时处理的队列,所谓延时处理就是说可以为队列中元素设定一个过期时间,相关的操作受到这个设定时间的控制。
import java.util.Date;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
// (基于PriorityQueue来实现的)是一个存放Delayed 元素的无界阻塞队列,
// 只有在延迟期满时才能从中提取元素。该队列的头部是延迟期满后保存时间最长的 Delayed 元素。
// 如果延迟都还没有期满,则队列没有头部,并且poll将返回null。
// 当一个元素的 getDelay(TimeUnit.NANOSECONDS) 方法返回一个小于或等于零的值时,
// 则出现期满,poll就以移除这个元素了。此队列不允许使用 null 元素。
public class DelayQueueDemo {
public static void main(String[] args) throws InterruptedException {
DelayQueue<Message> delayQueue = new DelayQueue<Message>();
// 这条消息5秒后发送
Message message = new Message("message - 00001", new Date(System.currentTimeMillis() + 5000L));
delayQueue.add(message);
while (true) {
System.out.println(delayQueue.poll());
Thread.sleep(1000L);
}
// 线程池中的定时调度就是这样实现的
}
}
// 实现Delayed接口的元素才能存到DelayQueue
class Message implements Delayed {
// 判断当前这个元素,是不是已经到了需要被拿出来的时间
@Override
public long getDelay(TimeUnit unit) {
// 默认纳秒
long duration = sendTime.getTime() - System.currentTimeMillis();
return TimeUnit.NANOSECONDS.convert(duration, TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
return o.getDelay(TimeUnit.NANOSECONDS) > this.getDelay(TimeUnit.NANOSECONDS) ? 1 : -1;
}
String content;
Date sendTime;
/**
* @param content 消息内容
* @param sendTime 定时发送
*/
public Message(String content, Date sendTime) {
this.content = content;
this.sendTime = sendTime;
}
@Override
public String toString() {
return "Message{" +
"content='" + content + '\'' +
", sendTime=" + sendTime +
'}';
}
}
延迟的排序是根据谁快,谁的位置最靠前。
- ⑥ LinkedBlockingQueue
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
// 它是基于链表的队列,此队列按 FIFO(先进先出)排序元素。
// 如果有阻塞需求,用这个。类似生产者消费者场景
public class LinkedBlockingQueueDemo {
public static void main(String[] args) throws InterruptedException {
// 构造时可以指定容量,默认Integer.MAX_VALUE
LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<String>(3);
// 1秒消费数据一个
new Thread(() -> {
while (true) {
try {
System.out.println("取到数据:" + queue.poll()); // poll非阻塞
Thread.sleep(1000L);
} catch (InterruptedException e) {
}
}
}).start();
Thread.sleep(3000L); // 让前面的线程跑起来
// 三个线程阻塞数据
for (int i = 0; i < 6; i++) {
new Thread(() -> {
try {
// queue.put(Thread.currentThread().getName()); // put阻塞
queue.offer(Thread.currentThread().getName()); // offer非阻塞,满了返回false
System.out.println(Thread.currentThread() + "塞入完成");
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
}
存放数据的方式:链表的形式。
入队列用了Atomic的形式,原子性。
LinkedBlockingQueue 和 ConcurrentBlockingQueue 两个的区别是 LinkedBlockingQueue 是通过lock加锁的方式,ConcurrentBlockingQueue 是通过cas的方式。
- ⑦ PriorityBlockingQueue
import java.util.PriorityQueue;
import java.util.concurrent.PriorityBlockingQueue;
// 包装了 PriorityQueue
// 是一个带优先级的 队列,而不是先进先出队列。
// 元素按优先级顺序被移除,该队列也没有上限
// 没有容量限制的,自动扩容
// 虽然此队列逻辑上是无界的,但是由于资源被耗尽,所以试图执行添加操作可能会导致 OutOfMemoryError),
// 但是如果队列为空,
// 那么取元素的操作take就会阻塞,所以它的检索操作take是受阻的。另外,
// 入该队列中的元素要具有比较能力
public class PriorityBlockingQueueDemo {
public static void main(String[] args) {
// 可以设置比对方式
PriorityBlockingQueue<String> priorityQueue = new PriorityBlockingQueue<>(2);
priorityQueue.add("c");
priorityQueue.add("a");
priorityQueue.add("b");
System.out.println(priorityQueue.poll());
System.out.println(priorityQueue.poll());
System.out.println(priorityQueue.poll());
}
}
- ⑧ PriorityQueueDemo
import java.util.Comparator;
import java.util.PriorityQueue;
public class PriorityQueueDemo {
public static void main(String[] args) {
// 可以设置比对方式
PriorityQueue<String> priorityQueue = new PriorityQueue<>(new Comparator<String>() {
@Override //
public int compare(String o1, String o2) {
// 实际就是 元素之间的 比对。
return 0;
}
});
priorityQueue.add("c");
priorityQueue.add("a");
priorityQueue.add("b");
System.out.println(priorityQueue.poll());
System.out.println(priorityQueue.poll());
System.out.println(priorityQueue.poll());
PriorityQueue<MessageObject> MessageObjectQueue = new PriorityQueue<>(new Comparator<MessageObject>() {
@Override
public int compare(MessageObject o1, MessageObject o2) {
return o1.order > o2.order ? -1 : 1;
}
});
}
}
class MessageObject {
String content;
int order;
}
一个带优先级的 队列,而不是先进先出队列。
元素按优先级顺序被移除,该队列也没有上限, 没有容量限制的,自动扩容。
虽然此队列逻辑上是无界的,但是由于资源被耗尽,所以试图执行添加操作可能会导致 OutOfMemoryError), 但是如果队列为空, 那么取元素的操作take就会阻塞,所以它的检索操作take是受阻的。另外, 入该队列中的元素要具有比较能力
new Comparator<MessageObject>() 比较器
- ⑨ SynchronousQueue
import java.util.concurrent.SynchronousQueue;
// 这是一个神奇的队列, 因为他不存数据。 手把手的交互数据
public class SynchronousQueueDemo {
public static void main(String[] args) throws InterruptedException {
SynchronousQueue<String> synchronousQueue = new SynchronousQueue<>();
// synchronousQueue.add("a"); // IllegalStateException
// synchronousQueue.offer("a");
System.out.println(synchronousQueue.poll()); // 非阻塞
// 阻塞式的用法
new Thread(() -> {
try {
System.out.println("等数据....");
System.out.println(synchronousQueue.take());
System.out.println("执行完毕....");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
Thread.sleep(1000L);
System.out.println("准备赛数据了");
synchronousQueue.put("a");// 等待有人取走他
}
}
SynchronousQueue 就是不存队列的,里面是不存数据的。源码里面不存在存储单位。
这个方法就是等待,
PS:ArrayList链表,Set不重复链表,Queue队列,90%的可能都是使用现有JDK已经提供的方法,很少自己去实现这些功能。针对这几个源码可以好好的看下,尝试画下图,其实花不了多少时间的,JVM,JDK都是很基础的东西,翻过这座山就比别人强,一定要手把手的摸过,手把手的爬过去的,好记性不如烂笔头,一定要实战才能出真知,磨刀不误砍柴工,就是从哪个阶段过来的,理解都渴望学习新框架,新技术的心情,相信厚积薄发吧。