五、nginx的进程间通信之socketpair(参考《深入剖析
(一)父子进程通信
nginx父子进程之间或子进程之间势必涉及到进程间通信,这里采用了socketpair进行通信。在Linux下,可使用socketpair函数创造一对的、相互连接的域套接字。套接字对建立的通道是双向的,每一端都可以进行读写
// socketpair — create a pair of connected sockets
int socketpair(int domain, int type, int protocol, int *sv);
前一篇文章(nginx启动过程中的进程创建)中提到了nginx启动子进程的函数,ngx_spawn_process,nginx进程间通信的套接字就是在这个函数中创建的。其主要代码如下:
if (socketpair(AF_UNIX, SOCK_STREAM, 0, ngx_processes[s].channel) == -1)
{
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"socketpair() failed while spawning \"%s\"", name);
return NGX_INVALID_PID;
}
/* ...*/
pid = fork();
switch (pid) {
/* ... */
}
重点在这一句:
socketpair(AF_UNIX, SOCK_STREAM, 0, ngx_processes[s].channel)
AF_UNIX用于同一台机器上的进程间通信;SOCK_STREAM提供的稳定数据传输,即TCP协议; ngx_processes是全局变量,ngx_processes[s].channel用于指定存储套接字的容器。
由于socketpair函数是在fork之前调用的,所以在fork之后,父子进程都会拥有该套接字。那么,只要父进程使用channel[0],子进程使用channel[1],就能实现父子进程之间的通信。
(二)子进程间通信
若是不同子进程之间想要通信,又该如何呢?既然套接字存储在ngx_processes[s]
.channel中,而ngx_processes又是全局变量,只要子进程的ngx_processes中存储着其它所有子进程的channel信息,就能给任意一个子进程发送消息。
显然,ngx_processes是从父进程处继承而来的,虽然父进程中的ngx_processes始终是最新最全,但子进程之间是有先后顺序的。比如说,在用户自定义工作进程为5个时,nginx的master进程将for循环5次产生5个子进程,则第5个子进程可以从父进程处获得前四个子进程的channel信息,而第4个子进程继承父进程时,由于第5个子进程还未产生,自然无法获得第5个子进程的channel信息。
如此一来,后产生的子进程拥有其“哥哥”们的channel信息,可以给“哥哥”们发消息,而“哥哥”们没有后产生的子进程的channel信息,便无法给“弟弟”们发消息。解决办法很简单,只需把“弟弟”们的相关信息发送给“哥哥”们即可。
执行这个任务的,正是父进程(master)。在ngx_start_worker_processes中的定义如下:
for (i = 0; i < n; i++) {
cpu_affinity = ngx_get_cpu_affinity(i);
ngx_spawn_process(cycle, ngx_worker_process_cycle, NULL,
"worker process", type);
ch.pid = ngx_processes[ngx_process_slot].pid;
ch.slot = ngx_process_slot;
ch.fd = ngx_processes[ngx_process_slot].channel[0];
ngx_pass_open_channel(cycle, &ch);
}
ngx_spawn_process函数用于产生子进程,然后将子进程的信息存储在结构体变量ch中,最后用 ngx_pass_open_channel函数将存储了新子进程相关信息的结构体ch发送给其它子进程。ngx_pass_open_channel的定义如下:
static void
ngx_pass_open_channel(ngx_cycle_t *cycle, ngx_channel_t *ch)
{
ngx_int_t i;
for (i = 0; i < ngx_last_process; i++) {
/* ... */
ngx_write_channel(ngx_processes[i].channel[0],
ch, sizeof(ngx_channel_t), cycle->log);
}
}
代码很清晰,就是不断地把子进程的信息通过第i个子进程的channel[0]发送,当第i个进程接收到结构体ch(存储着新子进程的pid和channel信息)后,再进行相关处理即可。
那么当子进程收到其它新子进程的信息时,具体是怎么处理的呢?
在上一篇文章中,已经知道在子进程产生后,会执行ngx_worker_process_cycle函数。此函数的开头将调用ngx_worker_process_init函数初始化子进程,而在初始化过程中,将把自己从父进程继承的ngx_channel[1](channel[0]被用于父进程或其它子进程写消息)加入到读事件监听集里。
下面先看一看ngx_worker_process_init的定义:
if (ngx_add_channel_event(cycle, ngx_channel, NGX_READ_EVENT,
ngx_channel_handler) == NGX_ERROR)
{
/* fatal */
exit(2);
}
其中,ngx_channel 是全局变量,在ngx_spawn_process中被赋值为:
ngx_channel = ngx_processes[s].channel[1];
ngx_channel_handler是对应的处理函数,在该函数中,对此事件的处理方式为:
case NGX_CMD_OPEN_CHANNEL:
/*...*/
ngx_processes[ch.slot].pid = ch.pid;
ngx_processes[ch.slot].channel[0] = ch.fd;
break;
也就是将接收到的新子进程的pid和channel信息存储到全局变量ngx_processes的相应位置中,如此一来,进程之间都互相有了彼此的channel信息和pid号,也就可以互相通信了。
(三)关于nginx_channel.c
这里关注一下nginx_channel.c。该文件定义了有关nginx利用channel通信的函数。其结构如下:
nginx_channel结构.png
可以看到,只有四个函数,分别对应着写、读、添加事件监听和关闭channel的功能。
-
关于写消息的函数,其定义如下。
ngx_int_t ngx_write_channel(ngx_socket_t s, ngx_channel_t *ch, size_t size, ngx_log_t *log)
该函数通过socket s发送大小为size字节的消息ch。
-
关于读消息的函数,其定义如下。
ngx_int_t ngx_read_channel(ngx_socket_t s, ngx_channel_t *ch, size_t size, ngx_log_t *log)
该函数通过socket s读取大小为size字节的消息ch。
-
关于关闭channel的函数,其定义如下。
void ngx_close_channel(ngx_fd_t *fd, ngx_log_t *log) { if (close(fd[0]) == -1) { ngx_log_error(NGX_LOG_ALERT, log, ngx_errno, "close() channel failed"); } if (close(fd[1]) == -1) { ngx_log_error(NGX_LOG_ALERT, log, ngx_errno, "close() channel failed"); } }
该函数将socket_pair的两个文件描述符依次关闭。
-
关于ngx_add_channel_event,其定义为
ngx_int_t ngx_add_channel_event(ngx_cycle_t *cycle, ngx_fd_t fd, ngx_int_t event, ngx_event_handler_pt handler)
这里大概是将某文件描述符添加到某事件集中,涉及到nginx的事件监听和处理,其具体的运行原理,待我下一章详述。