Java多线程

15.多线程总结(二)

2020-04-04  本文已影响0人  任振铭

原子类

什么是原子类

原子曾经被当作不可分割的最小单位,所以原子类可以认为他的操作都是不可分割的,具有原子性,为什么需要原子类呢?对于多线程访问同一个变量,需要加锁,而加锁是比较消耗性能的,JDK1.5之后,新增的原子类提供了一种用法简单,性能高效,线程安全的更新一个变量的方式,这些类位于JUC下的atomic包,发展到JDK1.8,该包下共有17个类,包括了原子更新基本类型,原子更新数组,原子更新属性,原子更新引用几种
1.8新增的:DoubleAccumulator,DoubleAdder,LongAccumulator,LongAdder,Striped64

原子更新基本类型

发展至JDk1.8,基本类型原子类有以下几个:
AtomicBoolean AtomicLong AtomicInteger,DoubleAccumulator,DoubleAdder,LongAccumulator,LongAdder
大致可分为3类:
AtomicBoolean、AtomicInteger、AtomicLong 元老级的原子更新,方法几乎一模一样,DoubleAdder、 LongAdder 对Double、Long的原子更新性能进行优化提升,DoubleAccumulator、LongAccumulator 支持自定 义运算

原子更新数组类型

AtomicIntegerArray,AtomicLongArray,AtomicReferenceArray

原子更新属性

原子的更新某个类的某个字段时,就需要使用原子更新字段类,atomic包提供了4个类进行原子字段更新,AtomicIntegerFieldUpdater,AtomicLongFieldUpdater,AtomicStampedReference,AtomicReferenceFieldUpdater

使用上述类的时候,必须遵循以下原则:
1.字段必须是volatile类型的,在线程之间共享变量时保证立即可见
2.字段的描述类型是与调用者与操作对象字段的关系一致。也就是说调用者能够直接操作对象字段,那么就可以反射进行原子操作。
3.对于父类的字段,子类是不能直接操作的,尽管子类可以访问父类的字段。
4.只能是实例变量,不能是类变量,也就是说不能加static关键字。
5.只能是可修改变量,不能使final变量,因为final的语义就是不可修改。
6.对于AtomicIntegerFieldUpdater和AtomicLongFieldUpdater只能修改int/long类型的字段,不能修改其包装
类型(Integer/Long)。
7.如果要修改包装类型就需要使用AtomicReferenceFieldUpdater。

原子更新引用

AtomicReference:用于对引用的原子更新
AtomicMarkableReference:带版本戳的原子引用类型,版本戳为boolean类型
AtomicStampedReference:带版本戳的原子引用类型,版本戳为int类型。

容器

同步容器与并发容器

1.同步容器

Vector、HashTable -- JDK提供的同步容器类 Collections.synchronizedXXX 本质是对相应的容器进行包装
2.同步容器类的缺点
在单独使用里面的方法的时候,可以保证线程安全,但是,复合操作需要额外加锁来保证线程安全 使用 Iterator迭代容器或使用使用for-each遍历容器,在迭代过程中修改容器会抛出 ConcurrentModificationException异常。想要避免出现ConcurrentModificationException,就必须在迭代过程 持有容器的锁。但是若容器较大,则迭代的时间也会较长。那么需要访问该容器的其他线程将会长时间等待。 从而会极大降低性能。 若不希望在迭代期间对容器加锁,可以使用"克隆"容器的方式。使用线程封闭,由于 其他线程不会对容器进行修改,可以避免ConcurrentModificationException。但是在创建副本的时候,存在较 大性能开销。 toString,hashCode,equalse,containsAll,removeAll,retainAll等方法都会隐式的Iterate, 也即可能抛出ConcurrentModificationException。

单线程下发生并发修改异常

/******************单线程下发生并发修改异常**********************/
import java.util.Vector;
public class TestVector {
    public static void main(String[] args) {
        Vector<String> v = new Vector<>();
        for (int i = 0; i < 100; i++) {
            //在单独使用里面的方法的时候,可以保证线程安全(add方法时同步的)
            v.add("num"+i);
        }
        //使用 Iterator迭代容器或使用使用for-each遍历容器,在迭代过程中修改容
        //器会抛出 ConcurrentModificationException异常
        for (String s : v) {
            if ("num2".equals(s)){
                v.remove(s);//java.util.ConcurrentModificationException
            }
            System.out.println(s);
        }
    }
}

  public synchronized boolean add(E e) {
        modCount++;
        ensureCapacityHelper(elementCount + 1);
        elementData[elementCount++] = e;
        return true;
    }

单线程下使用迭代器不会发生并发修改异常

public class TestVector {
    /*****************单线程下使用迭代器不会发生并发修改异常**********************/
    public static void main(String[] args) {
        Vector<String> v = new Vector<>();
        for (int i = 0; i < 100; i++) {
            v.add("num"+i);
        }
        Iterator<String> iterator = v.iterator();
        while(iterator.hasNext()){
            String next = iterator.next();
            if ("num2".equals(next)){
                iterator.remove();
            }
            System.out.println(next);
        }
    }
}

多线程下使用迭代器发生异常

public class TestVector {
    public static void main(String[] args) {
        Vector<String> v = new Vector<>();
        for (int i = 0; i < 100; i++) {
            v.add("num"+i);
        }
        Iterator<String> iterator = v.iterator();
        for (int i = 0; i < 4; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    while(iterator.hasNext()){
                        //NoSuchElementException看源码是从next方法抛出的,我们来分析一下
                        //本身next方法是同步的,是安全的,但是,复合操作下就不是了,
                        //假设两个线程,线程1走到while(iterator.hasNext())时,是第99个,判断还有下一个
                        //但是由于时间片切换,此时线程1在这个位置休眠了,然后线程2
                        //也走到了这个位置,由于游标还没有移动,所以iterator.hasNext()
                        //还是成立的,所以他也进来了,然后线程2继续,走完了iterator.next()
                        //此时其实游标已经到最后一个了,然后线程1醒过来,由于他
                        //已经判断过iterator.hasNext(),所以他也会执行iterator.next()
                        //问题就在这个时候发生了, if (i >= elementCount)满足条件,触发了异常
                        String next = iterator.next();
                        if ("num2".equals(next)){
                            iterator.remove();
                        }
                    }
                }
            }).start();
        }
    }
}
    //Vector的next方法
    public E next() {
            synchronized (Vector.this) {
                checkForComodification();
                int i = cursor;
                if (i >= elementCount)
                    throw new NoSuchElementException();
                cursor = i + 1;
                return elementData(lastRet = i);
            }
        }
Exception in thread "Thread-0" java.util.NoSuchElementException
    at java.util.Vector$Itr.next(Vector.java:1166)
    at TestVector$1.run(TestVector.java:16)
    at java.lang.Thread.run(Thread.java:748)

解决上边问题的办法就是加锁,将hasNext和next方法锁在一起

public class TestVector {
    public static void main(String[] args) {
        Vector<String> v = new Vector<>();
        for (int i = 0; i < 100; i++) {
            v.add("num"+i);
        }
        Iterator<String> iterator = v.iterator();
        for (int i = 0; i < 4; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    synchronized (iterator) {
                        while (iterator.hasNext()) {
                            String next = iterator.next();
                            if ("num2".equals(next)) {
                                iterator.remove();
                            }
                        }
                    }
                }
            }).start();
        }
    }
}

3.并发容器
CopyOnWrite、Concurrent、BlockingQueue 根据具体场景进行设计,尽量避免使用锁,提高容器的并发访问 性。
ConcurrentBlockingQueue:基于queue实现的FIFO的队列。队列为空,取操作会被阻塞
ConcurrentLinkedQueue,队列为空,取得时候就直接返回空

2.并发容器
CopyOnWriteArrayList

同步容器我们以Vector为例通过代码演示了他在使用中可能存在的问题,并发容器我们以CopyOnWriteArrayList为例来进行演示,在同样的场景下是否也会有这种问题?
单线程下遍历过程中删除,未发生并发修改异常

public class TestCopyOnWriteArrayList {
    public static void main(String[] args) {
        CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
        for (int i = 0; i < 1000; i++) {
            list.add("item"+i);
        }
        list.forEach(e->{
            if("item2".equals(e)){
                list.remove(e);
            }
        });
    }
}

单线程下使用迭代器遍历并删除元素
Exception in thread "main" java.lang.UnsupportedOperationException,CopyOnWriteArrayList不支持迭代器遍历

public class TestCopyOnWriteArrayList {
    public static void main(String[] args) {
        CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
        for (int i = 0; i < 1000; i++) {
            list.add("item"+i);
        }
        Iterator<String> iterator = list.iterator();
        while(iterator.hasNext()){
            String next = iterator.next();
            if ("item0".equals(next)){
                System.out.println("item="+next);
                iterator.remove();
            }else{
                System.out.println(next);
            }
        }
        System.out.println("done");
    }
}

多线程下遍历并移除指定元素,不会发生ConcurrentModifyException,我们来看看remove方法源码

import java.util.concurrent.CopyOnWriteArrayList;
public class TestCopyOnWriteArrayList {
    public static void main(String[] args) {
        CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
        for (int i = 0; i < 100; i++) {
            list.add("item"+i);
        }
        for (int i = 0; i < 4; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    //根据index移除
                    for (int i1 = 0; i1 < list.size(); i1++) {
                        String s = list.get(i1);
                        System.out.println("bbbbbbbbbb=="+s);
                        if ("item2".equals(s)){
                            System.out.println("移除item2");
                            list.remove(i1);
                        }
                    }
                }
            }).start();
        }
        System.out.println("11111");
        for (int i = 0; i < 4; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    //直接移除对象
                    for (String s : list) {
                        System.out.println("aaaaaaaaaaaa=="+s);
                        if ("item3".equals(s)){
                            System.out.println("移除item3");
                            list.remove(s);
                        }
                    }
                }
            }).start();
        }
        System.out.println("222222");
    }
}

remove(index)
根据index进行元素移除的时候,有加锁,所以多线程不会同时进行移除,不会发生问题。拿到底层数组,把除被移除的元素之外的元素重新拷贝一份到新的数组种,所以每次移除都会导致数组的拷贝

public E remove(int index) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            Object[] elements = getArray();
            int len = elements.length;
            E oldValue = get(elements, index);
            int numMoved = len - index - 1;
            if (numMoved == 0)
                setArray(Arrays.copyOf(elements, len - 1));
            else {
                Object[] newElements = new Object[len - 1];
                System.arraycopy(elements, 0, newElements, 0, index);
                System.arraycopy(elements, index + 1, newElements, index,
                                 numMoved);
                setArray(newElements);
            }
            return oldValue;
        } finally {
            lock.unlock();
        }
    }

remove(Object)
根据元素对象进行移除,会判断当前数组种是否存在这个元素,如果已经被其他线程移除了,那么就不会再次移除,返回false,否则才去移除元素,所以也不会发生异常

public boolean remove(Object o) {
        Object[] snapshot = getArray();
        int index = indexOf(o, snapshot, 0, snapshot.length);
        return (index < 0) ? false : remove(o, snapshot, index);
    }
为什么ArrayList 遍历的时候remove会发生并发修改异常,而Vector和CopyOnWriteArrayList不会,就是因为后两者都是枷锁的,ArrayList只有在单线程中才能保证安全
LinkedBlockingQueue

在并发编程中LinkedBlockingQueue使用非常频繁,因为它可以作为生产者和消费者的中间商,下面对它几个主要方法的源码进行探秘:

public static void main(String[] args) {
        LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
        boolean add = queue.add(1);
        System.out.println("add添加1"+add);
        try {
            queue.put(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        boolean offer = queue.offer(3);
        System.out.println("offer添加3"+offer);

        Integer remove = queue.remove();
        System.out.println("remove取出"+remove);
        Integer poll = queue.poll();
        System.out.println("poll取出"+poll);
        try {
            Integer take = queue.take();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("take取出"+poll);
    }
往队列中添加

#######add():调用offer方法实现的,区别是在队列满的时候会抛出异常

public boolean add(E e) {
        if (offer(e))
            return true;
        else //容量已满,抛出异常
            throw new IllegalStateException("Queue full");
    }
public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
        //获取到队列长度
        final AtomicInteger count = this.count;
        //判断长度是否已经达到容量值
        if (count.get() == capacity)
            //容量已满,返回false
            return false;
        int c = -1;
        //创建一个node,保存当前元素和下一个元素
        Node<E> node = new Node<E>(e);
        //拿到put锁
        final ReentrantLock putLock = this.putLock;
        //锁住
        putLock.lock();
        try {
            //再次判断当前容量是否没满
            if (count.get() < capacity) {
                //没满,就入队
                enqueue(node);
                //cas的方式自增1.因为新入队了一个元素,getAndIncrement返回值是
                //自增前的值,所以添加第一个元素后,自增后是1,返回值是0
                c = count.getAndIncrement();
                //入队之后,如果当前队列至少还能容纳一个元素
                if (c + 1 < capacity)
                    //至少还能容纳一个元素,通知其他线程添加
                    //notFull是一个Condition,它的作用是唤醒notFull.await()方法,而
                    //notFull.await()方法只有以put方式添加元素的时候有用到,因为只
                    //有通过put添加元素,在元素满的时候会进行阻塞,这里调用它的
                    //目的是,有可能此时有其他线程在通过put方法往队列中添加元素
                    //但是上一次它判断队列满了阻塞在那里了,然后本线程在天加的
                    //时候发现队列有空间了,就去随机唤醒一个阻塞中的线程
                    notFull.signal();
            }
        } finally {
            //释放锁
            putLock.unlock();
        }
        //c=0的情况只有队列为空,添加第一个元素之后,满足,也就是
        //添加了一个元素,通知其他线程队列有元素了,可以取了
        if (c == 0)
            signalNotEmpty();
        return c >= 0;
    }

private void signalNotEmpty() {
        //拿到take锁,
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            //添加一个元素后,队列不再是空的,通知出去,notEmpty.await()在take()方法
            //中有被调用,就是当队列为空的时候进行阻塞,这里已经向队列中
            //添加了一个元素,不再是空的,所以唤醒阻塞的线程去取元素
            notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
    }

可以看到这里调用了两个唤醒线程的操作notFull.signal(),只要队列还能容纳至少一个元素,就通知其他线程往里边加,notEmpty.signal()只要队列从空变成了添加了一个元素,就通知其他线程往外取
接下来进入enqueue(node),很简单,就是把这个元素加到了队列上

private void enqueue(Node<E> node) {
        // assert putLock.isHeldByCurrentThread();
        // assert last.next == null;
        last = last.next = node;
    }

#######offer():同上,区别是队列满了直接返回false,不会抛出异常
#######put():队列满,阻塞

public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        int c = -1;
        Node<E> node = new Node<E>(e);
        //拿到put锁
        final ReentrantLock putLock = this.putLock;
        //拿到当前长度
        final AtomicInteger count = this.count;
        //也是加锁,和lock的区别就是会抛出中断异常,因为它会阻塞
        putLock.lockInterruptibly();
        try {
            while (count.get() == capacity) {
                //满了,阻塞,这个时候如果调用了Thread.interrupt方法,就会抛出
                //InterruptException
                notFull.await();
            }
            //入队,和上边一样的
            enqueue(node);
            //自增1
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                //同样,唤醒其他线程添加元素
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            //唤醒其他线程取元素
            signalNotEmpty();
    }
从队列中取

#######remove():直接调用poll,区别是空的时候会抛异常

public E remove() {
        E x = poll();
        if (x != null)
            return x;
        else   //队列为空,抛出异常
            throw new NoSuchElementException();
    }

public E poll() {
        final AtomicInteger count = this.count;
        //如果队列长度为空,返回null
        if (count.get() == 0)
            return null;
        E x = null;
        int c = -1;
        //获取到take锁
        final ReentrantLock takeLock = this.takeLock;
        //锁住
        takeLock.lock();
        try {
            //再次判断队列中有没有元素
            if (count.get() > 0) {
                //从队列中取出
                x = dequeue();
                //队列数量减1
                c = count.getAndDecrement();
                //元素被取出一个之后,只要还有剩余的元素,就唤醒阻塞中的线程
                //取元素,这里被唤醒的是take方法中的notEmpty.await()
                if (c > 1)
                    notEmpty.signal();
            }
        } finally {
            takeLock.unlock();//解锁
        }
        if (c == capacity)
            //只有当队列是满的,此时取出了一个元素之后,会满足这里的条件,
            //count.getAndDecrement()会将队列长度减1然后返回减1之前的值
            //也就是capacity,此时队列已经不满了,通知其他线程往里边put(put方法调用如果队列满了会阻塞,这里就是唤醒那个线程)
            signalNotFull();
        return x;
    }

#######poll():空的时候返回null,不抛异常,源码见上边
#######take():队列为空的时候,进入等待

public E take() throws InterruptedException {
        E x;
        int c = -1;
        //队列长度
        final AtomicInteger count = this.count;
        //take锁
        final ReentrantLock takeLock = this.takeLock;
        //锁住,但是在await的情况下有可能抛出InterruptException
        takeLock.lockInterruptibly();
        try {
            //如果队列为空,阻塞
            while (count.get() == 0) {
                notEmpty.await();
            }
            // 取出一个元素
            x = dequeue();
            //长度减1
            c = count.getAndDecrement();
            if (c > 1)
                //取出一个之后,只要还有元素,就唤醒可能在等待中的线程取元素
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            //只有当队列是满的,此时取出了一个元素之后,会满足这里的条件,
            //count.getAndDecrement()会将队列长度减1然后返回减1之前的值
            //也就是capacity,此时队列已经不满了,通知其他线程往里边put(put方法调用如果队列满了会阻塞,这里就是唤醒那个线程)
            signalNotFull();
        return x;
    }
上一篇下一篇

猜你喜欢

热点阅读