使用DelayQueue 和 FutureTask 实现java
2017-08-01 本文已影响550人
jijs
使用DelayQueue、ConcurrentHashMap、FutureTask实现的缓存工具类。
DelayQueue 简介
DelayQueue是一个支持延时获取元素的无界阻塞队列。DelayQueue内部队列使用PriorityQueue来实现。队列中的元素必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。
DelayQueue非常有用,可以将DelayQueue运用在以下应用场景。
- 缓存系统的设计:可以用DelayQueue保存缓存元素的有效期,使用一个线程循环查询
DelayQueue,一旦能从DelayQueue中获取元素时,表示缓存有效期到了。 - 定时任务调度:使用DelayQueue保存当天将会执行的任务和执行时间,一旦从
DelayQueue中获取到任务就开始执行,比如TimerQueue就是使用DelayQueue实现的。
ConcurrentHashMap和FutureTask,详见以下:
缓存工具类实现
- 支持缓存多长时间,单位毫秒。
- 支持多线程并发。
比如:有一个比较耗时的操作,此时缓冲中没有此缓存值,一个线程开始计算这个耗时操作,而再次进来线程就不需要再次进行计算,只需要等上一个线程计算完成后(使用FutureTask)返回该值即可。
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
/**
* @author jijs
* @date 2017/08/04
*/
public class CacheBean<V> {
// 缓存计算的结果
private final static ConcurrentMap<String, Future<Object>> cache = new ConcurrentHashMap<>();
// 延迟队列来判断那些缓存过期
private final static DelayQueue<DelayedItem<String>> delayQueue = new DelayQueue<>();
// 缓存时间
private final int ms;
static {
// 定时清理过期缓存
Thread t = new Thread() {
@Override
public void run() {
dameonCheckOverdueKey();
}
};
t.setDaemon(true);
t.start();
}
private final Computable<V> c;
/**
* @param c Computable
*/
public CacheBean(Computable<V> c) {
this(c, 60 * 1000);
}
/**
* @param c Computable
* @param ms 缓存多少毫秒
*/
public CacheBean(Computable<V> c, int ms) {
this.c = c;
this.ms = ms;
}
public V compute(final String key) throws InterruptedException {
while (true) {
//根据key从缓存中获取值
Future<V> f = (Future<V>) cache.get(key);
if (f == null) {
Callable<V> eval = new Callable<V>() {
public V call() {
return (V) c.compute(key);
}
};
FutureTask<V> ft = new FutureTask<>(eval);
//如果缓存中存在此可以,则返回已存在的value
f = (Future<V>) cache.putIfAbsent(key, (Future<Object>) ft);
if (f == null) {
//向delayQueue中添加key,并设置该key的存活时间
delayQueue.put(new DelayedItem<>(key, ms));
f = ft;
ft.run();
}
}
try {
return f.get();
} catch (CancellationException e) {
cache.remove(key, f);
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
/**
* 检查过期的key,从cache中删除
*/
private static void dameonCheckOverdueKey() {
DelayedItem<String> delayedItem;
while (true) {
try {
delayedItem = delayQueue.take();
if (delayedItem != null) {
cache.remove(delayedItem.getT());
System.out.println(System.nanoTime() + " remove " + delayedItem.getT() + " from cache");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class DelayedItem<T> implements Delayed {
private T t;
private long liveTime;
private long removeTime;
public DelayedItem(T t, long liveTime) {
this.setT(t);
this.liveTime = liveTime;
this.removeTime = TimeUnit.MILLISECONDS.convert(liveTime, TimeUnit.MILLISECONDS) + System.currentTimeMillis();
}
@Override
public int compareTo(Delayed o) {
if (o == null)
return 1;
if (o == this)
return 0;
if (o instanceof DelayedItem) {
DelayedItem<T> tmpDelayedItem = (DelayedItem<T>) o;
if (liveTime > tmpDelayedItem.liveTime) {
return 1;
} else if (liveTime == tmpDelayedItem.liveTime) {
return 0;
} else {
return -1;
}
}
long diff = getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS);
return diff > 0 ? 1 : diff == 0 ? 0 : -1;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(removeTime - System.currentTimeMillis(), unit);
}
public T getT() {
return t;
}
public void setT(T t) {
this.t = t;
}
@Override
public int hashCode() {
return t.hashCode();
}
@Override
public boolean equals(Object object) {
if (object instanceof DelayedItem) {
return object.hashCode() == hashCode() ? true : false;
}
return false;
}
}
Computable接口
/**
* @author jijs
* @date 2017/08/04
*/
public interface Computable<V> {
V compute(String k);
}
测试类
/**
* @author jijs
* @date 2017/08/04
*/
public class FutureTaskDemo {
public static void main(String[] args) throws InterruptedException {
// 子线程
Thread t = new Thread(() -> {
CacheBean<String> cb = new CacheBean<>(k -> {
try {
System.out.println("模拟计算数据,计算时长2秒。key=" + k);
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "你好:" + k;
}, 5000);
try {
while (true) {
System.out.println("thead2:" + cb.compute("b"));
TimeUnit.SECONDS.sleep(1);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
t.start();
// 主线程
while (true) {
CacheBean<String> cb = new CacheBean<>(k -> {
try {
System.out.println("模拟计算数据,计算时长2秒。key=" + k);
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "你好:" + k;
}, 5000);
System.out.println("thead1:" + cb.compute("b"));
TimeUnit.SECONDS.sleep(1);
}
}
}
执行结果:
Paste_Image.png
两个线程同时访问同一个key的缓存。从执行结果发现,每次缓存失效后,同一个key只执行一次计算,而不是多个线程并发执行同一个计算然后缓存。
想了解更多精彩内容请关注我的公众号