前言
此篇博文是 Mongdb 基础系列之一;
本文为作者的原创作品,转载需注明出处;
简介
MongoDB Aggregation Pipeline 由大量的 Stages 所组成,完整的 Stages 信息参考 https://docs.mongodb.com/manual/reference/operator/aggregation-pipeline/ ,下面笔者就几个关键的 Stages 进行分别介绍,
Stages
$project
参考官网说明;
定义
Reshapes each document in the stream, such as by adding new fields or removing existing fields. For each input document, outputs one document.
将输入的 document 进行重塑,可以添加新的 fields 或者是删除某些 fields 进而构成新的 document 的结构用于输出;参看这样一个例子;
\$project 调用公式,
1 | { $project: { <specification(s)> } } |
The \$project takes a document that can specify the inclusion of fields, the suppression of the _id field, the addition of new fields, and the resetting of the values of existing fields. Alternatively, you may specify the exclusion of fields.
$project 通过指定一个 document 作为参数,可以
- 指定包含哪些字段
- 指定不包含哪些字段
- 可以不包含 _id 字段
- 添加一些新的字段作为输出
公式中的参数 <specification(s)> 包含如下的形式;
这里需要注意第三个条件 <field>: <expression>,可以通过为某个新增字段通过表达式的方式进行赋值;
另外,find() 方法也可以通过 project 来限定输出结果,参考限定返回字段
限定输出文档中所包含字段
假定我们有如下的 books 相关的数据
1 | { |
假设通过聚合查询以后,我们仅需要输出 _id,title 以及 author 字段;使用如下的方式,
1 | db.books.aggregate( [ { $project : { title : 1 , author : 1 } } ] ) |
默认的情况下,_id 是必须输出的,除非显示的 suppress 该字段;得到如下的输出,
1 | { "_id" : 1, "title" : "abc123", "author" : { "last" : "zzz", "first" : "aaa" } } |
Suppress id 字段
继续使用上面的例子,这里我们将使用如下的 \$project 表达式的方式,
1 | db.books.aggregate( [ { $project : { _id: 0, title : 1 , author : 1 } } ] ) |
通过 _id:0 显式的将 _id 字段从输出中过滤掉,
1 | { "title" : "abc123", "author" : { "last" : "zzz", "first" : "aaa" } } |
限定输出文档中所不包含字段
继续使用上述的例子,
1 | db.books.aggregate( [ { $project : { "lastModified": 0 } } ] ) |
这样通过取 0 便可以设置哪些字段不被包含到输出文档中;注意,除了 _id 字段以外,0
和1
的条件是不能混用的;
限定内嵌文档的输出字段
继续使用上述的例子,
1 | db.books.aggregate( [ { $project : { "author.first" : 0, "lastModified" : 0 } } ] ) |
或者可以使用这样的方式,
1 | db.bookmarks.aggregate( [ { $project: { "author": { "first": 0}, "lastModified" : 0 } } ] ) |
通过计算新增输出文档中的字段
假设我们有如下的 books 数据,
1 | { |
通过 \$project 我们可以通过计算来自定义自己想要的输出文档的字段;
1 | db.books.aggregate( |
这里我们得到相关的输出文档如下,
1 | { |
这里需要注意的是,如果我们通过 \$project 使用表达式的方式来为输出文档中指定相关的字段,如果该输出字段与原有文档的字段相重复,那么原有文档中的字段将会被新的字段的内容所覆盖;比如上述的字段 isbn;
$match
Filters the document stream to allow only matching documents to pass unmodified into the next pipeline stage. $match uses standard MongoDB queries. For each input document, outputs either one document (a match) or zero documents (no match).
注意,$match 使用的是标准的 MongoDB 的 queries,这也就是为什么前面的例子返回 aggregation 操作信息中,其针对 \$match stage 返回的操作信息是 $query 的原因,并且还包含了标准的 query planner 的相关信息;
$limit
Passes the first _n_ documents unmodified to the pipeline where n is the specified limit. For each input document, outputs either one document (for the first n documents) or zero documents (after the first n documents).
经过该 Stage 之后,只将前面 n 个文档传递给后续的 Stages 继续进行操作,如果这是最后一个 Stage,将会返回给客户端,看一个例子,
假设我们有如下的 orders 的数据,
1 | db.orders.insertMany([ |
返回其中两个 documents 给客户端,
1 | > db.orders.aggregate( |
返回其中两个 documents 给后续的 Pipeline Stage 继续进行处理,
1 | > db.orders.aggregate( |
$skip
与 $limit 非常类似,不过这里是直接将头 n 个 documents 过滤掉;
$unwind
如果 document 的某个字段值是一个 array,那么将该 array 拆分,拆分后,使用 array 中的每一个值与 document 的其余字段单独构成一个 document;
$group
简介
\$group stage 通过指定的表达式对输入的 documents 进行分组统计操作,然后每一个分组相关的统计信息将会通过一个 document 传递给下一个 stage;输出文档中必须包含一个 _id 字段,来作为分组的键( key ),换句话说,\$group 操作就是根据该 key 来进行分组的;另外输出文档还会包含相关的统计信息,该信息是通过相关的 Accumulator 操作符对该分组的数据进行统计计算得到的;
调用格式
1 | { |
_id 字段
如上述简介所述的那样,_id 字段是必不可少的,它是用来进行 Group 的键( key ),但是,你也可以将该值设置为 null,该操作将会把 input documents 作为一个分组来进行统计,其实言外之意也就是“不分组”;
其余的 field 字段
可选的,该部分设置 Accumulator 操作符来进行统计计算;
Accumulator 操作符
https://docs.mongodb.com/manual/reference/operator/aggregation/#aggregation-accumulator-operators
Group 的执行逻辑
该执行逻辑是从例子通过年月日来进行 Group 推导出来的;先通过 _id 字段进行分组,从该例子中,可以看到,分组以后,得到三个分组,第一个分组只有一个 document,其余两个分组分别包含两个 document,那么后续的 accumulator 操作其实也就显而易见了,针对每个分组的每一个 document 分别进行统计计算,既得到输出结果即可;
例子
常规例子
假设我们有如下的 sales 数据,
1 | db.sales.insertMany([ |
通过年月日来进行 Group
1 | db.sales.aggregate( |
分为两个部分,分组的 key 以及 accumulator 操作符部分,
- _id 字段
通过 _id 字段来构建分组所依赖的键( key ),这里构建出年月日; - 剩余的 field 字段
year 字段通过 accumulator 操作符根据分组日期进行销售数据的统计求和;
averageQuantity 字段计算 quantity 的均值;
count 字段通过 $sum accumulator 来统计每个分组中的数量;其逻辑是对分组中的每个 document 进行 +1 操作,然后返回总数给字段 count;不过有些时候,我们希望将 count 作为一个 stage 来进行处理,而不是 stage 中的某一部分进行统计,参考 $count
输出结果如下,
1 | { "_id" : { "month" : 3, "day" : 15, "year" : 2014 }, "totalPrice" : 50, "averageQuantity" : 10, "count" : 1 } |
从这个例子,我们可以看到 group 的执行逻辑,该部分逻辑,整理到了小节 Group 的执行逻辑中;
Group null
1 | db.sales.aggregate( |
这个例子将分组 key _id 字段设置为 null,意思其实就是说,不进行分组,将整个 sales 中的 documents 作为一个分组进行统计计算,所以得到结果如下,
1 | { "_id" : null, "totalPrice" : 290, "averageQuantity" : 8.6, "count" : 5 } |
distinct
比如,我们想要知道上述测试数据中有多少个唯一的 items?
1 | db.sales.aggregate( [ { $group : { _id : "$item" } } ] ) |
输出
1 | { "_id" : "xyz" } |
数据透视图( Pivot Data )
假设我们有如下的 books collection 信息,
1 | db.sales.insertMany([ |
Author 有哪些 title
1 | db.books.aggregate( |
输出结果,
1 | { "_id" : "Homer", "books" : [ "The Odyssey", "Iliad" ] } |
这里特别要注意的是,使用到了 $push Accumulator 操作符,将多个值输出到一个 array 中;
使用 $$ROOT 对 Author 进行分组
这里,我们通过使用 $$ROOT System Variable 来对 Author 进行分组,该操作将会对原始的 documents 进行重塑,将 Author 相关的记录单独的存放在一个 document 中,这样做的时候,需要注意的是,每个文档的大小不能超过 16M;
1 | db.books.aggregate( |
将会输出如下的结果,
1 | { |
$sort
定义
Sorts all input documents and returns them to the pipeline in sorted order.
将所有输入的文档进行排序,然后将排序后的结果返回给 pipeline;
$sort 的格式为
1 | { |
sort 通过指定的字段 field 和 sort order 来进行排序,<sort order> 可以包含如下的取值,
1
to specify ascending order.-1
to specify descending order.{ $meta: "textScore" }
to sort by the computed textScore metadata in descending order. See Metadata Sort for an example. 通过经过计算的 textScore 元数据进行降序排列;
例子
升序和降序排列的例子
1 | db.users.aggregate( |
上述例子对不同字段同时分别使用到升序和降序排列的例子将会输出一个什么样的结果呢?针对不同的 users,先根据不同的年龄进行降序排列,很显然,这一步以后,得到的结果就是年龄由大到小依次输出;但是后面的 posts 又是升序排列的,如何理解?很简单,假设得到 100 个 28 岁的 users,那么针对这个相同岁数的 users,再根据 posts 的数量升序排列即可;所以,上述的排序可以总结为,针对不同的年龄分段,针对 age 进行降序排列,针对相同年龄段内的数据,针对 posts 进行升序排列;
使用元数据 meta data 进行排列
先来看下官网上的这个例子,
1 | db.users.aggregate( |
如果你直接看,你能够比较直观的看懂的是,在 \$match stage 中通过 \$text search 进行查询,然后将查询得到的结果,使用 \$sort stage 进行排序;对,但是如果你和我一样,还不清楚什么是 MongoDB 的 Text Search,那么当你看到 $meta,还有 textScore 的时候,那简直就是云里雾里了,什么意思?从 $sort 中可以直观的看出,先是通过 score 进行升序排列,然后再通过 posts 字段进行降序排列;等等,score,score 的值是 { $meta: “textScore” },这个值怎么就凭空出现了?而且现在还要使用它来进行排序了;要完全弄懂上面的排序逻辑,实则并没有这么容易,官网的例子中只是简单的一笔带过了,
This operation uses the $text operator to match the documents, and then sorts first by the “textScore” metadata and then by descending order of the posts field. The specified metadata determines the sort order. For example, the “textScore” metadata sorts in descending order. See $meta for more information on metadata.
说到底,官网的这一段话中也没有说清楚 textScore 是什么,只是说了一堆 textScore 可以用来作为一个分数值来进行排序;但是这个分数值到底指什么,上面这段话并没有解释;那么笔者就试图用一句话来概括说明吧,其实当通过 \$test 使用 Text Search 来匹配相关字段的时候,因为是模糊匹配,因此它有一个评分机制,就是哪些字段值越接近的,所得到的分数字越高,而这个分数字就是 textScore,而该 textScore 是存储在 \$meta 既当前查询的元数据中当中,因此,可以从中取值,通过 \$meta: “textScore” 即可;Ok,因此现在就清楚了,这里实际上是通过匹配相似度的高低来进行排序的;
但是想要彻底理解清楚什么是 textScore,那么就必须对 $text、Text Search 以及 $meta 要有基本的理解,下面笔者就这两个方面做进一步简要的分析;
$text
https://docs.mongodb.com/manual/reference/operator/query/text/#text-operator-text-score
$text performs a text search on the content of the fields indexed with a text index. A $text expression has the following syntax:
$text 会在使用了 text index 的字段上实行文本查询,注意,该被查询的字段必须是使用 text index 构建过索引的字段才可以;
text score
https://docs.mongodb.com/manual/reference/operator/query/text/#text-score
The $text operator assigns a score to each document that contains the search term in the indexed fields. The score represents the relevance of a document to a given text search query. The score can be part of a sort() method specification as well as part of the projection expression. The { $meta: “textScore” } expression provides information on the processing of the $text operation.
简而言之,就是 $text 会为每一个匹配的 document 设置一个分数( score ),该 score 可以作为 sort() 或者是 projection 表达式的一部分;通过使用 { $meta: “textScore” } 表达式,便可以获取 $text 操作的有关 text score 的结果;
例子
假设我们有如下的 articles 数据,
1 | db.articles.insert( |
我们需要对 subject 进行相关的 Text Search 操作,查询哪些 subject 包含 coffee 字符;首先,我们需要对 subject 构建 text index;
1 | db.articles.createIndex( { subject: "text" } ) |
然后对 subject 执行相关 text search 的操作;
1 | db.articles.find( { $text: { $search: "coffee" } } ) |
输出,
1 | { "_id" : 2, "subject" : "Coffee Shopping", "author" : "efg", "views" : 5 } |
但是,这个结果中并没有出现 textScore 的信息呀,我们怎么知道 Text Search 执行过程中的打分记录呢?伴随着这个问题,笔者执行了如下的操作,来将 textScore 在进行聚合查询操作的过程中作为一个新的字段返回;如下所述,
1 | db.articles.aggregate( |
返回结果如下
1 | { "_id" : 2, "score" : 0.75 } |
这样,我们就可以清晰的看到 textScore 的相关信息了;之后,便可以像之前的例子使用元数据 meta data 进行排列中所描述的那样,针对 textScore 进行排序了;
$lookup
定义
Performs a left outer join to an
unsharded collection
in the same database to filter in documents from the “joined” collection for processing. The $lookup stage does an equality match between a field from the input documents with a field from the documents of the “joined” collection.
\$lookup 对同一个 database 中的,注意,是未分片
的 collection 进行左外链接操作;官网的描述得不是很清楚,\$lookup 使用两个 collection,一个是源 collection,一个是目标 collection,被 left join 的 collection 是目标 collection,这里所指的未分片
的 collection 指的就是目标 collection,源 collection 有没有这种限制,上述官文并没有描述,但是笔者猜测不会有这种限制,但是需要后续进行例证,这里做下备注;
1 | { |
from
Specifies the collection in the same database to perform the join with. The from collection cannot be sharded.
指定同一个 database 中的另外一个 collection 来执行左外连接操作,注意,该 collection 必须是
未分片
的;该 collection 既是目标 collection;localField
Specifies the field from the documents input to the \$lookup stage. \$lookup performs an equality match on the localField to the foreignField from the documents of the from collection. If an input document does not contain the localField, the $lookup treats the field as having a value of
null
for matching purposes.localField 也就是源 collection 中用来匹配目标 collection 所指定的 field;注意,如果源 collection 中布包好该 localField,\$lookup Stage 将会对 localField 使用
null
值来与目标 collection 的 foreignField 进行匹配;foreignField
Specifies the field from the documents in the from collection. \$lookup performs an equality match on the foreignField to the localField from the input documents. If a document in the from collection does not contain the foreignField, the $lookup treats the value as
null
for matching purposes.如同 localField 那样,foreignField 是目标 collection 用来与源 collection 进行左外连接的字段;同样,如果目标 collection 中并未包含该字段,使用
null
只进行匹配;as
Specifies the name of the new array field to add to the input documents. The new array field contains the matching documents from the from collection. If the specified name already exists in the input document, the existing field is overwritten.
简而言之,就是将目标 collection 的所匹配的记录作为用 as 定义的新的字段输出;具体参考例子中的描述;
例子
来看这样一个例子,orders 和 inventory,假设我们有如下的 Orders 记录
1 | db.orders.insertMany([ |
相应的,假设我们有如下的 inventory 记录,
1 | db.inventory.insertMany([ |
我们使用 orders 中的 item 字段来左外关联 inventory 中的 sku 字段,
1 | db.orders.aggregate([ |
上述查询执行完成以后,我们将会立即得到如下的结果,
1 | { |
简单的分析一下上述的查询结果,
首先,通过由 as 所指定的 inventory_docs 字段来保存相关联的目标 collection inventory 的记录;
其次,可以看到源 collection orders 的 _id = 3 的记录不包含 item 字段,所以会使用 item = null 来进行匹配,因此直接匹配到了 inventory sku = null 的第 5 条记录;同理 inventory 的第 6 条记录并不包含所需要的 sku 字段,所以也会将 sku 视为 null,所同样作为匹配结果与 orders 的第三条记录进行匹配;所以得到了上述输出结果中的第三条记录;
left outer join
这里我们来回顾一下什么是 left outer join,一句话,那就是“如果左表的某行在右表中没有匹配行,则在相关联的结果集行中右表的所有选择列表列均为空值”,就是说,匹配过程中,如果右表中没有与之匹配的记录,坐标记录照常输出,只是右边的记录为空值;
$out
https://docs.mongodb.com/manual/reference/operator/aggregation/out/#pipe._S_out
Takes the documents returned by the aggregation pipeline and writes them to a specified collection. The $out operator
must be the last stage
in the pipeline. The $out operator lets the aggregation framework return result sets of any size.
将经过 pipeline 一系列操作所得到的 documents 输出到一个由 \$out 所指定的 collection 中;注意 \$out 必须是 pipeline 的最后一个 stage;
并且如果指定了 \$out,那么原本默认所返回的 cursor 将会是一个空的 cursor,所有相关的结果都必须从 \$out 所指定的 collection 中去获取;
相关用例参考之前笔者所整理的一个例子;
$sortByCount
New in version 3.4.
Groups incoming documents based on the value of a specified expression, then computes the count of documents in each distinct group.
Each output document contains two fields: an _id field containing the distinct grouping value, and a count field containing the number of documents belonging to that grouping or category.
The documents are sorted by count in descending order.
官网用了上述的三句话来总结 \$sortByCount 的用法,似懂非懂那就对了,证明你真的逐字逐句的读过了;第一次接触,看过几遍以后,不懂它到底想表达什么,很正常;其实这三句话同时在描述 sortByCount 的行为,第一句话表述,$sortByCount 会对通过上一 Stage 操作的结果首先进行 Group 然后针对分组再进行 count 操作;第二句话想表述的意思是,输出结果将包含一个 _id 字段以及一个 count 字段,_id 字段就是用来进行 Group 的 key;第三部就是说,对最后的结果再进行一次以降序方式的 sort 操作;
Ok,如果看过笔者的描述以后,还是似懂非懂,没关系,看下面这个由笔者从官网上借鉴并完善的例子,
假设,我们有这样一批 exhibits 的数据,
1 | db.exhibits.insertMany([ |
执行 \$sortByCount 操作,
1 | db.exhibits.aggregate( [ |
输出结果,
1 | { "_id" : "painting", "count" : 6 } |
简单分析一下上述聚合操作的逻辑,首先,$unwind stage 将 tags 的值 array 进行拆分并根据拆分后的每一个元素值重新构建出多个 documents;然后,通过 \$sortByCount 对 tags 字段值进行排序操作,该操作是建立在对其结果值进行分组并 count 以后的基础之上的,可见,默认使用的是降序朴烈的方式;
使得,\$sortByCount 貌似跳过/合并了多个步骤,count 怎么计算出来的?_id 字段是如何指定的?还是似懂非懂;别怕,再看看下面由笔者分解出来的和使用 \$sortByCount 等价的几个步骤;
其实整个上述 \$sortByCount 的执行操作就相当于执行如下的三个步骤,
1 | db.exhibits.aggregate([ |
- 首先 \$unwind tags
- 其次,对 \$unwind 之后的输出结果进行 \$group 操作,这里便针对 tags 记性分组并计算 count
- 最后,通过 count 值进行降序排列
所以,其实整个 \$sortByCount 操作实际上等价于执行了如下的两步操作,
1 | { $group: { _id: "$tags", count: { $sum: 1 } } }, |
既是将这两步合并成了一步;
$addFields
定义
Adds new fields to documents. \$addFields outputs documents that contain all existing fields from the input documents and newly added fields.
使用 \$addFields 会同时输出新增的字段和原有的字段;这一点与 \$project 不同,\$project 只输出由其所限定的字段信息;除了这一点意外,其用法与 \$project 如出一辙;
调用格式,
1 | { $addFields: { <newField>: <expression>, ... } } |
例子
使用两个 $addFields Stages 添加输出字段
假设我们有如下 scores 相关的数据,
1 | { |
如下的操作通过两次 \$addFields 的操作来为输出的文档添加新的字段,
1 | db.scores.aggregate( [ |
第一次 $addFields 通过对 homework 字段 array 的值求和得到 totalHomework 新增字段,同时通过对 quiz array 值求和得到新增字段 totalQuiz;第二次 $addFields 通过对 totalHomework、totalQuize 以及 extraCredit 三者进行求和,得到新的输出字段 totalScore;于是乎,我们得到了如下的结果,
1 | { |
为内嵌文档添加字段
假设我们有如下包含 specs 内嵌文档的 vechicles 的数据,
1 | { _id: 1, type: "car", specs: { doors: 4, wheels: 4 } } |
假设我们需要对 specs 内嵌文档的输出增加新的字段 fuel_type
1 | db.vehicles.aggregate( [ |
我们将会得到如下的输出结果,
1 | { _id: 1, type: "car", |
这里要特别注意的是,_id : 3 的记录,原本是没有 specs 内嵌文档元素的,经过聚合查询 $addFields 以后,同时包含了该 specs 内嵌文档的元素了;
覆盖原有字段的输出
1 | { _id: 1, dogs: 10, cats: 15 } |
通过 $addFields 覆盖原有文档中的 cats 字段的输出;
1 | db.animals.aggregate( [ |
得到相关的输出结果如下,
1 | { _id: 1, dogs: 10, cats: 20 } |
覆盖 _id 字段的输出
1 | { "_id" : 1, "item" : "tangerine", "type" : "citrus" } |
使用 item 字段来覆盖原有的 _id 字段的输出;
1 | db.fruit.aggregate( [ |
得到如下的输出结果,
1 | { "_id" : "tangerine", "item" : "fruit", "type" : "citrus" } |
$count
假设我们有如下的 scores 数据,
1 | { "_id" : 1, "subject" : "History", "score" : 88 } |
然后,假设我们需要统计分数大于 80 的记录数;
1 | db.scores.aggregate( |
将会输出,
1 | { "passing_scores" : 4 } |
可见,直接通过 $count 便可以对分组中的记录数进行求和;
不过,有时候,count 操作只是某个 stage 中的一部分的时候,需要使用到类似这样的方式 count: { \$sum: 1 } ,通过 accumulator 操作符的方式来实现,参考前面的通过年月日来进行 Group 例子中的描述;
其它
有关其它的 Stages 命令,参考 https://docs.mongodb.com/manual/reference/operator/aggregation-pipeline/