MongoDB $merge $out 去除返回结果

2022-01-05  本文已影响0人  itkkanae

在MongoTemplate中使用带merge的聚合操作时,发现返回了into表的全表查询结果。实际使用merge通常是要进行聚合插入,不需要查询结果。
去除返回结果需要给Aggregation添加skipOutput选项,具体如下:

    @Test
    void mergeTest() throws InterruptedException {
        // 添加skipOutput去除merge返回结果
        Aggregation aggregation = Aggregation.newAggregation(
                Aggregation.match(Criteria.where("a").is("b")),
                Aggregation.merge().intoCollection("newCollection").on("_id").build()
        ).withOptions(AggregationOptions.builder().skipOutput().build());

        // 下面两种均适用
        reactiveMongoTemplate.aggregate(aggregation, "collection", Document.class)
                .subscribe(document -> System.out.println(document));
        List<Document> mappedResults = mongoTemplate.aggregate(aggregation, "collection", Document.class)
                .getMappedResults();

    }

正常使用数据库时,聚合操作中使用merge并不会返回查询结果,造成返回into表查询结果的原因是template在merge和out聚合操作后添加了find操作,即进行了两次请求

添加的位置分别在AggregateIterableImpl和AsyncAggregateIterableImpl中

class AggregateIterableImpl<TDocument, TResult> extends MongoIterableImpl<TResult> implements AggregateIterable<TResult> {

    public ReadOperation<BatchCursor<TResult>> asReadOperation() {
        MongoNamespace outNamespace = getOutNamespace();
        // 此处判断是否含有merge或out
        if (outNamespace != null) {
            getExecutor().execute(operations.aggregateToCollection(pipeline, maxTimeMS, allowDiskUse, bypassDocumentValidation, collation,
                    hint, comment, aggregationLevel), getReadConcern(), getClientSession());

            FindOptions findOptions = new FindOptions().collation(collation);
            Integer batchSize = getBatchSize();
            if (batchSize != null) {
                findOptions.batchSize(batchSize);
            }
            // 此处添加findOptions
            return operations.find(outNamespace, new BsonDocument(), resultClass, findOptions);
        } else {
            return operations.aggregate(pipeline, resultClass, maxTimeMS, maxAwaitTimeMS, getBatchSize(), collation,
                    hint, comment, allowDiskUse, aggregationLevel);
        }
    }

}
class AsyncAggregateIterableImpl<TDocument, TResult> extends AsyncMongoIterableImpl<TResult> implements AsyncAggregateIterable<TResult> {

    AsyncReadOperation<AsyncBatchCursor<TResult>> asAsyncReadOperation() {
        MongoNamespace outNamespace = getOutNamespace();
        // 此处判断是否含有merge或out
        if (outNamespace != null) {
            AsyncWriteOperation<Void> aggregateToCollectionOperation =
                    operations.aggregateToCollection(pipeline, maxTimeMS, allowDiskUse, bypassDocumentValidation, collation, hint, comment,
                            aggregationLevel);

            FindOptions findOptions = new FindOptions().collation(collation);
            Integer batchSize = getBatchSize();
            if (batchSize != null) {
                findOptions.batchSize(batchSize);
            }

            AsyncReadOperation<AsyncBatchCursor<TResult>> findOperation =
                    operations.find(outNamespace, new BsonDocument(), resultClass, findOptions);
            // 此处添加findOptions
            return new WriteOperationThenCursorReadOperation<TResult>(aggregateToCollectionOperation, findOperation);
        } else {
            return operations.aggregate(pipeline, resultClass, maxTimeMS, maxAwaitTimeMS, getBatchSize(), collation,
                    hint, comment, allowDiskUse, aggregationLevel);
        }

    }

}
上一篇 下一篇

猜你喜欢

热点阅读