ConcurrentLinkedHashMap源码分析
一、简介
ConcurrentLinkedHashMap是google 开源的线程安全的方便并发的Map, Map利用LRU缓存机制对Map中存储对象进行换入换出管理。采用两套资源控制机制,一套同步机制,使用ConcurrentMap对对象数据进行KV存储,保证多线程并发安全地调用Map资源,而对于存储对象的换入换出管理则采用异步机制,使用Queue buffer存储每次的因对象读写而产生的对象换入换出任务,当遇到读任务超过阈值或写任务时,加锁后,执行buffer中的多个任务,依次对evictionDeque进行节点调整,需要移除的数据,从map中移除。在jdk1.8后官方建议用Caffeine代替
二、代码解析
2.1 相关类
**Weigher **是一个计算每条记录占用存储单元数接口,项目在类Weighers中给了许多针对不同需求的计算方式,如Byte 数组可以通过数组长度计算为存储单元个数,而就一般应用的存储对象,可以直接用SingletonWeigher,每条记录占用一个存储单元。
@ThreadSafe
public interface Weigher<V> {
/**
* Measures an object's weight to determine how many units of capacity that
* the value consumes. A value must consume a minimum of one unit.
*
* @param value the object to weigh
* @return the object's weight
*/
int weightOf(V value);
}
**WeightedValue **是对Value的装饰,包含了Value占用的存储单元个数weight值,以及根据weight值计算状态
- active 存在于map和队列中
- retire 从map里删除但队列里还未删除
- dead map和队列里都删除
**Node **实现了链接表中Linked Node,便于LinkedDeque的双向索引,是Map以及evictionDeque存储对象。
**Task **是针对存储对象LRU顺序操作的抽象类,继承自Task的有ReadTask、AddTask、UpdateTask、RemoveTask, 每一个Task有一个根据创建顺序分配的order。
2.2 ConcurrentLinkedHashMap主要属性
// 存储数据
final ConcurrentMap<K, Node<K, V>> data;
// 实际存储大小
final AtomicLong weightedSize;
// 维护对象换入换出
final LinkedDeque<Node<K, V>> evictionDeque;
// 限制存储大小
final AtomicLong capacity;
// 回调
final EvictionListener<K, V> listener;
2.3 主要操作过程
get 操作,首先从Map中读取,再添加一个addTask用于调整queue中LRU order
@Override
public V get(Object key) {
final Node<K, V> node = data.get(key);
if (node == null) {
return null;
}
afterRead(node); // 处理LRU 异步调整队列顺序 注意不是立刻调整,当满足32条时才调整
return node.getValue();
}
put 操作稍微复杂,需要判断是否只在缺少才插入(putIfAbsent),如果不存在,直接插入,如果存在, 而不是只在不存在的情况下插入,则更新。
V put(K key, V value, boolean onlyIfAbsent) {
checkNotNull(key);
checkNotNull(value);
final int weight = weigher.weightOf(key, value);
final WeightedValue<V> weightedValue = new WeightedValue<V>(value, weight);
final Node<K, V> node = new Node<K, V>(key, weightedValue);
for (;;) {
final Node<K, V> prior = data.putIfAbsent(node.key, node);
if (prior == null) {
// 处理LRU 异步添加到队列中
afterWrite(new AddTask(node, weight));
return null;
} else if (onlyIfAbsent) {
afterRead(prior);
return prior.getValue();
}
// 以下代码是更新时更新权重大小
for (;;) {
final WeightedValue<V> oldWeightedValue = prior.get();
if (!oldWeightedValue.isAlive()) {
break;
}
if (prior.compareAndSet(oldWeightedValue, weightedValue)) {
final int weightedDifference = weight - oldWeightedValue.weight;
if (weightedDifference == 0) {
afterRead(prior);
} else {
afterWrite(new UpdateTask(prior, weightedDifference));
}
return oldWeightedValue.value;
}
}
}
}
remove
@Override
public V remove(Object key) {
final Node<K, V> node = data.remove(key);
if (node == null) {
return null;
}
// 设置节点状态 改为 retire
makeRetired(node);
// 处理LRU 队列删除节点
afterWrite(new RemovalTask(node));
return node.getValue();
}
2.4 LRU 处理过程
void afterWrite(Runnable task) {
// 添加写任务到缓存区
writeBuffer.add(task);
// 设置处理状态为必须
drainStatus.lazySet(REQUIRED);
// 尝试去添加并调整顺序
tryToDrainBuffers();
// 通知
notifyListener();
}
void afterRead(Node<K, V> node) {
final int bufferIndex = readBufferIndex();
// 把此次查询节点放入缓存区 并返回待调整的数量
final long writeCount = recordRead(bufferIndex, node);
// 尝试调整
drainOnReadIfNeeded(bufferIndex, writeCount);
notifyListener();
}
void drainOnReadIfNeeded(int bufferIndex, long writeCount) {
final long pending = (writeCount - readBufferDrainAtWriteCount[bufferIndex].get());
// READ_BUFFER_THRESHOLD 为32 当待调整数量大于32时进入调整
final boolean delayable = (pending < READ_BUFFER_THRESHOLD);
final DrainStatus status = drainStatus.get();
if (status.shouldDrainBuffers(delayable)) {
tryToDrainBuffers();
}
}
void tryToDrainBuffers() {
if (evictionLock.tryLock()) {
try {
drainStatus.lazySet(PROCESSING);
drainBuffers();
} finally {
drainStatus.compareAndSet(PROCESSING, IDLE);
evictionLock.unlock();
}
}
}
@GuardedBy("evictionLock")
void drainBuffers() {
// 处理读缓存区
drainReadBuffers();
// 处理写缓存区
drainWriteBuffer();
}
@GuardedBy("evictionLock")
void drainReadBuffers() {
final int start = (int) Thread.currentThread().getId();
final int end = start + NUMBER_OF_READ_BUFFERS;
for (int i = start; i < end; i++) {
drainReadBuffer(i & READ_BUFFERS_MASK);
}
}
@GuardedBy("evictionLock")
void drainReadBuffer(int bufferIndex) {
final long writeCount = readBufferWriteCount[bufferIndex].get();
for (int i = 0; i < READ_BUFFER_DRAIN_THRESHOLD; i++) {
final int index = (int) (readBufferReadCount[bufferIndex] & READ_BUFFER_INDEX_MASK);
final AtomicReference<Node<K, V>> slot = readBuffers[bufferIndex][index];
final Node<K, V> node = slot.get();
if (node == null) {
break;
}
slot.lazySet(null);
applyRead(node);
readBufferReadCount[bufferIndex]++;
}
readBufferDrainAtWriteCount[bufferIndex].lazySet(writeCount);
}
2.5 任务实现过程
AddTask
final class AddTask implements Runnable {
final Node<K, V> node;
final int weight;
AddTask(Node<K, V> node, int weight) {
this.weight = weight;
this.node = node;
}
@Override
@GuardedBy("evictionLock")
public void run() {
weightedSize.lazySet(weightedSize.get() + weight);
// ignore out-of-order write operations
if (node.get().isAlive()) {
evictionDeque.add(node);
// 踢出多余的
evict();
}
}
}
@GuardedBy("evictionLock")
void evict() {
// Attempts to evict entries from the map if it exceeds the maximum
// capacity. If the eviction fails due to a concurrent removal of the
// victim, that removal may cancel out the addition that triggered this
// eviction. The victim is eagerly unlinked before the removal task so
// that if an eviction is still required then a new victim will be chosen
// for removal.
while (hasOverflowed()) {
final Node<K, V> node = evictionDeque.poll();
// If weighted values are used, then the pending operations will adjust
// the size to reflect the correct weight
if (node == null) {
return;
}
// Notify the listener only if the entry was evicted
if (data.remove(node.key, node)) {
pendingNotifications.add(node);
}
makeDead(node);
}
}
RemovalTask
final class RemovalTask implements Runnable {
final Node<K, V> node;
RemovalTask(Node<K, V> node) {
this.node = node;
}
@Override
@GuardedBy("evictionLock")
public void run() {
// add may not have been processed yet
evictionDeque.remove(node);
makeDead(node);
}
}
UpdateTask
final class UpdateTask implements Runnable {
final int weightDifference;
final Node<K, V> node;
public UpdateTask(Node<K, V> node, int weightDifference) {
this.weightDifference = weightDifference;
this.node = node;
}
@Override
@GuardedBy("evictionLock")
public void run() {
weightedSize.lazySet(weightedSize.get() + weightDifference);
applyRead(node);
evict();
}
}
三、示例
- 限制内存大小
EntryWeigher<K, V> memoryUsageWeigher = new EntryWeigher<K, V>() {
final MemoryMeter meter = new MemoryMeter();
@Override public int weightOf(K key, V value) {
long bytes = meter.measure(key) + meter.measure(value);
return (int) Math.min(bytes, Integer.MAX_VALUE);
}
};
ConcurrentMap<K, V> cache = new ConcurrentLinkedHashMap.Builder<K, V>()
.maximumWeightedCapacity(1024 * 1024) // 1 MB
.weigher(memoryUsageWeigher)
.build();
- 限制条数
ConcurrentLinkedHashMap<String, String> map = new ConcurrentLinkedHashMap.Builder<String, String>()
.maximumWeightedCapacity(2).weigher(Weighers.singleton()).build();