浅谈Collections线程安全包装和ConcurrentHa

2018-04-25  本文已影响260人  siriusing

青春须早为,岂能长少年

Collections.synchronizedXXX

线程安全包装类是怎么实现的?

        ArrayList<Integer> list = new ArrayList<>();
        List<Integer> integers = Collections.synchronizedList(list);

ArrayList 实现了RandomAccess


public class ArrayList<E> extends AbstractList<E>
        implements List<E>, RandomAccess, Cloneable, java.io.Serializable
...

Collections.synchronizedList 是这么实现的:

    //根据指定的list返回一个线程安全的list。
    public static <T> List<T> synchronizedList(List<T> list) {
        return (list instanceof RandomAccess ?
                new SynchronizedRandomAccessList<>(list) :
                new SynchronizedList<>(list));
    }

然后去看看SynchronizedRandomAccessList,这个是一个静态内部类来着,父类是SynchronizedList

    static class SynchronizedRandomAccessList<E>
        extends SynchronizedList<E>
        implements RandomAccess {

        SynchronizedRandomAccessList(List<E> list) {
            super(list);
        }
        private static final long serialVersionUID = 1530674583602358482L;
        // 省略...

    }

去看看父类SynchronizedList


    /**
     * @serial include
     */
    static class SynchronizedList<E>
        extends SynchronizedCollection<E>
        implements List<E> {
        private static final long serialVersionUID = -7754090372962971524L;

        final List<E> list;

        SynchronizedList(List<E> list) {
            super(list);
            this.list = list;
        }
        SynchronizedList(List<E> list, Object mutex) {
            super(list, mutex);
            this.list = list;
        }
        //省略...
    }

可以看到还是用synchronized来实现的,那么mutex这把锁默认是什么呢?
去看一下父类:SynchronizedCollection

/**
     * @serial include
     */
    static class SynchronizedCollection<E> implements Collection<E>, Serializable {
        private static final long serialVersionUID = 3053995032091335093L;

        final Collection<E> c;  // Backing Collection
        final Object mutex;     // Object on which to synchronize

        SynchronizedCollection(Collection<E> c) {
            this.c = Objects.requireNonNull(c);
            mutex = this;
        }

        SynchronizedCollection(Collection<E> c, Object mutex) {
            this.c = Objects.requireNonNull(c);
            this.mutex = Objects.requireNonNull(mutex);
        }

        //省略...
    }

好,现在可以知道默认的锁就是this,也就是自己,还可以在创建时订制锁。

一言蔽之,Collections.synchronizedList 是通过把list所有已知的方法都用synchronized关键字包装起来,从而达到串行访问。

ConcurrentHashMap

ConcurrentHashMap 如何实现线程安全?

JDK1.7的实现

    public V put(K key, V value) {

        Segment<K,V> s;
        if (value == null)
            throw new NullPointerException();

        //得到hash值
        int hash = hash(key);
        // 获取到段号
        int j = (hash >>> segmentShift) & segmentMask;


        if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
             (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
             //段为空,创建段
            s = ensureSegment(j);
        // 
        return s.put(key, hash, value, false);
    }

为了保证创建时的线程安全,采用了volatile和CAS操作


    private Segment<K,V> ensureSegment(int k) {
        //省略...

            if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                == null) { // recheck
                Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
                while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                       == null) {
                    if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
                        break;
                }
            }

        final V put(K key, int hash, V value, boolean onlyIfAbsent) {

            //尝试获取锁
            //tryLock:如果当前线程可以得到锁,返回true
            //scanAndLockForPut:
            HashEntry<K,V> node = tryLock() ? null :   scanAndLockForPut(key, hash, value);
            
            V oldValue;
            try {
                HashEntry<K,V>[] tab = table;
                int index = (tab.length - 1) & hash;
                HashEntry<K,V> first = entryAt(tab, index);
                for (HashEntry<K,V> e = first;;) {
                    //省略...
                }
            } finally {
                //释放锁
                unlock();
            }
            return oldValue;
        }

总结
大概总结一下就是:

JDK 1.8的实现

    static class Node<K,V> implements Map.Entry<K,V> {
        final int hash;
        final K key;
        volatile V val;
        volatile Node<K,V> next;

        Node(int hash, K key, V val, Node<K,V> next) {
            this.hash = hash;
            this.key = key;
            this.val = val;
            this.next = next;
        }

        public final K getKey()       { return key; }
        public final V getValue()     { return val; }
        public final int hashCode()   { return key.hashCode() ^ val.hashCode(); }
        public final String toString(){ return key + "=" + val; }
        public final V setValue(V value) {
            throw new UnsupportedOperationException();
        }

        public final boolean equals(Object o) {
            Object k, v, u; Map.Entry<?,?> e;
            return ((o instanceof Map.Entry) &&
                    (k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
                    (v = e.getValue()) != null &&
                    (k == key || k.equals(key)) &&
                    (v == (u = val) || v.equals(u)));
        }

这个是线程安全的,也就是说节点是线程安全的


    /** Implementation for put and putIfAbsent */
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        
        if (key == null || value == null) 
            throw new NullPointerException();

        //获得hash
        int hash = spread(key.hashCode());

        int binCount = 0;
        
        //table是volitate的,垃圾箱,第一次使用时创建
        //transient volatile Node<K,V>[] table;
        for (Node<K,V>[] tab = table;;) {
            
            Node<K,V> f;
            int n, i, fh;
            
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);
        return null;
    }

    private transient volatile int sizeCtl;

控制表初始化和resize

size()获取大小是估算值,不是实时的,直接拿历史每个槽位的统计值相加

    public int size() {
        //不加任何锁,直接数一遍,是不精确的
        long n = sumCount();
        return ((n < 0L) ? 0 :
                (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
                (int)n);
    }

    final long sumCount() {
        
        //当前槽位数
        CounterCell[] as = counterCells; 
        CounterCell a;
        long sum = baseCount;
        if (as != null) {
            //直接把每一个槽位拥有的数量加起来
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }

    /**
     * counterCells 的长度就是表长度
     * counterCells[k]=n;表示第n个节点,存在hash相同的元素为n个
     * 这n个对象以红黑树存储
     */
    private transient volatile CounterCell[] counterCells;


    //内部类,volatile保证可见性
    @sun.misc.Contended static final class CounterCell {

        volatile long value;
        CounterCell(long x) { value = x; }
    }

    /**
     * Returns the value to which the specified key is mapped,
     * or {@code null} if this map contains no mapping for the key.
     *
     * <p>More formally, if this map contains a mapping from a key
     * {@code k} to a value {@code v} such that {@code key.equals(k)},
     * then this method returns {@code v}; otherwise it returns
     * {@code null}.  (There can be at most one such mapping.)
     *
     * @throws NullPointerException if the specified key is null
     */
    public V get(Object key) {

        Node<K,V>[] tab; 
        Node<K,V> e, p;
        int n, eh;
        K ek;
        //获得hash
        int h = spread(key.hashCode());
        //如果表已创建
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {
            if ((eh = e.hash) == h) {
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }
            else if (eh < 0)
                return (p = e.find(h, key)) != null ? p.val : null;
            while ((e = e.next) != null) {
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }


    //保证读取可见性
    @SuppressWarnings("unchecked")
    static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
        return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
    }


    public V put(K key, V value) {
        return putVal(key, value, false);
    }

    /** Implementation for put and putIfAbsent */
    final V putVal(K key, V value, boolean onlyIfAbsent) {
        
        //不允许key,value为空
        if (key == null || value == null) 
            throw new NullPointerException();
        //得到hash
        int hash = spread(key.hashCode());
        int binCount = 0;

        //死循环,直到成功才退出
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; 
            int n, i, fh;

            //table 是懒初始化的,没初始化就初始化一遍
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            // 计算出table中对象的位置i,获取对象f,找不到
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                //CAS赋值到table[i]
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }// 对象
            // MOVED: 前置节点的hash
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                //在node上加锁,就是对应一个hash 加锁
                synchronized (f) {

                    if (tabAt(tab, i) == f) {
                    /*
                    MOVED     = -1; // hash for forwarding nodes
                    TREEBIN   = -2; // hash for roots of trees
                    RESERVED  = -3; // hash for transient reservations
                    HASH_BITS = 0x7fffffff; // usable bits of normal node hash
                    */  
                        // 链表节点
                        if (fh >= 0) {
                            //省略...
                        }//树节点
                        else if (f instanceof TreeBin) {
                            //省略...
                        }
                    }
                }
            }
        }
        addCount(1L, binCount);
        return null;
    }


    //CAS操作
    static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                        Node<K,V> c, Node<K,V> v) {
        return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
    }

方法总结:

对比

JDK1.7 和JDK1.8 在ConcurrentHashMaps上的对比:

synchronized和ReentrantLock 比较

        final Lock lock = new ReentrantLock(true);
        final Condition notFull = lock.newCondition();
        final Condition notEmpty = lock.newCondition();

  1. JDK1.8对 synchronized的优化已经等于或优于ReentrantLock了

上一篇 下一篇

猜你喜欢

热点阅读