利用Callable,Future以及FutureTask实现一

2017-01-11  本文已影响69人  evil_ice
一,利用Callable,Future以及FutureTask实现一个缓存

利用Callable,Future以及FutureTask实现一个缓存实现一个多线程环境下的缓存
一步一步进行设计分析优化

二,实现
interface Computable<K, V>{
    V compute(K key) throws InterruptedException;
}

class Memoizer<K, V> {
    private final Map<K,V> cache = new HashMap<K,V>();
    Computable<K, V> computation;
    Memoizer(Computable<K, V> computation){
        this.computation = computation;
    }
    public synchronized V get(K key) throws InterruptedException {
              //从缓存中取结果. 如果结果存在,则直接返回; 若结果不存在,则进行计算
        V result = cache.get(key);
        if(result == null){
            result = computation.compute(key);
            cache.put(key,result);
        }
        return result;
    }
}

上面的代码实现存在一个问题-----伸缩性问题. 由于get方法被synchronized关键字修饰,每次只能有一个线程能够取结果.如果当一个线程结果不存在,需要计算执行的时候,而这个任务执行时间很长,那么其他线程就需要等待很长时间.
我们可以使用线程安全的ConcurrentHashMap进行替换HashMap,且去掉get方法的synchronized关键字

interface Computable<K, V>{
    V compute(K key) throws InterruptedException;
}

class Memoizer<K, V> {
    private final Map<K,V> cache = new ConcurrentHashMap<K,V>();
    Computable<K, V> computation;
    Memoizer(Computable<K, V> computation){
        this.computation = computation;
    }
    public V get(K key) throws InterruptedException {
        //此处由ConcurrentHashMap来保证线程安全
        V result = cache.get(key);
        //以下代码可以并发执行
        if(result == null){
            result = computation.compute(key);
            cache.put(key,result);
        }
        return result;
    }
}

这种方案解决了伸缩性问题,吞吐量得到了提高.
但是存在一个重复计算的问题. 假设一个任务需要执行很长时间,当第一次通过get取结果的时候, 结果为空,需要执行,且是长时间的执行,那么这时候第二线程也过来取同样key的结果,第一次进行运算的结果(需要长时间运行)还没有出来,那么第二次得到的结果也是空(第二个线程不知道其他线程正在进行计算),那么再次进行了重复的运算.
在这里我们考虑使用FutureTask进行优化,利用他的FutureTask的get方法.

interface Computable<K, V>{
    V compute(K key) throws InterruptedException;
}

class Memoizer<K, V> {
    private final Map<K,Future<V>> cache = new ConcurrentHashMap<K,Future<V>>();
    Computable<K, V> computation;
    Memoizer(Computable<K, V> computation){
        this.computation = computation;
    }
    public V get(K key) throws InterruptedException {
        //此处由ConcurrentHashMap来保证线程安全
        Future<V> result = cache.get(key);
        
        if(result == null){
            Callable<V> task = new Callable<V>(){
                @Override
                public V call() throws Exception {
                    return computation.compute(key);
                }
            };
            
            FutureTask<V> futureTask = new FutureTask<V>(task);
            //缓存不存在则加入
            result = cache.putIfAbsent(key, futureTask);
            if(result == null){
                result = futureTask;
                futureTask.run();
            }
            
            try {
                //等待运算结果返回
                return result.get();
            } catch (ExecutionException e) {
                e.printStackTrace();
                //遇到任务的终止,或者其他异常则移除缓存
                cache.remove(key,result);
            }
        }
        return null;
        
    }
}

这个缓存实现存在一个问题-----缓存逾期的问题

参考:
<<java编发编程实战>>

上一篇下一篇

猜你喜欢

热点阅读