CopyOnWrite思想

2019-05-20  本文已影响0人  王金松

读写锁的弊端

读写锁的思想是读读不互斥,读写互斥,写写互斥
最大的问题,其实就在于写锁和读锁的互斥。假设写操作频率很低,读操作频率很高,是写少读多的场景。那么偶尔执行一个写操作的时候,是不是会加上写锁,此时大量的读操作过来是不是就会被阻塞住,无法执行
如果用读写锁互斥的话,会导致写锁阻塞大量读操作,影响并发性能

copyOnwrite思想

你在读数据的时候,其实不加锁也没关系,大家左右都是一个读罢了,互相没影响。
  问题主要是在写的时候,写的时候你既然不能加锁了,那么就得采用一个策略。
  假如说你的ArrayList底层是一个数组来存放你的列表数据,那么这时比如你要修改这个数组里的数据,你就必须先拷贝这个数组的一个副本。
  然后你可以在这个数组的副本里写入你要修改的数据,但是在这个过程中实际上你都是在操作一个副本而已。
  这样的话,读操作是不是可以同时正常的执行?这个写操作对读操作是没有任何的影响的吧!

大家看下面的图,一起来体会一下这个过程:

image

关键问题来了,那那个写线程现在把副本数组给修改完了,现在怎么才能让读线程感知到这个变化呢?
  关键点来了,划重点!这里要配合上volatile关键字的使用。
  笔者之前写过文章,给大家解释过volatile关键字的使用,核心就是让一个变量被写线程给修改之后,立马让其他线程可以读到这个变量引用的最近的值,这就是volatile最核心的作用。
  所以一旦写线程搞定了副本数组的修改之后,那么就可以用volatile写的方式,把这个副本数组赋值给volatile修饰的那个数组的引用变量了。
  只要一赋值给那个volatile修饰的变量,立马就会对读线程可见,大家都能看到最新的数组了
如果多个线程都去写操作的话 是否需要copy很多的副本,答案是否定的,因为在写操作的时候采用了锁操作,保证所有的写操作只对一个副本进行操作

场景

读多写少
比如我们的实时操作,每隔一个小时需要更新一下内存(ArrayList 或者ArraySet),因为写的操作不频繁,一个小时一次,所以我们可以采用CopyOnWrite思想
切记,写操作过多的情况下不能使用,因为写操作的时候牵扯到数据拷贝的情况,实际上是很耗费资源的

Copy On Write思想在kafka的使用

在Kafka的内核源码中,有这么一个场景,客户端在向Kafka写数据的时候,会把消息先写入客户端本地的内存缓冲,然后在内存缓冲里形成一个Batch之后再一次性发送到Kafka服务器上去,这样有助于提升吞吐量。
话不多说,大家看下图:

image

这个时候Kafka的内存缓冲用的是什么数据结构呢?大家看源码:

private final ConcurrentMaptopicpartition, span= deque=batches = new CopyOnWriteMap();

package org.apache.kafka.common.utils;
public class CopyOnWriteMap<K, V> implements ConcurrentMap<K, V> {
    private volatile Map<K, V> map;
    public CopyOnWriteMap() {
        this.map = Collections.emptyMap();
    }

    public CopyOnWriteMap(Map<K, V> map) {
        this.map = Collections.unmodifiableMap(map);
    }
    @Override
    public synchronized V put(K k, V v) {
        Map<K, V> copy = new HashMap<K, V>(this.map);
        V prev = copy.put(k, v);
        this.map = Collections.unmodifiableMap(copy);
        return prev;
    }

    @Override
    public synchronized void putAll(Map<? extends K, ? extends V> entries) {
        Map<K, V> copy = new HashMap<K, V>(this.map);
        copy.putAll(entries);
        this.map = Collections.unmodifiableMap(copy);
    }
}


ConcurrentMap<TopicPartition, Deque<RecordBatch>> map= new CopyOnWriteMap();

key为TopicPartition, value 为 ArrayDeque<ProducerBatch>。它是RecordAccumulator 的一个成员变量,RecordAccumulator 会为每一个ProducerBatch分配一个batch.size 大小的Buffer,用于存储Byte字节流。
所以Kafka这个核心数据结构在这里之所以采用CopyOnWriteMap思想来实现,就是因为这个Map的key-value对,其实没那么频繁更新。
  但是他的get操作却是高频的读取请求,因为会高频的读取出来一个TopicPartition对应的Deque数据结构,来对这个队列进行入队出队等操作,所以对于这个map而言,高频的是其get操作。
  这个时候,Kafka就采用了CopyOnWrite思想来实现这个Map,避免更新key-value的时候阻塞住高频的读操作,实现无锁的效果,优化线程并发的性能。

参考

https://www.douban.com/note/717799759/

上一篇下一篇

猜你喜欢

热点阅读