NODEJS硬实战笔记(多进程)
利用NODE整合外部应用程序
执行外部应用程序
- execFile:执行外部程序,并且需要提供一组参数,以及一个在进程退出后的缓冲输出的回调。
- spawn:执行外部程序,并且需要提供一组参数,以及一个在进程退出后的输入输出和时间的数据流接口。
- exec:在一个命令行窗口中执行一个或多个命令,以及一个在进程退出后缓冲输出的回调。
- fork:在一个独立的进程中执行一个Node模块,并且需要提供一组参数,以及一个类似spawn方法里的数据流和事件式的接口,同时设置好父进程和子进程之间的进程间通信。
execFile
这是一个非常通用的方法,运行一个外部程序并且得到相应的输出结果。该方法是一个异步方法,在外部应用的输出内部使用buffer存放起来直到外部应用退出时,回调被调用传入对应的输出数据。
var cp = require('child_process')
cp.execFile('echo', ['hello', 'world'],
function(err, stdout, stderr) {
if (err) console.error(err);
console.log('stdout', stdout);
console.log('stderr', stderr);
});
Windows/UNIX操作系统中都有一个PATH的环境变量,PATH包含了一组可执行程序的执行目录列表。在Node中,当后台运行execvp时,当没有提供绝对或者相对路径时,它会基于PATH里定义的路径搜索所有相关的程序。
如果想要快速检查PATH路径包含哪一些目录,可以在Node的交互式命令解析器里输入:
$ node
> console.log(process.env.PATH.split(':').join('\n'))
/usr/local/bin
/usr/bin/bin
...
当然你可以继续向PATH路径当中添加你的路径,但是必须保证这个设置是在你执行execFile之前。
process.env.PATH += ':/a/new/path/to/executable';
执行外部程序时出现的异常
主要的异常分为两种,一种是提供的路径或文件名称不存在,一种是提供的应用路径被锁定(执行应用的权限不足)。
- 提供的路径或文件名不存在时:通常会报错ENOENT
- 执行应用的权限不足:通常会报错EACCESS或者EPERM
- 该程序不能在当前的平台下执行时:外部程序退出返回的状态码非零(即err.code!=0)
spawn
对于调用一个你可能预期有大数据量输出的外部应用,流确实是一个很好的选择。当外部程序输出的数据可用时,此时你可以选择马上将数据输出到内部应用,通过流。相反,而不是等到将所有数据缓存好之后再将其输出。
var cp = require('child_process');
var child = cp.spawn('echo', ['hello', 'world']);
child.on('error', console.error);
child.stdout.pipe(process.stdout);
child.stderr.pipe(process.stderr);
实例化一个spawn后,会返回一个ChildProccess对象,该对象中包含了stdin、stdout和stderr流对象,并且因为其流的性质,可以很好的进行无缝的处理。
var cp = require('child_process');
var cat = cp.spawn('cat', ['messy.txt']);
var sort = cp.spawn('sort');
var uniq = cp.spawn('uniq');
cat.stdout.pipe(sort.stdin);
sort.stdout.pipe(uniq.stdin);
uniq.stdout.pipe(process.stdout);
exec
如果需要在命令解析器里执行命令,你可以选择使用exec。exec方法实际上也是调用/bin/sh(在UNIX/Linux下)或者cmd.exe(在Windows下)来执行命令的。当然,这种方法可行的前提是,你必须拥有其他需要被执行的命令的权限(如管道、重定向和后台命令)。
cp.exec('cat messy.txt | sort | uniq',
function(err, stdout, stderr) {
console.log(stdout);
})
分离子进程
-
形式上的分离
当在进程里开启该进程的子进程以后,子进程会依赖于父进程。当父进程关闭的时候,子进程也会随着关闭,并且子进程是没有自己的独立I/O。那么如果想让子进程脱离父进程,就需要使用spawn方法,从而使得子进程拥有和父进程一样的级别,即成为一个进程组的头。
var child = cp.spawn('./', [], {detached: true});
但是此时,子进程和父进程之间还是通过I/O互相连接的,所以如果不强制性终结正在运行的Node程序,就会发现父进程会一直保持活跃状态,直到子进程结束。但是强制性终结Node程序以后,longrun会继续执行,直到它自己终结。
-
I/O分离
而stdio选项就是来控制子进程的I/O连接到一个具体的地方,
stdio:['pipe', 'pipe', 'pipe']
三个流分别对应child.stdin、child.stdout和child.stderr。默认这些流都是开放的,所以父进程能够与子进程之间进行通信。当然你可以使用很暴力的方式关闭掉这些流,阻止父进程与子进程进行通信。child.stdin.destroy(); child.stdout.destroy(); child.stderr.destroy();
但是既然我们根本不需要这些流,那就应该在源头上去放弃掉这些流或者重新赋值将I/O指向别的地方。
var fs = require('fs'); var cp = require('child_process'); var outFd = fs.openSync('./longrun.out', 'a'); var errFd = fs.openSync('./longrun.err', 'a'); var child = cp.spawn('./longrun', [], { detached: true, stdio: ['ignore', outFd, errFd] });
ignore关键词就是用来放弃相对应的流。
-
引用分离
尽管子进程被分离了并且它和父进程的I/O也被中断了,但是父进程仍然会有一个堆子进程的内部引用,并且只要子进程没有终结且这个引用没有被移除,父进程都不会终结。所以可以通过child.unref()方法告诉Node不要将子进程的引用进行计数。下面的代码就会再子进程执行spawn方法之后退出。
var fs = require('fs'); var cp = require('child_process'); var outFd = fs.openSync('./longrun.out', 'a'); var errFd = fs.openSync('./longrun.err', 'a'); var child = cp.spawn('./longrun', [], { detached: true, stdio: ['ignore', outFd, errFd] }); child.unref();
fork(操作一个独立的Node进程)
var cp = require('child_process');
var child = cp.fork('./myChild');
默认情况下,通过fork创建的子进程所有的输入输出都是继承自父进程的,并不会有child.stdin、child.stdout或child.stderr。
如果想提供像spawn一样的默认的I/O配置,那么可以使用slient选项。
var cp = require('child_process');
var child = cp.fork('./myChild', { silent: true });
使用fork方法会开放一个IPC通道,使得不同的Node进程之间进行消息传送。而Node进程之间主要是使用Event进行通信,在子进程这边会暴露process.on('message')和process.send()来接收和发送消息,在父进程这边使用child.on('message')和child.send()。
var cp = require('child_process');
var child = cp.fork('./child');
child.on('message', function(msg) {
console.log('got a message from child', msg);
});
child.send('sending a string');
因为我们打开了一个父进程和子进程间的一个IPC通道,只要子进程不中断,父进程也就会保持活动状态。如果需要中断IPC通信连接,可以在父进程中显式的实现:child.disconnect();
一个较好的父进程例子,考虑了多次调用以及子进程出现问题的情况:
function doWork(job, cb) {
var child = cp.fork('./worker');
var cbTriggered = false;
child
.once('error', function(err) {
if (!cbTriggered) {
cb(err);
cbTriggered = true;
}
// 子进程出现了异常则杀死子进程
child.kill();
})
.once('exit', function(code, signal) {
if (!cbTriggered)
cb(new Error('Child exited with code: ' + code));
})
.once('message', function(result) {
cb(null, result);
cbTriggered = true;
})
.send(job);
}
工作池
在Node的官方文档中有述:
这些子节点仍然是一个V8的新实例。预计每一个节点需要耗时30毫秒的启动时间和10MB的内存。
也就是说,你不能创建太多了,因为这些并不是没有代价开销的。
所以说在实现的过程当中,与其使用多个短时间的子进程,还不如维护一个工作池,池中存放了一些可以长时间运行的进程。
那么我们就在上面doWork的基础之上,做一些优化,完成我们在工作池上的一个作业分配以及发送。
var cp = require('child_process');
var cpus = require('os').cpus().length;
module.exports = function(workModule) {
// 等待的作业
var awaiting = [];
// 空闲的子进程
var readyPool = [];
// 总子进程的个数
var poolSize = 0;
return function doWork(job, cb) {
// 如果现在没有准备好的子进程,并且总子进程数已经超过cpu的个数了,就让作业先排队等待
if (!readyPool.length && poolSize > cpus)
return awaiting.push([ doWork, job, cb ]);
// 如果有空闲的子进程则取出第一个使用,没有的话就新建一个子进程
var child = readyPool.length
? readyPool.shift()
: (poolSize++, cp.fork(workModule));
var cbTriggered = false;
child
// 先删除原来的监听
.removeAllListeners()
.once('error', function(err) {
if (!cbTriggered) {
cb(err);
cbTriggered = true;
}
child.kill();
})
.once('exit', function() {
if (!cbTriggered)
cb(new Error('Child exited with code: ' + code));
// 进程关闭的时候将它从队列中踢出
poolSize--;
var childIdx = readyPool.indexOf(child);
if (childIdx > -1) readyPool.splice(childIdx, 1);
})
.once('message', function(msg) {
cb(null, msg);
cbTriggered = true;
// 子进程再次就绪,将其加回readPool
readyPool.push(child);
// 如果现在有等待的任务,运行之
if (awaiting.length) setImmediate.apply(null, awaiting.shift());
})
}
}
在这段代码的最后我是有一些疑问的,因为当子进程收到message的时候,进行了异步回调。因为当时没太能分清异步与多线程的区别。所以我当时就不太懂回调都还没有结束,为什么就能回收这个进程了?
那么这边也区分一下异步与多线程。
- 异步:首先说一下DMA,DMA就是直接内存访问的意思,也就是说,拥有DMA功能的硬件在和内存进行数据交换的时候可以不消耗CPU资源。只要CPU在发起数据传输时发送一个指令,硬件就开始自己和内存交换数据,在传输完成之后硬件会触发一个中断来通知操作完成。这些无须消耗CPU时间的I/O操作正是异步操作的硬件基础。所以即使在DOS这样的单进程(而且无线程概念)系统中也同样可以发起异步的DMA操作。因为异步操作无须额外的线程负担,并且使用回调的方式进行处理,在设计良好的情况下,处理函数可以不必使用共享变量(即使无法完全不用,最起码可以减少 共享变量的数量),减少了死锁的可能。当然异步操作也并非完美无暇。编写异步操作的复杂程度较高,程序主要使用回调方式进行处理,与普通人的思维方式有些出入,而且难以调试。
- 多线程:线程不是一个计算机硬件的功能,而是操作系统提供的一种逻辑功能,线程本质上是进程中一段并发运行的代码,所以线程需要操作系统投入CPU资源来运行和调度。多线程的优点很明显,线程中的处理程序依然是顺序执行,符合普通人的思维习惯,所以编程简单。但是多线程的缺点也同样明显,线程的使用(滥用)会给系统带来上下文切换的额外负担。并且线程间的共享变量可能造成死锁的出现。
详细可见:浅谈多线程和异步
同步运行
其实在异步的API介绍完以后,同步的API就显得很简单了。因为它和异步的API基本上是一样的,只不过在实现的过程当中会阻塞掉主线程,直到子进程模块完成。
-
如果只想同步执行一个单独的命令,并且得到输出,那么可以使用execFileSync
var ex = require('child_process').execFileSync; var stdout = ex('echo', ['hello']).toString(); console.log(stdout);
-
如果想程序式同步执行多个命令,并且命令之间的结果存在相互依赖的关系,可以使用spawnSync
var sp = require('child_process').spawnSync; var ps = sp('ps', ['aux']); var grep = sp('grep', ['node']) { input: ps.stdout; encoding: 'utf-8' }); console.log(grep);
同步子进程得到的结果包含了很多的细节,这也是使用spawnSync的另外一个好处。
-
当然execSync也是同样的使用方法,这里不再赘述
同步子进程中的异常处理
如果在execSync或execFileSync执行的结果中返回的是一个非零状态,这种情况下,将会有异常抛出。这个抛出的异常对象将会包含我们在使用spawnExec返回的结果里的所有东西。我们可以访问状态编码里的重要信息和stderr流。
var ex = require('child-process').execFileSync;
try {
ex('cd', ['non-existent-dir'], {
encoding: 'utf-8'
});
} catch(err) {
console.error('exit status was', err.status);
console.error('stderr', err.stderr);
}