HashMap 的不安全性引发的

2019-04-29  本文已影响0人  Wi1ls努力努力再努力

HashMap 没有任何锁的机制,注定了 HashMap 是线程不安全的。
1.当两个线程put 数据的时候,恰好两个数据哈希冲突在同一个 slot,包括在这个 slot 构造单链表的时候。
2.当表的大小达到了负载(默认表的大小 16,负载因子 0.75),此时要进行扩容,扩容的实质是实例化新的数组,将老的数组迁移过来。在迁移的过程中,会有多线程安全造成的死循环。主要在transfer(newTable)。当你看过 HashMap 的时候,发现 transfer(Entry[] newTable)是逆序转移。
比如 oldTable[1]有单链表: A->B->C;假设在新链表中,不幸 A,B,C 哈希冲突,并且其都在 newTable[2]处,则 newTable[2]有单链表:C->B->A

@jdk6 中的实现
void transfer(Entry[] newTable) {
        Entry[] src = table;
        int newCapacity = newTable.length;
        for (int j = 0; j < src.length; j++) {
            Entry<K,V> e = src[j];
            if (e != null) {
                src[j] = null;
                do {
                    Entry<K,V> next = e.next;
                    int i = indexFor(e.hash, newCapacity);
                    e.next = newTable[i];
                    newTable[i] = e;
                    e = next;
                } while (e != null);
            }
        }
    }

关于在 transfer 中的死循环可以参见这个https://coolshell.cn/articles/9606.html,其中在并发下的 Rehash 我觉得不好的是在线程 1 唤醒二,其实操作的是线程 2 已经 rehash 完成的表,而并非一个新表,因为对于线程 1 和线程2,newTable 是同一个,结果还是一个循环。


HashTable 是线程安全的,看源码可以看到几乎所有的方法都 syncronized 进行了同步,当多线程进行抢占的时候,效率是非常低的。


另外对于 HashMap,是允许 Key 或 value为null 的,而 HashTable 不允许 key 为 null,也不允许 value 为 null

@HashMap(jdk6)
public V put(K key, V value) {
        if (key == null)
            return putForNullKey(value);
        int hash = hash(key.hashCode());
        int i = indexFor(hash, table.length);
        for (Entry<K,V> e = table[i]; e != null; e = e.next) {
            Object k;
            if (e.hash == hash && ((k = e.key) == key || key.equals(k))) {
                V oldValue = e.value;
                e.value = value;
                e.recordAccess(this);
                return oldValue;
            }
        }

        modCount++;
        addEntry(hash, key, value, i);
        return null;
    }

 private V putForNullKey(V value) {
        for (Entry<K,V> e = table[0]; e != null; e = e.next) {
            if (e.key == null) {
                V oldValue = e.value;
                e.value = value;
                e.recordAccess(this);
                return oldValue;
            }
        }
        modCount++;
        addEntry(0, null, value, 0);
        return null;
    }

@HashTable
public synchronized V put(K key, V value) {
        // 如果 value 为 null
        if (value == null) {
            throw new NullPointerException();
        }

        Entry tab[] = table;
        //如果 key 为 null,则也挂了
        int hash = key.hashCode();
        int index = (hash & 0x7FFFFFFF) % tab.length;
        for (Entry<K,V> e = tab[index] ; e != null ; e = e.next) {
            if ((e.hash == hash) && e.key.equals(key)) {
                V old = e.value;
                e.value = value;
                return old;
            }
        }

        modCount++;
        if (count >= threshold) {
            // Rehash the table if the threshold is exceeded
            rehash();

            tab = table;
            index = (hash & 0x7FFFFFFF) % tab.length;
        }

        // Creates the new entry.
        Entry<K,V> e = tab[index];
        tab[index] = new Entry<K,V>(hash, key, value, e);
        count++;
        return null;
    }

集合系列


对于多线程并发,有并发集合 ConcurrentHashMap支持,ConcurrentHashMap 的核心是利用将数据分段,不用的段用不同锁,这样可以让不同线程在不同的段可以并发,只有在不同的线程在相同段才进行同步。降低了同步发生的概率。

//initialCapacity:初始容量大小;
//loadFactory:负载因子
//concurrencyLevel:线程并发数
public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        //最大是 1<<16(65536)
        if (concurrencyLevel > MAX_SEGMENTS)
            concurrencyLevel = MAX_SEGMENTS;

        // Find power-of-two sizes best matching arguments
        int sshift = 0;
        int ssize = 1;
        //sshift 和 ssize 是和 2 的指数次相关的,
        //  比如 concurrencyLevel 处于 2^(n-1)~2^n之间,
        //那么ssize 就是 2^n,sshift就是 n
        while (ssize < concurrencyLevel) {
            ++sshift;
            ssize <<= 1;
        }
        segmentShift = 32 - sshift;
        segmentMask = ssize - 1;
        this.segments = Segment.newArray(ssize);

        if (initialCapacity > MAXIMUM_CAPACITY)
            initialCapacity = MAXIMUM_CAPACITY;
        int c = initialCapacity / ssize;
        if (c * ssize < initialCapacity)
            ++c;
        int cap = 1;
        while (cap < c)
            cap <<= 1;

        for (int i = 0; i < this.segments.length; ++i)
            this.segments[i] = new Segment<K,V>(cap, loadFactor);
    }

//从上分析,如果传入的 concurrencyLevel 是 15,则 分段 segments[]的长度为16, 传入的 initialCapacity 为31 ,则c是3(保证 concurrenclLevel*c>initialCapacity),则 cap=23(2c),则每段 Segment 的初始值为 cap=8;

Segment(int initialCapacity, float lf) {
            loadFactor = lf;
            setTable(HashEntry.<K,V>newArray(initialCapacity));
        }

当想向ConcurrentHaspMap存入元素的时候,可以看到 key 和 value 都不能为 null

public V put(K key, V value) {
        if (value == null)
            throw new NullPointerException();
        int hash = hash(key.hashCode());
        return segmentFor(hash).put(key, hash, value, false);
    }

final Segment<K,V> segmentFor(int hash) {
        return segments[(hash >>> segmentShift) & segmentMask];
    }

@ConcurrentHashMap
 V put(K key, int hash, V value, boolean onlyIfAbsent) {
            lock();
            try {
                int c = count;
                if (c++ > threshold) // ensure capacity
                    rehash();
                HashEntry<K,V>[] tab = table;
                int index = hash & (tab.length - 1);
                HashEntry<K,V> first = tab[index];
                HashEntry<K,V> e = first;
                while (e != null && (e.hash != hash || !key.equals(e.key)))
                    e = e.next;

                V oldValue;
                if (e != null) {
                    oldValue = e.value;
                    if (!onlyIfAbsent)
                        e.value = value;
                }
                else {
                    oldValue = null;
                    ++modCount;
                    tab[index] = new HashEntry<K,V>(key, hash, first, value);
                    count = c; // write-volatile
                }
                return oldValue;
            } finally {
                unlock();
            }
        }

也可以看到,先利用 hash 值,从 segements[]从选取对应的 Segment,然后再利用这个 hash 进行 mode 选取对应的 Entry。
可以看到,在 ConcurrentHashMap 的锁是在 put()@Segment 中的,并且用的是可重入所 ReentrantLock,因此比 HashTable 性能好。


ConCurrentHashMap对于 get()方法没有加锁,因为在内部 HashEntry<K,V>[ ] table 是 volatile 的,可以保证线程的可见性。

上一篇下一篇

猜你喜欢

热点阅读