Flink维表关联系列3-LRU策略

2019-11-08  本文已影响0人  LZhan
1.前言

LRU(Least Recently Used),最近最少使用缓存淘汰算法,认为最近访问过的数据在将来被访问的概率也比较大,当内存达到上限去淘汰那些最近访问较少的数据。

在Flink中做维表关联时,如果维表的数据比较大,无法一次性全部加载到内存中,而在业务上也允许一定的延时,那么可以使用LRU策略加载维表数据。
但是如果一条维表数据一直都被缓存命中,这条数据永远都不会被淘汰,这时维表的数据已经发生改变,那么将会在很长时间或者永远都无法更新这条改变,所以需要设置缓存超时时间TTL,当缓存时间超过ttl,会强制性使其失效重新从外部加载进来。

2.常用的LRU使用

<1> Guava Cache
google guava提供了Cache缓存模块,轻量级,适合做本地缓存,能够做到以下几点:
a.可配置本地缓存大小
b.可配置缓存过期时间
c.可配置淘汰策略

实例,通常在使用时,是在继承异步查询类RichAsyncFunction中的open方法中定义一个缓存,如下所示,
应用CacheBuilder来构建

 @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);

        this._storeCache = CacheBuilder
                .newBuilder()
                .expireAfterWrite(7, TimeUnit.DAYS)
                .concurrencyLevel(this.capacity)
                .build(new CacheLoader<Long, Optional<Store>>() {

                    @Override
                    public Optional<Store> load(Long storeId) throws Exception {
                        Store s = null;

                        List<Store> results = createQueryRunner()
                                .query(JoinStore.this.sql, new BeanListHandler<>(Store.class), storeId);
                        if (results != null && results.size() > 0) {
                            s =  results.get(0);
                        }

                        return Optional.ofNullable(s);
                    }

                });
    }

    private Future<Optional<Item>> query(Long itemId) {
        return asyncCall(() -> this._itemCache.get(itemId));
    }

CacheBuilder通过使用CacheLoader进行自动加载,通过使用get方法获得返回结果,如果有则直接返回,如果没有,则使用CacheLoader的load方法去运算,接着将结果放到缓存中,最后才返回结果。


<2> LinkedHashMap
LinkedHashMap是双向链表+hash表的结构,普通的hash表访问是没有顺序的,通过加上元素之间的指向关系保证元素之间的顺序,默认是按照插入顺序的,插入是链表尾部,取数据是链表头部,也就是访问的顺序与插入的顺序是一致的。要想其具有LRU特性,那么就将其改为访问顺序,插入还是在链表尾部,但是数据访问会将其移动达到链表的尾部,那么最近插入或者访问的数据永远都在链表尾部,被访问较少的数据就在链表的头部,给 LinkedHashMap设置一个大小,当数据大小超过该值,就直接从链表头部移除数据。
LinkedHashMap本身不具有ttl功能,就是无法知晓数据是否过期,可以通过给数据封装一个时间字段insertTimestamp,表示数据加载到内存的时间,当这条记录被命中,首先判断当前时间currentTimestamp与insertTimestamp差值是否达到ttl, 如果达到了就重新从外部存储中查询加载到内存中。

3.LRU方式读取Hbase

实现思路:

public class HbaseAsyncLRU extends RichAsyncFunction<OrderItemOld, OrderItemOld> {



    private HBaseClient hBaseClient;

    private LoadingCache<Long, Optional<Item>> _memberCache;

    private String tableName;

    public HbaseAsyncLRU(String tableName,String zk){
        this.hBaseClient=new HBaseClient(zk);
        this.tableName=tableName;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        _memberCache= CacheBuilder
                .newBuilder()
                .maximumSize(2000)
                .expireAfterWrite(1, TimeUnit.HOURS)
                .build(new CacheLoader<Long, Optional<Item>>(){

                    @Override
                    public Optional<Item> load(Long memberId) throws Exception {

                        //执行hbase查询工作
                        GetRequest get=new GetRequest(tableName,memberId.toString());
                        hBaseClient.get(get).addCallbacks(new Callback<String, ArrayList<KeyValue>>() {
                            @Override
                            public String call(ArrayList<KeyValue> keyValues) throws Exception {
                                //todo
                                return null;
                            }
                        }, new Callback<String, Exception>() {
                            @Override
                            public String call(Exception e) throws Exception {
                                //todo
                                return null;
                            }
                        });

                        return Optional.empty();
                    }
                });
    }


    @Override
    public void asyncInvoke(OrderItemOld orderItemOld, ResultFuture<OrderItemOld> resultFuture) throws Exception {

    }

    @Override
    public void timeout(OrderItemOld input, ResultFuture<OrderItemOld> resultFuture) throws Exception {

    }

    @Override
    public void close() throws Exception {
        super.close();
    }

}

AsyncHBase的文档地址 http://opentsdb.github.io/asynchbase/ 打不开了,所以关于HBaseClient的部分没有完成。 等之后看看能不能用梯子打开这个文档,再将代码补充完成。

上一篇下一篇

猜你喜欢

热点阅读