滚动查询
2022-06-08 本文已影响0人
OkGogogooo
1. 引言
在微服务场景下,考虑这样一个例子。
一个程序想调用一个微服务接口,读取并解析一个数据库表的数据,这个数据表有10万条数据。
此时如果用一个微服务接口,将这10万条数据组成一个JSON格式,一次性返回,显然不合适。JSON是一种闭包形式的数据格式,有开始标记,有结束标记,通常是从头到尾解析完才能使用(现在已有库支持边解析,边回调消费,但这不是通常情况下使用JSON的方式)。服务端把这么庞大的数据构造成JSON格式-->网络传输-->客户端再解析,这过程消耗的时间和资源都是非常大的。
我们想到JDBC是通过迭代器的思想解决这个问题的,即 “fetch -- >迭代消费-->fetch-->迭代消费-->...”。这个思路同样同样可以应用在微服务的场景下,但它是有状态的接口调用,而不是无状态的。所以当后台多实例,中间有负载均衡时,应该以适当的方式告诉负载均衡服务(例如在HTTP请求的header中加一个特殊的头),这一批调用要指向后台同一个服务实例,别分派到多个不同实例。
2. 滚动查询
在微服务场景下,基于迭代器思想实现的这种大规模数据查询的方式,我们将之称为“滚动查询”(ScrollQuery)。它的逻辑过程时这样的:
image.png
客户端不用主动调用关闭,如果数据获取完,服务端自动会把资源关闭;如果获取了部分就放弃继续获取,则服务端在发现资源超过指定时间没有被使用之后,就会自动关闭。这个idle时间通常是30秒或1分钟。所以滚动查询需要一次性连续使用完。
3. 构件
image.png实现:
public class ScrollQuerySite
{
static ScrollQuerySite sInstance ;
public static ScrollQuerySite getInstance()
{
if(sInstance == null)
sInstance = new ScrollQuerySite() ;
return sInstance ;
}
final AutoCleanHashMap<String , IScrollQuery<?>> mResMap = AutoCleanHashMap.withExpired_Idle(1, true) ;
private ScrollQuerySite()
{
}
public JSONObject scrollNext(String aHandle , int aMaxSize)
{
IScrollQuery<?> sq = mResMap.remove(aHandle) ;
Assert.notNull(sq, "查询句柄[%s]无效,可能已经过期,过期时间2分钟!" , aHandle) ;
return sq.scrollNext(aMaxSize) ;
}
public void cacheScrollQuery(IScrollQuery<?> aScrollQuery)
{
if(aScrollQuery.getHandle() != null)
mResMap.put(aScrollQuery.getHandle() , aScrollQuery) ;
}
}
public interface IScrollQuery<T> extends Closeable
{
JSONObject scrollNext(int aMaxSize) ;
String getHandle() ;
}
一种实现
public class JScroll implements IScrollQuery<ResultSet>
{
Connection mConn ;
PreparedStatement mPStm ;
ResultSet mRs ;
EFunction<JSONArray, Object, SQLException> mFac ;
Comparator<Object> mComparator ;
String mHandle ;
int mMaxSize ;
BiPredicate<ResultSet , JSONArray> mPred ;
boolean mLookAhead = false ;
public JScroll(Connection aConn , PreparedStatement aPstm , ResultSet aRs , int aMaxSize
, BiPredicate<ResultSet , JSONArray> aPred
, EFunction<JSONArray, Object, SQLException> aFac
, Comparator<Object> aComparator)
{
mConn = aConn ;
mPStm = aPstm ;
mRs = aRs ;
mMaxSize = aMaxSize<=0?500:aMaxSize ;
mPred = aPred ;
mFac = aFac ;
mComparator = aComparator ;
}
@Override
public void close()
{
StreamAssist.closeAll(mRs , mPStm , mConn) ;
}
@Override
public JSONObject scrollNext(int aMaxSize)
{
int count = 0 ;
String handle = null ;
if(aMaxSize<=0)
aMaxSize = mMaxSize ;
JSONArray ja = new JSONArray() ;
try
{
while(mLookAhead || mRs.next())
{
if(count++>=aMaxSize)
{
handle = UUID.randomUUID().toString() ;
mLookAhead = true ;
break ;
}
mLookAhead = false ;
if(!mPred.test(mRs , ja))
{
mHandle = null ;
return null ;
}
}
}
catch (Exception e)
{
WrapException.wrapThrow(e) ;
}
if(mComparator != null && ja.isNotEmpty())
ja.sort(mComparator) ;
Object data = ja ;
JSONObject resultJo = null ;
if(mFac != null)
{
try
{
Object genObj = mFac.apply(ja) ;
if(!(genObj instanceof JSONObject))
{
data = genObj ;
}
else
{
resultJo = (JSONObject)genObj ;
}
}
catch (Exception e)
{
WrapException.wrapThrow(e) ;
}
}
mHandle = handle ;
if(handle != null)
ScrollQuerySite.getInstance().cacheScrollQuery(this) ;
if(resultJo == null)
resultJo = new JSONObject() ;
return resultJo.put("data", data)
.put("returnAmount", ja.length())
.put("handle", mHandle)
.put("hasMore", mHandle != null) ;
}
@Override
public String getHandle()
{
return mHandle ;
}
}