11.并发容器类二
2020-03-03 本文已影响0人
强某某
List
- ArrayList
ArrayList<String> arrayList=new ArrayList<>();
arrayList.add("a");
arrayList.add("b");
arrayList.add("c");
Iterator<String> iterator=arrayList.iterator();
while (iterator.hasNext()) {
// iterator.remove();//IllegalStateException
// arrayList.remove(iterator.next());//ConcurrentModificationException
}
总结:arraylist并发不安全,底层是数组,遍历同时删除,会导致异常;可通过CopyOnWriteArrayList实现遍历时修改
- CopyOnWriteArrayList容器即写时复制的容器和ArrayList比较,优点是并发安全,缺点有两个:
- 多个内存占用:写数据时copy一份完整的数据,单独进行操作。占用双份内存
- 数据一致性:数据写完后,其他线程不一定时马上读取到最新内容
-
底层也是数组实现
5.png
CopyOnWriteArrayList<String> arrayList=new CopyOnWriteArrayList<>();
arrayList.add("a");
arrayList.add("b");
arrayList.add("c");
Iterator<String> iterator=arrayList.iterator();
while (iterator.hasNext()) {
// iterator.remove();//IllegalStateException 也会有异常
arrayList.remove(iterator.next());//正常
}
Set
- HashSet:基于HashMap实现,线程不安全
- CopyOnWriteArraySet: 基于CopyOnWriteArrayList,线程安全
- ConcurrentSkipListSet:基于ConcurrentSkipListMap,线程安全,有序,查询快
Queue
队列数据结构的实现,分为阻塞队列和非阻塞队列。下列的蓝色区块,为阻塞队列特有的方法

- 非阻塞队列实现的都是Queue
- 阻塞队列实现的都是BlockingQueue
- 阻塞的put方法内部通过Conditon(底层park/unpark)配合可重入锁实现等待唤醒机制
- ArrayBlockingQueue
import java.util.concurrent.ArrayBlockingQueue;
//它是基于数组的阻塞队列,此队列按FIFO(先进先出)原则对元素进行排序
public class ArrayBlockingQueueDemo {
public static void main(String[] args) throws InterruptedException {
//构造时需要指定容量,可以选择是否需要公平(最闲进入阻塞的,先操作),此时是针对可重入锁初始化时候是公平还是非公平
//同步通过可重入锁实现
ArrayBlockingQueue<String> queue=new ArrayBlockingQueue<>(3,false);
new Thread(()->{
while (true) {
try {
System.out.println("取到数据:"+queue.poll());//poll非阻塞
Thread.sleep(1000L);
} catch (InterruptedException e) {
}
}
}).start();
Thread.sleep(3000L);
for (int i = 0; i < 6; i++) {
new Thread(()->{
try {
queue.offer(Thread.currentThread().getName());//offer非阻塞,满了返回false
// queue.put(Thread.currentThread().getName());//put阻塞,如果塞满了,不会继续执行,直到其他线程取出数据
System.out.println(Thread.currentThread()+"塞入完成");
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
}
- LinkedBlockingQueue
import java.util.concurrent.LinkedBlockingQueue;
public class LinkedBlockingQueueDemo {
public static void main(String[] args) throws InterruptedException {
//底层是链表,构造时需要指定容量,默认Integer.MAX_VALUE
//同步通过可重入锁实现
LinkedBlockingQueue<String> queue=new LinkedBlockingQueue<>(3);
new Thread(()->{
while (true) {
try {
System.out.println("取到数据:"+queue.poll());//poll非阻塞
Thread.sleep(1000L);
} catch (InterruptedException e) {
}
}
}).start();
Thread.sleep(3000L);
for (int i = 0; i < 6; i++) {
new Thread(()->{
try {
queue.offer(Thread.currentThread().getName());//offer非阻塞,满了返回false
// queue.put(Thread.currentThread().getName());//put阻塞,如果塞满了,不会继续执行,直到其他线程取出数据
System.out.println(Thread.currentThread()+"塞入完成");
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
}
- PriorityQueue-优先级队列
import java.util.Comparator;
import java.util.PriorityQueue;
/**
* 是一个带优先级的队列,而不是先进先出队列
* 元素按优先级顺序被移除,该队列也没有上限
* 没有容量限制的,自动扩容
* 虽然次队列逻辑上是无界的,但是由于资源被耗尽,所以试图执行添加操作可能会导致oom
* 但是如果队列为空
* 那么取元素的操作take就会被阻塞,所以它的检索操作是受阻的
* 另外,入该队列中的元素要具有比较能力
*
* 该队列是有序的插入的时候会进行比对
*/
public class PriorityQueueDemo {
public static void main(String[] args) {
PriorityQueue<String> priorityQueue=new PriorityQueue<>();
// PriorityQueue<String> priorityQueue=new PriorityQueue<>(new Comparator<String>() {
// @Override
// public int compare(String o1, String o2) {
//可自定义比较对象的规则:默认是元素直接的比对
// return 0;
// }
// });
priorityQueue.add("c");
priorityQueue.add("a");
priorityQueue.add("b");
System.out.println(priorityQueue.poll());
System.out.println(priorityQueue.poll());
System.out.println(priorityQueue.poll());
}
/**
* 输出:
* a
* b
* c
*/
}
- DelayQueue-延迟队列-底层基于优先级队列
import java.util.Date;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
public class DelayQueueDemo {
public static void main(String[] args) {
DelayQueue<Message> delayQueue=new DelayQueue();
//这条消息5秒后发送
Message message=new Message("messgae-00001",new Date(System.currentTimeMillis()+5000L));
delayQueue.add(message);
while (true) {
//循环进入,同时试图取出,但是取出操作内部通过优先级操作,会调用compareTo(内部又会调用getDelay)从而判断时间是否符合取出条件
System.out.println(delayQueue.poll());
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
//重要:线程池的定时调度就是这么实现的
}
}
}
class Message implements Delayed{
String content;
Date sendTime;
public Message(String s, Date date) {
this.content=s;
this.sendTime=date;
}
//判断当前这条消息是不是已经到了需要处理的时间
@Override
public long getDelay(TimeUnit unit) {
//默认纳秒
long duration=sendTime.getTime()-System.currentTimeMillis();
return TimeUnit.NANOSECONDS.convert(duration,TimeUnit.MILLISECONDS);
}
//比较规则,当前是通过时间比较
@Override
public int compareTo(Delayed o) {
return o.getDelay(TimeUnit.NANOSECONDS)>this.getDelay(TimeUnit.NANOSECONDS)?1:-1;
}
}
- ConcurrentLinkedQueue
- 线程安全依赖于cas实现
- 底层链表
- SynchronousQueue - 单个阻塞队列
//一个很神奇的队列,一次只能放入一个,然后必须取出之后才能继续放
public class SynchronousQueueDemo {
public static void main(String[] args) throws InterruptedException {
SynchronousQueue<String> queue=new SynchronousQueue<>();
// queue.add("a");//lllegalStateExection
// queue.offer("a");//不报错,但是也是无效
System.out.println(queue.poll());//非阻塞 输出null
//阻塞式的用法
new Thread(()->{
try {
System.out.println("等数据...");
System.out.println(queue.take());
System.out.println("执行完毕...");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
Thread.sleep(1000L);
System.out.println("准备塞数据了");
queue.put("a");//等待有其他操作取走
}
}
补充:还有一大类Vector(栈),后进先出,类似于队列,不在细写