Implementation of nginx worker process loop

  • 2020-05-24 06:52:35
  • OfStack

When the worker process is started, it first initializes the environment it needs to run, then enters a loop in which it constantly checks to see if there are any events that need to be executed, and then processes the events. In this process, the worker process also needs to interact with the master process. Moreover, as a child process, the worker process can also receive command line instructions (such as kill, etc.) for corresponding logical processing. So how does the worker process interact with the master or command-line instructions? This article will first explain how the worker process interacts with the master process and how the worker process processes command line instructions, and then introduce the entire workflow of the worker process interaction from the source code.

1. Interaction between worker and master processes

Here, first of all, to be sure, whether master or external command, nginx by mark a way to deal with the corresponding instruction, which is in receiving a command (master or external command, worker will be set in its callback methods correspond to the directive of the sign bit, and then in the process of worker after handling the event in the cycle of its own will, in turn, check if the sign bit is true, is, according to the role of the sign bit executed corresponding logic.

For the worker process to interact with the master process, it is done through the socket pipeline. An ngx_process_t structure is declared in the ngx_process.h file. Here we focus on its channel properties:


typedef struct {
  //  The rest of the property ...
  
  ngx_socket_t channel[2];
} ngx_process_t;

The ngx_process_t structure is used to store information about a process, such as pid, channel, status, and so on. Each process has an ngx_processes array, and the array element is the ngx_process_t structure, which means that each process will save the basic information of the remaining processes through the ngx_processes array. Its statement reads as follows:

// stores an array of all child processes in nginx, each marked with a corresponding ngx_process_t structure
extern ngx_process_t ngx_processes[NGX_MAX_PROCESSES];
Here we can see that each process has a corresponding channel array of length 2, which is the pipe flow that interacts with the master process. Before each child process is created by the master process, an channel array is created by:

int socketpair(int domain, int type, int protocol, int sv[2]);
The main purpose of this method is to create a pair of anonymous, connected sockets; that is, if data is written on one socket, the data written on the other socket can be received. In this way, if the data is written to one side of the pipe in the parent process, the data can be received on the other side in the child process, enabling data communication between the parent and child processes.

After the master process has started the child process, the child process retains the corresponding data from the master process, including the channel array here. In this way, the master process can communicate with the child process through the channel array.

2. worker handles external commands

For external commands, this is essentially handled by the individual signals and callback methods defined in the signals array. When the master process initializes the base environment, it sets the signal callback method specified in the signals array to the corresponding signal. Since the worker process inherits the basic environment of the master process, the worker process also calls the corresponding callback method after receiving the signal set here. The main logic of the callback method simply sets the value of the corresponding flag bit. As for how to set the corresponding flag bit after nginx receives the signal, please refer to my previous article (nginx master working cycle hyperlink), and I won't go into details here.

3. Source code explanation

master process is through the ngx_start_worker_processes() method to start each child process, as follows is the method source code:


/**
 *  Start the n a worker Child process, and set each child process and master Between parent processes socketpair
 *  The system call is set up socket Handle communication mechanism 
 */
static void ngx_start_worker_processes(ngx_cycle_t *cycle, ngx_int_t n, ngx_int_t type) {
 ngx_int_t i;
 ngx_channel_t ch;
 
 ngx_memzero(&ch, sizeof(ngx_channel_t));
 ch.command = NGX_CMD_OPEN_CHANNEL;

 for (i = 0; i < n; i++) {

  // spawn It means to lay eggs, and here it means to produce 1 Child process, and the child process of the event cycle is 
  // ngx_worker_process_cycle() Method, here ngx_worker_process_cycle is worker The process handles the loop of events, 
  // worker In the process 1 An infinite for In the loop, we constantly check whether there are corresponding events in the corresponding event model. 
  //  then accept Events and read , write Events are placed in two separate queues and are processed in an event loop 
  ngx_spawn_process(cycle, ngx_worker_process_cycle, 
           (void *) (intptr_t) i, "worker process", type);

  //  The main purpose of the following code is to notify other processes of the new process 
  // ch.command = NGX_CMD_OPEN_CHANNEL; In the NGX_CMD_OPEN_CHANNEL That means that it's new 1 A process, 
  //  while ngx_process_slot It stores the location of the array that the new process is storing, and the reason it needs to broadcast here is because, 
  //  After each child process is created, its memory data is copied from the parent process, however ngx_processes The array is for every process 1 Of the, 
  //  Thus, the child process created first in the array does not have the data of the child process created later, however master A process has data for all its child processes, 
  //  So here master After the process creates the child process, it sends the ngx_processes Array of each process channel[0] on 
  //  Writes the currently broadcast event, which is here ch In this way, after each child process receives the event, 
  //  Will try to update what it saves ngx_processes Data and information 
  ch.pid = ngx_processes[ngx_process_slot].pid;
  ch.slot = ngx_process_slot;
  ch.fd = ngx_processes[ngx_process_slot].channel[0];

  //  Broadcast events 
  ngx_pass_open_channel(cycle, &ch);
 }
}

The main concern here is the method call to the child process above, which is the ngx_spawn_process() method here. The second parameter of this method is a method, and after the child process is started, the child process enters the loop specified by the method. In the ngx_spawn_process() method, the master process creates an channel array for the currently newly created child process to communicate with the current child process. The following is the source code of the ngx_spawn_process() method:


ngx_pid_t ngx_spawn_process(ngx_cycle_t *cycle, ngx_spawn_proc_pt proc, void *data, char *name, ngx_int_t respawn) {
 u_long on;
 ngx_pid_t pid;
 ngx_int_t s;

 if (respawn >= 0) {
  s = respawn;

 } else {
  //  in ngx_processes The array stores all the processes currently created, and ngx_last_process Is the end of the current current record 1 a 
  // process in ngx_processes Under the 1 The index of the location, just ngx_processes There may be parts of the process being recorded in 
  //  It's no longer working. The current loop is to start from scratch to see if a process has failed, and if so, to reuse the process location, 
  //  Otherwise use directly ngx_last_process Where it's pointing 
  for (s = 0; s < ngx_last_process; s++) {
   if (ngx_processes[s].pid == -1) {
    break;
   }
  }

  //  This indicates that the maximum number of processes created was reached 
  if (s == NGX_MAX_PROCESSES) {
   ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,
          "no more than %d processes can be spawned",
          NGX_MAX_PROCESSES);
   return NGX_INVALID_PID;
  }
 }

 // NGX_PROCESS_DETACHED The flag represents the current fork The outgoing process has nothing to do with the original parent process, such as proceeding nginx To upgrade, 
 //  The new generation master The process is the same as before master Process doesn't matter 
 if (respawn != NGX_PROCESS_DETACHED) {

  /* Solaris 9 still has no AF_LOCAL */

  //  Here, socketpair() The primary purpose of the method is to generate 1 On the socket stream, used for communication between the main process and the child process, this 1 The socket will 
  //  Stored in the ngx_processes[s].channel Where, essentially this field is 1 A length of 2 Integer array. In the main process and child process 
  //  Before communicating, the main process shuts it down 1 The child process will close the other 1 One, and then one to the other 1 File descriptors 
  //  Write or read data to achieve communication. 
  // AF_UNIX Means that is currently in use UNIX File form socket The family of address 
  // SOCK_STREAM Specifies that the communication mode established by the current socket is a pipe flow, and that the pipe flow is bidirectional, 
  //  That is, both sides of the pipeline can read and write operations 
  //  The first 3 A parameter protocol Must be 0
  if (socketpair(AF_UNIX, SOCK_STREAM, 0, ngx_processes[s].channel) == -1) {
   ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
          "socketpair() failed while spawning \"%s\"", name);
   return NGX_INVALID_PID;
  }

  ngx_log_debug2(NGX_LOG_DEBUG_CORE, cycle->log, 0,
          "channel %d:%d",
          ngx_processes[s].channel[0],
          ngx_processes[s].channel[1]);

  //  will ngx_processes[s].channel[0] Set to non-blocking mode 
  if (ngx_nonblocking(ngx_processes[s].channel[0]) == -1) {
   ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
          ngx_nonblocking_n
            " failed while spawning \"%s\"",
          name);
   ngx_close_channel(ngx_processes[s].channel, cycle->log);
   return NGX_INVALID_PID;
  }

  //  will ngx_processes[s].channel[1] Set to non-blocking mode 
  if (ngx_nonblocking(ngx_processes[s].channel[1]) == -1) {
   ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
          ngx_nonblocking_n
            " failed while spawning \"%s\"",
          name);
   ngx_close_channel(ngx_processes[s].channel, cycle->log);
   return NGX_INVALID_PID;
  }

  on = 1;
  //  will ngx_processes[s].channel[0] The socket pipe is set to asynchronous mode 
  if (ioctl(ngx_processes[s].channel[0], FIOASYNC, &on) == -1) {
   ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
          "ioctl(FIOASYNC) failed while spawning \"%s\"", name);
   ngx_close_channel(ngx_processes[s].channel, cycle->log);
   return NGX_INVALID_PID;
  }

  //  I'm still in the main process, right here ngx_pid The process that points to the main process id , the current method is mainly used to 
  // ngx_processes[s].channel[0] Operation permission is set to the main process, that is to say, the main process through to 
  // ngx_processes[s].channel[0] Writes and reads data to communicate with child processes 
  if (fcntl(ngx_processes[s].channel[0], F_SETOWN, ngx_pid) == -1) {
   ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
          "fcntl(F_SETOWN) failed while spawning \"%s\"", name);
   ngx_close_channel(ngx_processes[s].channel, cycle->log);
   return NGX_INVALID_PID;
  }

  // FD_CLOEXEC Indicates that the currently specified socket pipe is available in the child process, but in execl() Not available in the program being executed 
  if (fcntl(ngx_processes[s].channel[0], F_SETFD, FD_CLOEXEC) == -1) {
   ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
          "fcntl(FD_CLOEXEC) failed while spawning \"%s\"",
          name);
   ngx_close_channel(ngx_processes[s].channel, cycle->log);
   return NGX_INVALID_PID;
  }

  // FD_CLOEXEC Indicates that the currently specified socket pipe is available in the child process, but in execl() Not available in the program being executed 
  if (fcntl(ngx_processes[s].channel[1], F_SETFD, FD_CLOEXEC) == -1) {
   ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
          "fcntl(FD_CLOEXEC) failed while spawning \"%s\"",
          name);
   ngx_close_channel(ngx_processes[s].channel, cycle->log);
   return NGX_INVALID_PID;
  }

  // ngx_processes[s].channel[1] Is used for child processes to listen for related events, when the parent process to 
  // ngx_processes[s].channel[0] After publishing the event, ngx_processes[s].channel[1] I'm going to receive it 
  //  The corresponding event is processed accordingly 
  ngx_channel = ngx_processes[s].channel[1];

 } else {
  //  If it is NGX_PROCESS_DETACHED The schema indicates that it is currently new in addition 1 a master Process, thus setting its pipe values to -1
  ngx_processes[s].channel[0] = -1;
  ngx_processes[s].channel[1] = -1;
 }

 ngx_process_slot = s;


 // fork() Method will generate 1 A new process whose relationship to the parent process is that the child process's memory data will copy the parent process exactly. 
 //  It's also important to note that, fork() The code that comes out of the child process is executed from fork() And for the parent process, 
 //  The return value of this method is the parent process id , and for the child process, the method returns a value of 0 , thus passing if-else The statement allows the parent process 
 //  The child process and the child process respectively call the subsequent different code fragments 
 pid = fork();

 switch (pid) {

  case -1:
   // fork error 
   ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
          "fork() failed while spawning \"%s\"", name);
   ngx_close_channel(ngx_processes[s].channel, cycle->log);
   return NGX_INVALID_PID;

  case 0:
   //  The branch that the child process executes, here proc() Methods are passed in externally, that is, the current method is just created 1 Two new processes, 
   //  The specific process processing logic is defined by an external block of code ngx_getpid() Method gets the process of the currently newly created child process id
   ngx_pid = ngx_getpid();
   proc(cycle, data);
   break;

  default:
   //  The parent process will go here 
   break;
 }

 ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "start %s %P", name, pid);

 //  The parent will go here, the current pid is fork() After the parent process gets the newly created child process pid
 ngx_processes[s].pid = pid;
 ngx_processes[s].exited = 0;

 if (respawn >= 0) {
  return pid;
 }

 //  Sets the properties of the current process and stores them to ngx_processes The corresponding position in the array 
 ngx_processes[s].proc = proc;
 ngx_processes[s].data = data;
 ngx_processes[s].name = name;
 ngx_processes[s].exiting = 0;

 switch (respawn) {

  case NGX_PROCESS_NORESPAWN:
   ngx_processes[s].respawn = 0;
   ngx_processes[s].just_spawn = 0;
   ngx_processes[s].detached = 0;
   break;

  case NGX_PROCESS_JUST_SPAWN:
   ngx_processes[s].respawn = 0;
   ngx_processes[s].just_spawn = 1;
   ngx_processes[s].detached = 0;
   break;

  case NGX_PROCESS_RESPAWN:
   ngx_processes[s].respawn = 1;
   ngx_processes[s].just_spawn = 0;
   ngx_processes[s].detached = 0;
   break;

  case NGX_PROCESS_JUST_RESPAWN:
   ngx_processes[s].respawn = 1;
   ngx_processes[s].just_spawn = 1;
   ngx_processes[s].detached = 0;
   break;

  case NGX_PROCESS_DETACHED:
   ngx_processes[s].respawn = 0;
   ngx_processes[s].just_spawn = 0;
   ngx_processes[s].detached = 1;
   break;
 }

 if (s == ngx_last_process) {
  ngx_last_process++;
 }

 return pid;
}

The ngx_spawn_process() method finally executes the callback method specified by its second argument to the fork()1 subprocess. But before we do that, we need to note that its call through the socketpair() method creates 1 pair of anonymous socket and stores it in the current process's channel array, thus completing the channel array creation.

After the worker process starts, it executes the ngx_worker_process_cycle() method, which initializes the worker process, including the processing of the inherited channel array. Since the master process and the worker process both retain the socket descriptor that the channel array refers to, the master process and each worker process essentially only need to retain the descriptor on one side of the array. Thus, the worker process will close the other side descriptor it saved during initialization. In nginx, the master process will keep the socket descriptor of the channel array bit 0, close the socket descriptor of bit 1, and the worker process will close the socket descriptor of bit 0, and keep the descriptor of bit 1. In this way, when the master process needs to communicate with the worker process, it only needs to write data to channel[0], while the worker process listens to channel[1] to receive the data written by the master process. Here we first look at 1 worker process initialization method ngx_worker_process_init() source:


/**
 *  This is mainly about initializing the current process, setting parameters such as its priority and open file limits. 
 *  Finally, it is added for the current process 1 A listener channel[1] The connection to read continuously master The message of the process, which is processed accordingly 
 */
static void ngx_worker_process_init(ngx_cycle_t *cycle, ngx_int_t worker) {
 sigset_t set;
 ngx_int_t n;
 ngx_time_t *tp;
 ngx_uint_t i;
 ngx_cpuset_t *cpu_affinity;
 struct rlimit rlmt;
 ngx_core_conf_t *ccf;
 ngx_listening_t *ls;

 //  Set the time zone information 
 if (ngx_set_environment(cycle, NULL) == NULL) {
  /* fatal */
  exit(2);
 }

 ccf = (ngx_core_conf_t *) ngx_get_conf(cycle->conf_ctx, ngx_core_module);

 //  Sets the priority of the current process 
 if (worker >= 0 && ccf->priority != 0) {
  if (setpriority(PRIO_PROCESS, 0, ccf->priority) == -1) {
   ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
          "setpriority(%d) failed", ccf->priority);
  }
 }

 //  Sets the number of file handles that the current process can open 
 if (ccf->rlimit_nofile != NGX_CONF_UNSET) {
  rlmt.rlim_cur = (rlim_t) ccf->rlimit_nofile;
  rlmt.rlim_max = (rlim_t) ccf->rlimit_nofile;

  if (setrlimit(RLIMIT_NOFILE, &rlmt) == -1) {
   ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
          "setrlimit(RLIMIT_NOFILE, %i) failed",
          ccf->rlimit_nofile);
  }
 }

 // Changes the limit on the largest size of a core file(RLIMIT_CORE) for worker processes.
 //  In short, set the maximum size the core file can use 
 if (ccf->rlimit_core != NGX_CONF_UNSET) {
  rlmt.rlim_cur = (rlim_t) ccf->rlimit_core;
  rlmt.rlim_max = (rlim_t) ccf->rlimit_core;

  if (setrlimit(RLIMIT_CORE, &rlmt) == -1) {
   ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
          "setrlimit(RLIMIT_CORE, %O) failed",
          ccf->rlimit_core);
  }
 }

 // geteuid() Returns the user who executed the current program id Here, 0 Indicates whether root The user 
 if (geteuid() == 0) {
  // setgid() The role of the method is more reorganized id
  if (setgid(ccf->group) == -1) {
   ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,
          "setgid(%d) failed", ccf->group);
   /* fatal */
   exit(2);
  }

  // initgroups() Is to change the additional group id
  if (initgroups(ccf->username, ccf->group) == -1) {
   ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,
          "initgroups(%s, %d) failed",
          ccf->username, ccf->group);
  }

  //  Change the user's id
  if (setuid(ccf->user) == -1) {
   ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,
          "setuid(%d) failed", ccf->user);
   /* fatal */
   exit(2);
  }
 }

 //  It's important to note that yes cache manager and cache loader Process, over here worker The incoming is -1 . 
 //  Means that the two processes do not need to set the nucleophile 
 if (worker >= 0) {
  //  Gets the current worker the CPU nucleophilic 
  cpu_affinity = ngx_get_cpu_affinity(worker);

  if (cpu_affinity) {
   //  Set up the worker Of the core 
   ngx_setaffinity(cpu_affinity, cycle->log);
  }
 }

#if (NGX_HAVE_PR_SET_DUMPABLE)
 if (prctl(PR_SET_DUMPABLE, 1, 0, 0, 0) == -1) {
   ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
          "prctl(PR_SET_DUMPABLE) failed");
 }

#endif

 if (ccf->working_directory.len) {
  // chdir() Is used to change the current working directory to the path its parameters are passed in 
  if (chdir((char *) ccf->working_directory.data) == -1) {
   ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
          "chdir(\"%s\") failed", ccf->working_directory.data);
   /* fatal */
   exit(2);
  }
 }

 //  Initializes null set Instruction set 
 sigemptyset(&set);

 //  In pieces  SIG_BLOCK Will:  set  The parameter points to the signal in the signal set and is added to the signal mask. 
 //  In pieces  SIG_UNBLOCK Will:  set  The signal in the signal set that the parameter points to is removed from the signal mask. 
 //  In pieces  SIG_SETMASK Will:  set  Parameter to the signal set is set to the signal mask. 
 //  In this case, the signal set to be blocked is simply initialized, with an empty set by default 
 if (sigprocmask(SIG_SETMASK, &set, NULL) == -1) {
  ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
         "sigprocmask() failed");
 }

 tp = ngx_timeofday();
 srandom(((unsigned) ngx_pid << 16) ^ tp->sec ^ tp->msec);

 ls = cycle->listening.elts;
 for (i = 0; i < cycle->listening.nelts; i++) {
  ls[i].previous = NULL;
 }

 //  This is where you call each module's init_process() Method to initialize the process module 
 for (i = 0; cycle->modules[i]; i++) {
  if (cycle->modules[i]->init_process) {
   if (cycle->modules[i]->init_process(cycle) == NGX_ERROR) {
    /* fatal */
    exit(2);
   }
  }
 }

 //  This is mainly to close the other processes in the current process channel[1] Pipe handle 
 for (n = 0; n < ngx_last_process; n++) {

  if (ngx_processes[n].pid == -1) {
   continue;
  }

  if (n == ngx_process_slot) {
   continue;
  }

  if (ngx_processes[n].channel[1] == -1) {
   continue;
  }

  if (close(ngx_processes[n].channel[1]) == -1) {
   ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
          "close() channel failed");
  }
 }

 //  Closes the current process channel[0] Pipe handle 
 if (close(ngx_processes[ngx_process_slot].channel[0]) == -1) {
  ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
         "close() channel failed");
 }

#if 0
 ngx_last_process = 0;
#endif

 // ngx_channel Points to the current process channel[1] Handle, also known as listening master Handle to which the process sends a message. 
 //  In the current method, the current handle is first created 1 a connection Object and encapsulates it as 1 , and then add the event to 
 //  The corresponding event model queue to listen for the current handle of the event, the event processing logic is mainly here ngx_channel_handler()
 //  Method. Here, ngx_channel_handler The main processing logic is to set the current process based on the currently received message 1 The sign, 
 //  Or update some of the cached data so that these flag bits are continuously checked during the current event loop 
 //  Deal with real logic. So the ngx_channel_handler The processing efficiency is very high 
 if (ngx_add_channel_event(cycle, ngx_channel, NGX_READ_EVENT,
              ngx_channel_handler)
   == NGX_ERROR) {
  /* fatal */
  exit(2);
 }
}

This method is mainly used to initialize the worker process. Here we need to focus on traversing the ngx_processes array, which holds the relevant information of the current nginx processes. During traversal, the channel[1] handle of the remaining processes held by the current process is closed, while the channel[0] handle is retained, so that if the current process needs to communicate with other processes, it only needs to write data to the channel[0] of the target process. After the traversal is complete, the current process closes its channel[0] handle and retains the channel[1] handle. Finally, the ngx_add_channel_event() method adds a listening event to the current process on channel[1]. The second parameter passed in when calling the ngx_add_channel_channel () method is ngx_channel, which is assigned in the ngx_spawn_process() method. It points to the socket handle of the current process's channel[1].

With the ngx_add_channel_event() method, it essentially creates an event in the ngx_event_event structure and then adds it to the handle of the currently used event model (such as epoll). The implementation source code for this method will not be described here, but we need to focus on the callback method when the event is triggered, which is the third parameter ngx_channel_handler() method passed in when calling the ngx_add_channel_event() method. Here is the source of the method:


static void ngx_channel_handler(ngx_event_t *ev) {
 ngx_int_t n;
 ngx_channel_t ch;
 ngx_connection_t *c;

 if (ev->timedout) {
  ev->timedout = 0;
  return;
 }

 c = ev->data;

 for (;;) {

  //  In the infinite for The loop reads and reads master The message sent by the process 
  n = ngx_read_channel(c->fd, &ch, sizeof(ngx_channel_t), ev->log);

  //  If there is an error reading the message, the current handle may be invalid and the current connection needs to be closed 
  if (n == NGX_ERROR) {
   if (ngx_event_flags & NGX_USE_EPOLL_EVENT) {
    ngx_del_conn(c, 0);
   }

   ngx_close_connection(c);
   return;
  }

  if (ngx_event_flags & NGX_USE_EVENTPORT_EVENT) {
   if (ngx_add_event(ev, NGX_READ_EVENT, 0) == NGX_ERROR) {
    return;
   }
  }

  if (n == NGX_AGAIN) {
   return;
  }

  //  Process the message that is sent 
  switch (ch.command) {
   //  If it is quit Message, set quit Sign a 
   case NGX_CMD_QUIT:
    ngx_quit = 1;
    break;

    //  if terminate Message, set terminate Sign a 
   case NGX_CMD_TERMINATE:
    ngx_terminate = 1;
    break;

    //  If it is reopen Message, set reopen Sign a 
   case NGX_CMD_REOPEN:
    ngx_reopen = 1;
    break;

    //  If it is a new process message, it updates the current one ngx_processes The array corresponds to the location of the data 
   case NGX_CMD_OPEN_CHANNEL:
    ngx_processes[ch.slot].pid = ch.pid;
    ngx_processes[ch.slot].channel[0] = ch.fd;
    break;

    //  If it's off channel , is closed ngx_processes The array corresponds to a handle to the location 
   case NGX_CMD_CLOSE_CHANNEL:
    if (close(ngx_processes[ch.slot].channel[0]) == -1) {
     ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_errno,
            "close() channel failed");
    }

    ngx_processes[ch.slot].channel[0] = -1;
    break;
  }
 }
}

In the ngx_channel_handler() method, it mainly reads the data in the monitored socket handle, and the data is carried by an ngx_channel_t structure. This ngx_channel_t structure is the master structure used by nginx to communicate with the worker process. It specifies the type of event that is currently happening and the process information that is happening. The following is the declaration of ngx_channel_t structure:


typedef struct {
  //  The type of event currently taking place 
  ngx_uint_t command;
  //  eventful pid
  ngx_pid_t pid;
  //  The course of events occurring in ngx_processes Subscripts in the array 
  ngx_int_t slot;
  //  Of the course of events channel[0] The value of the descriptor 
  ngx_fd_t fd;
} ngx_channel_t;

After reading the ngx_channel_t structure from the current process's channel[1], the ngx_channel_handler() method updates the status of the corresponding flag bit based on the type of event that occurred, and updates the status information of the process in the current process's ngx_processes array corresponding to the event that occurred.

After processing the event sent by the master process, the worker process continues its loop, in which it checks the state of the flag bits it is interested in, and then executes the corresponding logic based on those states. The following is the worker process work loop source:


/**
 *  Enter the worker A cycle of process work 
 */
static void ngx_worker_process_cycle(ngx_cycle_t *cycle, void *data) {
 ngx_int_t worker = (intptr_t) data;

 ngx_process = NGX_PROCESS_WORKER;
 ngx_worker = worker;

 //  Initialize the worker Process, before the method source code was explained 
 ngx_worker_process_init(cycle, worker);

 ngx_setproctitle("worker process");

 for (;;) {

  if (ngx_exiting) {
   //  The main purpose here is to check whether there is a right or wrong event cancelable State, that is to say whether all events have been canceled, and if so, 
   //  It will return NGX_OK . The logic here can be understood as, if marked as ngx_exiting So at this point, if there's anything left to cancel 
   //  If the event exists, it will go to the bottom ngx_process_events_and_timers() Method so that it handles unfinished events, 
   //  And then you go through the loop to this point again, and finally if Conditions for true , thereby performing exit worker Process work 
   if (ngx_event_no_timers_left() == NGX_OK) {
    ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "exiting");
    ngx_worker_process_exit(cycle);
   }
  }

  ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "worker cycle");

  //  Here, by checking whether there is a corresponding event in the corresponding event model, and then putting it into the queue for processing, 
  //  Here is the worker The core method for processes to handle events 
  ngx_process_events_and_timers(cycle);

  //  Here, ngx_terminate It's a forced shutdown nginx The choice if to nginx A forced shutdown was sent nginx Command, the current process will exit directly 
  if (ngx_terminate) {
   ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "exiting");
   ngx_worker_process_exit(cycle);
  }

  //  Here, ngx_quit Is a graceful exit option. This is mainly going to be the ngx_exiting Set to 1 Is used to indicate that the current process needs to exit, 
  //  It will then perform the following 3 A job: 
  // 1.  Add to the event queue 1 The event used to handle the currently active connection will be close Mark position: 1 , and execute the connection 
  //   The current processing method to complete the connection event as soon as possible; 
  // 2.  Close the current cycle In the listening socket Handle; 
  // 3.  That connects all currently idle states close The status is marked as 1 , and then call its connection handling method .
  if (ngx_quit) {
   ngx_quit = 0;
   ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "gracefully shutting down");
   ngx_setproctitle("worker process is shutting down");

   if (!ngx_exiting) {
    ngx_exiting = 1;
    ngx_set_shutdown_timer(cycle);
    ngx_close_listening_sockets(cycle);
    ngx_close_idle_connections(cycle);
   }
  }

  // ngx_reopen Basically re-open it nginx All files, such as toggle nginx Log files and so on 
  if (ngx_reopen) {
   ngx_reopen = 0;
   ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "reopening logs");
   ngx_reopen_files(cycle, -1);
  }
 }
}

As you can see, the worker process mainly deals with whether nginx exits the associated flag bit, and whether nginx re-reads the flag bit of the configuration file.

4. Summary

This article first explained the basic principles of master-worker process interaction, and then went deep into the source code to explain how nginx and worker processes communicate with each other.


Related articles: