右耳菌-邓小白的Java架构师的修炼之路一些收藏

Caffeine 知识扩充

2022-06-12  本文已影响0人  右耳菌

一、常用的缓存淘汰算法

缓存淘汰算法的作用是在有限的资源内,尽可能识别出哪些数据在短时间会被重复利用,从而提高缓存的命中率。常用的缓存淘汰算法有LRU、LFU、FIFO等。


二、淘汰算法W-TinyLFU

FrequencySketch

更多内容,可以

如上图所示,W-TinyLFU分为两个大类,分别是窗口缓存、主缓存:

  • 首先数据先放入窗口缓存中
  • 如果窗口缓存满了,则把数据淘汰,放入到A1中
  • 如果A2也满了,那么就会使用窗口缓存中的候选和A1 probation segment中的候选进行对比淘汰。Caffeine中具体的PK逻辑可以查看BoundedLocalCache下的admit方法
 /**
  * Determines if the candidate should be accepted into the main space, as determined by its
  * frequency relative to the victim. A small amount of randomness is used to protect against hash
  * collision attacks, where the victim's frequency is artificially raised so that no new entries
  * are admitted.
  *
  * @param candidateKey the key for the entry being proposed for long term retention
  * @param victimKey the key for the entry chosen by the eviction policy for replacement
  * @return if the candidate should be admitted and the victim ejected
  */
 @GuardedBy("evictionLock")
 boolean admit(K candidateKey, K victimKey) {
   int victimFreq = frequencySketch().frequency(victimKey);
   int candidateFreq = frequencySketch().frequency(candidateKey);
   if (candidateFreq > victimFreq) {
     return true;
   } else if (candidateFreq <= 5) {
     // The maximum frequency is 15 and halved to 7 after a reset to age the history. An attack
     // exploits that a hot candidate is rejected in favor of a hot victim. The threshold of a warm
     // candidate reduces the number of random acceptances to minimize the impact on the hit rate.
     return false;
   }
   int random = ThreadLocalRandom.current().nextInt();
   return ((random & 127) == 0);
 }

二、Caffenie的核心类图

- Caffenie的核心类图

三、Caffeine缓存的分类

Caffeine缓存的分类

四、Caffeine 操作的原子性

caffeine的load、put和invalidate操作都是原子的,这个意思是这3个操作是互斥的,load和put是不能同时执行的,load和invalidate也是不能同时执行的。

package cn.lazyfennec.caffeine;

import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import org.junit.Test;

import java.util.concurrent.atomic.AtomicInteger;

/**
 * @Author: Neco
 * @Description:
 * @Date: create in 2022/6/12 15:18
 */
public class CaffeineTest {
    @Test
    public void test() throws Exception {
        AtomicInteger atomicInteger=new AtomicInteger();
        LoadingCache<String, String> cache = Caffeine.newBuilder().maximumSize(3).build(key -> {
            Thread.sleep(1000);
            return atomicInteger.incrementAndGet()+"";
        });

        cache.get("test");
        cache.invalidate("test");

        new Thread() {
            @Override
            public void run() {
                cache.get("test");

            }
        }.start();

        new Thread() {
            @Override
            public void run() {
                try {
                    Thread.sleep(200);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                long start = System.currentTimeMillis();
                cache.invalidate("test");
                System.out.println("use ms:"+(System.currentTimeMillis() - start));
            }
        }.start();

        Thread.sleep(1200);
        System.out.println("========" + cache.asMap());
        System.out.println("========" + cache.get("test"));
    }
}
use ms:797
========{}
========3

五、Caffeine 缓存过期策略解析

可以查看类scheduleDrainBuffers中的afterWriteafterRead方法

  /**
   * Performs the post-processing work required after a write.
   *
   * @param task the pending operation to be applied
   */
  void afterWrite(Runnable task) {
    if (buffersWrites()) {
      for (int i = 0; i < WRITE_BUFFER_RETRIES; i++) {
        if (writeBuffer().offer(task)) {
          scheduleAfterWrite();
          return;
        }
        scheduleDrainBuffers();
      }

      // The maintenance task may be scheduled but not running due to all of the executor's threads
      // being busy. If all of the threads are writing into the cache then no progress can be made
      // without assistance.
      try {
        performCleanUp(task);
      } catch (RuntimeException e) {
        logger.log(Level.SEVERE, "Exception thrown when performing the maintenance task", e);
      }
    } else {
      scheduleAfterWrite();
    }
  }
  /**
   * Performs the post-processing work required after a read.
   *
   * @param node the entry in the page replacement policy
   * @param now the current time, in nanoseconds
   * @param recordHit if the hit count should be incremented
   */
  void afterRead(Node<K, V> node, long now, boolean recordHit) {
    if (recordHit) {
      statsCounter().recordHits(1);
    }

    boolean delayable = skipReadBuffer() || (readBuffer.offer(node) != Buffer.FULL);
    if (shouldDrainBuffers(delayable)) {
      scheduleDrainBuffers();
    }
    refreshIfNeeded(node, now);
  }

可以知道其中有一个scheduleDrainBuffers方法,该方法的作用就是过期任务调度的。

  /**
   * Attempts to schedule an asynchronous task to apply the pending operations to the page
   * replacement policy. If the executor rejects the task then it is run directly.
   */
  void scheduleDrainBuffers() {
    if (drainStatus() >= PROCESSING_TO_IDLE) {
      return;
    }
    if (evictionLock.tryLock()) {
      try {
        int drainStatus = drainStatus();
        if (drainStatus >= PROCESSING_TO_IDLE) {
          return;
        }
        lazySetDrainStatus(PROCESSING_TO_IDLE);
        executor().execute(drainBuffersTask);
      } catch (Throwable t) {
        logger.log(Level.WARNING, "Exception thrown when submitting maintenance task", t);
        maintenance(/* ignored */ null);
      } finally {
        evictionLock.unlock();
      }
    }
  }

又看到drainBuffersTask字样,查看可以知道这是一个PerformCleanupTask,具体查看该类细节,可以看见以下内容

PerformCleanupTask

而其中的perfermCleanUp的具体细节如下

  /**
   * Performs the maintenance work, blocking until the lock is acquired. Any exception thrown, such
   * as by {@link CacheWriter#delete}, is propagated to the caller.
   *
   * @param task an additional pending task to run, or {@code null} if not present
   */
  void performCleanUp(@Nullable Runnable task) {
    evictionLock.lock();
    try {
      maintenance(task);
    } finally {
      evictionLock.unlock();
    }
  }

再对maintenance方法进行查看

  /**
   * Performs the pending maintenance work and sets the state flags during processing to avoid
   * excess scheduling attempts. The read buffer, write buffer, and reference queues are
   * drained, followed by expiration, and size-based eviction.
   *
   * @param task an additional pending task to run, or {@code null} if not present
   */
  @GuardedBy("evictionLock")
  void maintenance(@Nullable Runnable task) {
    lazySetDrainStatus(PROCESSING_TO_IDLE);

    try {
      drainReadBuffer();

      drainWriteBuffer();
      if (task != null) {
        task.run();
      }

      drainKeyReferences();
      drainValueReferences();

      expireEntries();
      evictEntries();
    } finally {
      if ((drainStatus() != PROCESSING_TO_IDLE) || !casDrainStatus(PROCESSING_TO_IDLE, IDLE)) {
        lazySetDrainStatus(REQUIRED);
      }
    }
  }

可见以上方法中,分别执行了drainReadBuffer,drainWriteBuffer,drainKeyReferences,drainValueReferences,expireEntries,evictEntries,这些方法都是对缓存进行回收的方法。


六、总结

Caffeine是一个非常大的内容,更多的相关知识,可以查看相关的文档或者源码。


如果觉得有收获就点个赞吧,更多知识,请点击关注查看我的主页信息哦~

上一篇 下一篇

猜你喜欢

热点阅读