异步编程
有异步I/O,必有异步编程。
# 01. 函数式编程
# 高阶函数
高阶函数可以将函数作为参数或者将函数作为返回值的函数。
function foo(x) {
return function () {
return x;
};
}
通过改动sort()方法的参数,可以决定不同的排序方式。
var points = [40, 100, 1, 5, 25, 10];
points.sort(function(a, b) {
return a - b;
});
// [ 1, 5, 10, 25, 40, 100 ]
在Node中,事件的处理方式是基于高阶函数的特效来完成的。在自定义事件实例中,通过为相同事件注册不同的回调函数,可以很灵活的处理业务逻辑。
var emitter = new events.EventEmitter();
emitter.on('event_foo', function () {
// TODO
});
# 偏函数用法
偏函数用法是指创建一个调用另外一个部分-参数或变量已经预置的函数。
var toString = Object.prototype.toString;
var isString = function (obj) {
return toString.call(obj) == '[object String]';
};
var isFunction = function (obj) {
return toString.call(obj) == '[object Function]';
};
在JavaScript中进行类型判断时,通过类似与上述代码的方法定义。里面需要我们重复去定义一些类似的函数。为了解决重复定义的问题,我们引入了一个新函数,这个新函数可以像工厂一样批量创建一些类似的函数。
例如:通过isType()预先指定type的值,然后返回新的函数。这种通过指定部分参数来产生一个新的定制函数的形式就是 偏函数
var isType = function (type) {
return function (obj) {
return toString.call(obj) == '[object ' + type + ']';
};
};
var isString = isType('String');
var isFunction = isType('Function');
# 02. 异步编程的优势与难点
Node利用JavaScript以及内部异步库,将异步直接提升到业务层面。
# 优势
Node带来的最大特性:基于事件驱动的非阻塞I/O模型。非阻塞I/O使CPU与I/O互不依赖等待,让资源可以得到更好的利用。
异步I/O调用:
传统同步I/O模型:
Node实现异步I/O原理:利用事件循环的方式,JavaScript线程像一个分配任务和处理结果的大管家,I/O线程池里面的各个I/O线程都是小二。负责完成分配来的任务,小二与管理互不关联,所以可以保持整体的高效率。
由于事件循环模型需要应对海量请求,海量请求同时作用在单线程上,就需要防止任何一个计算耗费过多的CPU时间片。至于是计算密集型还是I/O 密集型,只要不计算异步I/O的调度,就构不成问题。建议对CPU得耗用不要超过10ms,或者将大量的计算分解成诸多的小量计算。通过setImmediate()进行调度。
# 难点
- 难点1: 异常处理 过去,我们处理异常,通过使用类java的try/catch/final语句块进行一场捕获。
try {
JSON.parse(json);
} catch (e) {
// TODO
}
但是这对于异步编程不一定适用。异步I/O的实现主要包括两个阶段:提交请求和处理结果
。这两个阶段中间有事件循环的调度,两者彼此不关联,异步方法通常在第一个阶段提交请求后立即返回,因为异常不一定发生在这个阶段。try/catch的功效在此处不会发生任何作用
var async = function (callback) {
process.nextTick(callback);
};
调用async()方法后,callback被存储起来,直到下一个事件循环Tick才会取出来执行。对异步方法进行try/catch操作只能捕获当前事件循环内的异常,对callback执行时抛出来的异常无能为力。
try {
async(callback);
} catch (e) {
// TODO
}
Node在处理异常上形成了一种约定,将异常作为回调函数的第一个实参传回,如果为空值,表明异常调用没有异常抛出。
async(function (err, results) {
// TODO
});
在我们自行编写的异步方法上,需要遵循这样的原则:
- 必须执行调用者传入的回调函数
- 正确传递异常供调用者判断
var async = function (callback) {
process.nextTick(function() {
var results = something;
if (error) {
return callback(error);
}
callback(null, results);
});
};
在异步方法的编写中,另外一个常犯的错误是对用户传递的回调函数进行异常捕获。
try {
req.body = JSON.parse(buf, options.reviver);
callback();
} catch (err){
err.body = buf;
err.status = 400;
callback(err);
}
上述代码的意图是捕获JSON.parse()中可能出现的异常,但是却不小心包含了用户传递的回调函数。这意味着如果回调函数中有异常抛出,将会进入catch()代码块中执行,于是回调函数会被执行两次。 正确的捕获:
try {
req.body = JSON.parse(buf, options.reviver);
} catch (err){
err.body = buf;
err.status = 400;
return callback(err);
}
callback();
在编写异步方法时,只要将异常正确的传递给用户的回调方法即可。无须过多处理。
- 难点2: 函数嵌套过深 在Node中,事务中存在多个异常调用的场景:比如一个遍历目录的操作:
fs.readdir(path.join(__dirname, '..'), function (err, files) {
files.forEach(function (filename, index) {
fs.readFile(filename, 'utf8', function (err, file) {
// TODO
});
});
});
对于上述场景,由于两次操作存在依赖关系,函数嵌套的行为也许情有可原。在网页渲染过程中,通常需要数据,模版,资源文件,这三者互相不依赖,但是在最终渲染结果中,三者缺一不可。如果采用默认的异步方法调用,程序取下:
fs.readFile(template_path, 'utf8', function (err, template) {
db.query(sql, function (err, data) {
l10n.get(function (err, resources) {
// TODO
});
});
});
- 难点3: 阻塞代码
疑惑🤔为什么没有sleep()这样的线程沉睡功能,唯有用于延时的setInterval和setTimeout()这两个函数。这两个函数不能阻塞后续代码的持续执行。实现sleep(1000)的效果:
// TODO
var start = new Date();
while (new Date() - start < 1000) {
// TODO
}
// 需要阻塞的代码
实际是这段代码会支持占用CPU进行判断,与真正的线程沉睡相去甚远,完全破坏了事件循环的调度。由于Node单线程的原因,CPU资源全都会用于这段代码服务,导致其余任何请求都会得不到响应。 需要这样的需求时,在统一规划好业务逻辑后,调用setTimeout()的效果会更好。
- 难点4: 多线程编程 我们在谈论JavaScript的时候,通常谈的是单一线程上执行的代码,这在浏览器中指的是JavaScript执行线程与UI渲染共用一个线程,在Node中,只是没有渲染UI的部分,模型基本相同。对于服务端而言,如果服务器是多核CPU,单个Node进程实质上没有充分利用多核CPU
浏览器提出Web Workers,它通过将JavaScript执行与UI渲染分离,可以很好利用多核CPU为大量计算服务。同时前端Web Workers也是一个利用消息机制合理使用多核CPU的理想模型。
遗憾在于前端浏览器存在对标准的滞后性,Web Workers没有广泛应用起来。Web Workers能解决利用CPU和减少阻塞UI渲染,但是不能解决UI渲染的效率问题。Node借鉴这个模式,child_process是其基础API,cluster模块是更深层次的应用。
- 难点5: 异步转同步 Node提供了绝大部分的异步API和少量的同步API,偶尔出现的同步需求会因没有同步API让开发者无所适从。
# 03. 异步编程解决方案
前面列举了因异步编程带来的一些问题,与异步编程提升的性能成功相比,编程过程看起来咩有想象中那么美好。 目前异步编程的主要解决方案有三种。
- 事件发布/订阅模式
- Primise/Deferred模式
- 流程控制库
# 事件发布/订阅模式
事件监听器模式是一种广泛用于异步编程的模式,是回调函数的事件化,又称发布/订阅模式
。
Node自身提供的events
模块 (opens new window)是发布/订阅模式的一个简单实现。Node中大部分模块都继承自它,这个模块比前端浏览器中大量DOM事件简单,不存在事件冒泡,也不存在preventDefault(),stopPropagation()和stopImmediatePropagation()
()等控制事件传递的方法。它具有addListener/on(), once(), removeListener(), removeAllListener()和emit()等基本的事件监听模式的方法实现。
// 订阅
emitter.on("event1", function (message) {
console.log(message);
});
// 发布
emitter.emit('event1', "I am message!");
可以看到订阅事件是一个高阶函数的应用。事件发布/订阅模式可以实现一个事件与多个回调函数(事件侦听器)的关联。通过 emit
发布事件后,消息会立即传递给当前节点事件的所有侦听器。侦听器可以灵活的添加和删除,使得事件与具体处理逻辑之间可以很轻松的关联与接耦。
事件发布/订阅模式自身并无同步与异步调用的问题,但是在Node中,emit()调用多半是伴随事件循环而异步触发的。所以我们说事件发布/订阅模式广泛应用于异步编程。
事件发布/订阅模式常常用来接耦业务逻辑,事件发布者无须关注订阅的侦听器如何实现业务逻辑,数据通过消息的方式可以很灵活的传递。
可以通过事件发布/订阅模式进行组件分装,将不变的部分分装在组件内部,将容易变化,需自定义的部分通过事件暴露给外部处理。这是一种典型的逻辑分离方式。
从另一个角度来看,事件侦听器模式也是一种钩子(hook)机制,利用钩子导出内部数据或状态给外部的调用者,Node中很多对象都具有黑盒子状态,功能点较少,如果不通过事件钩子的形式,很难获得对象在运行期间的中间值或内部状态。 Http请求就是典型的场景:
var options = {
host: 'www.google.com',
port: 80,
path: '/upload',
method: 'POST'
};
var req = http.request(options, function (res) {
console.log('STATUS: ' + res.statusCode);
console.log('HEADERS: ' + JSON.stringify(res.headers));
res.setEncoding('utf8');
res.on('data', function (chunk) {
console.log('BODY: ' + chunk);
});
res.on('end', function () {
// TODO
});
});
req.on('error', function (e) {
console.log('problem with request: ' + e.message);
});
// write data to request body
req.write('data\n');
req.write('data\n')
在这段Http请求中,程序员只需要将视线放在error,data,end这些业务事件点上,至于内部的流程如何,无须关注。
Node对事件发布/订阅做了额外处理:
- 如果对一个事件添加了超过10个侦听器,将会得到一个警告。这一处设计与Node自身单线程运行有关,设计者认为侦听器太多可能会导致内存泄露。所以存在这样一个警告。调用
emitter.setMaxListeners(0)
可以将这个限制去掉。另外,由于事件发布会引起一系列侦听器执行,如果事件相关的侦听器过多,可能会存在过多占用CPU的场景。 - 为了处理异常,
EventEmitter
对象对error事件进行了特殊对待,如果运行期间的错误触发了error事件,EventEmitter
会检查是否有多error事件添加过侦听器。如果添加了,这个错误将会交给侦听器处理,否则这个错误将会作为异常抛出。如果外部没有捕获这个异常,将会引起线程退出。
- 继承events模块 实现一个继承EventEmitter的类
var events = require('events');
function Stream() {
events.EventEmitter.call(this);
}
util.inherits(Stream, events.EventEmitter);
Node在util模块中封装了继承的方法,所以此处可以很遍历的调用。开发者可以通过这样的方式轻松的继承EventEmitter
,利用事件机制解决业务问题。在Node提供的核心模块中,有近半数都继承自EventEmitter
- 利用事件队列解决雪崩问题 在事件订阅/发布模式中,通常有一个once()方法,通过它添加的侦听器只执行一次,在执行之后就会将它与事件的关联移除。 如何采用once()解决雪崩问题 在计算机中,缓存由于存放在内存中,访问速度十分快,常常用于加速数据访问,让绝大多数的请求不必重复去做一些低效率的数据读取。
雪崩问题:就是在高访问量,大并发量的情况下缓存失效的情景此时大量的请求同时涌入数据库中,数据库无法同时承受如此大的查询请求,进而影响到网站整体的响应速度。 数据库查询语句:
var select = function (callback) {
db.select("SQL", function (results) {
callback(results);
});
};
如果站点刚好启动,这时缓存汇总是不存在数据的,如果访问量大,同一句sql会被发送到数据库中反复查询,会影响服务的整体性能,一种改进方案是添加一个状态锁。
var status = "ready";
var select = function (callback) {
if (status === "ready") {
status = "pending";
db.select("SQL", function (results) {
status = "ready";
callback(results);
});
}
};
在这种场景下,连续多次的调用select()时,只有第一次调用时生效的,后续的select()是没有数据服务的,这个时候可以引入事件队列
var proxy = new events.EventEmitter();
var status = "ready";
var select = function (callback) {
proxy.once("selected", callback);
if (status === "ready") {
status = "pending";
db.select("SQL", function (results) {
proxy.emit("selected", results);
status = "ready";
});
}
};
这里我们利用once()方法,将所有的请求回调都压入到事件队列中,利用其执行一次就会将监视器移除的特点,保证每一个回调只会被执行一次。
此处可能因为存在侦听器过多引发的警告,需要调用setMaxListeners(0)移除掉警告,或者设更大的警告阀值。
- 多异步之间的协作方案 事件发布/订阅有着它的优点,利用高阶函数的优势,侦听器作为回调函数可以随意添加和删除,它帮助开发者轻松处理随时可能添加的业务逻辑,也可以隔离业务逻辑,保持业务逻辑单元的职业单一。 使用原生代码解决难点2(函数嵌套过深 在Node中,事务中存在多个异常调用的场景)
var count = 0;
var results = {};
var done = function (key, value) {
results[key] = value;
count++;
if (count === 3) {
// 渲染页面
render(results);
}
};
fs.readFile(template_path, "utf8", function (err, template) {
done("template", template);
});
db.query(sql, function (err, data) {
done("data", data);
});
l10n.get(function (err, resources) {
done("resources", resources);
});
由于多个异步场景中回调函数的执行并不能保证顺序,且回调函数相互之间没有任何交集,所以需要借助一个第三方函数和第三方变量来处理异步协作的结果。
通常,我们将这个用于检测次数的变量叫做哨兵变量
。
可以利用偏函数来处理哨兵变量与第三方函数的关系
var after = function (times, callback) {
var count = 0, results = {};
return function (key, value) {
results[key] = value;
count++;
if (count === times) {
callback(results);
}
};
};
var done = after(times, render);
上述方案实现了多对一的目的。如果业务继续增长,我们依然可以利用发布/订阅的方式来完成多对多的方案
var emitter = new events.Emitter();
var done = after(times, render);
emitter.on("done", done);
emitter.on("done", other);
fs.readFile(template_path, "utf8", function (err, template) {
emitter.emit("done", "template", template);
});
db.query(sql, function (err, data) {
emitter.emit("done", "data", data);
});
l10n.get(function (err, resources) {
emitter.emit("done", "resources", resources);
});
这个方案结合了前者用简单的偏函数完成多对一的收敛与事件订阅/发布模式中一对多的发散。 这个缺点在于:调用者要去准备着done()函数,以及在回调函数中需要从结果中把数据一个一个提取出来,再进行维护。
另外一个方案:
var proxy = new EventProxy();
proxy.all("template", "data", "resources", function (template, data, resources) {
// TODO
});
fs.readFile(template_path, "utf8", function (err, template) {
proxy.emit("template", template);
});
db.query(sql, function (err, data) {
proxy.emit("data", data);
});
l10n.get(function (err, resources) {
proxy.emit("resources", resources);
});
EventProxy提供了一个all()方法来订阅多个事件,当每个事件都被触发以后,侦听器才会执行。 另外一个就是tail,他与all()方法的区别在于all()方法的侦听器在满足条件后只会执行一次,tail()方法的侦听器在满足条件时执行一次之后,如果组合事件的某个事件再次被触发,侦听器会用最新的数据继续执行。 all()方法的改进:在侦听器中返回数据的参数列表与订阅组合事件的事件列表时一致对应的。
在异步场景下,我们需要从一个接口多次读取数据,此时触发的事件名或许是相同的,EventProxy提供了after()方法来实现事件在执行多次后执行侦听器的单一事件组合订阅方式。
var proxy = new EventProxy();
proxy.after("data", 10, function (datas) {
// TODO
});
执行0次data事件后执行侦听器。这个侦听器的数据为10次按事件触发次序排序的数组。
- EventProxy原理
trigger: function (eventName) {
var list, calls, ev, callback, args;
var both = 2;
if (!(calls = this._callbacks)) return this;
while (both--) {
ev = both ? eventName : "all";
if ((list = calls[ev])) {
for (var i = 0, l = list.length; i < l; i++) {
if (!(callback = list[i])) {
list.splice(i, 1);
i--;
l--;
} else {
args = both ? Array.prototype.slice.call(arguments, 1) : arguments;
callback[0].apply(callback[1] || this, args);
}
}
}
}
return this;
},
EventProxy是将all作为一个事件流的拦截层,在其中注入一些业务来处理单一事件无法解决的异步处理问题。类似的扩展还有all(), tail(), after(), not()和any()等。
- EventProxy异常处理
通过额外添加error事件来进行异常统一处理。
exports.getContent = function (callback) {
var ep = new EventProxy();
ep.all('tpl', 'data', function (tpl, data) {
// 成功回调
callback(null, {
template: tpl,
data: data
});
});
// 侦听error事件
ep.bind('error', function (err) {
// ႂ载ۖ有处理函数
ep.unbind();
// 异常回调
callback(err);
});
fs.readFile('template.tpl', 'utf-8', function (err, content) {
if (err) {
//一旦发生异常,一律交给error事件的处理函数处理
return ep.emit('error', err);
}
ep.emit('tpl', content);
});
db.get('some sql', function (err, result) {
if (err) {
// 一旦发生异常,一律交给error事件的处理函数处理
return ep.emit('error', err);
}
ep.emit('data', result);
});
}
改进:
exports.getContent = function (callback) {
var ep = new EventProxy();
ep.all("tpl", "data", function (tpl, data) {
// 成功回调
callback(null, {
template: tpl,
data: data,
});
});
//绑定错误处理函数
ep.fail(callback);
fs.readFile("template.tpl", "utf-8", ep.done("tpl"));
db.get("some sql", ep.done("data"));
};
在上述代码中EventProxy提供了fail和done两个实例方法来优化异常处理。使得开发者将精力关注在业务开发上,而不是异常捕获上。 fail()的实现,参见一下变换:
ep.fail(callback);
等价于:
ep.fail(function (err) {
callback(err);
});
又等价于:
ep.bind('error', function (err) {
// ႂ卸载掉所有有处理函数
ep.unbind();
// 异常回调
callback(err);
});
done()的实现,参见一下变换:
ep.done('tpl');
等价于:
function (err, content) {
if (err) {
// 一旦发生异常,一律交给error事件处理函数处理
return ep.emit('error', err);
}
ep.emit('tpl', content);
}
同时done()方法接受一个函数作为参数:
ep.done(function (content) {
// TODO
// 无须关注异常
ep.emit('tpl', content);
});
这段代码等价于:
function (err, content) {
if (err) {
// 一旦发生异常,一律交给error事件的处理函数处理
return ep.emit('error', err);
}
(function (content) {
// TODO
// 无须关注异常
ep.emit('tpl', content);
}(content));
}
# Promise/Deferred模式
先执行异步调用,延迟传递处理。
# Promises/A
Promise/Deferred模式其实只包含两个部分,即Promise和Deferred。
Promise的状态转化示意图:
一个Promise对象只要具备了then() 方法即可。
then()方法定义如下:
then(fulfilledHandler, errorHandler, progressHandler)
通过Node的events模块来完成一个简单实现。
const events = require('events');
var EventEmitter = new events.EventEmitter();
var Promise = function () {
EventEmitter.call(this);
};
util.inherits(Promise, EventEmitter);
Promise.prototype.then = function (fulfilledHandler, errorHandler, progressHandler) {
if (typeof fulfilledHandler === 'function') {
// 利用once()方法,保证成功回调函数只执行一次。
this.once('success', fulfilledHandler);
}
if (typeof errorHandler === 'function') {
// 利用once()方法,保证异常回调函数只执行一次。
this.once('error', errorHandler);
}
if (typeof progressHandler === 'function') {
this.on('progress', progressHandler);
}
return this;
};
触发执行这些回调函数的地方,实现这些功能的对象通常被称为Deferred
,即延迟对象。
var Deferred = function () {
this.state = "unfulfilled";
this.promise = new Promise();
};
Deferred.prototype.resolve = function (obj) {
this.state = "fulfilled";
this.promise.emit("success", obj);
};
Deferred.prototype.reject = function (err) {
this.state = "failed";
this.promise.emit("error", err);
};
Deferred.prototype.progress = function (data) {
this.promise.emit("progress", data);
};
var promisify = function (res) { var deferred = new Deferred(); var result = '';
res.on('data', function (chunk) {
result += chunk;
deferred.progress(chunk); });
res.on('end', function () { deferred.resolve(result);
});
res.on('error', function (err) {
deferred.reject(err); });
return deferred.promise; };
如此就得到简单的结果。这里返回deferred.promise
的目的是为了·不让外部程序调用resolve和reject(),更改内部状态的行为交给定义者处理。
示例:
promisify(res).then(
function () {
// Done
},
function (err) {
// Error
},
function (chunk) {
// progress
console.log("BODY: " + chunk);
}
);
Promise和Deferred整体关系示意图:
与事件发布/订阅模式相比,Promise和Deferred模式的API接口和抽象模型都十分简洁。从上面代码可以看出,将业务中不可变的部分封装在了Deferred中,将可变的部分交给了Promise。 Q模块是Promise/A规范的一个实现。 它对Node中常见回调函数的Promise实现如下:
/**
* Creates a Node-style callback that will resolve or reject the deferred
* promise.
* @returns a nodeback
*/
defer.prototype.makeNodeResolver = function () {
var self = this;
return function (error, value) {
if (error) {
self.reject(error);
} else if (arguments.length > 2) {
self.resolve(array_slice(arguments, 1));
} else {
self.resolve(value);
}
};
};
makeNodeResolver
返回一个Node风格的回调函数。
对于fs.readFile()的调用,演化为如下形式:
var readFile = function (file, encoding) {
var deferred = Q.defer();
fs.readFile(file, encoding, deferred.makeNodeResolver());
return deferred.promise;
};
// 定义之后的调用示例如下:
readFile("foo.txt", "utf-8").then(
function (data) {
// Success case
},
function (err) {
// Failed case
}
);
- Promise 中多异步协作 Promise主要解决的是单个异步操作中存在的问题。当我们需要处理多个异步调用,该如何?
类似于EventProxy,原型实现:
Deferred.prototype.all = function (promises) {
var count = promises.length;
var that = this;
var results = [];
promises.forEach(function (promise, i) {
promise.then(
function (data) {
count--;
results[i] = data;
if (count === 0) {
that.resolve(results);
}
},
function (err) {
that.reject(err);
}
);
});
return this.promise;
};
对于多次文件的读取场景,以下面的代码为例,all()方法将两个单独的Promise重新抽象组合成一个新的Promise:
var promise1 = readFile("foo.txt", "utf-8");
var promise2 = readFile("bar.txt", "utf-8");
var deferred = new Deferred();
deferred.all([promise1, promise2]).then(function (results) {
// TODO
}, function (err) {
// TODO
});
- Promise的进阶知识。
恶魔金字塔:
obj.api1(function (value1) {
obj.api2(value1, function (value2) {
obj.api3(value2, function (value3) {
obj.api4(value3, function (value4) {
callback(value4);
});
});
});
});
拆分上述代码:
var handler1 = function (value1) {
obj.api2(value1, handler2);
};
var handler2 = function (value2) {
obj.api3(value2, handler3);
};
var handler3 = function (value3) {
obj.api4(value3, hander4);
};
var handler4 = function (value4) {
callback(value4);
});
obj.api1(handler1);
利用事件的开发者:
var emitter = new event.Emitter();
emitter.on("step1", function () {
obj.api1(function (value1) {
emitter.emit("step2", value1);
});
});
emitter.on("step2", function (value1) {
obj.api2(value1, function (value2) {
emitter.emit("step3", value2);
});
});
emitter.on("step3", function (value2) {
obj.api3(value2, function (value3) {
emitter.emit("step4", value3);
});
});
emitter.on("step4", function (value3) {
obj.api4(value3, function (value4) {
callback(value4);
});
});
emitter.emit("step1");
- 支持序列执行的Promise 链式调用:
promise()
.then(obj.api1)
.then(obj.api2)
.then(obj.api3)
.then(obj.api4)
.then(function (value4) {
// Do something with value4
}, function (error) {
// Handle any error from step1 through step4
})
.done();
优化后:
var Deferred = function () {
this.promise = new Promise();
};
// 完成态
Deferred.prototype.resolve = function (obj) {
var promise = this.promise;
var handler;
while ((handler = promise.queue.shift())) {
if (handler && handler.fulfilled) {
var ret = handler.fulfilled(obj);
if (ret && ret.isPromise) {
ret.queue = promise.queue;
this.promise = ret;
return;
}
}
}
};
// 失败态
Deferred.prototype.reject = function (err) {
var promise = this.promise;
var handler;
while ((handler = promise.queue.shift())) {
if (handler && handler.error) {
var ret = handler.error(err);
if (ret && ret.isPromise) {
ret.queue = promise.queue;
this.promise = ret;
return;
}
}
}
};
// 生成回调函数
Deferred.prototype.callback = function () {
var that = this;
return function (err, file) {
if (err) {
return that.reject(err);
}
that.resolve(file);
};
};
var Promise = function () {
// 队列用于存储执行的回调函数
this.queue = [];
this.isPromise = true;
};
Promise.prototype.then = function (
fulfilledHandler,
errorHandler,
progressHandler
) {
var handler = {};
if (typeof fulfilledHandler === "function") {
handler.fulfilled = fulfilledHandler;
}
if (typeof errorHandler === "function") {
handler.error = errorHandler;
}
this.queue.push(handler);
return this;
};
这里我们以两次文件读取作为例子,以验证该设计的可行性。这里假设读取第二个文件是依 赖于第一个文件中的内容的,相关代码如下:
var readFile1 = function (file, encoding) {
var deferred = new Deferred();
fs.readFile(file, encoding, deferred.callback());
return deferred.promise;
};
var readFile2 = function (file, encoding) {
var deferred = new Deferred();
fs.readFile(file, encoding, deferred.callback());
return deferred.promise;
};
readFile1('file1.txt', 'utf8').then(function (file1) {
return readFile2(file1.trim(), 'utf8');
}).then(function (file2) {
console.log(file2);
});
# 流程控制库
最为主流的模式——事件发布/订阅模式和Promise/Deferred模式,
- 尾触发与Next
除了事件和Promise外,还有一类方法是需要手工调用才能持续执行后续调用的,我们将此类方法叫做尾触发
,常见的关键词是next
。事实上,尾触发目前应用最多的地方是Connect的中间件。
var app = connect();
// Middleware
app.use(connect.staticCache());
app.use(connect.static(__dirname + '/public'));
app.use(connect.cookieParser());
app.use(connect.session());
app.use(connect.query());
app.use(connect.bodyParser());
app.use(connect.csrf());
app.listen(3001);
在通过use()方法注册好一系列中间件后,监听端口上的请求。中间件利用了尾触发的机制, 最简单的中间件如下:
function (req, res, next) {
// 中间件
}
每个中间件传递请求对象、响应对象和尾触发函数,通过队列形成一个处理流
中间件机制使得在处理网络请求时,可以像面向切面编程一样进行过滤、验证、日志等功能,而不与具体业务逻辑产生关联,以致产生耦合。 Connect的核心实现
function createServer() {
// 创建了HTTP服务器的request事件处理函数:
function app(req, res){ app.handle(req, res); }
utils.merge(app, proto);
utils.merge(app, EventEmitter.prototype);
app.route = '/';
// stack属性是这个服务器内部维护的中间件队列。通过调用use()方法我们可以将中间件放进队列中
app.stack = [];
for (var i = 0; i < arguments.length; ++i) {
app.use(arguments[i]);
}
return app;
};
use()方法的重要部分
app.use = function(route, fn){
// some code
this.stack.push({ route: route, handle: fn });
return this;
监听函数的实现如下:
app.listen = function(){
var server = http.createServer(this);
return server.listen.apply(server, arguments);
};
最终回到app.handle()方法,每一个监听到的网络请求都将从这里开始处理。该方法的代码如下:
app.handle = function(req, res, out) {
// some code
next();
};
原始的next()方法较为复杂,下面是简化后的内容,其原理十分简单,取出队列中的中间件并执行,同时传入当前方法以实现递归调用,达到持续触发的目的:
function next(err) {
// some code
// next callback
layer = stack[index++];
layer.handle(req, res, next);
}
- 流程控制模块async
- 异步的串行执行 async是如何解决“恶魔金字塔”问题的。 async提供了series()方法来实现一组任务的串行执行,示例代码如下:
var fs = require('fs');
var async = require("async");
async.series([
function (callback) {
fs.readFile('file1.txt', 'utf-8', callback);
},
function (callback) {
fs.readFile('file2.txt', 'utf-8', callback);
}
], function (err, results) {
// results => [file1.txt, file2.txt]
});
这段代码等价于:
var fs = require('fs');
var async = require("async");
fs.readFile('file1.txt', 'utf-8', function (err, content) {
if (err) {
return callback(err);
}
fs.readFile('file2.txt ', 'utf-8', function (err, data) {
if (err) {
return callback(err);
}
callback(null, [content, data]);
});
});
series()方法中传入的函数callback()并非由使用者指定。事实上,此处的回调函数由async通过高阶函数的方式注入,这里隐含了特殊的逻辑。每个callback()执行时会将结果保存起来,然后执行下一个调用,直到结束所有调用。最终 的回调函数执行时,队列里的异步调用保存的结果以数组的方式传入。这里的异常处理规则是一 旦出现异常,就结束所有调用,并将异常传递给最终回调函数的第一个参数。
- 异步的并行执行 当我们需要通过并行来提升性能时,async提供了parallel()方法,用以并行执行一些异步操作。以下为读取两个文件的并行版本:
async.parallel([
function (callback) {
fs.readFile('file1.txt', 'utf-8', callback);
},
function (callback) {
fs.readFile('file2.txt', 'utf-8', callback);
}
], function (err, results) {
// results => [file1.txt, file2.txt]
});
等价于:
var counter = 2;
var results = [];
var done = function (index, value) {
results[index] = value;
counter--;
if (counter === 0) {
callback(null, results);
}
};
// 只传递第一个异常
var hasErr = false;
var fail = function (err) {
if (!hasErr) {
hasErr = true;
callback(err);
}
};
fs.readFile('file1.txt', 'utf-8', function (err, content) {
if (err) {
return fail(err);
}
done(0, content);
});
fs.readFile('file2.txt', 'utf-8', function (err, data) {
if (err) {
return fail(err);
}
done(1, data);
});
- 异步调用的依赖处理
series()适合无依赖的异步串行执行,但当前一个的结果是后一个调用的输入时,series()方法就无法满足需求了。所幸,这种典型场景的需求,async提供了
waterfall()
方法来满足,相关代码如下:
async.waterfall([
function (callback) {
fs.readFile('file1.txt', 'utf-8', function (err, content) {
callback(err, content);
});
},
function (arg1, callback) {
// arg1 => file2.txt
fs.readFile(arg1, 'utf-8', function (err, content) {
callback(err, content);
});
},
function(arg1, callback){
// arg1 => file3.txt
fs.readFile(arg1, 'utf-8', function (err, content) {
callback(err, content);
});
}
], function (err, result) {
// result => file4.txt
});
等价于:
fs.readFile('file1.txt', 'utf-8', function (err, data1) {
if (err) {
return callback(err);
}
fs.readFile(data1, 'utf-8', function (err, data2) {
if (err) {
return callback(err);
}
fs.readFile(data2, 'utf-8', function (err, data3) {
if (err) {
return callback(err);
}
callback(null, data3);
});
});
});
- 自动依赖处理(auto) auto()方法能根据依赖关系自动分析,以最佳的顺序执行以上业务: async.auto(deps);
- Step Step与前面介绍的事件模式、Promise甚至async都不同的一点在于Step用到了this关键字。事实上,它是Step内部的一个next()方法,将异步调用的结果传递给下一个任务作为参数,并调用执行。 通过npm install step即可安装使用。示例代码如下
Step(task1, task2, task3);
Step接受任意数量的任务,所有的任务都将会串行依次执行。下面的示例代码将依次读取文件:
Step(
function readFile1() {
fs.readFile('file1.txt', 'utf-8', this);
},
function readFile2(err, content) {
fs.readFile('file2.txt', 'utf-8', this);
},
function done(err, content) {
console.log(content);
}
);
- 并行任务执行 Step如何实现多个异步任务并行执行呢?this具有一个parallel()方法,它告诉Step,需要等所有任务完成时才进行下一个任务,相关代码如下:
Step(
function readFile1() {
fs.readFile('file1.txt', 'utf-8', this.parallel());
fs.readFile('file2.txt', 'utf-8', this.parallel());
},
function done(err, content1, content2) {
// content1 => file1
// content2 => file2
console.log(arguments);
}
);
使用parallel()的时候需要小心的是,如果异步方法的结果传回的是多个参数,Step将只会取前两个参数,相关代码如下:
var asyncCall = function (callback) {
process.nextTick(function () {
// 在调用parallel()时,result2将会被丢弃。
callback(null, 'result1', 'result2');
});
};
Step提供的另外一个方法是group(),它类似于parallel()的效果,但是在结果传递上略有不同。下面的代码用于读取一个目录,然后迭代其中文件的操作:
Step(
function readDir() {
fs.readdir(__dirname, this);
},
function readFiles(err, results) {
if (err) throw err;
// Create a new group
var group = this.group();
results.forEach(function (filename) {
if (/\.js$/.test(filename)) {
fs.readFile(__dirname + "/" + filename, 'utf8', group());
}
});
},
function showAll(err, files) {
if (err) throw err;
console.dir(files);
}
);
有两次group()的调用。第一次调用是告知Step要并行执行,第二次调用的结果将会生成一个回调函数,而回调函数接受的返回值将会按组存储。parallel()传递给下一个任务的结果是如下形式:
function (err, result1, result2, ...);
group()传递的结果是:
function (err, results);
这个函数返回的数据保存在数组中。
# 04. 异步并发控制
在Node中,我们可以十分方便地利用异步发起并行调用。使用下面的代码,我们可以轻松发起100次异步调用:
for (var i = 0, i < 100; i++) {
async();
}
但是如果并发量过大,我们的下层服务器将会吃不消。如果是对文件系统进行大量并发调用,操作系统的文件描述符数量将会被瞬间用光,抛出如下错误:
Error: EMFILE, too many open files
# async的解决方案
async也提供了一个方法用于处理异步调用的限制:parallelLimit()。如下是async的示例代码:
async.parallelLimit([
function (callback) {
fs.readFile('file1.txt', 'utf-8', callback);
},
function (callback) {
fs.readFile('file2.txt', 'utf-8', callback);
}
], 1, function (err, results) {
// TODO
});
parallelLimit()与parallel()类似,但多了一个用于限制并发数量的参数,使得任务只能同时并发一定数量,而不是无限制并发。