BiBi - 并发编程 -11- 并发容器
From:Java并发编程的艺术
- 目录
BiBi - 并发编程 -0- 开篇
BiBi - 并发编程 -1- 挑战
BiBi - 并发编程 -2- volatile
BiBi - 并发编程 -3- 锁
BiBi - 并发编程 -4- 原子操作
BiBi - 并发编程 -5- Java内存模型
BiBi - 并发编程 -6- final关键字
BiBi - 并发编程 -7- DCL
BiBi - 并发编程 -8- 线程
BiBi - 并发编程 -9- ReentrantLock
BiBi - 并发编程 -10- 队列同步器
BiBi - 并发编程 -11- 并发容器
BiBi - 并发编程 -12- Fork/Join框架
BiBi - 并发编程 -13- 并发工具类
BiBi - 并发编程 -14- 线程池
BiBi - 并发编程 -15- Executor框架
1. ConcurrentHashMap
JDK1.7中的ConcurrentHashMap
ConcurrentHashMap使用锁分段技术
HashTable效率低下,使用synchronized来保证线程安全,线程1使用put进行元素添加,线程2不但不能使用put进行添加也不能使用get来获取元素。
ConcurrentHashMap把数据分成一段一段的存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其它段的数据也能被其它线程访问。
ConcurrentHashMap由Segment数组和HashEntry数组组成,其中Segment继承ReentrantLock,static class Segment<K,V> extends ReentrantLock
,扮演锁的角色;HashEntry用于存储键值对数据。Segment和HashEntry的数据结构类似为:数组和链表结构。一个ConcurrentHashMap中包含一个Segment数组,每个Segment中包含一个HashEntry数组。当对HashEntry数组进行修改时,需要获得与他对应的Segment锁。
ConcurrentHashMap实现细节
1)为了使用【按位与的散列算法】来定位segments数组的索引,segments数组的长度必须是2的N次方。
2)定位Segment。使用Wang/Jenkins hash的变种算法对元素的hashCode进行一次再散列,来减少散列冲突。
3)get操作不需要加锁。原因是他的get方法里将要使用的共享变量都定义成了volatile类型,如:用于统计当前Segment大小的count和用于存储值的HashEntry的value。
4)put需要加锁。在插入元素前会先判断Segment里的HashEntry数组是否超过容量,在扩容的时候,首先会创建一个容量是原来容量两倍的数组,然后将原数组里的元素进行再散列后插入到新的数组里。为了高效,ConcurrentHashMap不会对整个容器进行扩容,而只对某个segment进行扩容。
put操作具体细节:Segment实现了ReentrantLock,也就带有锁的功能,当执行put操作时,会进行第一次key的hash来定位Segment的位置,如果该Segment还没有初始化,即通过CAS操作进行赋值,然后进行第二次hash操作,找到相应的HashEntry的位置,这里会利用继承过来的锁的特性,在将数据插入指定的HashEntry位置时(链表的尾端),会通过继承ReentrantLock的tryLock()方法尝试去获取锁,如果获取成功就直接插入相应的位置,如果已经有线程获取该Segment的锁,那当前线程会以自旋的方式去继续的调用tryLock()方法去获取锁,超过指定次数就挂起,等待唤醒。
5)size操作。第一种方案他会使用不加锁的模式去尝试多次计算ConcurrentHashMap的size,最多三次,比较前后两次计算的结果,结果一致就认为当前没有元素加入,计算的结果是准确的;第二种方案是如果第一种方案不符合,他就会给每个Segment加上锁,然后计算ConcurrentHashMap的size返回。
JDK1.8中的ConcurrentHashMap
JDK1.8中的ConcurrentHashMap取消了segment分段锁,而采用CAS和synchronized来保证并发安全。数据结构跟HashMap1.8的结构一样,Node数组+链表/红黑二叉树。synchronized只锁定当前链表或红黑二叉树的首节点,这样只要hash不冲突,就不会产生并发,效率又提升N倍。
Node结点
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K, V> next;
}
注意:val和next为volatile类型。key和hash为final,即只允许对数据进行查找,不允许进行修改。【HashMap的key和hash也是final】
TreeNode
TreeNode继承与Node,但是数据结构换成了二叉树结构,它是红黑树的数据的存储结构,用于红黑树中存储数据,当链表的节点数大于8时会转换成红黑树的结构,他就是通过TreeNode作为存储结构代替Node来转换成黑红树。
static final class TreeNode<K,V> extends Node<K,V> {
TreeNode<K, V> parent; // red-black tree links
TreeNode<K, V> left;
TreeNode<K, V> right;
TreeNode<K, V> prev; // needed to unlink next upon deletion
boolean red;
}
TreeBin
TreeBin从字面含义中可以理解为存储树形结构的容器,而树形结构就是指TreeNode,所以TreeBin就是封装TreeNode的容器,它提供转换黑红树的一些条件和锁的控制。
static final class TreeBin<K,V> extends Node<K,V> {
TreeNode<K, V> root;
volatile TreeNode<K, V> first;
volatile Thread waiter;
volatile int lockState;
// values for lockState
static final int WRITER = 1; // set while holding write lock
static final int WAITER = 2; // set when waiting for write lock
static final int READER = 4; // increment value for setting read lock
}
put
对当前的table进行无条件自循环直到put成功,可以分成以下六步流程来概述。
1)如果没有初始化就先调用initTable()方法来进行初始化过程。
2)如果没有hash冲突就直接CAS插入。
3)如果还在进行扩容操作就先进行扩容。
4)如果存在hash冲突,就加锁来保证线程安全,这里有两种情况,一种是链表形式就直接遍历到尾端插入,一种是红黑树就按照红黑树结构插入。
5)最后一个如果该链表的数量大于阈值8,就要先转换成黑红树的结构,break再一次进入循环。
6)如果添加成功就调用addCount()方法统计size,并且检查是否需要扩容。
-
在某个桶的迁移过程中,别的线程想要对该桶进行put操作怎么办?
一旦某个桶在迁移过程中了,必然要获取该桶的锁,所以其他线程的put操作要被阻塞,一旦迁移完毕,该桶中第一个元素就会被设置成ForwardingNode节点,所以其他线程put时需要重新判断下桶中第一个元素是否被更改了,如果被改了重新获取重新执行逻辑,如下代码 -
某个桶已经迁移完成(其他桶还未完成),别的线程想要对该桶进行put操作怎么办?
该线程会首先检查是否还有未分配的迁移任务,如果有则先去执行迁移任务,如果没有即全部任务已经分发出去了,那么此时该线程可以直接对新的桶进行插入操作(映射到的新桶必然已经完成了迁移,所以可以放心执行操作)
get
1)计算hash值,定位到该table索引位置,如果是首节点符合就返回。
2)如果遇到扩容的时候,会调用标志正在扩容节点ForwardingNode的find()方法,查找该节点,匹配就返回。
3)以上都不符合的话,就往下遍历节点,匹配就返回,否则最后就返回null。
虽然ConcurrentHashMap的读不需要锁,但是需要保证能读到最新数据,所以必须加volatile。即数组的引用需要加volatile,同时一个Node节点中的val和next属性也必须要加volatile。
size
在JDK1.8版本中,对于size的计算,在扩容和addCount()方法就已经有处理了。
扩容
https://juejin.im/post/5b00160151882565bd2582e0
其实helpTransfer()方法的目的就是调用多个工作线程一起帮助进行扩容,这样的效率就会更高,而不是只有检查到要扩容的那个线程进行扩容操作,其他线程就要等待扩容操作完成才能工作。
ForwardingNode的作用就是支持扩容操作,用于占位,将已处理的节点和空节点置为ForwardingNode,并发处理时当别的线程发现这个槽位中是 fwd 类型的节点,则跳过这个节点往后遍历。
当线程2访问到了ForwardingNode节点,如果线程2执行的put或remove等写操作,那么就会先帮其扩容。如果线程2执行的是get等读方法,则会调用ForwardingNode的find方法,去nextTable里面查找相关元素。
多线程无锁扩容的关键就是通过CAS设置sizeCtl与transferIndex变量,协调多个线程对table数组中的node进行迁移。
2. 队列
非阻塞队列ConcurrentLinkedQueue
使用循环CAS方式实现。
入队列:循环获取尾结点,只有当next节点为null时,才能添加元素。【使用CAS算法将入队节点设置成尾结点的next节点,如不成功则重试】尾结点可能是tail节点,也可能是tail节点的next节点。
阻塞队列的种类
1)ArrayBlockingQueue 数组结构的有界阻塞队列
2)LinkedBlockingQueue 链表结构的无界阻塞队列
3)PriorityBlockingQueue 支持优先级排序的无界阻塞队列
4)DelayQueue 使用优先级队列实现的无界阻塞队列
支持延时获取元素的无界阻塞队列,适用于:缓存系统的设计和定时任务的调度
5)SynchronousQueue 一个不存储元素的阻塞队列
每一个put操作必须等待一个take操作,否则不能继续添加元素。
6)LinkedTransferQueue 链表结构的无界阻塞队列
transfer方法:可以把生产者传入的元素立刻transfer给消费者,等消费者消费了才能返
回,期间CPU进行自旋。
tryTransfer方法:该方法无论 消费者是否接收,方法都会立刻返回。
7)LinkedBlockingDeque 链表结构的双向阻塞队列
运用在【工作窃取】模式中。