ConcurrentLinkedHashMap源码分析

2021-04-14  本文已影响0人  lim快乐_无限

一、简介

ConcurrentLinkedHashMap是google 开源的线程安全的方便并发的Map, Map利用LRU缓存机制对Map中存储对象进行换入换出管理。采用两套资源控制机制,一套同步机制,使用ConcurrentMap对对象数据进行KV存储,保证多线程并发安全地调用Map资源,而对于存储对象的换入换出管理则采用异步机制,使用Queue buffer存储每次的因对象读写而产生的对象换入换出任务,当遇到读任务超过阈值或写任务时,加锁后,执行buffer中的多个任务,依次对evictionDeque进行节点调整,需要移除的数据,从map中移除。在jdk1.8后官方建议用Caffeine代替

二、代码解析

2.1 相关类
**Weigher **是一个计算每条记录占用存储单元数接口,项目在类Weighers中给了许多针对不同需求的计算方式,如Byte 数组可以通过数组长度计算为存储单元个数,而就一般应用的存储对象,可以直接用SingletonWeigher,每条记录占用一个存储单元。

@ThreadSafe
public interface Weigher<V> {

  /**
   * Measures an object's weight to determine how many units of capacity that
   * the value consumes. A value must consume a minimum of one unit.
   *
   * @param value the object to weigh
   * @return the object's weight
   */
  int weightOf(V value);
}

**WeightedValue **是对Value的装饰,包含了Value占用的存储单元个数weight值,以及根据weight值计算状态

**Node **实现了链接表中Linked Node,便于LinkedDeque的双向索引,是Map以及evictionDeque存储对象。
**Task **是针对存储对象LRU顺序操作的抽象类,继承自Task的有ReadTask、AddTask、UpdateTask、RemoveTask, 每一个Task有一个根据创建顺序分配的order。

2.2 ConcurrentLinkedHashMap主要属性

// 存储数据
final ConcurrentMap<K, Node<K, V>> data; 
// 实际存储大小
final AtomicLong weightedSize;
// 维护对象换入换出
final LinkedDeque<Node<K, V>> evictionDeque;
// 限制存储大小
final AtomicLong capacity;
// 回调
final EvictionListener<K, V> listener;

2.3 主要操作过程
get 操作,首先从Map中读取,再添加一个addTask用于调整queue中LRU order

@Override
  public V get(Object key) {
    final Node<K, V> node = data.get(key);
    if (node == null) {
      return null;
    }
    afterRead(node); // 处理LRU 异步调整队列顺序 注意不是立刻调整,当满足32条时才调整
    return node.getValue();
  }

put 操作稍微复杂,需要判断是否只在缺少才插入(putIfAbsent),如果不存在,直接插入,如果存在, 而不是只在不存在的情况下插入,则更新。

 V put(K key, V value, boolean onlyIfAbsent) {
    checkNotNull(key);
    checkNotNull(value);

    final int weight = weigher.weightOf(key, value);
    final WeightedValue<V> weightedValue = new WeightedValue<V>(value, weight);
    final Node<K, V> node = new Node<K, V>(key, weightedValue);

    for (;;) {
      final Node<K, V> prior = data.putIfAbsent(node.key, node);
      if (prior == null) {
          // 处理LRU 异步添加到队列中
        afterWrite(new AddTask(node, weight));
        return null;
      } else if (onlyIfAbsent) {
        afterRead(prior);
        return prior.getValue();
      }
       // 以下代码是更新时更新权重大小
      for (;;) {
        final WeightedValue<V> oldWeightedValue = prior.get();
        if (!oldWeightedValue.isAlive()) {
          break;
        }

        if (prior.compareAndSet(oldWeightedValue, weightedValue)) {
          final int weightedDifference = weight - oldWeightedValue.weight;
          if (weightedDifference == 0) {
            afterRead(prior);
          } else {
            afterWrite(new UpdateTask(prior, weightedDifference));
          }
          return oldWeightedValue.value;
        }
      }
    }
  }

remove

 @Override
  public V remove(Object key) {
    final Node<K, V> node = data.remove(key);
    if (node == null) {
      return null;
    }
    // 设置节点状态 改为 retire
    makeRetired(node);
    // 处理LRU 队列删除节点
    afterWrite(new RemovalTask(node));
    return node.getValue();
  }

2.4 LRU 处理过程

void afterWrite(Runnable task) {
    // 添加写任务到缓存区
    writeBuffer.add(task);
    // 设置处理状态为必须
    drainStatus.lazySet(REQUIRED);
    // 尝试去添加并调整顺序
    tryToDrainBuffers();
    // 通知
    notifyListener();
  }

void afterRead(Node<K, V> node) {
    final int bufferIndex = readBufferIndex();
    // 把此次查询节点放入缓存区 并返回待调整的数量
    final long writeCount = recordRead(bufferIndex, node);
    // 尝试调整
    drainOnReadIfNeeded(bufferIndex, writeCount);
    notifyListener();
  }

void drainOnReadIfNeeded(int bufferIndex, long writeCount) {
    final long pending = (writeCount - readBufferDrainAtWriteCount[bufferIndex].get());
    // READ_BUFFER_THRESHOLD 为32  当待调整数量大于32时进入调整
    final boolean delayable = (pending < READ_BUFFER_THRESHOLD);
    final DrainStatus status = drainStatus.get();
    if (status.shouldDrainBuffers(delayable)) {
      tryToDrainBuffers();
    }
  }

void tryToDrainBuffers() {
    if (evictionLock.tryLock()) {
      try {
        drainStatus.lazySet(PROCESSING);
        drainBuffers();
      } finally {
        drainStatus.compareAndSet(PROCESSING, IDLE);
        evictionLock.unlock();
      }
    }
  }

  @GuardedBy("evictionLock")
  void drainBuffers() {
    // 处理读缓存区
    drainReadBuffers();
    // 处理写缓存区
    drainWriteBuffer();
  }

@GuardedBy("evictionLock")
  void drainReadBuffers() {
    final int start = (int) Thread.currentThread().getId();
    final int end = start + NUMBER_OF_READ_BUFFERS;
    for (int i = start; i < end; i++) {
      drainReadBuffer(i & READ_BUFFERS_MASK);
    }
  }

  @GuardedBy("evictionLock")
  void drainReadBuffer(int bufferIndex) {
    final long writeCount = readBufferWriteCount[bufferIndex].get();
    for (int i = 0; i < READ_BUFFER_DRAIN_THRESHOLD; i++) {
      final int index = (int) (readBufferReadCount[bufferIndex] & READ_BUFFER_INDEX_MASK);
      final AtomicReference<Node<K, V>> slot = readBuffers[bufferIndex][index];
      final Node<K, V> node = slot.get();
      if (node == null) {
        break;
      }
      slot.lazySet(null);
      applyRead(node);
      readBufferReadCount[bufferIndex]++;
    }
    readBufferDrainAtWriteCount[bufferIndex].lazySet(writeCount);
  }

2.5 任务实现过程
AddTask

final class AddTask implements Runnable {
    final Node<K, V> node;
    final int weight;

    AddTask(Node<K, V> node, int weight) {
      this.weight = weight;
      this.node = node;
    }

    @Override
    @GuardedBy("evictionLock")
    public void run() {
      weightedSize.lazySet(weightedSize.get() + weight);

      // ignore out-of-order write operations
      if (node.get().isAlive()) {
        evictionDeque.add(node);
        // 踢出多余的
        evict();
      }
    }
  }
@GuardedBy("evictionLock")
  void evict() {
    // Attempts to evict entries from the map if it exceeds the maximum
    // capacity. If the eviction fails due to a concurrent removal of the
    // victim, that removal may cancel out the addition that triggered this
    // eviction. The victim is eagerly unlinked before the removal task so
    // that if an eviction is still required then a new victim will be chosen
    // for removal.
    while (hasOverflowed()) {
      final Node<K, V> node = evictionDeque.poll();

      // If weighted values are used, then the pending operations will adjust
      // the size to reflect the correct weight
      if (node == null) {
        return;
      }

      // Notify the listener only if the entry was evicted
      if (data.remove(node.key, node)) {
        pendingNotifications.add(node);
      }

      makeDead(node);
    }
  }

RemovalTask

final class RemovalTask implements Runnable {
    final Node<K, V> node;

    RemovalTask(Node<K, V> node) {
      this.node = node;
    }

    @Override
    @GuardedBy("evictionLock")
    public void run() {
      // add may not have been processed yet
      evictionDeque.remove(node);
      makeDead(node);
    }
  }

UpdateTask

final class UpdateTask implements Runnable {
    final int weightDifference;
    final Node<K, V> node;

    public UpdateTask(Node<K, V> node, int weightDifference) {
      this.weightDifference = weightDifference;
      this.node = node;
    }

    @Override
    @GuardedBy("evictionLock")
    public void run() {
      weightedSize.lazySet(weightedSize.get() + weightDifference);
      applyRead(node);
      evict();
    }
  }

三、示例

  1. 限制内存大小
EntryWeigher<K, V> memoryUsageWeigher = new EntryWeigher<K, V>() {
  final MemoryMeter meter = new MemoryMeter();

  @Override public int weightOf(K key, V value) {
    long bytes = meter.measure(key) + meter.measure(value);
    return (int) Math.min(bytes, Integer.MAX_VALUE);
  }
};
ConcurrentMap<K, V> cache = new ConcurrentLinkedHashMap.Builder<K, V>()
    .maximumWeightedCapacity(1024 * 1024) // 1 MB
    .weigher(memoryUsageWeigher)
    .build();
  1. 限制条数
ConcurrentLinkedHashMap<String, String> map = new ConcurrentLinkedHashMap.Builder<String, String>()
            .maximumWeightedCapacity(2).weigher(Weighers.singleton()).build();
上一篇下一篇

猜你喜欢

热点阅读