Nodejs 系列四:深入 Events 和 EventEmitter

前言

本文是笔者所总结的有关 Nodejs 基础系列之一,

注明:本文为作者的原创作品,转载需注明出处;

EventEmitter

在开始本文以前,先来简单的认识一下 Nodejs 中的 EventEmitter,先直接来看一个例子,

1
2
3
4
5
6
7
8
9
10
const EventEmitter = require('events');

class MyEmitter extends EventEmitter {}

const myEmitter = new MyEmitter();
myEmitter.on('event', () => {
console.log('an event occurred!');
});

myEmitter.emit('event');

主进程中(备注,该主进程在 Nodejs 中被称为 Event Loop),

  1. 首先,初始化了一个 EventEmitter;代码 1 - 5 行;
  2. 然后,通过 EventEmitter 的 on() 方法注册了一个名为 ‘event’ 的事件,然后将回调方法通过 lambda 的方式注入;代码 5 - 8 行;
  3. 最后,通过 EventEmitter 的 emit 方法来触发 #2 中的事件方法;

嗯,是的,第一次看到上面这个例子,总感觉哪里不对劲,myEmitter 是主进程中的一个对象,在该对象注册一个事件,再由这个对象来触发对象自己的这个事件,这不都是在主进程中发生的吗?从对象的角度上来说,主进程的一个对象自己在主进程中通过接口回调自己的方法,都在主进程中发生的,何来异步之说?不过当你再过头来看看笔者在 Nodejs 系列二:Child Process 子进程中所介绍的例子,你应该就会恍然大悟了,

1
2
3
4
5
6
const { spawn } = require('child_process');
const ls = spawn('ls', ['-lh', '/usr']);

ls.on('close', (code) => {
console.log(`child process exited with code ${code}`);
});

我们知道,通过 child_process 模块所 spawn 出来的的对象_ls_ 本身就是一个子进程,一旦该对象被创建,就代表一个子进程开始执行,然后,第 4 - 5 行代码的意义就显而易见了,就是给子进程对象注入父进程的回调函数,这里要注意的是,该回调函数是注册在父进程的队列中的;上面代码的逻辑就是,当子进程 close 的时候,会回调主进程中的 lambda 函数既 (code) -> {...};那么还剩下一个疑问,那就是该对象_ls_ 是在什么时候触发回调函数的,也就是在什么时候执行 EventEmitter 的 emit 方法的?我们知道,EventEmitter 的标准流程是,通过调用 EventEmitter 对象的 emit 方法来触发主进程中的回调方法,才能执行回调,那么上面的例子中的 emit 方法是在什么地方执行的呢?可惜的是,这部分逻辑在 Nodejs 的源码 child_process.js 中并没有涉及,

1
2
3
4
5
6
const child_process = require('internal/child_process');
const {
_validateStdio,
setupChannel,
ChildProcess
} = child_process;

可见 child_process 以及 ChildProcess 均是在 internal 源码也就是 V8 引擎中实现的,所以笔者猜测其实现的逻辑是,父进程通过 referece count 引用与子进程保持关联,一旦该引用丢失或者被回收,便可以断定子进程已经 close,那么父进程中的 EventComitter 对象 _ls_ 便立即调用 emit 方法触发 'close' 事件,便实现了子进程对父进程的回调

所以综上所述,EventEmitter 只是一个接口,定义了主进程的回调函数的注册和回调的标准,至于各个模块该如何实现自己的回调函数的注册和回调的方式,按照这个标准来就好了,而模块的执行过程是否采用异步的方式,是否使用子线程,都是由实现了 EventEmitter 接口的该模块自己来决定,就类似于上面的 _ls_ 对象;因此当我们再看到诸如 net.Server、fs.ReadStream 等对象是如何实现事件的注册和回调的逻辑也就不奇怪了,也就了然了;

概念设计

我们知道,Nodejs 的异步核心就是它的 EventLoop 架构,一个 EventLoop 异步的处理所有的请求;这个可类比于 Java NIO 中的 Selector 框架;EventLoop 进程不停的接收 Request,然后将 Request 异步的交由子进程去处理( 备注,这里的异步主要是指,子进程的处理过程并不会打断 EventLoop 进程 ),子进程处理完以后,通过管道 Pipe 五阻塞的通知父进程 EventLoop,EventLoop 会按照回调事件的注册顺序依次进行回调,如果是一个 Web Request,该回调过程中会执行相应的 Response 操作;

多核的情况

multi core eventloop process design.png

眨一看,这个和 Java NIO 中的 Selector 架构简直是一模一样,NIO 中通过维护一个 Selector 进行将回调一样的注入到某个队列中,然后子进程通过管道来通知 NIO 的 Selector,Nodejs 中的异步机制也大同小异;当某个 request 进来以后,主进程会通过 spwan 初始化一个子进程(备注该子进程在 Nodejs 中是一个 EventEmitter 对象),并通过异步的方式使其立即执行;执行过程中,会根据进程的类型不同对应不同的处理方式,

  • 如果是科学计算型的进程,也就是说不需要 I/O 介入;这样该进程在多核的情况下会分配给另外的一个 CPU 执行;
  • 如果是 I/O 型的进程,这个时候,该请求会交付给 DMA 控制器;

当请求执行完成以后,会通过管道无阻塞的直接将结果反馈给 EventEmitter 对象,EventEmitter 对象便会回调该进程在步骤 2 中所注入的事件,通常情况下,如果是一个 Web Request,这个时候需要构造一个 Web Response 进行返回;备注,事件回调过程有可能不是立刻执行的,而是,要根据事件在 EventEmitter 对象中的事件队列中注入的先后顺序来执行的,也就是说是一个接一个按顺序执行的;备注,红色虚线方框内的逻辑表示了一个 Event Loop 的过程,一个 Nodejs 应用可以由这样一个 Event Loop 来构造,这样,就表示回调过程只会回调 EventEmitter 对象中的事件,因此,在这种场景下,我们称作,Nodejs 主进程抽象的概括为是由一个 Event Loop 进程所构成的;因此,国内大部分的教程,直接将 Nodejs 主进程概括为是一个 EventLoop 进程这种说法严格上来说,是不准确的;

上述描述了单个 EventLoop 进程的场景,不过它化繁为简,比较简洁的还原了事物的本质;但是现实场景往往比这个更为复杂,来看看多个 EventLoop 进程的场景,
multi core and multi eventloop process design.png

在真实的应用场景如上图所示,在 main process 中会通过 sub_process 模块的 spawn 命令初始化出多个由 EventEmitter 对象所构成的 child process,也就是说,会创建出更多的 Event Loop 进程;但其实问题的本质依然是一样的,虽然有多个子进程,这些子进程都是科学计算型的进程( Nodejs 通过异步的方式,将 I/O 进程同样转换成了科学计算型进程 ),所以对于 CPU 而言,都可以无中断的对它们依次的高效的执行,说白了,就是 CPU 在执行过程中无需任何的进程上下文切换;那么 CPU 又是如何做到依次执行呢?这就得益于 CPU 的 L2、L3 Cache 了,这些进程将会依次的缓存到 CPU 的 L2、L3 Cache 或者内存中等待执行,所以要想 Nodejs 的执行效率更为高效,强大的 L2、L3 Cache 必不可少,这也就是为什么科学计算型的服务器往往都会配置非常高的 L2、L3 Cache 的原因之所在了;

单核的情况

要能够将一个主机的性能发挥至极致,那么必须考虑单核这样极端的场景;流程图和多核的情况类似,这里不再赘述;要注意的是,如果是单核的情况,那么子进程一定会争用 EventLoop 的 CPU 资源,因为都只能享有唯一的一个 CPU 资源;Nodejs 通过异步的方式能够保证 CPU 不发生中断,因此可以保证将每一个进程转变为科学计算型的进程,这个时候,CPU 不会因为进程同步等发生中断,但是,注意了,如果某个子进程执行的时间非常长,比如,嗯,比如计算某个质数的指数或者对音视频流数据解码之类的这类计算周期非常长的子进程,这个时候,CPU 为了公平的调度每个进程,会中断该长时间的进程转而执行另外的进程,这个时候中断也发生了;所以,在使用 Nodejs 的时候,尽量保证不要出现一个非常长的子进程;如果真的有这样的情况,而且非常的常见,建议将这样的任务单独的剥离出 Nodejs 服务器而选择一个独立的服务器异步的执行;这样便可以最大限度的保证 Nodejs 服务器的性能;

总结,笔者这里考虑了一种即便全是科学计算型的进程也导致中断的问题,要予以足够的重视;

源码分析

笔者不打算对功能性的东西照本宣科了,这部分内容大家可以直接参考官网的 API 文档,而是通过一个例子来对 EventEmitter 的核心源码进行剖析;

例子

1
2
3
4
5
6
7
8
9
10
11
12
13
const EventEmitter = require('events');

class MyEmitter extends EventEmitter {}

const myEmitter = new MyEmitter();

myEmitter.on('event', (a, b) => {

console.log('log prints out:', a, b, this);

});

myEmitter.emit('event', 'a', 'b');

打印结果,

1
log prints out: a b {}

源码剖析

事件注册

例子中的代码第 7 行,会调用 events 模块的 addListener(type, listener) 方法对事件 (a, b) => {...} 进行注册,

1
2
3
EventEmitter.prototype.addListener = function addListener(type, listener) {
return _addListener(this, type, listener, false);
};

并调用 _addListener 方法,可见,_addListener 方法的四个参数分别是,target -> EventLoop 实例本身,type -> 'event',listener -> 回调方法 (a, b) => {}, prepend -> false;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
function _addListener(target, type, listener, prepend) {
var m;
var events;
var existing;

if (typeof listener !== 'function')
throw new TypeError('"listener" argument must be a function');

events = target._events;
if (!events) {
events = target._events = Object.create(null);
target._eventsCount = 0;
} else {
// To avoid recursion in the case that type === "newListener"! Before
// adding it to the listeners, first emit "newListener".
if (events.newListener) {
target.emit('newListener', type,
listener.listener ? listener.listener : listener);

// Re-assign `events` because a newListener handler could have caused the
// this._events to be assigned to a new object
events = target._events;
}
existing = events[type];
}

if (!existing) {
// Optimize the case of one listener. Don't need the extra array object.
existing = events[type] = listener;
++target._eventsCount;
} else {
if (typeof existing === 'function') {
// Adding the second element, need to change to array.
existing = events[type] =
prepend ? [listener, existing] : [existing, listener];
// If we've already got an array, just append.
} else if (prepend) {
existing.unshift(listener);
} else {
existing.push(listener);
}

// Check for listener leak
if (!existing.warned) {
m = $getMaxListeners(target);
if (m && m > 0 && existing.length > m) {
existing.warned = true;
const w = new Error('Possible EventEmitter memory leak detected. ' +
`${existing.length} ${String(type)} listeners ` +
'added. Use emitter.setMaxListeners() to ' +
'increase limit');
w.name = 'MaxListenersExceededWarning';
w.emitter = target;
w.type = type;
w.count = existing.length;
process.emitWarning(w);
}
}
}

return target;
}

该方法主要对应三个部分,

  1. 代码 9 - 25 行
    这部分代码实际上对应到 Nodejs 初始化的过程,重要的是 EventLoop 的 events 的 newListener 属性,不过从调试来看,该 newListener 只有当 removeListener 和 warn 两个 listener 发生注册的时候才会被触发,而一旦 Nodejs 初始化完毕,newListener 会从 events 对象中移除;
  2. 代码 27 - 41 行
    这部分逻辑是核心,打算放到后面深入分析;
  3. 代码 44 - 59 行
    这部分代码主要是检测 events 中所注册的 listeners 数量是否超出了额定的最大的长度,若超出了,则抛出 warning;

在对上面的三个部分有了一个大致的了解以后,我们来看看 #2 部分的源码,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
if (!existing) {
// Optimize the case of one listener. Don't need the extra array object.
existing = events[type] = listener;
++target._eventsCount;
} else {
if (typeof existing === 'function') {
// Adding the second element, need to change to array.
existing = events[type] =
prepend ? [listener, existing] : [existing, listener];
// If we've already got an array, just append.
} else if (prepend) {
existing.unshift(listener);
} else {
existing.push(listener);
}

... 检查 listener 的数量是否超限?
}
  1. 如果当前类型的( type )事件( event )不存在,则在 events 对象中以当前的 type 新建一个属性,且将该属性赋值为 listener 对象;
    这里要注意的是,javascript 可以通过 obj[val] 的方式,根据 val 的值来访问 obj 的属性,比如 val = ‘hello’,那么这样的访问方式就等价于访问 obj.hello 属性;
  2. 如果当前类型的事件已经存在,分为如下的几种情况进行处理,

    1. 如果是第一次检测到该 event 存在,那么当前的 listener 也就是属性 existing 必然为 'function' 类型,所以这个时候,将该 event 转换成队列的方式;这里要给 Nodejs 中的源码点赞了,注释写得蛮到位的;注意,转换的时候需要判断是否使用 prepend 模式,笔者将其翻译为”优先模式”,意思是,如果 prepend == true,那么将该同类型的事件追加到事件队列的前面;
    2. 如果不是第一次检测到该 event 存在, 则判断是否采用 prepend 模式,若是,则追加到该类型的 events 队列的前面,否则追加到该类型的 events 队列的后面;

可以看到,EventLoop 根据不同类型的 event (the different event type)分别维护了一个 listener 队列,一个类型就是 events 对象的一个属性,而该属性的值就是一个 listener 队列;

事件触发

代码第 13 行会触发 'event' 类型事件;来看看其底层调用的源码,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
EventEmitter.prototype.emit = function emit(type) {
var er, handler, len, args, i, events, domain;
var needDomainExit = false;
var doError = (type === 'error');

events = this._events;
if (events)
doError = (doError && events.error == null);
else if (!doError)
return false;

domain = this.domain;

// If there is no 'error' event listener then throw.
if (doError) {
if (arguments.length > 1)
er = arguments[1];
if (domain) {
if (!er)
er = new Error('Unhandled "error" event');
if (typeof er === 'object' && er !== null) {
er.domainEmitter = this;
er.domain = domain;
er.domainThrown = false;
}
domain.emit('error', er);
} else if (er instanceof Error) {
throw er; // Unhandled 'error' event
} else {
// At least give some kind of context to the user
const err = new Error('Unhandled "error" event. (' + er + ')');
err.context = er;
throw err;
}
return false;
}

handler = events[type];

if (!handler)
return false;

if (domain && this !== process) {
domain.enter();
needDomainExit = true;
}

var isFn = typeof handler === 'function';
len = arguments.length;
switch (len) {
// fast cases
case 1:
emitNone(handler, isFn, this);
break;
case 2:
emitOne(handler, isFn, this, arguments[1]);
break;
case 3:
emitTwo(handler, isFn, this, arguments[1], arguments[2]);
break;
case 4:
emitThree(handler, isFn, this, arguments[1], arguments[2], arguments[3]);
break;
// slower
default:
args = new Array(len - 1);
for (i = 1; i < len; i++)
args[i - 1] = arguments[i];
emitMany(handler, isFn, this, args);
}

if (needDomainExit)
domain.exit();

return true;
};

该段远吗主要分为两个部分,

  1. 处理错误信息的逻辑;
  2. 处理事件回调的逻辑

我们聚焦到处理回调过程的逻辑,此部分逻辑主要对应源码 48 - 70 行;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
var isFn = typeof handler === 'function';
len = arguments.length;
switch (len) {
// fast cases
case 1:
emitNone(handler, isFn, this);
break;
case 2:
emitOne(handler, isFn, this, arguments[1]);
break;
case 3:
emitTwo(handler, isFn, this, arguments[1], arguments[2]);
break;
case 4:
emitThree(handler, isFn, this, arguments[1], arguments[2], arguments[3]);
break;
// slower
default:
args = new Array(len - 1);
for (i = 1; i < len; i++)
args[i - 1] = arguments[i];
emitMany(handler, isFn, this, args);
}

通过 arguments 获取分别获得调用方法的参数,回过头来看看例子中的代码第 13 行,我们有三个参数,分别为 'event', 'a' 和 'b',所以,理所当然,我们将会进入 case 3 的情况,

1
2
3
case 3:
emitTwo(handler, isFn, this, arguments[1], arguments[2]);
break;

第一次看到这种调用方式的时候,emitNone、emitOne、emitTwo、emitThree 以及 default 的调用方式深敢奇怪,因为这四个方法内部的核心内容打通小异,都是通过 call 的方式调用回调函数;这里我们先来看看 emitTwo 方法,

1
2
3
4
5
6
7
8
9
10
function emitTwo(handler, isFn, self, arg1, arg2) {
if (isFn)
handler.call(self, arg1, arg2);
else {
var len = handler.length;
var listeners = arrayClone(handler, len);
for (var i = 0; i < len; ++i)
listeners[i].call(self, arg1, arg2);
}
}

再来看看 emitThree,

1
2
3
4
5
6
7
8
9
10
function emitThree(handler, isFn, self, arg1, arg2, arg3) {
if (isFn)
handler.call(self, arg1, arg2, arg3);
else {
var len = handler.length;
var listeners = arrayClone(handler, len);
for (var i = 0; i < len; ++i)
listeners[i].call(self, arg1, arg2, arg3);
}
}

再来看看 emitMany,

1
2
3
4
5
6
7
8
9
10
function emitMany(handler, isFn, self, args) {
if (isFn)
handler.apply(self, args);
else {
var len = handler.length;
var listeners = arrayClone(handler, len);
for (var i = 0; i < len; ++i)
listeners[i].apply(self, args);
}
}

三个方法的实现逻辑都大同小异,关键是,emitTwo、emitThree 都可以由 emitMany 替换,但是为什么 Nodejs 要这么来做呢?其实答案就在它源码的注解中了,

1
2
3
4
5
6
// slower
default:
args = new Array(len - 1);
for (i = 1; i < len; i++)
args[i - 1] = arguments[i];
emitMany(handler, isFn, this, args);

因为使用 emitMany 的方式会更慢;从这里可以看出,Nodejs 是处处都在考虑它的性能;

总结

这是笔者第一次分析 Nodejs 的源码便深深的体会到 Nodejs 源码的简洁,EventLoop 的时间注册和触发的逻辑非常的简洁,EventLoop 进程通过一个 events 对象衔接了事件的注册和触发的整个逻辑,也体现了 EventLoop 的精髓所在,通过一个主进程来接受、注册并触发事件;这也正是 Event Driven 的核心;

要能更为准确的把握 Event Loop 机制,要知道 Event Loop 中的 IDLE Watcher,I/OCheck 的概念,参考笔者所总结的一篇内容 best practise

this

this 指向 Function 本身,

1
2
3
4
5
const myEmitter = new MyEmitter();
myEmitter.on('event', function(a, b) {
console.log(a, b, this);
});
myEmitter.emit('event', 'a', 'b');

输出,

1
2
3
4
5
a b MyEmitter {
domain: null,
_events: { event: [Function] },
_eventsCount: 1,
_maxListeners: undefined }

要注意的是,lambda 方法的 this 不再指向 Function 本身了;

1
2
3
4
5
const myEmitter = new MyEmitter();
myEmitter.on('event', (a, b) => {
console.log(a, b, this);
});
myEmitter.emit('event', 'a', 'b');

输出,

1
a b {}

附录

通过 obj[a] 访问对象属性的方式

1
2
3
4
5
6
7
8
9
10
11
12
var a = 'x'
var b = 'y'

var events = Object.create(null);

// 给 events 对象分别赋一个 x 和 y 的属性;
events[a] = 'M'
events[b] = 'N'

// 分别调用 events 对象的 x 和 y 属性;
console.log(events[a]+" <==> "+events.x)
console.log(events[b]+" <==> "+events.y)

输出,

1
2
M <==> M
N <==> N