mongoDB聚合和管道, 2022-09-02
(2022.09.02 Fri)
MongoDB提供了三种聚合(aggregation)方法:
- 聚合流程(the aggregation pipeline)
- 单目标聚合(single purpose aggregation method)
- map-reduce函数
MongoDB的聚合架构建立在数据处理流程基础之上(data processing pipeline)。文档可通过多步流程转换为聚合结果。
聚合操作可用于:
- 将多个文档中的数据分组
- 对分组数据进行操作并返回单一值(a single result)
- 分析数据如何随时间变化
聚合流程 aggregation pipeline
Aggregation pipeline包含了处理文档的一个或多个阶段:
- 每个阶段对输入文档执行一个操作,比如文档过滤(filter document),文档分组(group documents),和计算统计值
- pipeline中每个阶段的文档输出传入下个阶段作为文档输入
- Agg pipeline可以为文档分组产生聚合统计值,比如求和、平均、极大、极小值等。
从MongoDB 4.2版本开始,可通过agg pipeline修改文档的内容
注意,
以
db.<coll_name>.aggregate()
形式执行的aggregation pipeline并不会修改colleciton中的文档,除非pipeline包含了$merge
或$out
。
案例
下面给出若干案例,基于如下文档,该文档名testmongo
,保存披萨饼店订单数据的一部分
db.testmongo.insertMany( [
{ _id: 0, name: "Pepperoni", size: "small", price: 19,
quantity: 10, date: ISODate( "2021-03-13T08:14:30Z" ) },
{ _id: 1, name: "Pepperoni", size: "medium", price: 20,
quantity: 20, date : ISODate( "2021-03-13T09:13:24Z" ) },
{ _id: 2, name: "Pepperoni", size: "large", price: 21,
quantity: 30, date : ISODate( "2021-03-17T09:22:12Z" ) },
{ _id: 3, name: "Cheese", size: "small", price: 12,
quantity: 15, date : ISODate( "2021-03-13T11:21:39.736Z" ) },
{ _id: 4, name: "Cheese", size: "medium", price: 13,
quantity:50, date : ISODate( "2022-01-12T21:23:13.331Z" ) },
{ _id: 5, name: "Cheese", size: "large", price: 14,
quantity: 10, date : ISODate( "2022-01-12T05:08:13Z" ) },
{ _id: 6, name: "Vegan", size: "small", price: 17,
quantity: 10, date : ISODate( "2021-01-13T05:08:13Z" ) },
{ _id: 7, name: "Vegan", size: "medium", price: 18,
quantity: 10, date : ISODate( "2021-01-13T05:10:13Z" ) }
] )
案例1建立另一个agg pipeline,其中含有两个阶段,返回了medium尺寸披萨的订购量,并按披萨名字分组。
db.orders.aggregate( [
// Stage 1: 用$match方法找出所有medium尺寸披萨订单
{
$match: { size: "medium" }
},
// Stage 2: 对前一个阶段的输出结果按名字分组,并计算订单量
{
$group: { _id: "$name", totalQuantity: { $sum: "$quantity" } }
}
] )
返回
{ "_id" : "Pepperoni", "totalQuantity" : 20 }
{ "_id" : "Cheese", "totalQuantity" : 50 }
{ "_id" : "Vegan", "totalQuantity" : 10 }
在该pipeline中,aggregate
方法的参数是一个列表[ ]
,不同步骤按从左到右的顺序依次排列。
$match
阶段,找出所有size=medium的订单。$group
阶段,按披萨名字对前一步的输出结果分组,按组计算quantity
之和,并赋名为totalQuantity
。
案例2指定了两个日期,计算在这两个日期之间的订单价值和平均单量。
db.testmongo.aggregate( [
// 阶段1,指定日期间隔
{
$match:
{
"date": { $gte: new ISODate( "2020-01-30" ), $lt: new ISODate( "2022-01-30" ) }
}
},
// 阶段2,对选定日期的文档计算所求值
{
$group:
{
_id: { $dateToString: { format: "%Y-%m-%d", date: "$date" } },
totalOrderValue: { $sum: { $multiply: [ "$price", "$quantity" ] } },
averageOrderQuantity: { $avg: "$quantity" }
}
},
// 阶段3,对文档中的totalOrderValue按降序排列
{
$sort: { totalOrderValue: -1 }
}
] )
返回
{ "_id" : "2022-01-12", "totalOrderValue" : 790, "averageOrderQuantity" : 30 }
{ "_id" : "2021-03-13", "totalOrderValue" : 770, "averageOrderQuantity" : 15 }
{ "_id" : "2021-03-17", "totalOrderValue" : 630, "averageOrderQuantity" : 30 }
{ "_id" : "2021-01-13", "totalOrderValue" : 350, "averageOrderQuantity" : 10 }
限制
agg pipeline如三方面限制
- 结果尺寸限制:
aggregate
命令可以默认返回一个光标(cursor)或将结果存入colleciton中。输出结果的文档应该服从BSON文件的16MB(megabyte)限制。若单一文档超过该限制,则aggregation返回错误。该限制只应用于最终返回文档,而pipeline处理过程中,文档可以超过此限制。 - 阶段数的限制:从MongoDB 5.0开始,单一aggregation pipeline中最多包含1000个阶段。
- 内存限制:从MongoDB 6.0开始
allowDiskUseByDefault
参数用以控制pipeline是否需要100MB内存执行默认情况下写入磁盘的临时文件。若该参数设为true
,pipeline的步骤中需要超过100MB内存才能执行的步骤将默认将临时文件写入磁盘(pipeline stages that require more than 100 megabytes of memory to execute write temporary files to disk by default)。设为false
,不写入磁盘而直接返回错误。注意$search
聚合步骤并不受限于100MB内存,因为它运行于一个独立进程。
可将临时文件写入磁盘的阶段包括,$bucket
,$bucketAuto
,$group
,$setWindowFields
,$sort
,$sortByCount
。
如果含有$sort
的pipeline超过了内存限制,可以考虑加入$limit
步骤。
从MongoDB 4.2开始profiler log messages和diagnostic log messages包含了usedDisk
标志位,应对pipeline出现因内存限制写数据到临时文件的情况。
pipeline可操作的步骤和表达式列表
-
$project
:修改输入文档的结构。可以用来重命名、增加或删除域,也可以用于创建计算结果以及嵌套文档。 -
$match
:用于过滤数据,只输出符合条件的文档。$match使用MongoDB的标准查询操作。 -
$limit
:用来限制MongoDB聚合管道返回的文档数。 -
$skip
:在聚合管道中跳过指定数量的文档,并返回余下的文档。 -
$unwind
:将文档中的某一个数组类型字段拆分成多条,每条包含数组中的一个值。 -
$group
:将集合中的文档分组,可用于统计结果。 -
$sort
:将输入文档排序后输出。 -
$geoNear
:输出接近某一地理位置的有序文档。
1 | 2 | 3 |
---|---|---|
$sum |
计算总和 | db.<coll_name>.aggregate([{$group : {_id : "$by_user", num_tutorial : {$sum : "$likes"}}}]) |
$avg |
计算平均值 | db.<coll_name>.aggregate([{$group : {_id : "$by_user", num_tutorial : {$avg : "$likes"}}}]) |
$min |
获取集合中所有文档对应值得最小值 | db.<coll_name>.aggregate([{$group : {_id : "$by_user", num_tutorial : {$min : "$likes"}}}]) |
$max |
获取集合中所有文档对应值得最大值 | db.<coll_name>.aggregate([{$group : {_id : "$by_user", num_tutorial : {$max : "$likes"}}}]) |
$push |
将值加入一个数组中,不会判断是否有重复的值 | db.<coll_name>.aggregate([{$group : {_id : "$by_user", url : {$push: "$url"}}}]) |
$addToSet |
将值加入一个数组中,会判断是否有重复的值,若相同的值在数组中已经存在了,则不加入 | db.<coll_name>.aggregate([{$group : {_id : "$by_user", url : {$addToSet : "$url"}}}]) |
$first |
根据资源文档的排序获取第一个文档数据 | db.<coll_name>.aggregate([{$group : {_id : "$by_user", first_url : {$first : "$url"}}}]) |
$last |
根据资源文档的排序获取最后一个文档数据 | db.<coll_name>.aggregate([{$group : {_id : "$by_user", last_url : {$last : "$url"}}}]) |
单目标聚合 single purpose aggregation
单目标聚合对单独一个collection的文档做聚合。方法简单,但无法实现聚合pipeline。
方法 | 描述 |
---|---|
db.<coll_name>.estimatedDocumentCount() |
collection或view中文档的大致数目 |
db.<coll_name>.count() |
collection或view中文档的数目 |
db.<coll_name>.distinct(<field_name>) |
文档中特定字段的独特值列表 |
map-reduce函数
skip
从MongoDB 5.0开始,map-reduce函数下线(deprecated),相应的功能可由aggregation pipeline完成。
Reference
1 MongoDB manual, official website
2 runoob教程