MongoDB 基础系列十七:聚合查询之 Aggregation Pipeline - Aggregation Method

前言

此篇博文是 Mongdb 基础系列之一;

本文为作者的原创作品,转载需注明出处;

定义

1
db.collection.aggregate(pipeline, options)

参数

该方法包含两个参数 pipelineoptions

  • pipeline
    type: array
    description: 由一系列的 stages 所构成;
  • options
    type: document
    description: 可选;在执行的 pipeline 的额外选项;

options 参数

有关 options 的参数描述如下,

  • explain
    boolean
    可选;会额外返回 pipeline 有关聚合操作的相关信息;

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    db.orders.aggregate(
    [
    { $match: { status: "A" } },
    { $group: { _id: "$cust_id", total: { $sum: "$amount" } } },
    { $sort: { total: -1 } }
    ],
    {
    explain: true
    }
    )

    调用结束以后,会额外返回 pipeline 有关聚合操作的相关信息;详情参考返回 aggregation 操作信息

  • allowDiskUsage
    boolean
    可选;允许 MongoDB 在执行 pipeline 聚合操作的时候,使用临时文件;当设置为 true 以后,MongoDB 会将临时数据写入 dbPath 的子目录 _tmp 中;

  • cursor
    document
    可选;通过该参数可以设置 cursor 的初始 batch size;

    1
    cursor: { batchSize: <int> }

    看一个例子

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    db.orders.aggregate(
    [
    { $match: { status: "A" } },
    { $group: { _id: "$cust_id", total: { $sum: "$amount" } } },
    { $sort: { total: -1 } },
    { $limit: 2 }
    ],
    {
    cursor: { batchSize: 0 }
    }
    )

    A batchSize of 0 means an empty first batch and is useful for quickly returning a cursor or failure message without doing significant server-side work.
    在没有进行显著的服务器后台操作的前提下,快速返回;笔者补充,常用在调试的过程中,或者是检查;

  • maxTimeMS
    非负数
    可选,设置 cursor 的超时时间,单位毫秒;如果你没有设置该值,那么操作将永远不会超时;如果设置为 0,则认为不会超时;

  • bypassDocumentValidation
    boolean
    可选;当你指定了 $out aggregation 操作符以后有效;当设置为 true,那么将会使得 Pipeline 在处理 documents 的时候绕开 document 的验证;

  • readConcern
    document
    可选;参看 Mongodb 基础系列十五:增删查改 CRUD 之 Query - Write Concern 获取更多信息;

  • collation
    document

    Collation allows users to specify language-specific rules for string comparison, such as rules for lettercase and accent marks.

返回值

db.collection.aggregate() 的返回;

A cursor to the documents produced by the final stage of the aggregation pipeline operation, or if you include the explain option, the document that provides details on the processing of the aggregation operation.

Aggregation Pipeline 的最后一个 stage 处理完以后,将会向客户端返回一个 cursor;如果你使用了 explain 选项,那么除了 Cursor 以外,还会向客户端返回 aggreation 操作的详细信息;

If the pipeline includes the \$out operator, aggregate() returns an empty cursor. See $out for more information.

当 pipeline 中使用了 \$out 操作符以后,将会返回一个空的 cursor;取而代之,\$out 将会把结果写入一个由其所指定的 collection 当中;看下面这样的一个例子,

假设我们有如下的一个 books collection,

1
2
3
4
5
6
7
db.books.insertMany([
{ "_id" : 8751, "title" : "The Banquet", "author" : "Dante", "copies" : 2 },
{ "_id" : 8752, "title" : "Divine Comedy", "author" : "Dante", "copies" : 1 },
{ "_id" : 8645, "title" : "Eclogues", "author" : "Dante", "copies" : 2 },
{ "_id" : 7000, "title" : "The Odyssey", "author" : "Homer", "copies" : 10 },
{ "_id" : 7020, "title" : "Iliad", "author" : "Homer", "copies" : 10 }
])

然后,我们队 author 进行分组操作,并且将每个 author 的著作放入字段 books 当中,

1
2
3
4
db.books.aggregate( [
{ $group : { _id : "$author", books: { $push: "$title" } } },
{ $out : "authors" }
] )

我们将结果放入了一个新的 collection authors 中;然后,我们可以通过查询该 authors collection 来获取结果,

1
2
3
> db.authors.find({})
{ "_id" : "Homer", "books" : [ "The Odyssey", "Iliad" ] }
{ "_id" : "Dante", "books" : [ "The Banquet", "Divine Comedy", "Eclogues" ] }

从上面的例子可以清晰地看到,db.collection.aggregate() 返回的是一个空的 cursor,取而代之的是,将结果放入了一个新的 collection authors 中;

例子

假设我们有如下的 orders collection,

1
2
3
4
5
6
7
db.orders.insertMany([
{ _id: 1, cust_id: "abc1", ord_date: ISODate("2012-11-02T17:04:11.102Z"), status: "A", amount: 50 },
{ _id: 2, cust_id: "xyz1", ord_date: ISODate("2013-10-01T17:04:11.102Z"), status: "A", amount: 100 },
{ _id: 3, cust_id: "xyz1", ord_date: ISODate("2013-10-12T17:04:11.102Z"), status: "D", amount: 25 },
{ _id: 4, cust_id: "xyz1", ord_date: ISODate("2013-10-11T17:04:11.102Z"), status: "D", amount: 125 },
{ _id: 5, cust_id: "abc1", ord_date: ISODate("2013-11-12T17:04:11.102Z"), status: "A", amount: 25 }
])

分组求和 (Group by and sum)

需求,分别统计 status 为 “A” 的用户的消费总额;

1
2
3
4
5
db.orders.aggregate([
{ $match: { status: "A" } },
{ $group: { _id: "$cust_id", total: { $sum: "$amount" } } },
{ $sort: { total: -1 } }
])

$match stage,对原始 collection 数据进行初始筛选,

$group stage,对用户进行分组操作,然后计算各个分组的消费总额;

$sort stage,明确指明不需要进行排序操作,加速查询效率

返回值,

1
2
{ "_id" : "xyz1", "total" : 100 }
{ "_id" : "abc1", "total" : 75 }

返回 aggregation 操作信息

1
2
3
4
5
6
7
8
9
10
db.orders.aggregate(
[
{ $match: { status: "A" } },
{ $group: { _id: "$cust_id", total: { $sum: "$amount" } } },
{ $sort: { total: -1 } }
],
{
explain: true
}
)

将会返回如下的详细的操作信息,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
{
"stages" : [
{
"$cursor" : {
"query" : {
"status" : "A"
},
"fields" : {
"amount" : 1,
"cust_id" : 1,
"_id" : 0
},
"queryPlanner" : {
"plannerVersion" : 1,
"namespace" : "test.orders",
"indexFilterSet" : false,
"parsedQuery" : {
"status" : {
"$eq" : "A"
}
},
"winningPlan" : {
"stage" : "COLLSCAN",
"filter" : {
"status" : {
"$eq" : "A"
}
},
"direction" : "forward"
},
"rejectedPlans" : [ ]
}
}
},
{
"$group" : {
"_id" : "$cust_id",
"total" : {
"$sum" : "$amount"
}
}
},
{
"$sort" : {
"sortKey" : {
"total" : -1
}
}
}
],
"ok" : 1
}

当笔者整理到这里的时候,有个疑问,后面两个 Stages 的相关处理信息的 key 都是为 “\$group”、”\$sort”,与 aggregate() 方法中的 stages 的名称是对应的,怎么到了 \$match 的时候,该 key 值变成了 $cursor?答案见 match 的简介;

Reference

https://docs.mongodb.com/manual/reference/method/db.collection.aggregate/#db.collection.aggregate