GraphQL AI-大数据

第8课 GraphQL集成DataLoader

2021-09-08  本文已影响0人  笔名辉哥

N+1问题

首先来看看什么是N+1问题,假设我们有如下模型:

image

然后我们有这样的查询:

  school(schoolId:"school1"){
    teachers
    {
      teacherName
    }
  }

会得到类似下面的结果:

  "data": {
    "school": {
      "teachers": [
        {
          "teacherName": "老师11"
        },
        {
          "teacherName": "老师12"
        },
        {
          "teacherName": "老师13"
        }
      ]
    }
  }

根据我们之前的经验,GraphQL会这样执行查询逻辑:

  1. 根据schoolId查到学校里的teacherId列表
  2. 遍历TeacherId列表,查到每个Teacher对象
  3. 获取Teacher对象的teacherName属性

很容易发现,遍历teacherId列表取查询每个Teacher对象是极不经济的,而N+1指的就是N次Teacher查询+1次teacherId列表的查询。

Java-DataLoader

DataLoader通过将多次查询合并成一次来减少查询次数。比如上面的例子,需要执行三次对Teacher对象的查询,DataLoader会自动将三次查询合并成一次批量查询。除此之外,就算是不同层级的查询,DataLaoder也会自动进行合并,比如将上面的查询改成:

## 查询全校老师名字和每个班级里的老师名字
school(schoolId:"school1"){
  classList{
    teachers{
      teacherName
    }
  }
  teachers
  {
    teacherName
  }
}

虽然是不同层级,DataLoader也会将针对Teacher对象的查询合并成一次批量查询,同时会过滤掉重复的TeacherId,保证最佳的查询性能。

GraphQL集成Java-DataLoader

这里以优化Teacher对象的查询演示如何集成Java-DataLoader(GraphQL-JAVA默认引入了Java-DataLoader,不需要额外引入)。

1. 实现TeacherDataLoader

class TeacherDataLoader : BatchLoader<String, Teacher> {
    override fun load(keys: List<String>): CompletableFuture<List<Teacher>> {
        return CompletableFuture.supplyAsync {
            // 这里是根据ID批量查询Teacher列表
            DataStore.getTeachersByTeacherIds(keys)
        }
    }
}

2. 注入DataLoader

val executionInputBuilder = ExecutionInput.Builder()

// 省略其他内容的注入
// ...

val dataLoaderRegister = DataLoaderRegistry()
dataLoaderRegister.register("teacherBatchLoader", DataLoader.newDataLoader(TeacherDataLoader))
executionInputBuilder.dataLoaderRegistry(buildDataLoaderRegistry())

val executionResult = graphQL.execute(executionInput)

3. 改写Resolver

将所有对Teacher的Fetch修改为通过DataLoader中间层取获取数据(这里以需改SchoolResolver中的teachers为例):

class SchoolResolver : GraphQLResolver<School> {
    fun teachers(school: School, env: DataFetchingEnvironment): CompletableFuture<List<Teacher>> {
        val teacherIds = DataStore.schoolStore.first { it.schoolId == school.schoolId }.teachers
        val dataLoader = env.getDataLoader<String, Teacher>(DataLoaderConstants.TEACHER_DATA_LOADER)
        return dataLoader.loadMany(teacherIds)
    }
}

通过以上三步就完成了DataLoader的集成,接下来分析DataLoader的工作原理。

DataLoader原理

DataLoader主要利用了Java的CompletableFuture异步任务收集再批量处理,最后将结果写回对应任务。

image

以下是部分重点源码解读:

// key的合并和缓存处理
CompletableFuture<V> load(K key, Object loadContext) {
        synchronized (dataLoader) {
            Object cacheKey = getCacheKey(nonNull(key));
            stats.incrementLoadCount();

            boolean batchingEnabled = loaderOptions.batchingEnabled();
            boolean cachingEnabled = loaderOptions.cachingEnabled();

            // cache是默认开启的,同样的key直接拿缓存
            if (cachingEnabled) {
                if (futureCache.containsKey(cacheKey)) {
                    stats.incrementCacheHitCount();
                    return futureCache.get(cacheKey);
                }
            }

            CompletableFuture<V> future = new CompletableFuture<>();
            if (batchingEnabled) {
                //把key和future对应收集起来,合并key批量查询后写回future
                loaderQueue.add(new LoaderQueueEntry<>(key, future, loadContext));
            } else {
                stats.incrementBatchLoadCountBy(1);
                // immediate execution of batch function
                future = invokeLoaderImmediately(key, loadContext);
            }
            if (cachingEnabled) {
                futureCache.set(cacheKey, future);
            }
            return future;
        }
    }

// 调用我们写的DataLoader
private CompletableFuture<List<V>> dispatchQueueBatch(List<K> keys, List<Object> callContexts, List<CompletableFuture<V>> queuedFutures) {
        stats.incrementBatchLoadCountBy(keys.size());
        // 调用我们写的TeacherDataLoader
        CompletionStage<List<V>> batchLoad = invokeLoader(keys, callContexts);
        return batchLoad
                .toCompletableFuture()
                .thenApply(values -> {
                    // keys和结果一定要对应,一个key对应一个future,一个future对应一个结果
                    assertResultSize(keys, values);

                    for (int idx = 0; idx < queuedFutures.size(); idx++) {
                        Object value = values.get(idx);
                        CompletableFuture<V> future = queuedFutures.get(idx);
                        if (value instanceof Throwable) {
                            stats.incrementLoadErrorCount();
                            future.completeExceptionally((Throwable) value);
                            // we don't clear the cached view of this entry to avoid
                            // frequently loading the same error
                        } else if (value instanceof Try) {
                            // we allow the batch loader to return a Try so we can better represent a computation
                            // that might have worked or not.
                            Try<V> tryValue = (Try<V>) value;
                            if (tryValue.isSuccess()) {
                                future.complete(tryValue.get());
                            } else {
                                stats.incrementLoadErrorCount();
                                future.completeExceptionally(tryValue.getThrowable());
                            }
                        } else {
                            // 把结果写回缓存中的future
                            V val = (V) value;
                            future.complete(val);
                        }
                    }
                    return values;
                }).exceptionally(ex -> {
                    stats.incrementBatchLoadExceptionCount();
                    for (int idx = 0; idx < queuedFutures.size(); idx++) {
                        K key = keys.get(idx);
                        CompletableFuture<V> future = queuedFutures.get(idx);
                        future.completeExceptionally(ex);
                        // clear any cached view of this key because they all failed
                        dataLoader.clear(key);
                    }
                    return emptyList();
                });
    }

除此之外Java-DataLoader还做了一个Statistics用于收集DataLoader执行过程中的状态,比如缓存命中多少次,已经load了多少个对象,有多少次error等。默认情况下是不会执行数据收集的,需要通过DataLoaderDispatcherInstrumentation进行注入:

val options = DataLoaderDispatcherInstrumentationOptions
        .newOptions().includeStatistics(true)
val dispatcherInstrumentation = DataLoaderDispatcherInstrumentation(options)

但是有一个问题,在构建GraphQL时只支持一个instrumentation,那么是不是我们仅只能写一个instrumentation呢?好在GraphQL用组合模式提供了一个ChainedInstrumentation,我们得以组合多个instrumentation。

作者:Johnny_
链接:https://www.jianshu.com/p/196472a0f813
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

上一篇下一篇

猜你喜欢

热点阅读