Java 并发编程

Java并发编程笔记(四):并发容器

2018-06-30  本文已影响3人  yeonon

Java类库中存在很多并发基础构建模块,例如Collections工具类,J.U.C包下的绝大多数类等等。利用这些现有的模块可以大大提高开发效率。

一、同步容器类

同步容器包括Vector,Hashtable,这两位比较古老了,很多书或者博客都说到这两个类基本已经不用了。这两个类和Collections下synchronized为前缀的方法的实现方式是差不多的:将状态封装起来,并对每个共有方法进行同步(synchronized关键字修饰)。虽然效率可能有些低,但终究是线程安全的,不过仍旧逃不开同步容器类共有的问题。

同步容器类的问题

先看看示例代码:

public class NoSafe {
    
    public static int getFromVector(Vector<Integer> list) {
        int last = list.size() - 1;
        return list.get(last);
    }

    public static void remove(Vector<Integer> list) {
        int last = list.size() - 1;
        list.remove(last);
    }

    public static void main(String[] args) throws InterruptedException {
        Random random = new Random();
        Vector<Integer> vector = new Vector<>();
        for (int i = 0; i < 50000; i++) {
            vector.add(random.nextInt(1000));
        }

        Thread t1 = new Thread(() -> {
            for (int i = 0; i < 1000; i++)
                NoSafe.getFromVector(vector);
        });

        Thread t2 = new Thread(() -> {
            for (int i = 0; i < 1000; i++)
                NoSafe.remove(vector);
        });

        t1.start();
        t2.start();
        t1.join();
        t2.join();

    }
}

多运行几次代码,会看到有

java.lang.ArrayIndexOutOfBoundsException: Array index out of range

为什么呢?首先得明确Vector类本身肯定是线程安全的,即remove,get等操作是安全的,但是为什么还会出现问题呢?原因是上述NoSafe类的两个方法执行的都是“先检查再运行”操作。对Vector的操作在“先验条件”之后,而NoSafe类的这两个方法并没有任何同步处理,所以仍然会发生并发问题。如图所示:


get和remove操作交替执行

一图胜千言,上图描述得非常清楚了,不再赘述。

还有一个问题就是著名的“ConcurrentModificationException”异常。这个问题发生在使用迭代器的方式遍历容器元素时有对容器元素的修改操作。如下代码:

public static void main(String[] args) {

        Random random = new Random();
        List<Integer> integers = Stream.generate(() -> random.nextInt(1000)).limit(100000).collect(Collectors.toList());

        //启动一个定时任务,时间尽量短一些,在主线程遍历完成之前启动
        ScheduledExecutorService service = Executors.newScheduledThreadPool(1);
        service.schedule(() -> {
            integers.add(1);
        }, 5, TimeUnit.MILLISECONDS);
        //java8 lambda foreache也是使用Java5提供的语法糖,即使用迭代器的方式遍历
        integers.forEach(System.out::println);

    }

如上,运行上述代码,只要电脑不是特别快,应该会看到控制台有java.util.ConcurrentModificationException异常。这种机制被称作“fail-fast” ,但其并不是一种完备的机制,仅仅是一种提醒,抛出异常就停止遍历了,没有做其他的处理。一种避免该异常的方式是在遍历之前先获得一份容器的拷贝,对拷贝进行遍历,但显然这样会导致不小的性能开销和空间占用。

容器的hashcode和equal方法甚至toString方法(不一定,具体看实现逻辑)也会间接的执行对容器的迭代遍历,所以也有可能出现java.util.ConcurrentModificationException异常。

二、并发容器

Java5提供了多种并发容器来改进同步容器的性能。例如ConcurrentHashMap,CopyOnWriteArrayList,BlockingQueue等。从名字基本能看出这些容器的作用对象。

这些容器类基本都在J.U.C包下

ConcurrentHashMap

总所周知,HashMap不是线程安全的,在多线程的环境下,对HashMap的操作可能会带来一些问题。在Java5.0之前,要么使用Collections里的同步工厂方法来实现同步,要不使用Hashtable这个类(现在基本已经不再使用)来代替,但这两种方式的性能都很糟糕,直到ConcurrentHashMap出现才缓解了这个尴尬。

ConcurrentHashMap的和HashMap的结构差不多,都是基于hash算法确定元素位置并插入,每个元素都包含在一个Entiy里(Java8里改成了Node),hash冲突的话就使用链表实现插入(Java8中,当链表节点达到一定阈值之后,会转换成红黑树),这种方式称为“拉链法”

不同的地方是ConcurrentHashMap是线程安全的,但不并不是直接在共有方法上加上synchronized修饰,而是使用一种叫做“分段锁”的技术实现粒度更小的并发控制。另一个特定是与古老的同步容器想比较的,上面讲到“古老”的同步容器几乎都有ConcurrentModificationException问题,而ConcurrentHashMap没有这个问题,这是因为ConcurrentHashMap返回的迭代器具有“弱一致性”,当遍历容器元素的时候,其他线程可以修改容器里的内容,对本次遍历不会起作用,但会保证最终一定会将修改同步到容器里。

关于ConcurrentHashMap是一个大主题,这里只是简单粗略的讨论了一下。

CopyOnWriteArrayList

从名字中可以看到这是一个“写时复制”容器,什么是“写时复制”?写时复制在linux中被广泛应用,不过这里的CopyOnWriteArrayList容器没有涉及到操作系统,不过思想都是一致的。

CopyOnWriteArrayList包含一个Object类型的数组array,且有volatile修饰,保证可见性。类中没有公开操作这个数组的接口,要操作元素只能间接操作,这样就保证了一定的线程安全。这个类的一个特点是当有修改操作来的时候,先用锁来开辟一段临界区,在临界区里将元素拷贝到一个新的数组里,然后在新数组里修改元素,最后将其赋值给array完成本次操作。如下代码所示:

    public boolean add(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            Object[] elements = getArray();
            int len = elements.length;
            Object[] newElements = Arrays.copyOf(elements, len + 1);
            newElements[len] = e;
            setArray(newElements);
            return true;
        } finally {
            lock.unlock();
        }
    }

这实现了底层的数组是不变的,有点类似String,String类也是每次修改都会返回一个新的String对象。

和ConcurrentHashMap一样,CopyOnWriteArrayList也不会抛出ConcurrentModificationException异常,也是仅保证“弱一致性”。实现方式是CopyOnWriteArrayList的迭代器类持有一个开始迭代时最新的array数组引用,整个迭代过程都是对整个数组迭代。如果在迭代的过程中,有其他线程对数组进行修改,不会影响到迭代过程。这是因为修改数组会改变array引用指向,但是迭代器中的数组引用仍然指向原来的数据。故不会受到影响。可以写demo尝试一下,如下代码:

    public static void main(String[] args) {
        CopyOnWriteArrayList<Integer> copyOnWriteArrayList = new CopyOnWriteArrayList<>();

        for (int i = 0; i < 100000; i++) {
            copyOnWriteArrayList.add(i);
        }


        ScheduledExecutorService service = Executors.newScheduledThreadPool(1);
        service.schedule(() -> {
            copyOnWriteArrayList.add(-10);
            System.out.println("添加-10");
        }, 2, TimeUnit.MILLISECONDS);
        AtomicInteger count = new AtomicInteger();
        System.out.println("开始遍历");
        copyOnWriteArrayList.forEach((n) -> {
            count.incrementAndGet();
        });

        System.out.println(count.get());
        count.set(0);
        copyOnWriteArrayList.forEach((n) -> {
            count.incrementAndGet();
        });
        System.out.println(count.get());
    }

结果如下:

开始遍历
添加-10
100000
100001

即第一次遍历的过程中即使添加了一个元素,仍然仅仅遍历原来的那100000个元素。运行的时候也可以感觉到,添加100000个元素比起其他容器会慢很多,所以一般CopyOnWriteArrayList用在读多写少的场合。

三、阻塞队列和生产者-消费者模式

BlockingQueue是阻塞队列的父接口,提供阻塞队列的基本抽象,他有很多实现类,例如LinkedBlockingQueue,ArrayBlockQueue等等。他提供了put和take方法支持阻塞操作,当队列满的时候,put操作被阻塞,当队列空的时候take操作被阻塞。同时当队列从满的状态到非满状态的时候,put操作的线程会被唤醒,继续执行put操作,当队列从空到非空状态时,take操作的线程会被唤醒,继续执行take操作。这其实就是生产者-消费者模式。阻塞队列就像是载体,承载生产者生成的元素,消费者从队列里拿出元素做处理。一个简单的Demo如下所示:

public class PCExample {


    private static class Producer {
        private final BlockingQueue<Integer> queue;

        private AtomicInteger count = new AtomicInteger(0);

        public Producer(BlockingQueue<Integer> queue) {
            this.queue = queue;
        }

        public void produce() {
            try {
                Thread.sleep(new Random().nextInt(3000));
                queue.put(count.incrementAndGet());
                System.out.println("生产者生产 : " + count.get());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    private static class Consumer {
        private final BlockingQueue<Integer> queue;

        public Consumer(BlockingQueue<Integer> queue) {
            this.queue = queue;
        }

        public void consum() {
            try {
                Thread.sleep(new Random().nextInt(3000));
                System.out.println("消费者消费 :" + queue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<>(20);
        Producer producer = new Producer(blockingQueue);
        Consumer consumer = new Consumer(blockingQueue);
        ExecutorService service = Executors.newFixedThreadPool(4);
        service.submit(() -> {
            for (int i = 0; i < 10000; i++) {
                producer.produce();
            }
        });

        service.submit(() -> {
            for (int i = 0; i < 10000; i++) {
                consumer.consum();
            }
        });

        service.shutdown();
        service.awaitTermination(20, TimeUnit.SECONDS);
    }
}

运行代码,可以看出上面描述的情况。
在Demo里,队列的容量被设置为20,故这个队列是一个有界队列,也可以不设置,默认是Integer.MAX_VALUE。理论上队列也可以是无界队列,即可以存储无数个元素,put操作也就永远不会被阻塞,不过这样可能会导致内存爆炸。

双端队列

还有一种队列是可以从队列的两遍插入和移除的,这种队列被称为 “双端队列”。在Fork-Join框架里用到了双端队列,主要原因是双端队列可以提供“工作窃取”的功能,即当一个线程空闲的时候,当前的工作队列又没有元素待处理,那么他就可以去其他队列的尾端拿出元素做处理,提高CPU效率以及吞吐量。

线程安全

生产者和消费者往往是不同的线程,他们共享阻塞队列,所以肯定会存在线程安全问题,不过用户不用太担心,阻塞队列已经在内部做了很多同步处理,使得插入,删除都是安全的操作。

小结

使用Java类库提供的并发容器可以大大提升开发效率,同时拥有不错的性能,其中CopyOnWriteArrayList、ConcurrentHashMap、BlockingQueue及其实现类比较有代表性,故稍微讨论了一下它们。

上一篇 下一篇

猜你喜欢

热点阅读