Nodejs

Node.js之Cluster模块分析与学习

2019-08-05  本文已影响0人  西葫芦炒胖子

Cluster


源码地址

常见问题


    问:cluster模块如何区分子进程和主进程
    答:判断环境变量是否含有NODE_UNQUE_ID,有则为子进程,没有则为主进程(源码 node/lib/cluster.js中可以查看)如下:

    const childOrMaster = 'NODE_UNIQUE_ID' in process.env ? 'child' : 'master';
    module.exports = require(`internal/cluster/${childOrMaster}`);

    在(internal/cluster/master.js)文件搜索NODE_UNIQUE_ID ==》 上层为createWorkerProcess函数 ==》 上层为cluster.fork函数

    变量NODE_UNIQUE_ID 是在主进程fork子进程时传递进去的参数,因此采用cluster.fork 创建的主进程一定包含NDOE_UNIQUE_ID,而直接使用child_process.fork 的子进程没有NODE_UNIQUE_ID

    NODE_UNIQUE_ID 将作为主进程存储活跃的工作进程对象的键值

    问:主进程中农是否存在TCP服务器,什么时候创建的
    答:主进程fork子进程,子进程中有显式创建服务器的操作,但实际上在cluster模块下,子进程是把创建服务器所需要的数据发送给主进程,主进程来隐式创建TCP服务器。

    当子进程创建HTTP服务器时,http模块会调用net模块(http.Server继承net.Server),创建net.Server对象,同时侦听端口。创建net.Server实例,调用构造函数返回。创建的net.Server实例调用listen(),等待accpet连接。

    源码库(https://github.com/nodejs/node/blob/master/lib/internal/cluster/child.js)

    这是一个hack函数,当cluster fork 出来子进程只要调用listen方法,就屏蔽掉了,child源码如下:
    function listen(backlog){
        // TODO(bnoordhuis) Send a message to the master that tells it to
        // update the backlog size. The actual backlog should probably be
        // the largest requested size by any worker.
        return 0;
    }

    backlog 是已经连接但未进行accept处理的socket队列大小。node默认在socket层设置backlog默认值为511,这是因为nginx和redis默认设置的backlog值也为此。

    问:多个子进程与端口复用
    答:子进程中确实创建了net.Server对象,但它并没有像主进程在libuv层构建socket句柄,子进程的net.Server对象使用的是一个人为fake出的一个假句柄来'欺骗'使用者端口已侦听,这样做的目的是为了集群的负载均衡。   

    根据源码分析(child.js)当主进程发送创建服务器成功的消息后,子进程执行如下回调函数,它会根据主进程是否返回了handle句柄来选择函数执行。由于cluster默认采用RoundRobin调度策略,因此主进程返回的handle为null,执行rr函数,做了上文提到的hack操作,fake了一个假的handle对象,handle.listen 并没有调用libuv层的listen方法。子进程没有创建底层的服务端socket做侦听,所以在子进程创建的HTTP服务器侦听的端口根本不会出现端口复用的情况。

    send(message, (reply, handle) => {
        if (typeof obj._setServerData === 'function')
            obj._setServerData(reply.data);

        if (handle)
            shared(reply, handle, indexesKey, cb);  // Shared listen socket.
        else
            rr(reply, indexesKey, cb);              // Round-robin.
    });


    function rr(message, indexesKey, cb) {
        if (message.errno)
            return cb(message.errno, null);

        var key = message.key;

        function listen(backlog) {
            // TODO(bnoordhuis) Send a message to the master that tells it to
            // update the backlog size. The actual backlog should probably be
            // the largest requested size by any worker.
            return 0;
        }
        ...

        const handle = { close, listen, ref: noop, unref: noop };

        if (message.sockname) {
            handle.getsockname = getsockname;  // TCP handles only.
        }

        assert(handles.has(key) === false);
        handles.set(key, handle);
        cb(0, handle);
    }

    问:如何将请求分发个多个worker
    答:主进程的服务器中会创建RoundRobinHandle 决定分发请求给哪一个子进程,筛选出子进程后发送newconn消息给对应的子进程。子进程接收到newconn消息后,会调用内部的oncennectiono函数,先向主进程发送开始处理请求的消息,然后执行业务处理函数server.onconnection(node在C++层执行的js回调函数)

    源码(lib/internal/cluster/round_robin_handle.js)

    const message = { act: 'newconn', key: this.key };

    sendHelper(worker.process, message, handle, (reply) => {
        if (reply.accepted)
        handle.close();
        else
        this.distribute(0, handle);  // Worker is shutting down. Send to another.

        this.handoff(worker);
    });

    源码(lib/internal/cluster/child.js)

    function onmessage(message, handle) {
        if (message.act === 'newconn')
            onconnection(message, handle);
        else if (message.act === 'disconnect')
            _disconnect.call(worker, true);
    }

    // Round-robin connection.
    function onconnection(message, handle) {
        const key = message.key;
        const server = handles.get(key);
        const accepted = server !== undefined;

        send({ ack: message.seq, accepted });

        if (accepted)
            server.onconnection(0, handle);
    }

    请求分发策略:
    handloff 函数获取排队中的客户端请求,并通过IPC发送句柄handle和newconn消息,等待子进程返回。当子进程返回正在处理请求的消息时,执行handoff函数,继续分配请求给子进程,不管该子进程上次请求是否处理完成(node的异步特性和事件循环可以让单进程处理多请求).
    按照这样的策略,主进程的服务器每接受一个req请求,执行修改后的onconnection回调,执行distribute方法,在其内部调用handoff函数,进入该子进程的处理循环中。一旦主进程没有缓存的客户端请求时(this.handles为空),便会将当前子进程加入free空闲队列,等待主进程的下一步调度。这就是cluster的RoundRobin调度策略,每个子进程的处理逻辑都是一个闭环,直到主进程缓存的客户端请求处理完毕时,该子进程的处理闭环才被打开。

    源码:(https://github.com/nodejs/node/blob/master/lib/internal/cluster/round_robin_handle.js)
    
    //负责筛选出处理请求的子进程
    //this.free 数组存储空闲的子进程
    //this.handles 数组存放待处理的用户请求
    RoundRobinHandle.prototype.distribute = function(err, handle) {
        this.handles.push(handle);
        const worker = this.free.shift();

        if (worker)
            this.handoff(worker);
    };

    //发送消息和handle给对应的worker进程,处理业务逻辑
    RoundRobinHandle.prototype.handoff = function(worker) {
        if (this.all.has(worker.id) === false) {
            return;  // Worker is closing (or has closed) the server.
        }

        const handle = this.handles.shift();

        if (handle === undefined) {
            this.free.push(worker);  // Add to ready queue again.
            return;
        }

        const message = { act: 'newconn', key: this.key };

        sendHelper(worker.process, message, handle, (reply) => {
            if (reply.accepted)
            handle.close();
            else
            this.distribute(0, handle);  // Worker is shutting down. Send to another.

            this.handoff(worker);
        });
    };
const cluster = require('cluster');                                      //  | |
const http = require('http');                                            //  | |   都执行了
const numCpus = require('os').cpus().length;                             //  | |------------

if(cluster.isMaster){                                                    //    |
    console.log(`主进程 ${process.pid} 正在运行`);                                //    |
    for(var i = 0;i < numCpus; i++){                                           //    |
        cluster.fork();                                                  //    |
    }                                                                    //    |

    for(const id in cluster.workers){                                    //    |
        cluster.workers[id].on('message', messageHandler);               //    |  仅父进程执行
    }                                                                    //    |

    function messageHandler(msg) {                                       //    |
        if(msg.cmd && msg.cmd == 'notifyRequest'){                       //    |
            console.log("就是这么玩");                                    //    |
        }                                                                //    |
    }                                                                    //    |----------------------
}
else{
    //当子进程执行
    http.Server((req, res) => {                                          //  |
        res.writeHead(200);                                              //  |
        res.end("Hello world");                                          //  |   仅子进程执行
        process.send({cmd:"notifyRequest"});                             //  |
    }).listen(8000);    

    console.log(`工作进程 ${process.pid} 已启动`);
}

process.on("uncaughtException", (err) => {
    process.exit(1);
})
上一篇下一篇

猜你喜欢

热点阅读