Nginx高端成长之路linux程序员

nginx的daemon master worker

2016-12-28  本文已影响320人  烨哥

daemon:

Linux Daemon(守护进程)是运行在后台的一种特殊进程。它独立于控制终端并且周期性地执行某种任务或等待处理某些发生的事件。它不需要用户输入就能运行而且提供某种服务,不是对整个系统就是对某个用户程序提供服务。Linux系统的大多数服务器就是通过守护进程实现的。常见的守护进程包括系统日志进程syslogd、 web服务器httpd、邮件服务器sendmail和数据库服务器mysqld等。

守护进程一般在系统启动时开始运行,除非强行终止,否则直到系统关机都保持运行。守护进程经常以超级用户(root)权限运行,因为它们要使用特殊的端口(1-1024)或访问某些特殊的资源。

一个守护进程的父进程是init进程,因为它真正的父进程在fork出子进程后就先于子进程exit退出了,所以它是一个由init继承的孤儿进程。守护进程是非交互式程序,没有控制终端,所以任何输出,无论是向标准输出设备stdout还是标准出错设备stderr的输出都需要特殊处理。

守护进程的名称通常以d结尾,比如sshd、xinetd、crond等

二,创建守护进程步骤
首先我们要了解一些基本概念:

进程组 :

每个进程也属于一个进程组
每个进程主都有一个进程组号,该号等于该进程组组长的PID号 .
一个进程只能为它自己或子进程设置进程组ID号
会话期:

会话期(session)是一个或多个进程组的集合。

setsid()函数可以建立一个对话期:

如果,调用setsid的进程不是一个进程组的组长,此函数创建一个新的会话期。

(1)此进程变成该对话期的首进程

(2)此进程变成一个新进程组的组长进程。

(3)此进程没有控制终端,如果在调用setsid前,该进程有控制终端,那么与该终端的联系被解除。 如果该进程是一个进程组的组长,此函数返回错误。

(4)为了保证这一点,我们先调用fork()然后exit(),此时只有子进程在运行

现在我们来给出创建守护进程所需步骤:

编写守护进程的一般步骤步骤:

(1)在父进程中执行fork并exit推出;

(2)在子进程中调用setsid函数创建新的会话;

(3)在子进程中调用chdir函数,让根目录 ”/” 成为子进程的工作目录;

(4)在子进程中调用umask函数,设置进程的umask为0;

(5)在子进程中关闭任何不需要的文件描述符
摘抄于(linux系统编程之进程(八):守护进程详解及创建,daemon()使用)

然后我们来看下nginx的处理

ngx_int_t
ngx_daemon(ngx_log_t *log)
{
   int  fd;

   switch (fork()) {//fork进行进程创建
   case -1://失败了
       ngx_log_error(NGX_LOG_EMERG, log, ngx_errno, "fork() failed");
       return NGX_ERROR;

   case 0://子进程
       break;

   default://pid>0 代表是父进程 
       exit(0);//调用exit
   }

   ngx_pid = ngx_getpid();//重新获取pid

/*
有必要先介绍一下Linux中的进程与控制终端,登录会话和进程组之间的关系:进程属于一个进程组,进程组号(GID)就是进程组长的进程号(PID)。
登录会话可以包含多个进程组。这些进程组共享一个控制终端。这个控制终端通常是创建进程的登录终端。 控制终端,登录会话和进程组通常是从父进程继承下来的。
我们的目的就是要摆脱它们,使之不受它们的影响。调用setsid()使进程成为会话组长: 
setsid()调用成功后,进程成为新的会话组长和新的进程组长,并与原来的登录会话和进程组脱离。由于会话过程对控制终端的独占性,进程同时与控制终端脱离。 
*/
   if (setsid() == -1) {
       ngx_log_error(NGX_LOG_EMERG, log, ngx_errno, "setsid() failed");
       return NGX_ERROR;
   }

   umask(0);//进程从创建它的父进程那里继承了文件创建掩模。它可能修改守护进程所创建的文件的存取位。为防止这一点,将文件创建掩模清除

   fd = open("/dev/null", O_RDWR);
   if (fd == -1) {
       ngx_log_error(NGX_LOG_EMERG, log, ngx_errno,
                     "open(\"/dev/null\") failed");
       return NGX_ERROR;
   }

   if (dup2(fd, STDIN_FILENO) == -1) {
       ngx_log_error(NGX_LOG_EMERG, log, ngx_errno, "dup2(STDIN) failed");
       return NGX_ERROR;
   }

   if (dup2(fd, STDOUT_FILENO) == -1) {
       ngx_log_error(NGX_LOG_EMERG, log, ngx_errno, "dup2(STDOUT) failed");
       return NGX_ERROR;
   }

#if 0
   if (dup2(fd, STDERR_FILENO) == -1) {
       ngx_log_error(NGX_LOG_EMERG, log, ngx_errno, "dup2(STDERR) failed");
       return NGX_ERROR;
   }
#endif
//进程从创建它的父进程那里继承了打开的文件描述符。如不关闭,将会浪费系统资源,造成进程所在的文件系统无法卸下以及引起无法预料的错误
   if (fd > STDERR_FILENO) {
       if (close(fd) == -1) {
           ngx_log_error(NGX_LOG_EMERG, log, ngx_errno, "close() failed");
           return NGX_ERROR;
       }
   }

   return NGX_OK;
}

到此,daemon模式启动完成了

master-worker

master-worker是一个nginx最常用的运行模式来看下master-worker的定义。
  1. master: master进程主要是用来管理worker进程,包括接收外界的信号,像worker进程发送信号,监控worker进程的运行状态,当worker进程异常退出的时候,重新启动worker进程,mater不进行事件处理了。
  2. worker: 工作进程,多个worker进程直接是对等的,他们同等竞争来自客户端的请求,一个请求只可能在一个进程被处理,worker进程的数目是可以设置,一个设置得和我们的cpu核心数一致。
master-worker之间的通信
worker-mater直接的通信实际上是通过socketpair来构造一对socket来进行通信的,master使用第一个套接字来发数据,worker使用第二个套接字来读数据。
这个构造在创建子进程之前调用,master-worker之间的通信封装为一个 ngx_channel_t 结构体
typedef struct {
    ngx_uint_t  command;//发送的命令
    ngx_pid_t   pid;//一般是发送方的命令id
    ngx_int_t   slot;//在全局进程表中的位置
    ngx_fd_t    fd;//套接字 
} ngx_channel_t;

ngx_channel_t提供四个方法的,读,写,关闭,增加事件侦听

ngx_int_t ngx_write_channel(ngx_socket_t s, ngx_channel_t *ch, size_t size,
   ngx_log_t *log);//写数据(一般都是master往worker发送cmd)
ngx_int_t ngx_read_channel(ngx_socket_t s, ngx_channel_t *ch, size_t size,
   ngx_log_t *log);//读数据(一般都是worker读取master发送的命令)
ngx_int_t ngx_add_channel_event(ngx_cycle_t *cycle, ngx_fd_t fd,
   ngx_int_t event, ngx_event_handler_pt handler);//加入事件侦听(一般都是worker操作)
void ngx_close_channel(ngx_fd_t *fd, ngx_log_t *log);//套接字关闭
进程的创建
nginx提供了ngx_process_t来对进程信息的保存的结构体,用来保存子进程信息
定义了一个全局数组
ngx_process_t ngx_processes[NGX_MAX_PROCESSES];用来存放所有的子进程 NGX_MAX_PROCESSES = 1024
typedef struct {
   ngx_pid_t           pid;//进程id
   int                 status;//子进程退出后,父进程收到sigchld,父进程由waitpid系统调用去获得进程状态
   ngx_socket_t        channel[2];//socktpair产生的用于进程间通信的句柄

   ngx_spawn_proc_pt   proc;//启动子进程后的执行方法
   /*
    上面的ngx_spawn_proc_pt方法中第2个参数雷要传递1个指针,它是可选的。例如,worker子进程就不需要,而cache manage进程
    就需要ngx_cache_manager_ctx上下文成员。这时,data一般与ngx_spawn_proc_pt方法中第2个参数是等价的
    */
   void               *data;//
   char               *name;//进程的名字

   
   unsigned            respawn:1;//为1表示重新生成子进程
   unsigned            just_spawn:1;//表示正在生成子进程
   unsigned            detached:1;//表示父子进程分离
   unsigned            exiting:1;//表示进程正在退出
   unsigned            exited:1;//表示进程已经退出
} ngx_process_t;
进程的创建的方法

流程:

  1. 找出一个可用ngx_processes
  2. 如果不是热代码替换的话使用socketpair创建通信sock
  3. 设置sock的属性,非阻塞,异步等。
  4. 调用fork创建子进程
  5. 子进程中将调用proc方法,回不来了,主进程中保存子进程的信息,设置状态。
ngx_pid_t
ngx_spawn_process(ngx_cycle_t *cycle, ngx_spawn_proc_pt proc, void *data,
   char *name, ngx_int_t respawn)
{
/*
proc 创建完成之后的执行方法
data 方法需要的执行参数
name 进程的名字
respawn 创建方式
*/
   u_long     on;
   ngx_pid_t  pid;
   ngx_int_t  s;//将要创建的子进程在进程表中的位置
  // 如果respawn不小于0,则视为当前进程已经退出,需要重启
   if (respawn >= 0) {
       s = respawn;

   } else {
       for (s = 0; s < ngx_last_process; s++) {
           if (ngx_processes[s].pid == -1) {//找个可用的
               break;
           }
       }

       if (s == NGX_MAX_PROCESSES) {//最多只能创建1024个子进程
           ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,
                         "no more than %d processes can be spawned",
                         NGX_MAX_PROCESSES);
           return NGX_INVALID_PID;
       }
   }

   if (respawn != NGX_PROCESS_DETACHED) {//不是分离的子进程      /* 不是热代码替换 */

       /* Solaris 9 still has no AF_LOCAL */

       /*
       这里相当于Master进程调用socketpair()为新的worker进程创建一对全双工的socket
       
       实际上socketpair 函数跟pipe 函数是类似的,也只能在同个主机上具有亲缘关系的进程间通信,但pipe 创建的匿名管道是半双工的,
       而socketpair 可以认为是创建一个全双工的管道。*/
       if (socketpair(AF_UNIX, SOCK_STREAM, 0, ngx_processes[s].channel) == -1)
       {
           ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
                         "socketpair() failed while spawning \"%s\"", name);
           return NGX_INVALID_PID;
       }
       /*
        protocol必须传递0
         当socketpair执行成功时,sv[2]这两个套接字具备下列关系:向sv[0]套接字写入数据,将可以从sv[l]套接字中读取到刚写入的数据;
         同样,向sv[l]套接字写入数据,也可以从sv[0]中读取到写入的数据
        通常,在父、子进程通信前,会先调用socketpair方法创建这样一组
        套接字,在调用fork方法创建出子进程后,将会在父进程中关闭sv[l]套接字,仅使用sv[0]套接字用于向子进程发送数据以及接收子进程发
        送来的数据:
        而在子进程中则关闭sv[0]套接字,仅使用sv[l]套接字既可以接收父进程发来的数据,也可以向父进程发送数据。
        注意socketpair的协议族为AF_UNIX UNXI域
        */

       ngx_log_debug2(NGX_LOG_DEBUG_CORE, cycle->log, 0,
                      "channel %d:%d",
                      ngx_processes[s].channel[0],
                      ngx_processes[s].channel[1]);

       if (ngx_nonblocking(ngx_processes[s].channel[0]) == -1) {//设置为非阻塞
           ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
                         ngx_nonblocking_n " failed while spawning \"%s\"",
                         name);
           ngx_close_channel(ngx_processes[s].channel, cycle->log);
           return NGX_INVALID_PID;
       }

       if (ngx_nonblocking(ngx_processes[s].channel[1]) == -1) {
           ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
                         ngx_nonblocking_n " failed while spawning \"%s\"",
                         name);
           ngx_close_channel(ngx_processes[s].channel, cycle->log);
           return NGX_INVALID_PID;
       }

       on = 1;
       /*
        设置异步模式: 
        
        */
       /*
        设置channel[0]的信号驱动异步I/O标志
        FIOASYNC:该状态标志决定是否收取针对socket的异步I/O信号(SIGIO)
        其与O_ASYNC文件状态标志等效,可通过fcntl的F_SETFL命令设置or清除
        */
       if (ioctl(ngx_processes[s].channel[0], FIOASYNC, &on) == -1) {
           ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
                         "ioctl(FIOASYNC) failed while spawning \"%s\"", name);
           ngx_close_channel(ngx_processes[s].channel, cycle->log);
           return NGX_INVALID_PID;
       }
       /* F_SETOWN:用于指定接收SIGIO和SIGURG信号的socket属主(进程ID或进程组ID)
        * 这里意思是指定Master进程接收SIGIO和SIGURG信号
        * SIGIO信号必须是在socket设置为信号驱动异步I/O才能产生,即上一步操作
        * SIGURG信号是在新的带外数据到达socket时产生的
        */
       if (fcntl(ngx_processes[s].channel[0], F_SETOWN, ngx_pid) == -1) {
           ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
                         "fcntl(F_SETOWN) failed while spawning \"%s\"", name);
           ngx_close_channel(ngx_processes[s].channel, cycle->log);
           return NGX_INVALID_PID;
       }
       
       /* FD_CLOEXEC:用来设置文件的close-on-exec状态标准
        *             在exec()调用后,close-on-exec标志为0的情况下,此文件不被关闭;非零则在exec()后被关闭
        *             默认close-on-exec状态为0,需要通过FD_CLOEXEC设置
        *     这里意思是当Master父进程执行了exec()调用后,关闭socket
        */
       if (fcntl(ngx_processes[s].channel[0], F_SETFD, FD_CLOEXEC) == -1) {
           ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
                         "fcntl(FD_CLOEXEC) failed while spawning \"%s\"",
                          name);
           ngx_close_channel(ngx_processes[s].channel, cycle->log);
           return NGX_INVALID_PID;
       }
       /* 同上,这里意思是当Worker子进程执行了exec()调用后,关闭socket */
       if (fcntl(ngx_processes[s].channel[1], F_SETFD, FD_CLOEXEC) == -1) {
           ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
                         "fcntl(FD_CLOEXEC) failed while spawning \"%s\"",
                          name);
           ngx_close_channel(ngx_processes[s].channel, cycle->log);
           return NGX_INVALID_PID;
       }

       ngx_channel = ngx_processes[s].channel[1];

   } else {
       ngx_processes[s].channel[0] = -1;
       ngx_processes[s].channel[1] = -1;
   }

   ngx_process_slot = s;// 这一步将在ngx_pass_open_channel()中用到,就是设置下标,用于寻找本次创建的子进程

   pid = fork();//fork 创建子进程

   switch (pid) {

   case -1://创建失败了
       ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
                     "fork() failed while spawning \"%s\"", name);
       ngx_close_channel(ngx_processes[s].channel, cycle->log);
       return NGX_INVALID_PID;

   case 0:
       ngx_pid = ngx_getpid();//在子进程了
       proc(cycle, data);//执行回调方法  出不来了 这里是子进程执行的方法
       break;

   default:
       break;
   }

   ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "start %s %P", name, pid);

   ngx_processes[s].pid = pid;//保存pid
   ngx_processes[s].exited = 0;//没退出

   if (respawn >= 0) {//如果是大于0的操作 说明是重启子进程 下面不重复了
       return pid;
   }

   ngx_processes[s].proc = proc;
   ngx_processes[s].data = data;
   ngx_processes[s].name = name;
   ngx_processes[s].exiting = 0;

   switch (respawn) {//设置状态信息

   case NGX_PROCESS_NORESPAWN:
       ngx_processes[s].respawn = 0;
       ngx_processes[s].just_spawn = 0;
       ngx_processes[s].detached = 0;
       break;

   case NGX_PROCESS_JUST_SPAWN:
       ngx_processes[s].respawn = 0;
       ngx_processes[s].just_spawn = 1;
       ngx_processes[s].detached = 0;
       break;

   case NGX_PROCESS_RESPAWN:
       ngx_processes[s].respawn = 1;
       ngx_processes[s].just_spawn = 0;
       ngx_processes[s].detached = 0;
       break;

   case NGX_PROCESS_JUST_RESPAWN:
       ngx_processes[s].respawn = 1;
       ngx_processes[s].just_spawn = 1;
       ngx_processes[s].detached = 0;
       break;

   case NGX_PROCESS_DETACHED:
       ngx_processes[s].respawn = 0;
       ngx_processes[s].just_spawn = 0;
       ngx_processes[s].detached = 1;
       break;
   }

   if (s == ngx_last_process) {
       ngx_last_process++;
   }

   return pid;
}

在创建子进程中子进程将会调用一个proc方法这个方法,这个方法主要是进行worker进程的初始化和事件的循环,这个方法是
static void ngx_worker_process_cycle(ngx_cycle_t *cycle, void *data),他将会调用ngx_worker_process_init方法进行一些初始化操作。这些初始化操作主要是:(1)优先级设置;(2)文件最大描述符设置;(3)core文件尺寸设置;(4)用户信息设置;(5)工作目录的改变;(6):改变当前阻塞信号集(7)随机种子设置(8) init_process (9): 关闭其他进程的channel[1] 和自己进程的channel[0] ;(10):把channel[1]加入事件侦听

static void
ngx_worker_process_init(ngx_cycle_t *cycle, ngx_int_t worker)
{
   sigset_t          set;
   uint64_t          cpu_affinity;
   ngx_int_t         n;
   ngx_uint_t        i;
   struct rlimit     rlmt;
   ngx_core_conf_t  *ccf;
   ngx_listening_t  *ls;

   if (ngx_set_environment(cycle, NULL) == NULL) {//环境变量
       /* fatal */
       exit(2);
   }

   ccf = (ngx_core_conf_t *) ngx_get_conf(cycle->conf_ctx, ngx_core_module);

   if (worker >= 0 && ccf->priority != 0) {//优先级的设置
       if (setpriority(PRIO_PROCESS, 0, ccf->priority) == -1) {
           ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
                         "setpriority(%d) failed", ccf->priority);
       }
   }

   if (ccf->rlimit_nofile != NGX_CONF_UNSET) {
       rlmt.rlim_cur = (rlim_t) ccf->rlimit_nofile;
       rlmt.rlim_max = (rlim_t) ccf->rlimit_nofile;
         //RLIMIT_NOFILE指定此进程可打开的最大文件描述词大一的值,超出此值,将会产生EMFILE错误。

       if (setrlimit(RLIMIT_NOFILE, &rlmt) == -1) {
           ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
                         "setrlimit(RLIMIT_NOFILE, %i) failed",
                         ccf->rlimit_nofile);
       }
   }

   if (ccf->rlimit_core != NGX_CONF_UNSET) {
       rlmt.rlim_cur = (rlim_t) ccf->rlimit_core;
       rlmt.rlim_max = (rlim_t) ccf->rlimit_core;
//修改工作进程的core文件尺寸的最大值限制(RLIMIT_CORE),用于在不重启主进程的情况下增大该限制。
       if (setrlimit(RLIMIT_CORE, &rlmt) == -1) {
           ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
                         "setrlimit(RLIMIT_CORE, %O) failed",
                         ccf->rlimit_core);
       }
   }

#ifdef RLIMIT_SIGPENDING
   if (ccf->rlimit_sigpending != NGX_CONF_UNSET) {
       rlmt.rlim_cur = (rlim_t) ccf->rlimit_sigpending;
       rlmt.rlim_max = (rlim_t) ccf->rlimit_sigpending;

       if (setrlimit(RLIMIT_SIGPENDING, &rlmt) == -1) {
           ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
                         "setrlimit(RLIMIT_SIGPENDING, %i) failed",
                         ccf->rlimit_sigpending);
       }
   }
#endif

   //设置用户信息
   if (geteuid() == 0) {
       if (setgid(ccf->group) == -1) {
           ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,
                         "setgid(%d) failed", ccf->group);
           /* fatal */
           exit(2);
       }

       if (initgroups(ccf->username, ccf->group) == -1) {
           ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,
                         "initgroups(%s, %d) failed",
                         ccf->username, ccf->group);
       }

       if (setuid(ccf->user) == -1) {
           ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,
                         "setuid(%d) failed", ccf->user);
           /* fatal */
           exit(2);
       }
   }

   if (worker >= 0) {
       cpu_affinity = ngx_get_cpu_affinity(worker);

       if (cpu_affinity) {
           ngx_setaffinity(cpu_affinity, cycle->log);
       }
   }

#if (NGX_HAVE_PR_SET_DUMPABLE)

   /* allow coredump after setuid() in Linux 2.4.x */

   if (prctl(PR_SET_DUMPABLE, 1, 0, 0, 0) == -1) {
       ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
                     "prctl(PR_SET_DUMPABLE) failed");
   }

#endif

   if (ccf->working_directory.len) {//更改当前工作目录
       if (chdir((char *) ccf->working_directory.data) == -1) {
           ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
                         "chdir(\"%s\") failed", ccf->working_directory.data);
           /* fatal */
           exit(2);
       }
   }

   sigemptyset(&set);

   if (sigprocmask(SIG_SETMASK, &set, NULL) == -1) {//改变进程的当前阻塞信号集
       ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
                     "sigprocmask() failed");
   }

   srandom((ngx_pid << 16) ^ ngx_time());//设置随机种子

   /*
    * disable deleting previous events for the listening sockets because
    * in the worker processes there are no events at all at this point
    */
   ls = cycle->listening.elts;
   for (i = 0; i < cycle->listening.nelts; i++) {
       ls[i].previous = NULL;
   }
//调用init_process
   for (i = 0; ngx_modules[i]; i++) {
       if (ngx_modules[i]->init_process) {
           if (ngx_modules[i]->init_process(cycle) == NGX_ERROR) {
               /* fatal */
               exit(2);
           }
       }
   }
//关闭其他进程的channel[1] 和自己进程的channel[0]   然主进程使用channel[0] 写数据自己使用channel[1]读数据
   for (n = 0; n < ngx_last_process; n++) {

       if (ngx_processes[n].pid == -1) {
           continue;
       }

       if (n == ngx_process_slot) {
           continue;
       }

       if (ngx_processes[n].channel[1] == -1) {
           continue;
       }

       if (close(ngx_processes[n].channel[1]) == -1) {
           ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
                         "close() channel failed");
       }
   }

   if (close(ngx_processes[ngx_process_slot].channel[0]) == -1) {
       ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
                     "close() channel failed");
   }

#if 0
   ngx_last_process = 0;
#endif

   if (ngx_add_channel_event(cycle, ngx_channel, NGX_READ_EVENT,
                             ngx_channel_handler)//添加到事件侦听
       == NGX_ERROR)
   {
       /* fatal */
       exit(2);
   }
}

然后ngx_worker_process_cycle 进行事件的循环,如果收到了对应的信号信息,就进行对应的信号操作,比如quit reopen terminate操作,就不贴代码了自己参考代码。

我们发现我们增加了对channel[1]的事件侦听这个socket收到的消息就是我们的master发送的指令,具体的处理函数就是ngx_channel_handler里面,可以查看具体命令的处理。

master的信号处理和对子进程的管理和监控
master进程的主要作用就是对外界信号的处理和对子进程的监控,对外界信号的处理主要是通过信号量的侦听来实现的
下面来看下nginx对信号量的处理

在main函数中会调用一个ngx_init_signals方法来初始化信号量处理
nginx简单封装了下singal

typedef struct {
   int     signo;//需要处理的信号
   char   *signame;//信号对应的字符串名称
   char   *name;//信号对应的nginx命令
   void  (*handler)(int signo);//收到signo信号后的处理方法
} ngx_signal_t;

nginx定义了一个ngx_signal_t signals[]的数组,用来保存需要初始化的信号量

ngx_signal_t  signals[] = {
   { ngx_signal_value(NGX_RECONFIGURE_SIGNAL),
     "SIG" ngx_value(NGX_RECONFIGURE_SIGNAL),
     "reload",
     ngx_signal_handler },

   { ngx_signal_value(NGX_REOPEN_SIGNAL),
     "SIG" ngx_value(NGX_REOPEN_SIGNAL),
     "reopen",
     ngx_signal_handler },

   { ngx_signal_value(NGX_NOACCEPT_SIGNAL),
     "SIG" ngx_value(NGX_NOACCEPT_SIGNAL),
     "",
     ngx_signal_handler },

   { ngx_signal_value(NGX_TERMINATE_SIGNAL),
     "SIG" ngx_value(NGX_TERMINATE_SIGNAL),
     "stop",
     ngx_signal_handler },

   { ngx_signal_value(NGX_SHUTDOWN_SIGNAL),
     "SIG" ngx_value(NGX_SHUTDOWN_SIGNAL),
     "quit",
     ngx_signal_handler },

   { ngx_signal_value(NGX_CHANGEBIN_SIGNAL),
     "SIG" ngx_value(NGX_CHANGEBIN_SIGNAL),
     "",
     ngx_signal_handler },

   { SIGALRM, "SIGALRM", "", ngx_signal_handler },

   { SIGINT, "SIGINT", "", ngx_signal_handler },

   { SIGIO, "SIGIO", "", ngx_signal_handler },

   { SIGCHLD, "SIGCHLD", "", ngx_signal_handler },

   { SIGSYS, "SIGSYS, SIG_IGN", "", SIG_IGN },

   { SIGPIPE, "SIGPIPE, SIG_IGN", "", SIG_IGN },

   { 0, NULL, "", NULL }
};
init_signals的作用就是将signals里面的内容加入到系统侦听
当信号的到来就将会调用ngx_signal_handler方法来处理,转换成当前对应的状态(不贴代码)
当收到的信号是SIGCHLD的时候(子线程挂了)将还会调用ngx_process_get_status()来获取子线程信息。然后把对应挂了的子进程的exit设置为1
static void
ngx_process_get_status(void)
{
   int              status;
   char            *process;
   ngx_pid_t        pid;
   ngx_err_t        err;
   ngx_int_t        i;
   ngx_uint_t       one;

   one = 0;

   for ( ;; ) {
       pid = waitpid(-1, &status, WNOHANG);//获取挂了的子进程的id

       if (pid == 0) {
           return;
       }

       if (pid == -1) {
           err = ngx_errno;

           if (err == NGX_EINTR) {
               continue;
           }

           if (err == NGX_ECHILD && one) {
               return;
           }

           /*
            * Solaris always calls the signal handler for each exited process
            * despite waitpid() may be already called for this process.
            *
            * When several processes exit at the same time FreeBSD may
            * erroneously call the signal handler for exited process
            * despite waitpid() may be already called for this process.
            */

           if (err == NGX_ECHILD) {
               ngx_log_error(NGX_LOG_INFO, ngx_cycle->log, err,
                             "waitpid() failed");
               return;
           }

           ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, err,
                         "waitpid() failed");
           return;
       }

       one = 1;
       process = "unknown process";

       for (i = 0; i < ngx_last_process; i++) {//找到子进程
           if (ngx_processes[i].pid == pid) {
               ngx_processes[i].status = status;
               ngx_processes[i].exited = 1;//设置退出
               process = ngx_processes[i].name;
               break;
           }
       }

       if (WTERMSIG(status)) {
#ifdef WCOREDUMP
           ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, 0,
                         "%s %P exited on signal %d%s",
                         process, pid, WTERMSIG(status),
                         WCOREDUMP(status) ? " (core dumped)" : "");
#else
           ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, 0,
                         "%s %P exited on signal %d",
                         process, pid, WTERMSIG(status));
#endif

       } else {
           ngx_log_error(NGX_LOG_NOTICE, ngx_cycle->log, 0,
                         "%s %P exited with code %d",
                         process, pid, WEXITSTATUS(status));
       }

       if (WEXITSTATUS(status) == 2 && ngx_processes[i].respawn) {//2 致命的错误 
           ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, 0,
                         "%s %P exited with fatal code %d "
                         "and cannot be respawned",
                         process, pid, WEXITSTATUS(status));
           ngx_processes[i].respawn = 0;//再打开也没用了
       }

       ngx_unlock_mutexes(pid);
   }
}
在master的主循环中会发现ngx_reap被设置为1,然后会被调用ngx_reap_childern来重启子进程
static ngx_uint_t
ngx_reap_children(ngx_cycle_t *cycle)
{
   ngx_int_t         i, n;
   ngx_uint_t        live;
   ngx_channel_t     ch;
   ngx_core_conf_t  *ccf;

   ngx_memzero(&ch, sizeof(ngx_channel_t));

   ch.command = NGX_CMD_CLOSE_CHANNEL;
   ch.fd = -1;

   live = 0;
   for (i = 0; i < ngx_last_process; i++) {

       ngx_log_debug7(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                      "child: %d %P e:%d t:%d d:%d r:%d j:%d",
                      i,
                      ngx_processes[i].pid,
                      ngx_processes[i].exiting,
                      ngx_processes[i].exited,
                      ngx_processes[i].detached,
                      ngx_processes[i].respawn,
                      ngx_processes[i].just_spawn);

       if (ngx_processes[i].pid == -1) {//当前在被用
           continue;
       }

       if (ngx_processes[i].exited) {//被退出了

           if (!ngx_processes[i].detached) {//不是分离的
               ngx_close_channel(ngx_processes[i].channel, cycle->log);//关闭通信channel

               ngx_processes[i].channel[0] = -1;
               ngx_processes[i].channel[1] = -1;

               ch.pid = ngx_processes[i].pid;
               ch.slot = i;

               for (n = 0; n < ngx_last_process; n++) {//扫描所有的
                   if (ngx_processes[n].exited
                       || ngx_processes[n].pid == -1
                       || ngx_processes[n].channel[0] == -1)
                   {
                       continue;
                   }

                   ngx_log_debug3(NGX_LOG_DEBUG_CORE, cycle->log, 0,
                                  "pass close channel s:%i pid:%P to:%P",
                                  ch.slot, ch.pid, ngx_processes[n].pid);

                   /* TODO: NGX_AGAIN */

                   ngx_write_channel(ngx_processes[n].channel[0],
                                     &ch, sizeof(ngx_channel_t), cycle->log);//关闭所有子进程和退出进程的通信通道
               }
           }

           if (ngx_processes[i].respawn
               && !ngx_processes[i].exiting
               && !ngx_terminate
               && !ngx_quit)//如果不是新开的进程就不启动了,如果是致命的错误,也不启动了
           {
               if (ngx_spawn_process(cycle, ngx_processes[i].proc,
                                     ngx_processes[i].data,
                                     ngx_processes[i].name, i)//重新启动这个进程
                   == NGX_INVALID_PID)
               {
                   ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,
                                 "could not respawn %s",
                                 ngx_processes[i].name);
                   continue;
               }

               ch.command = NGX_CMD_OPEN_CHANNEL;
               ch.pid = ngx_processes[ngx_process_slot].pid;
               ch.slot = ngx_process_slot;
               ch.fd = ngx_processes[ngx_process_slot].channel[0];

               ngx_pass_open_channel(cycle, &ch);//通知其他进程新的联系方式

               live = 1;

               continue;
           }

           if (ngx_processes[i].pid == ngx_new_binary) {//热代码替换

               ccf = (ngx_core_conf_t *) ngx_get_conf(cycle->conf_ctx,
                                                      ngx_core_module);

               if (ngx_rename_file((char *) ccf->oldpid.data,
                                   (char *) ccf->pid.data)
                   == NGX_FILE_ERROR)
               {
                   ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
                                 ngx_rename_file_n " %s back to %s failed "
                                 "after the new binary process \"%s\" exited",
                                 ccf->oldpid.data, ccf->pid.data, ngx_argv[0]);
               }

               ngx_new_binary = 0;
               if (ngx_noaccepting) {
                   ngx_restart = 1;
                   ngx_noaccepting = 0;
               }
           }

           if (i == ngx_last_process - 1) {
               ngx_last_process--;

           } else {
               ngx_processes[i].pid = -1;
           }

       } else if (ngx_processes[i].exiting || !ngx_processes[i].detached) {//正在退出
           live = 1;
       }
   }

   return live;
}

其中有一点就是当上一个进程是因为致命错误退出的exit(2)那就不用再打开了,反正也打不开

整体的连接起来
到这里每一个小项已经基本差不多了。来整体看来最后的启动
void
ngx_master_process_cycle(ngx_cycle_t *cycle)
{
   char              *title;
   u_char            *p;
   size_t             size;
   ngx_int_t          i;
   ngx_uint_t         n, sigio;
   sigset_t           set;
   struct itimerval   itv;
   ngx_uint_t         live;
   ngx_msec_t         delay;
   ngx_listening_t   *ls;
   ngx_core_conf_t   *ccf;

   sigemptyset(&set);
   sigaddset(&set, SIGCHLD);
   sigaddset(&set, SIGALRM);
   sigaddset(&set, SIGIO);
   sigaddset(&set, SIGINT);
   sigaddset(&set, ngx_signal_value(NGX_RECONFIGURE_SIGNAL));
   sigaddset(&set, ngx_signal_value(NGX_REOPEN_SIGNAL));
   sigaddset(&set, ngx_signal_value(NGX_NOACCEPT_SIGNAL));
   sigaddset(&set, ngx_signal_value(NGX_TERMINATE_SIGNAL));
   sigaddset(&set, ngx_signal_value(NGX_SHUTDOWN_SIGNAL));
   sigaddset(&set, ngx_signal_value(NGX_CHANGEBIN_SIGNAL));

   if (sigprocmask(SIG_BLOCK, &set, NULL) == -1) {//信号阻塞
       ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
                     "sigprocmask() failed");
   }

   sigemptyset(&set);

   size = sizeof(master_process);

   for (i = 0; i < ngx_argc; i++) {
       size += ngx_strlen(ngx_argv[i]) + 1;
   }

   title = ngx_pnalloc(cycle->pool, size);

   p = ngx_cpymem(title, master_process, sizeof(master_process) - 1);
   for (i = 0; i < ngx_argc; i++) {
       *p++ = ' ';
       p = ngx_cpystrn(p, (u_char *) ngx_argv[i], size);
   }

   ngx_setproctitle(title);//设置进程的名字

   ccf = (ngx_core_conf_t *) ngx_get_conf(cycle->conf_ctx, ngx_core_module);

   ngx_start_worker_processes(cycle, ccf->worker_processes,
                              NGX_PROCESS_RESPAWN);//启动worker
   ngx_start_cache_manager_processes(cycle, 0);//启动manager

   ngx_new_binary = 0;
   delay = 0;
   sigio = 0;
   live = 1;

   for ( ;; ) {
       /*
        delay用来等待子进程退出的时间,由于我们接受到SIGINT信号后,我们需要先发送信号给子进程,而子进程的退出需要一定的时间,
        超时时如果子进程已退出,我们父进程就直接退出,否则发送sigkill信号给子进程(强制退出),然后再退出。
        */
       if (delay) {
           if (ngx_sigalrm) {
               sigio = 0;
               delay *= 2;
               ngx_sigalrm = 0;
           }

           ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                          "termination cycle: %d", delay);

           itv.it_interval.tv_sec = 0;
           itv.it_interval.tv_usec = 0;
           itv.it_value.tv_sec = delay / 1000;
           itv.it_value.tv_usec = (delay % 1000 ) * 1000;
           /*
            setitimer(int which, const struct itimerval *value, struct itimerval *ovalue));
            setitimer()比alarm功能强大,支持3种类型的定时器:
            
            ITIMER_REAL: 设定绝对时间;经过指定的时间后,内核将发送SIGALRM信号给本进程;
            ITIMER_VIRTUAL 设定程序执行时间;经过指定的时间后,内核将发送SIGVTALRM信号给本进程;
            ITIMER_PROF 设定进程执行以及内核因本进程而消耗的时间和,经过指定的时间后,内核将发送ITIMER_VIRTUAL信号给本进程;
            
            */ //设置定时器,以系统真实时间来计算,送出SIGALRM信号,这个信号反过来会设置ngx_sigalrm为1,这样delay就会不断翻倍。
           if (setitimer(ITIMER_REAL, &itv, NULL) == -1) {
               ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
                             "setitimer() failed");
           }
       }

       ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "sigsuspend");

       sigsuspend(&set);//等待定时器超时,通过ngx_init_signals执行ngx_signal_handler中的SIGALRM信号,信号处理函数返回后,继续该函数后面的操作

       ngx_time_update();

       ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                      "wake up, sigio %i", sigio);

       if (ngx_reap) {//父进程收到一个子进程退出的信号
           ngx_reap = 0;
           ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "reap children");
           ///这个里面处理退出的子进程(有的worker异常退出,这时我们就需要重启这个worker ),如果所有子进程都退出则会返回0.
           live = ngx_reap_children(cycle);// 有子进程意外结束,这时需要监控所有的子进程
       }

        //如果没有存活的子进程,并且收到了ngx_terminate或者ngx_quit信号,则master退出。
       if (!live && (ngx_terminate || ngx_quit)) {
           ngx_master_process_exit(cycle);
       }

       if (ngx_terminate) {//收到了sigint信号。
           if (delay == 0) {//设置延时。
               delay = 50;
           }

           if (sigio) {
               sigio--;
               continue;
           }

           sigio = ccf->worker_processes + 2 /* cache processes */;

           if (delay > 1000) {//如果超时就强制杀死进程
               ngx_signal_worker_processes(cycle, SIGKILL);
           } else {
               ngx_signal_worker_processes(cycle,
                                      ngx_signal_value(NGX_TERMINATE_SIGNAL));//否则就发送退出信号
           }

           continue;
       }

       if (ngx_quit) {//如果收到了退出信号
           ngx_signal_worker_processes(cycle,
                                       ngx_signal_value(NGX_SHUTDOWN_SIGNAL));//发送信号

           ls = cycle->listening.elts;
           for (n = 0; n < cycle->listening.nelts; n++) {//关闭侦听数组
               if (ngx_close_socket(ls[n].fd) == -1) {
                   ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_socket_errno,
                                 ngx_close_socket_n " %V failed",
                                 &ls[n].addr_text);
               }
           }
           cycle->listening.nelts = 0;

           continue;
       }

       if (ngx_reconfigure) {//收到reconfig信号
           ngx_reconfigure = 0;

           if (ngx_new_binary) {//判断是否热代码替换后的新的代码还在运行中(也就是还没退出当前的master)。如果还在运行中,则不需要重新初始化config。
               ngx_start_worker_processes(cycle, ccf->worker_processes,
                                          NGX_PROCESS_RESPAWN);//重启worker进程
               ngx_start_cache_manager_processes(cycle, 0);//重启cache进程
               ngx_noaccepting = 0;

               continue;
           }

           ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "reconfiguring");

           cycle = ngx_init_cycle(cycle);//重新初始化config,并重新启动新的worker
           if (cycle == NULL) {
               cycle = (ngx_cycle_t *) ngx_cycle;
               continue;
           }

           ngx_cycle = cycle;
           ccf = (ngx_core_conf_t *) ngx_get_conf(cycle->conf_ctx,
                                                  ngx_core_module);
           ngx_start_worker_processes(cycle, ccf->worker_processes,//重启worker进程
                                      NGX_PROCESS_JUST_RESPAWN);
           ngx_start_cache_manager_processes(cycle, 1);//重启cache进程

           /* allow new processes to start */
           ngx_msleep(100);

           live = 1;
           ngx_signal_worker_processes(cycle,
                                       ngx_signal_value(NGX_SHUTDOWN_SIGNAL));//发送退出命令
       }

       if (ngx_restart) {//重启
           ngx_restart = 0;
           ngx_start_worker_processes(cycle, ccf->worker_processes,
                                      NGX_PROCESS_RESPAWN);
           ngx_start_cache_manager_processes(cycle, 0);
           live = 1;
       }

       if (ngx_reopen) {//重新打开文件
           ngx_reopen = 0;
           ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "reopening logs");
           ngx_reopen_files(cycle, ccf->user);
           ngx_signal_worker_processes(cycle,
                                       ngx_signal_value(NGX_REOPEN_SIGNAL));
       }
       /*
        检查ngx_change_binary标志位,如果ngx_change_binary为1,则表示需要平滑升级Nginx,这时将调用ngx_exec_new_binary方法用新的子
        进程启动新版本的Nginx程序, 同时将ngx_change_binary标志位置为0。
        */
       if (ngx_change_binary) {
           ngx_change_binary = 0;
           ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "changing binary");
           ngx_new_binary = ngx_exec_new_binary(cycle, ngx_argv);
       }

       if (ngx_noaccept) {//不进行accept
           ngx_noaccept = 0;
           ngx_noaccepting = 1;
           ngx_signal_worker_processes(cycle,
                                       ngx_signal_value(NGX_SHUTDOWN_SIGNAL));//退出子进程
       }
   }
}

static void
ngx_start_worker_processes(ngx_cycle_t *cycle, ngx_int_t n, ngx_int_t type)
{
   ngx_int_t      i;
   ngx_channel_t  ch;

   ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "start worker processes");

   ngx_memzero(&ch, sizeof(ngx_channel_t));

   ch.command = NGX_CMD_OPEN_CHANNEL;

   for (i = 0; i < n; i++) {//n是配置的worker 数量

       ngx_spawn_process(cycle, ngx_worker_process_cycle,
                         (void *) (intptr_t) i, "worker process", type);

       ch.pid = ngx_processes[ngx_process_slot].pid;//配置当前的channel
       ch.slot = ngx_process_slot;
       ch.fd = ngx_processes[ngx_process_slot].channel[0];

       // 把该子进程的相关channel信息传递给已经创建好的其他所有子进程
       ngx_pass_open_channel(cycle, &ch);
   }
}

/*
在Nginx中,如果启用了proxy(fastcgi) cache功能,master process会在启动的时候启动管理缓存的两个子进程(区别于处理请求的子进程)来管理内
存和磁盘的缓存个体。第一个进程的功能是定期检查缓存,并将过期的缓存删除;第二个进程的作用是在启动的时候将磁盘中已经缓存的个
体映射到内存中(目前Nginx设定为启动以后60秒),然后退出。

具体的,在这两个进程的ngx_process_events_and_timers()函数中,会调用ngx_event_expire_timers()。Nginx的ngx_event_timer_rbtree(红黑树)里
面按照执行的时间的先后存放着一系列的事件。每次取执行时间最早的事件,如果当前时间已经到了应该执行该事件,就会调用事件的handler。两个
进程的handler分别是ngx_cache_manager_process_handler和ngx_cache_loader_process_handler
*/
static void
ngx_start_cache_manager_processes(ngx_cycle_t *cycle, ngx_uint_t respawn)
{
   ngx_uint_t       i, manager, loader;
   ngx_path_t     **path;
   ngx_channel_t    ch;

   manager = 0;
   loader = 0;

   path = ngx_cycle->paths.elts;
   for (i = 0; i < ngx_cycle->paths.nelts; i++) {

       if (path[i]->manager) {
           manager = 1;
       }

       if (path[i]->loader) {
           loader = 1;
       }
   }

   if (manager == 0) {
       return;
   }

   ngx_spawn_process(cycle, ngx_cache_manager_process_cycle,
                     &ngx_cache_manager_ctx, "cache manager process",
                     respawn ? NGX_PROCESS_JUST_RESPAWN : NGX_PROCESS_RESPAWN);//第一个函数

   ngx_memzero(&ch, sizeof(ngx_channel_t));

   ch.command = NGX_CMD_OPEN_CHANNEL;
   ch.pid = ngx_processes[ngx_process_slot].pid;
   ch.slot = ngx_process_slot;
   ch.fd = ngx_processes[ngx_process_slot].channel[0];

   ngx_pass_open_channel(cycle, &ch);//通知

   if (loader == 0) {
       return;
   }

   ngx_spawn_process(cycle, ngx_cache_manager_process_cycle,
                     &ngx_cache_loader_ctx, "cache loader process",
                     respawn ? NGX_PROCESS_JUST_SPAWN : NGX_PROCESS_NORESPAWN);

   ch.command = NGX_CMD_OPEN_CHANNEL;
   ch.pid = ngx_processes[ngx_process_slot].pid;
   ch.slot = ngx_process_slot;
   ch.fd = ngx_processes[ngx_process_slot].channel[0];

   ngx_pass_open_channel(cycle, &ch);//通知
}

到这里我们所有关于master worker的东西都差不多了。补充一个外部像master发送命令 比如我们的nginx -s stop

像master发送命令
比如我们的 nginx -s stop 等等命令 其实这些命令在发送的时候是重新开了一个nginx的进程,在解析完命令的时候,发现这个是用来执行命令的 这个时候就会调用 ngx_signal_process 发送命令
其实很简单,就只是读取pidfile文件获取pid 然后通过kill像nginx发送命令。就不写了
上一篇下一篇

猜你喜欢

热点阅读