58 concurrentHashMap

2020-10-03  本文已影响0人  滔滔逐浪

jdk1.8 ConcurrentHashMap
1,去除了Segment 分段锁
2,synchronized+cas 保证node节点线程安全问题

HashMap1.8与jdk1.8 ConcurrentHashMap 数据结构模型是一样的
concurrentHashMap 数组+链表+红黑树
concurrentHashMap对我们 index 下标对应的node 节点上锁
ConcurrenthashMap锁的竞争:
多个线程同时落到put key的时候。如果 多个key都落入到同一个index node节点的时候;

如果没有落到一个index node 节点不需要做锁的竞争。
注意:
只需要计算一次index的值。

区别:
hashtable 对我们整个Table 数组上锁。
jdk1.7 ConcurrentHashmap

1,基于Segment 分段锁设计
2,lock+cas 保证node节点线程安全问题
ConcurrenthashMap1.5 synchronized性能没有做优化 tryLock方法
需要计算2次index 值: 第一次计算存放哪个Segement 对象中。
第二次计算Segement 对象中哪个hashEntry<k,v>[]table;

使用传统的hashTable 保证线程安全问题,是采用synchronized 锁将整个hashTable中的数组锁住,在多个线程中只允许一个线程访问put或者get,效率非常低,但是能保证线程安全。
[图片上传失败...(image-c18fcf-1601685672405)]

jdk官方不推荐在多线程的情况下使用hashtable 或者hashmap 建议使用Concurrenthashmap 分段锁,效率非常高。
Concurrenthashmap 将一个大 的 hashmap 集合拆成n多个不同的小的hashtable(segement)。默认的情况下分成16个不同的segment每个segment中都有独立的hashEntry<k,v>[]table;
简单实现Concurrenthashmap:

package com.taotao.hashmap001;

import java.util.Hashtable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;

/**
 *@author tom
 *Date  2020/9/27 0027 7:19
 *concurrentHashmap底层原理
 */
public class MyConcurrentHashMap<K, V> {
    /**
     * segments
     */
    private Hashtable<K, V>[] segments;

    public MyConcurrentHashMap() {
        segments = new Hashtable[16];
    }

    public void put(K k, V v) {
        //第一次计算index, 计算key存放在那个hashtable
        int segmentIndex = k.hashCode() & (segments.length - 1);
        Hashtable<K, V> segment = segments[segmentIndex];
        if (segment == null) {
            segment = new Hashtable<>();
        }
        segment.put(k, v);
      segments[segmentIndex]=segment;
    }


    public V get(K k) {
        //第一次计算index, 计算key存放在那个hashtable
        int segmentIndex = k.hashCode() & (segments.length - 1);
        Hashtable<K, V> segment = segments[segmentIndex];
        if (segment != null) {
            return segment.get(k);
        }
        return null;

    }

    public static void main(String[] args) {
        MyConcurrentHashMap<String, String> myConcurrentHashMap = new MyConcurrentHashMap<>();
        ConcurrentHashMap map=new ConcurrentHashMap();

        for (int i = 0; i < 10; i++) {
            myConcurrentHashMap.put(i + "", i + "");
        }
        for (int i = 0; i < 10; i++) {
            myConcurrentHashMap.get(i + "");
            System.out.println(  myConcurrentHashMap.get(i + ""));
        }
    }
}


核心源码分析:

1,无参构造函数分析:
initialCapacity  --16
loadFactor  hashEntry<K,V>[]table; 加载因子 0.75
concurrnetcyLevel并发级别 默认是16
2,并发级别是能够大于2的16次方
if(concurrentlevel > MAX_SEGMENTS)
concurrentlevel =MAX_SEGMENTS;
3,sshift 左移位的次数 ssize 作用,记录segment数组大小。
int sshift=0;
int ssize=1;
while(ssize < concurrentcyLevel){
   ++sshift=0;
int ssize=1;
while(ssize<concurrencyLevel){
       ++sshift;
ssize<<=1;

}
}

###4,segmentShift segmentMask: ssize-1 做与运算的时候能够将key均匀存放:
this.segmentSHift=32-shift;
this.segmentmask=ssize-1;
###5.初始化 Segment0 赋值下标0 的位置
Segment<K,V>s0=new Segment<K,V>(loadFactory,(int)(cap *loadFactor),(hashEntry<K,V>[])new hashEntry[cap]);
####6 采用cas修改复制给Segment数组;
UNSAFE.putOrderedObject(ss,SBASE,s0);//or







put方法底层实现:


        Segment<K,V> s;
        if (value == null)
            throw new NullPointerException();
        ###计算key存放那个Segment数组下标位置;
        int hash = hash(key);
        int j = (hash >>> segmentShift  2 8) & segmentMask 15;
        ###使用cas 获取Segment[10]对象 如果没有获取到的情况下,则创建一个新的segment对象
        if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
             (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
            s = ensureSegment(j);
        ### 使用lock锁对put方法保证线程安全问题
        return s.put(key, hash, value, false);



0000 0000 00000 0000 0000 0000 0000 0011
                                    0000 0000 00000 0000 0000 0000 0000 0011



深度分析:
Segment<K,V> ensureSegment(int k) 
  
    private Segment<K,V> ensureSegment(int k) {
        final Segment<K,V>[] ss = this.segments;
        long u = (k << SSHIFT) + SBASE; // raw offset
        Segment<K,V> seg;
        ### 使用UNSAFE强制从主内存中获取 Segment对象,如果没有获取到的情况=null
        if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
            ## 使用原型模式 将下标为0的Segment设定参数信息 赋值到新的Segment对象中
            Segment<K,V> proto = ss[0]; // use segment 0 as prototype
            int cap = proto.table.length;
            float lf = proto.loadFactor;
            int threshold = (int)(cap * lf);
            HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
            #### 使用UNSAFE强制从主内存中获取 Segment对象,如果没有获取到的情况=null
            if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                == null) { // recheck
                ###创建一个新的Segment对象
                Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
                while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                       == null) {
                    ###使用CAS做修改
                    if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
                        break;
                }
            }
        }
        return seg;
}

   final V put(K key, int hash, V value, boolean onlyIfAbsent) {
    ###尝试获取锁,如果获取到的情况下则自旋
            HashEntry<K,V> node = tryLock() ? null :
                scanAndLockForPut(key, hash, value);
            V oldValue;
            try {
                HashEntry<K,V>[] tab = table;
                ###计算该key存放的index下标位置
                int index = (tab.length - 1) & hash;
                HashEntry<K,V> first = entryAt(tab, index);
                for (HashEntry<K,V> e = first;;) {
                    if (e != null) {
                        K k;
                        ###查找链表如果链表中存在该key的则修改
                        if ((k = e.key) == key ||
                            (e.hash == hash && key.equals(k))) {
                            oldValue = e.value;
                            if (!onlyIfAbsent) {
                                e.value = value;
                                ++modCount;
                            }
                            break;
                        }
                        e = e.next;
                    }
                    else {
                        if (node != null)
                            node.setNext(first);
                        else
                            ###创建一个新的node结点 头插入法
                            node = new HashEntry<K,V>(hash, key, value, first);
                        int c = count + 1;
                          ###如果达到阈值提前扩容
                        if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                            rehash(node);
                        else
                            setEntryAt(tab, index, node);
                        ++modCount;
                        count = c;
                        oldValue = null;
                        break;
                    }
                }
            } finally {
                ###释放锁
                unlock();
            }
            return oldValue;
        }

核心Api

GetObjectVolatile
此方法和上面的getObject功能类似,不过附加了'volatile'加载语义,也就是强制从主存中获取属性值。类似的方法有getIntVolatile、getDoubleVolatile等等。这个方法要求被使用的属性被volatile修饰,否则功能和getObject方法相同。

tryLock() 方法是有返回值的,他表示用来尝试获取锁,如果获取成功,则返回true,

如果获取失败(其他线程已经获取到锁),则返回false,这个方法无论如何都会立即返回,在拿不到锁的时候不会一直在哪里等待。

[图片上传失败...(image-748a07-1601689703313)]

   /** Implementation for put and putIfAbsent */
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
//计算该key存放的index下标位置 查找node节点  如果没有发生index冲突的
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
//有多个线程同时修改 没有发生index冲突下标位置  cas修改。
//如果cas修改成功的话在退出自旋
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
//synchronized 的时候:index已经发生冲突 使用node节点上锁
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
//转红黑树
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);
        return null;
    }

binCount //记录数组长度 ,如果长度>8转为红黑树。

    for (Node<K,V>[] tab = table;;) {}  // 自旋
  if (tab == null || (n = tab.length) == 0)
            tab = initTable();       //如果table为空 则初始化
  /**
     * Initializes table, using the size recorded in sizeCtl.
     */
    private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        while ((tab = table) == null || tab.length == 0) {
//如果发现有其他线程正在扩容的时候,当前线程释放cpu执行权
            if ((sc = sizeCtl) < 0)
                Thread.yield(); // lost initialization race; just spin
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
//使用cas 修改当前的sizeCtl 为-1   sizeCtl 默认为0,-1表示其他的线程已经在扩容,-2表示2个其他线程再扩容
                    if ((tab = table) == null || tab.length == 0) {
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
//对table 默认长度为16
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        sc = n - (n >>> 2);
                    }
                } finally {
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }


image.png

[图片上传失败...(image-572d22-1601775904418)]

上一篇 下一篇

猜你喜欢

热点阅读