前言
此篇博文是 Mongdb 基础系列之一;
本文为作者的原创作品,转载需注明出处;
简介
本文将全面的介绍 Map Reduce 的内容,包含各个核心要点;
特性
https://docs.mongodb.com/manual/core/map-reduce/#map-reduce-behavior
- map-reduce 可以将结果写入另外一个 collection 中或者是逐行的( inline )进行返回;
- 如果结果是逐行的( inline )的进行返回,那么结果集 documents 的大小的总和不能超过 16M;
- 如果将结果输出到另外一个 collection $C_a$ 中,那么你可以继续使用 $C_a$ 与原有 collection 的“增量部分”继续进行 reduce 操作而无需在对原有的 collection 进行全集的 map-reduce 操作;该部分的详细内容参考incremental map reduce( 增量类型的 map reduce )(这点非常有意思,这可能也是 map-reduce 唯一胜过 pipeline 的亮点);
- map-reduce 也可以对分片的 collection 进行处理,同时 map-reduce; 同样可以将结果输出到分片的 collection 当中;
下面笔者将根据上述的特性来进行介绍;
mapReduce command and method
mapReduce command 和 db.collection.mapReduce() 方法的各个参数意义是一样的;所以将其放到一起来梳理;
下面是 mapReduce command 的调用格式,
1 | db.runCommand( |
下面是 db.collection.mapReduce() 的调用格式,
1 | db.collection.mapReduce( |
可以看到,db.collection.mapReduce() 包含三个参数,<map>
,<reduce>
和一个 document,注意,该 document 包含了所有的可选参数,不过这些可选参数也是非常重要的;
一个简单的模拟调用的例子,注意该例子可以转换为 db.collection.mapReduce() 的调用方式;
1 | var mapFunction = function() { ... }; |
下面我们来看看各个参数的意义,
mapReduce
类型:collection
mapReduce command 中的必填参数;
指定需要进行 map-reduce 操作的输入源 collection;在开始进行 map 操作以前,可以通过下面的 query 步骤进行初步过滤;
map
类型:function
一个不带任何参数的 javascript function,通过在方法体内部引用 this 变量来获取当前的 document,并且通过 emit() 方法输出 key 和 value 键值对;
1 | function() { |
当在自定义该方法的时候,注意如下几点,
- In the map function, reference the current document as
this
within the function.
方法体内部的this
指向的是当前的文档; - The map function should not access the database for any reason.
在该方法体内部不应该有操作 MongoDB 数据库的任何操作; - The map function should be pure, or have no impact outside of the function (i.e. side effects.)
该方法只应该处理方法体内部的逻辑,而不应该影响任何外部的逻辑; - A single emit can only hold half of MongoDB’s maximum BSON document size.
单个 emit 输出的 key、value 键值对不能超过 BSON document size 最大限制的一半( 8M ) - The map function may optionally call emit(key,value) any number of times to create an output document associating key with value.
单个 map 方法内部可以调用 emit(key, value) 方法任意多次来输出 key、value 键值对;注意,由 emit(key, value) 方法输出的键值对( key, value )最终会通过 MongoDB 内部方法对 key 进行 group by,然后输出( key, values )键值对,并赋值给 reduce 方法进行后续的操作;
下面这个 map() 方法会根据 status 的状态调用 emit(kye, value) 方法 0 到 1 次,
1 | function() { |
下面这个 map() 方法会调用 emit(key, value) 多次,
1 | function() { |
reduce
类型:function
1 | function(key, values) { |
一个包含 key 和 values 参数的 function,该 key、values 对应的也就是由 map 方法所输出( emit )的键、以及值的集合;为什么是值的集合?因为 MongoDB 会对由 map 方法输出的每一个 key、value 键值对再通过 key 进行相关的 group by 的操作,生成类似于 key, [value1, value2 …] 这样的一个键值对,然后将其赋值给 reduce 方法;(当笔者整理到这里的时候,有一个疑问,如果 values 数组中的元素过多而导致内存溢出怎么弄?这一点,笔者在下面的第 6 点有合理的推测和分析;)
在使用 reduce 方法的时候,需要注意如下的几点,
- The reduce function should not access the database, even to perform read operations.
reduce 方法不应该有任何操作 MongoDB 数据库的操作; - The reduce function should not affect the outside system.
reduce 方法的内部功能不应该影响外部系统; - MongoDB will not call the reduce function for a key that has only a single value. The values argument is an array whose elements are the value objects that are “mapped” to the key.
reduce 方法所接受的参数 values 数组是由相同 key 的不同 value 所构成的数组; - MongoDB can invoke the reduce function more than once for the same key. In this case, the previous output from the reduce function for that key will become one of the input values to the next reduce function invocation for that key.
MongoDB 可以对同一个 key 进行多次的 reduce 操作;这种情况下,上一次通过 reduce 针对该 key 所生成的输出结果,将会作为下一次 reduce 针对该 key 的输入,并进行相关处理; - The reduce function can access the variables defined in the scope parameter.
reduce 方法可以访问由 scope 所定义的参数; - The inputs to reduce must not be larger than half of MongoDB’s maximum BSON document size. This requirement may be violated when large documents are returned and then joined together in subsequent reduce steps.
单个 reduce 的输入大小不能超过 BSON 文档最大限制的一半,既 8M;
(笔者好奇的是,MongoDB 是如何做到的?试想,如果我们有一个非常庞大的 (key, values) 的键值对作为输入,那么 MongoDB 是如何控制 values 大小的?如果让笔者来实现,一种可能就是对 values 进行拆分,生成一个一个更小的 (key, sub_values) 键值对,然后将原本一步的 reduce(key, values)分解成多个 reduce(key, sub_values) 的步骤来进行处理,不过这样的话,就要求 reduce 方法必须是相关联
( associative )的,这一点将会在下面有关 reduce 特性的部分得到证实)
因为同一个 reduce 方法可以针对通过 key 被调用多次,那么 reduce 方法就必须满足如下的特性
;
- 由 reduce 返回的结果类型( type )必须与 map 所输出的 value 的类型( type )一致;
reduce 调用必须是
相关联
( associative )的,也就是说,下面的等式必须相等;1
reduce( key, [ C, A, B ] ) == reduce(key, [ C, reduce(key, [ A, B ]) ] )
这里回答了笔者在上述第 6 点中所提出的疑问,如果 values 过大怎么办?当时笔者想到的就是对 values 进行拆分,生成多个更小的 sub_values 集合;那么要能够这么做,那么就必须要求 reduce 方法是
相关联
的( associative );因此,我们假设 [C, A, B] 是一个非常大的集合,而且超过了内存的限制,那么这个时候,我们就可以像上述等式右侧的那样,将 [C, A, B] 拆分成两个部分来分别进行处理,先对 ( key, [A,B] ) 进行 reduce 处理得到结果 $R_{sub}$,然后对 ( key, [C, $R_{sub}$] ) 进行 reduce 处理所得到的最终结果与 reduce( key, [C, A, B ] ) 一致;reduce 调用必须是
幂等
( idempotent )的,1
reduce( key, [ reduce(key, valuesArray) ] ) == reduce( key, valuesArray )
也就是说,对一个 valuesArray 进行重复的多次调用( 第二次调用就相当于对第一次计算的结果进行第二次重复调用 ),得到的结果是相等的;这也就满足了 reduce 方法是
可重入
的;- reduce 方法中的参数 values 数组中的元素顺序必须是可交换的( commutative )
1
reduce( key, [ A, B ] ) == reduce( key, [ B, A ] )
out
类型:string 或者 document
指定 map-reduce 操作的输出结果输出到哪;你可以将结果输出到 collection 中或者是一行一行( inline )的返回结果;不过注意的是,如果是主从节点,只能在 primary 的节点上将输出结果到 collection 中,如果是 secondary 的节点,只能使用 inline 的方式;
Output to a Collection
输出到一个 collection 中,如果是主从集群中的 secondary 成员不支持这样的操作;
1 | out: { <action>: <collectionName> |
Output to a Collection with an Action
该操作只对已经输出过的 collection 有效( 言外之意就是说,只对输出的 collection 进行操作 );该操作在主从集群中的从节点上无效;
1 | out: { <action>: <collectionName> |
- <action>,可以使用下面的任意一个 action,
备注,<collectionName> 表示之前由 reduce 所生成的 out collection,这里作为 action 的输入 collection,用 $C_s$ 表示;而由当前 out 操作所输出的 collection 记为 $C_t$;- replace
使用当前由 out 输出的 collection $C_t$ 来完全替换 $C_s$; - merge
将 $C_t$ 和 $C_s$ 进行合并,如果合并过程中,有相同的 key,那么使用 $C_t$ 中的记录来覆盖 $C_s$ 中的记录; - reduce
首先,同样执行 merge 的相关操作,唯一不同的是,当遇到相同的 key 的时候,将会对 $C_t$ 和 $C_s$ 进行一次 reduce 操作,既执行 function(key, [$C_t$, $C_s$]) 的操作,然后将结果覆盖 $C_s$ 中所对应 key 的内容( document );
- replace
- db:
可选的,指定 map-reduce 方法执行在哪个 database 上;默认使用与 $C_s$ 相同的 database; - sharded:
可选的,如果为 true,将会把 $C_t$ 的结果根据 _id 将结果输出到多个分片中; - nonAtomic:
可选的;默认情况下为 false,也就是说,对 out 输出 $C_t$ 的过程启动原子性保护操作,也就是说在输出的过程中,对 $C_t$ 进行上锁,避免其它客户端在 $C_t$ 的输出过程中进行访问;该特性只应用到 merge 和 reduce 的 output 模型上;如果 nonAotmic 被设置为 true,那么也就意味着在 out 的输出过程中,不对 MongoDB 上任何锁,在输出的过程中,其它的客户端将可以直接读取到 $C_t$ 中的临时内容,也就是说正在输出的局部内容;
Output Inline
将结果缓存在内存中并逐行返回,
1 | out: { inline: 1 } |
注意返回结果的大小不能超过 BSON 文档的最大值;
query
类型: document
可选的,
通过 query 操作符来指定查询的条件,这些查询条件将决定哪些 documents 将会作为 map 方法的输入;
sort
类型: document
可选的,
对输入文档进行排序,主要用来对 map-reduce 进行调优;比如将 sort key 与 emit key 设置成一行,这样可以减少 reduce 的调用的次数;并且 sort key 必须是一个 indexing 的字段;
limit
类型: number
可选的,
指定 map 方法所接受的最大的输入文档的数量;一般用在调试阶段使用;
finalize
类型:function
可选的,
紧接在 reduce 方法之后调用,对输出文档进行修改;
1 | function(key, reducedValue) { |
finalize 方法接收两个参数, key 和 reducedValue,key 就是由 map 函数所生成的 emit key,reducedValue 就是由 reduce 方法生成的结果值;使用 finalize 方法的时候,注意如下几点,
- The finalize function should not access the database for any reason.
- The finalize function should be pure, or have no impact outside of the function (i.e. side effects.)
- The finalize function can access the variables defined in the scope parameter.
scope
类型: document
可选的,
指定全局变量,该全局变量可以被 map、reduce 以及 finalize 方法所使用;
jsMode
类型: boolean
可选的,
表示是否需要在 map 执行以后,在 reduce 执行之前,将中间的临时数据转换为 BSON 的格式;
默认为 false;
- 如果取值为
false
- MongoDB 会将由 map 方法 emits 出来的 Javascript 对象转换为 BSON 对象,但是在调用 reduce 方法之前,会将 BSON 对象再转换为 Javascript 对象;
- map-reduce 操作将会临时的数据作为 BSON 对象在磁盘上进行存储;这样的好处是,便于 MongoDB 的 map-reduce 操作能够对海量的数据集进行操作;
- 如果取值为
true
- MongoDB 不会将由 map 方法 emits 出来的 Javascript 对象转换为 BSON 对象;所以也就不会再调用 reduce 方法之前再将 BSON 对象转换回 Javascript 对象了;
- 当在 map 阶段只能最多 emit 500,000 个 distinct 的 key,当然一个 key 可以对应多个 value;
verbose
类型:Boolean
可选的,
指明是否在结果集中包含 timing 的信息;默认为 true,既是包含 timing 的信息;
bypassDocumentValidation
类型: Boolean
可选的,
如果设置为 true,那么将会跳过文档验证的相关步骤;
collation
类型: document
可选的,
Specifies the collation to use for the operation.
Collation allows users to specify language-specific rules for string comparison, such as rules for lettercase and accent marks.
The collation option has the following syntax:
例子
一个基础的例子
假设我们有如下的 orders 的相关数据,以其中的一个 document 为例,
1 | { |
上面的例子中,price 是 items (既当前订单用户所购买商品明细) 金额的总和,其中 qty 是 quantity 既数量的意思;
返回每一个 Customer 的消费总额
首先,创建 map 方法,
1
2
3var mapFunction1 = function() {
emit(this.cust_id, this.price);
};其中的
this
既是对当前文档的引用;再次,定义 reduce 方法,
1
2
3var reduceFunction1 = function(keyCustId, valuesPrices) {
return Array.sum(valuesPrices);
};keyCustId
既是 customer idvaluesPrices
通过对 map 方法 emit 出来的记录,对 key 进行分组以后,取得的 prices 数据的集合,是一个数组;通过 map 和 reduce 方法来构建 mapReduce() 方法
1
2
3
4
5db.orders.mapReduce(
mapFunction1,
reduceFunction1,
{ out: "map_reduce_example" }
)这里将结果输出到 map_reduce_example collection 中;
计算每件商品( sku )的购买总数以及该商品于订单( order )的购买平均数
需求是,使用 map-reduce 方法对日期大于 01/01/2012 的 orders 进行统计运算;计算出 sku 既商品的售卖总数,以及该商品在每个订单中的购买平均数;
定义 map 方法,
1
2
3
4
5
6
7
8
9
10var mapFunction2 = function() {
for (var idx = 0; idx < this.items.length; idx++) {
var key = this.items[idx].sku;
var value = {
count: 1,
qty: this.items[idx].qty
};
emit(key, value);
}
};1) emit key
将商品 sku 作为 emit key;
2) emit value
将相关统计信息 value object 作为 emit value;count 表示对 order 的累加,qty 表示当前订单( order )中,该商品的购买总数;
定义 reduce 方法,将 map 方法中 emit 出来的 value 集合既 values 进行统计运算
1
2
3
4
5
6
7
8
9
10var reduceFunction2 = function(keySKU, countObjVals) {
reducedVal = { count: 0, qty: 0 };
for (var idx = 0; idx < countObjVals.length; idx++) {
reducedVal.count += countObjVals[idx].count;
reducedVal.qty += countObjVals[idx].qty;
}
return reducedVal;
};1) 参数 keySKU
keySKU 对应的就是 map 所 emit 出来的 key
2) 参数 countObjVals
countObjVals 是一个数组,其中的每一个元素对应的就是 map 方法 emit 出来的 value,该 value 是通过对 key 进行分组所得到的;
3) 返回对象 reduceVal
注意,正如 reduce 的
特性
中的第一个点所描述的那样,为了保证 reduce 方法是可以被重复调用的,所以 reduceVal 的类型必须与 map 方法中的 value 的类型相同;所以,这里的 reduceVal 对象也仅包含两个字段,既 count 和 qty;区别是 value 存放的是每一个商品的信息,而 reduceVal 是针对同一种商品的信息的累加;也正是因为这个原因
,所以,如果要想在返回结果中增加新的字段,比如下面第三步当中使用到的 avg 字段,必须在 finalize 方法内部创建;笔者之前一直疑惑为什么需要在 map-reduce 中额外添加一个 finalize 方法,这里算是找到原因
了;构建 finalize 方法,在该方法中计算出商品购买的平均值
1
2
3
4
5
6
7var finalizeFunction2 = function (key, reducedVal) {
reducedVal.avg = reducedVal.qty/reducedVal.count;
return reducedVal;
};1) 参数 key
该 key 对应的就是 map 和 reduce 中的 key2) 参数 reduceVal
对应的就是 reduce 方法的输出;3) 计算平均值 avg
为什么不在 reduce 方法中直接计算平均值在上述 #2 点进行过描述,一方面是因为 reduce 的返回值的类型必须与 map 的 emit value 的类型相同;另一个原因是,传入 reduce 方法的参数 values 很有可能被拆分成多个 sub_values 再作为 reduce 的输入,从而需要进行多次 reduce 的操作,所以如果遇到这种情况,单个 reduce 方法中是无法获得 count 总数的,所以,无论如何,是不应该在 reduce 方法中进行 avg 计算的;最后,all together,我们来构建 map-reduce 方法
1
2
3
4
5
6
7
8
9
10db.orders.mapReduce( mapFunction2,
reduceFunction2,
{
out: { merge: "map_reduce_example" },
query: { ord_date:
{ $gt: new Date('01/01/2012') }
},
finalize: finalizeFunction2
}
)
incremental map reduce( 增量类型的 map reduce )
正如特性部分第一大点的第二小点所描述的那样,如何对源 collection $C_s$ 的“增量”部分进行 map-reduce 计算,而不是再重复的对 $C_s$ 的全集进行聚合 map-reduce 运算?这一块,就是本小节将要进行描述的内容;也是笔者认为 map-reduce 唯一胜过 Aggregation Pipeline 的地方;
If the map-reduce data set is constantly growing, you may want to perform an incremental map-reduce rather than performing the map-reduce operation over the entire data set each time.
假设我们有如下的 sessions 数据,用来记录用户登录的信息,
1 | db.sessions.save( { userid: "a", ts: ISODate('2011-11-03 14:17:00'), length: 95 } ); |
需求是,我们统计出用户的登录总时长,登录的次数以及平均的登录时间;而我们知道,用户每天都会不断的进行登录,所以会不断的产生新的登录日志信息;那么,我们看看如何使用增量类型的 map-reduce 来进行增量统计;
初始统计 ( Initial Map-Reduce of Current Collection )
首先,我们需要对原始数据进行一次初始的 map-reduce 聚合运算;
map
1
2
3
4
5
6
7
8
9
10
11var mapFunction = function() {
var key = this.userid;
var value = {
userid: this.userid,
total_time: this.length,
count: 1,
avg_time: 0
};
emit( key, value );
};reduce,对 values 队列中的元素进行分别统计
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16var reduceFunction = function(key, values) {
var reducedObject = {
userid: key,
total_time: 0,
count:0,
avg_time:0
};
values.forEach( function(value) {
reducedObject.total_time += value.total_time;
reducedObject.count += value.count;
}
);
return reducedObject;
};finalize,计算平均值
1
2
3
4
5
6
7var finalizeFunction = function (key, reducedValue) {
if (reducedValue.count > 0)
reducedValue.avg_time = reducedValue.total_time / reducedValue.count;
return reducedValue;
};构建 mapReduce()
1
2
3
4
5
6
7db.sessions.mapReduce( mapFunction,
reduceFunction,
{
out: "session_stat",
finalize: finalizeFunction
}
)这里需要注意的是,我们将结果输出到 session_stat collection 中
使用增量 Map-Reduce
假设,随着时间的推移,sessions 数据不断的累加并增加了如下的数据,
1 | db.sessions.save( { userid: "a", ts: ISODate('2011-11-05 14:17:00'), length: 100 } ); |
下面我们看看如何使用“增量 Map-Reduce”的方式来对上述的增量数据和原有 reduce 的数据进行快速的 map-reduce 操作;在每天结束以后,执行如下的操作来进行增量的操作,
1 | db.sessions.mapReduce( mapFunction, |
‘2011-11-05 00:00:00’
上一次执行结束的以后的最大记录值的时间点;通过 Output to a Collection with an Action 中的 reduce action 对 session_stat collection 也就是通过之前 map-reduce 输出的结果与当前的
增量
input collection 进行一次 reduce 操作;然后将结果覆盖掉原有的 session_stat 中的记录;官网上就这样比较含糊的将这个关键的过程给一笔带过了,其实细节上,比这个要稍微复杂些,不应该只是执行了 reduce 方法,还需要执行 finalize 方法,既是使用新的 total_time 除以 count 计算得到新的 average time;
Map-Reduce via Sharded Collections
分片 collection 作为 input
When using sharded collection as the input for a map-reduce operation, mongos will automatically dispatch the map-reduce job to each shard in parallel. There is no special option required. mongos will wait for jobs on all shards to finish.
如果将分片 collection 作为 map-reduce 操作的输入,mongos 服务将会自动的将 map-reduce 任务( job ) 同步的分发到各自的分片上;这里没有额外需要用户进行设置的选项,一切只需要等待 map-reduce jobs 在各个分片上执行完成即可;
分片 collection 作为 output
如果通过 out 设置将结果输出某个到 collection 中,并且结果集中包含一个分片值;MongoDB 将会使用 _id 字段将该 collection 进行分片;
- 如果 output collection 并不存在,那么 MongoDB 将会创建并根据 _id 来进行分片;
- For a new or an empty sharded collection, MongoDB uses the results of the first stage of the map-reduce operation to create the initial chunks distributed among the shards.
意思是说,如果是在新建该分片 collection 的时候,将会使用 map-reduce 操作第一阶段的数据来进行分片操作; - mongos dispatches, in parallel, a map-reduce post-processing job to every shard that owns a chunk. During the post-processing, each shard will pull the results for its own chunks from the other shards, run the final reduce/finalize, and write locally to the output collection.
这里主要是说明,如果在 post-processing 操作过程中,比如执行 finalize 方法或者最后一次 reduce 方法的时候,会从其它的分片中去获取中间数据,并进行计算得到最终的结果;