大数据平台技术笔记

滚动查询

2022-06-08  本文已影响0人  OkGogogooo

1. 引言

  在微服务场景下,考虑这样一个例子。

一个程序想调用一个微服务接口,读取并解析一个数据库表的数据,这个数据表有10万条数据。

  此时如果用一个微服务接口,将这10万条数据组成一个JSON格式,一次性返回,显然不合适。JSON是一种闭包形式的数据格式,有开始标记,有结束标记,通常是从头到尾解析完才能使用(现在已有库支持边解析,边回调消费,但这不是通常情况下使用JSON的方式)。服务端把这么庞大的数据构造成JSON格式-->网络传输-->客户端再解析,这过程消耗的时间和资源都是非常大的。
  我们想到JDBC是通过迭代器的思想解决这个问题的,即 “fetch -- >迭代消费-->fetch-->迭代消费-->...”。这个思路同样同样可以应用在微服务的场景下,但它是有状态的接口调用,而不是无状态的。所以当后台多实例,中间有负载均衡时,应该以适当的方式告诉负载均衡服务(例如在HTTP请求的header中加一个特殊的头),这一批调用要指向后台同一个服务实例,别分派到多个不同实例。

2. 滚动查询

  在微服务场景下,基于迭代器思想实现的这种大规模数据查询的方式,我们将之称为“滚动查询”(ScrollQuery)。它的逻辑过程时这样的:


image.png

客户端不用主动调用关闭,如果数据获取完,服务端自动会把资源关闭;如果获取了部分就放弃继续获取,则服务端在发现资源超过指定时间没有被使用之后,就会自动关闭。这个idle时间通常是30秒或1分钟。所以滚动查询需要一次性连续使用完。

3. 构件

image.png

实现:

public class ScrollQuerySite
{
    static ScrollQuerySite sInstance ;
    
    public static ScrollQuerySite getInstance()
    {
        if(sInstance == null)
            sInstance = new ScrollQuerySite() ;
        
        return sInstance ;
    }
    
    final AutoCleanHashMap<String , IScrollQuery<?>> mResMap = AutoCleanHashMap.withExpired_Idle(1, true) ;
    
    private ScrollQuerySite()
    {
    }
    
    public JSONObject scrollNext(String aHandle , int aMaxSize)
    {
        IScrollQuery<?> sq =  mResMap.remove(aHandle) ;
        Assert.notNull(sq, "查询句柄[%s]无效,可能已经过期,过期时间2分钟!" , aHandle) ;
        return sq.scrollNext(aMaxSize) ;
    }
    
    public void cacheScrollQuery(IScrollQuery<?> aScrollQuery)
    {
        if(aScrollQuery.getHandle() != null)
            mResMap.put(aScrollQuery.getHandle() , aScrollQuery) ;
    }
    
    
}
public interface IScrollQuery<T> extends Closeable
{
    JSONObject scrollNext(int aMaxSize) ;
    
    String getHandle() ;
}

一种实现

public class JScroll implements IScrollQuery<ResultSet>
{
    Connection mConn ;
    PreparedStatement mPStm ;
    ResultSet mRs ;
    
    EFunction<JSONArray, Object, SQLException> mFac ;
    Comparator<Object> mComparator ;

    String mHandle ;
    int mMaxSize ;
    
    BiPredicate<ResultSet , JSONArray> mPred ;
    
    boolean mLookAhead = false ;

    
    public JScroll(Connection aConn , PreparedStatement aPstm , ResultSet aRs , int aMaxSize
            , BiPredicate<ResultSet , JSONArray> aPred
            , EFunction<JSONArray, Object, SQLException> aFac
            , Comparator<Object> aComparator)
    {
        mConn = aConn ;
        mPStm = aPstm ;
        mRs = aRs ;
        mMaxSize = aMaxSize<=0?500:aMaxSize ;
        mPred = aPred ;
        mFac = aFac ;
        mComparator = aComparator ;
    }

    @Override
    public void close()
    {
        StreamAssist.closeAll(mRs , mPStm , mConn) ; 
    }

    @Override
    public JSONObject scrollNext(int aMaxSize)
    {
        int count = 0 ;
        String handle = null ;
        if(aMaxSize<=0)
            aMaxSize = mMaxSize ;
        JSONArray ja = new JSONArray() ;
        try
        {
            while(mLookAhead || mRs.next())
            {
                if(count++>=aMaxSize)
                {
                    handle = UUID.randomUUID().toString() ;
                    mLookAhead = true ; 
                    break ;
                }
                mLookAhead = false ;
                if(!mPred.test(mRs , ja))
                {
                    mHandle = null ;
                    return null ;
                }
            }
        }
        catch (Exception e)
        {
            WrapException.wrapThrow(e) ;
        }
        
        if(mComparator != null && ja.isNotEmpty())
            ja.sort(mComparator) ;
        
        Object data = ja ;
        JSONObject resultJo = null ;
        if(mFac != null)
        {
            try
            {
                Object genObj = mFac.apply(ja) ;
                if(!(genObj instanceof JSONObject))
                {
                    data = genObj ;
                }
                else
                {
                    resultJo = (JSONObject)genObj ;
                }
            }
            catch (Exception e)
            {
                WrapException.wrapThrow(e) ;
            }
        }
        
        mHandle = handle ;
        if(handle != null)
            ScrollQuerySite.getInstance().cacheScrollQuery(this) ;
        
        if(resultJo == null)
            resultJo = new JSONObject() ;
        
        return resultJo.put("data", data)
                .put("returnAmount", ja.length())
                .put("handle", mHandle)
                .put("hasMore", mHandle != null) ;
    }

    @Override
    public String getHandle()
    {
        return mHandle ;
    }

}
上一篇下一篇

猜你喜欢

热点阅读