知识回顾|线程安全集合
线程安全
java.util.concurrent包的层次结构:
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/package-tree.html
- 快速失败和安全失败
它们是针对迭代器而言的。
快速失败:当在迭代一个集合的时候,如果有另外一个线程在修改这个集合,就会抛出ConcurrentModification异常,java.util下都是快速失败。
安全失败:在迭代时候会在集合二层做一个拷贝,所以在修改集合上层元素不会影响下层。在java.util.concurrent下都是安全失败
Collections.synchronized
① 实现原理
在 List的操作外包加了一层 synchronize 同步控制;需注意的是,当用户通过迭代器遍历返回列表时必须手动同步。
CopyOnWriteArrayList
① 如何保证线程安全的
内部持有锁lock -- 实现了对CopyOnWriteArrayList集合操作的互斥访问
“动态数组”机制 -- 内部有个“volatile数组”(array)来保存数据,通过volatile保证“读取到的数据总是最新的”。
“写时复制”机制 -- 在“添加/修改/删除”数据时,都会进行一次数组复制操作,然后对新复制的数组进行些操作,最后再将该数组赋值给“volatile数组”。而读操作并没有对数组修改,不会产生线程安全问题。
② 适用场景及缺陷
CopyOnWriteArrayList 和 Collections.synchronizedList 是实现线程安全的列表的两种方式。
多线程下,CopyOnWriteArrayList的写操作性能很差,读操作性能较好;而Collections.synchronizedList的写操作性能较好,读操作较差。
在不同的应用场景下,应该选择不同的多线程安全实现类。
③ 为什么没有ConcurrentArrayList
java.util.concurrent包中没有加入并发的ArrayList实现的主要原因是:很难去开发一个通用并且没有并发瓶颈的线程安全的List。
像“Array List”这样的数据结构,你不知道如何去规避并发的瓶颈。拿contains() 这样一个操作来说,当你进行搜索的时候如何避免锁住整个list?add(int index, E element)的操作可能需要对index之后的元素,可能是整个数组都加锁?
Queue 和 Deque 有并发的实现(ConcurrentLinkedQueue)是因为他们的接口相比List的接口有更多的限制,这些限制使得实现并发成为可能。
http://ifeve.com/why-is-there-not-concurrent-arraylist-in-java-util-concurrent-package/
ConcurrentHashMap
① 实现原理
jdk7:使用分段锁,类似于将hashmap又抽象了一层,每个segment就是一个hashmap,先将数据hash到segment的槽位,然后再hashsegment中hashmap的槽位,当插入或删除时先对segment的槽位加锁。
jdk8:使用synchronized和cas,底层和HashMap一样,但是在插入的时候会锁住单个槽位,与jdk7相比,锁的粒度小了很多。
② 扩容时做的优化
并发扩容机制-——多线程同时参与扩容
数组桶上为链表结构时,扩容源码大致逻辑:
1.创建 nextTable数组,新容量是旧容量的 2 倍。
2.将原 table数组 的所有桶逆序分配给多个线程,每个线程每次最小分配 16 个桶,防止资源竞争导致的效率下降,指定范围的桶可能分配给多个线程同时处理。
3.扩容时遇到空的桶,采用 CAS 设置为 ForwardingNode 节点,表示该桶扩容完成。
4.扩容时遇到 ForwardingNode 节点,表示该桶已扩容过了,直接跳过。
5.一个桶内的链表迁移时根据 lastRun 节点拆分成了两条新的链表,迁移完成将桶设置为 ForwardingNode 节点。
6.其他线程想增/删元素时,如果访问的桶是 ForwardingNode 节点,则表明当前正处于扩容状态,协助一起扩容完成后再完成相应的数据更改操作。
链表迁移图示
③ 复合操作可能出现的原子性问题
使用ConcurrentHashMap复合操作不当,也可能出现并发问题。
例如:containsKey + put --> putIfAbsent
import cn.hutool.core.thread.ThreadUtil;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* 判断用户是否存在,不存在则注册,正常情况只会注册一次
*/
@Slf4j
public class ConcurrentHashMapTest {
private final Map<String, User> userMap = new ConcurrentHashMap<>();
@Test
public void testRegister1(){
ConcurrentHashMapTest concurrentHashMapTest = new ConcurrentHashMapTest();
ThreadUtil.concurrencyTest(100, () -> concurrentHashMapTest.registerUnsafe(new User("张三", 1)));
}
public void registerUnsafe(User user) {
if (userMap.containsKey(user.getUsername())) {
log.info("用户{}已存在, {}", user.getUsername(), user.getAge());
} else {
userMap.put(user.getUsername(), user);
log.info("用户注册成功======================, {}, {}", user.getUsername(), user.getAge());
}
}
@Test
public void testRegister2(){
ConcurrentHashMapTest concurrentHashMapTest = new ConcurrentHashMapTest();
ThreadUtil.concurrencyTest(100, () -> concurrentHashMapTest.registerSafeSynchronized(new User("张三", 1)));
}
public synchronized void registerSafeSynchronized(User user) {
if (userMap.containsKey(user.getUsername())) {
} else {
userMap.put(user.getUsername(), user);
log.info("用户注册成功, {}, {}", user.getUsername(), user.getAge());
}
}
@Test
public void testRegister3(){
ConcurrentHashMapTest concurrentHashMapTest = new ConcurrentHashMapTest();
ThreadUtil.concurrencyTest(100, () -> concurrentHashMapTest.registerSafe(new User("张三", 1)));
}
public void registerSafe(User user) {
User absent = userMap.putIfAbsent(user.getUsername(), user);
if (absent == null) {
log.info("用户注册成功, {}, {}", user.getUsername(), user.getAge());
}
}
}
@Data
@NoArgsConstructor
@AllArgsConstructor
class User {
private String username;
private int age;
}
ConcurrentSkipListMap
① 什么是ConcurrentSkipListMap?
通过跳表实现的线程安全的、有序的哈希表。
跳表是可以实现二分查找的多层的有序链表,其插入、删除和查找的时间复杂度为O(logn),空间复杂度为O(n)。
② TreeMap、ConcurrentSkipListMap、ConcurrentHashMap的区别?
| 数据结构 | 线程安全性 | 是否有序 | |
|---|---|---|---|
| TreeMap | 红黑树 | 非线程安全 | key有序 |
| Collections.synchronizedSortedMap(TreeMap) | 红黑树 | 线程安全 | key有序 |
| ConcurrentSkipListMap | 跳表 | 线程安全 | key有序 |
| ConcurrentHashMap | 数组、单向链表、红黑树 | 线程安全 | key无序 |
BlockingQueue
① 什么是BlockingQueue?
BlockingQueue,是java.util.concurrent 包提供的用于解决并发生产者 - 消费者问题的最有用的类,它的特性是在任意时刻只有一个线程可以进行take或者put操作,并且BlockingQueue提供了超时return null的机制,在许多生产场景里都可以看到这个工具的身影。
在某些情况下对阻塞队列的访问可能会造成阻塞:
当队列中没有数据的情况下,消费者端的所有线程都会被自动阻塞(挂起),直到有数据放入队列。
当队列中填满数据的情况下,生产者端的所有线程都会被自动阻塞(挂起),直到队列中有空的位置,线程被自动唤醒。
② BlockingQueue大家族有哪些?
● ArrayBlockingQueue 由数组支持的有界队列
ArrayBlockingQueue 的大小一旦初始化就无法修改。
ArrayBlockingQueue 内部以 FIFO(先进先出)的顺序对元素进行存储,队列中的头元素在所有元素之中是放入时间最久的那个,而尾元素则是最短的那个。
● LinkedBlockingQueue 由链接节点支持的可选有界队列
如果没有定义上限,将使用 Integer.MAX_VALUE 作为上限。
队列中的头元素在所有元素之中是放入时间最久的那个,而尾元素则是最短的那个。
● PriorityBlockingQueue 由优先级堆支持的无界优先级队列
基于数组,数据结构为二叉堆,数组第一个也是树的根节点总是最小值。
所有插入到 PriorityBlockingQueue 的元素必须实现 java.lang.Comparable 接口,该队列中元素的排序就取决于 Comparable 实现。
● DelayQueue 由优先级堆支持的、基于时间的调度队列
注入其中的元素必须实现Delayed接口,元素进入队列后会进行排序,只有getDelay() 方法返回的值为0的时候,该元素才有资格从队列中取出
● SynchronousQueue 同时只能够容纳单个元素的同步队列
不提供任何空间来存储元素,可以选择公平模式(先来后到)和非公平模式。
如果该队列已有一元素的话,试图向队列中插入一个新元素的线程将会阻塞,直到另一个线程将该元素从队列中抽走。
同样,如果该队列为空,试图向队列中抽取一个元素的线程将会阻塞,直到另一个线程向队列中插入了一条新的元素。
参考文章
https://pdai.tech/md/java/collection/java-collection-all.html
https://javadoop.com/post/java-concurrent-queue
https://segmentfault.com/a/1190000039315702