NODEJS硬实战笔记程序员开源工具技巧

NODEJS硬实战笔记(多进程)

2017-03-31  本文已影响538人  77即是正义

利用NODE整合外部应用程序

执行外部应用程序

execFile

这是一个非常通用的方法,运行一个外部程序并且得到相应的输出结果。该方法是一个异步方法,在外部应用的输出内部使用buffer存放起来直到外部应用退出时,回调被调用传入对应的输出数据。

var cp = require('child_process')

cp.execFile('echo', ['hello', 'world'],
    function(err, stdout, stderr) {
        if (err) console.error(err);
        console.log('stdout', stdout);
        console.log('stderr', stderr);
    });

Windows/UNIX操作系统中都有一个PATH的环境变量,PATH包含了一组可执行程序的执行目录列表。在Node中,当后台运行execvp时,当没有提供绝对或者相对路径时,它会基于PATH里定义的路径搜索所有相关的程序。

如果想要快速检查PATH路径包含哪一些目录,可以在Node的交互式命令解析器里输入:

$ node
> console.log(process.env.PATH.split(':').join('\n'))
/usr/local/bin
/usr/bin/bin
...

当然你可以继续向PATH路径当中添加你的路径,但是必须保证这个设置是在你执行execFile之前。

process.env.PATH += ':/a/new/path/to/executable';

执行外部程序时出现的异常

主要的异常分为两种,一种是提供的路径或文件名称不存在,一种是提供的应用路径被锁定(执行应用的权限不足)

spawn

对于调用一个你可能预期有大数据量输出的外部应用,流确实是一个很好的选择。当外部程序输出的数据可用时,此时你可以选择马上将数据输出到内部应用,通过流。相反,而不是等到将所有数据缓存好之后再将其输出。

var cp = require('child_process');

var child = cp.spawn('echo', ['hello', 'world']);
child.on('error', console.error);
child.stdout.pipe(process.stdout);
child.stderr.pipe(process.stderr);

实例化一个spawn后,会返回一个ChildProccess对象,该对象中包含了stdin、stdout和stderr流对象,并且因为其流的性质,可以很好的进行无缝的处理。

var cp = require('child_process');
var cat = cp.spawn('cat', ['messy.txt']);
var sort = cp.spawn('sort');
var uniq = cp.spawn('uniq');

cat.stdout.pipe(sort.stdin);
sort.stdout.pipe(uniq.stdin);
uniq.stdout.pipe(process.stdout);

exec

如果需要在命令解析器里执行命令,你可以选择使用exec。exec方法实际上也是调用/bin/sh(在UNIX/Linux下)或者cmd.exe(在Windows下)来执行命令的。当然,这种方法可行的前提是,你必须拥有其他需要被执行的命令的权限(如管道、重定向和后台命令)。

cp.exec('cat messy.txt | sort | uniq', 
    function(err, stdout, stderr) {
        console.log(stdout);
    })

分离子进程

fork(操作一个独立的Node进程)

var cp = require('child_process');
var child = cp.fork('./myChild');

默认情况下,通过fork创建的子进程所有的输入输出都是继承自父进程的,并不会有child.stdinchild.stdoutchild.stderr

如果想提供像spawn一样的默认的I/O配置,那么可以使用slient选项。

var cp = require('child_process');
var child = cp.fork('./myChild', { silent: true });

使用fork方法会开放一个IPC通道,使得不同的Node进程之间进行消息传送。而Node进程之间主要是使用Event进行通信,在子进程这边会暴露process.on('message')process.send()来接收和发送消息,在父进程这边使用child.on('message')child.send()

var cp = require('child_process');
var child = cp.fork('./child');
child.on('message', function(msg) {
    console.log('got a message from child', msg);
});
child.send('sending a string');

因为我们打开了一个父进程和子进程间的一个IPC通道,只要子进程不中断,父进程也就会保持活动状态。如果需要中断IPC通信连接,可以在父进程中显式的实现:child.disconnect();

一个较好的父进程例子,考虑了多次调用以及子进程出现问题的情况:

function doWork(job, cb) {
    var child = cp.fork('./worker');
    var cbTriggered = false;
    
    child
        .once('error', function(err) {
            if (!cbTriggered) {
                cb(err);
                cbTriggered = true;
            }
            // 子进程出现了异常则杀死子进程
            child.kill();
        })
        .once('exit', function(code, signal) {
            if (!cbTriggered)
                cb(new Error('Child exited with code: ' + code));
        })
        .once('message', function(result) {
            cb(null, result);
            cbTriggered = true;
        })
        .send(job);
}

工作池

在Node的官方文档中有述:

这些子节点仍然是一个V8的新实例。预计每一个节点需要耗时30毫秒的启动时间和10MB的内存。
也就是说,你不能创建太多了,因为这些并不是没有代价开销的。

所以说在实现的过程当中,与其使用多个短时间的子进程,还不如维护一个工作池,池中存放了一些可以长时间运行的进程。

那么我们就在上面doWork的基础之上,做一些优化,完成我们在工作池上的一个作业分配以及发送。

var cp = require('child_process');
var cpus = require('os').cpus().length;

module.exports = function(workModule) {
    // 等待的作业
    var awaiting = [];
    // 空闲的子进程
    var readyPool = [];
    // 总子进程的个数
    var poolSize = 0;
    
    return function doWork(job, cb) {
        // 如果现在没有准备好的子进程,并且总子进程数已经超过cpu的个数了,就让作业先排队等待
        if (!readyPool.length && poolSize > cpus) 
            return awaiting.push([ doWork, job, cb ]);
        
        // 如果有空闲的子进程则取出第一个使用,没有的话就新建一个子进程
        var child = readyPool.length
            ? readyPool.shift()
            : (poolSize++, cp.fork(workModule));
        var cbTriggered = false;
        
        child
            // 先删除原来的监听
            .removeAllListeners()
            .once('error', function(err) {
                if (!cbTriggered) {
                    cb(err);
                    cbTriggered = true;
                }
                child.kill();
            })
            .once('exit', function() {
                if (!cbTriggered)
                    cb(new Error('Child exited with code: ' + code));
                // 进程关闭的时候将它从队列中踢出
                poolSize--;
                var childIdx = readyPool.indexOf(child);
                if (childIdx > -1) readyPool.splice(childIdx, 1); 
            })
            .once('message', function(msg) {
                cb(null, msg);
                cbTriggered = true;
                // 子进程再次就绪,将其加回readPool
                readyPool.push(child);
                // 如果现在有等待的任务,运行之
                if (awaiting.length) setImmediate.apply(null, awaiting.shift());
            })
    }
}

在这段代码的最后我是有一些疑问的,因为当子进程收到message的时候,进行了异步回调。因为当时没太能分清异步与多线程的区别。所以我当时就不太懂回调都还没有结束,为什么就能回收这个进程了?

那么这边也区分一下异步与多线程。

详细可见:浅谈多线程和异步

同步运行

其实在异步的API介绍完以后,同步的API就显得很简单了。因为它和异步的API基本上是一样的,只不过在实现的过程当中会阻塞掉主线程,直到子进程模块完成。

同步子进程中的异常处理

如果在execSync或execFileSync执行的结果中返回的是一个非零状态,这种情况下,将会有异常抛出。这个抛出的异常对象将会包含我们在使用spawnExec返回的结果里的所有东西。我们可以访问状态编码里的重要信息stderr流

var ex = require('child-process').execFileSync;
try {
    ex('cd', ['non-existent-dir'], {
        encoding: 'utf-8'
    });
} catch(err) {
    console.error('exit status was', err.status);
    console.error('stderr', err.stderr);
}
上一篇下一篇

猜你喜欢

热点阅读