Source code analysis of Redis network model

  • 2020-10-23 21:16:35
  • OfStack

preface

The network model of Redis is based on I/O multiplexing program. The source contains four multiplexing function libraries epoll, select, evport, kqueue. One of the four libraries is automatically selected by the system when the program is compiled. The following take epoll as an example to analyze the I/O module source code.

epoll system call method

The Redis network event handling module code is written around epoll's three system methods. Get these 3 ways out of the way first, and then it's not hard.

[

epfd = epoll_create(1024);

]

Create an instance of epoll

Parameter: represents the maximum number of socket fd (file descriptors) that this epoll instance can listen on.

Returns: file descriptor dedicated to epoll.

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)

Manage events in epoll, registering, modifying, and deleting events.

[

Parameters:
epfd: File descriptor for the epoll instance;
op: Three values: EPOLL_CTL_ADD registration, EPOLL_CTL_MOD modification, EPOLL_CTL_DEL deletion;
fd: File descriptor for socket;
epoll_event *event: event

]

event represents 1 event, similar to the channel "channel" in Java NIO. The structure of epoll_event is as follows:


typedef union epoll_data {
void *ptr;
int fd; /* socket File descriptor  */
__uint32_t u32;
__uint64_t u64;
} epoll_data_t;

struct epoll_event {
__uint32_t events; /* Epoll events  Is the result of the opcodes of various operations to be listened to, for example EPOLLIN ( fd Readable), EPOLLOUT ( fd Can write)  */
epoll_data_t data; /* User data variable */
};

int epoll_wait(int epfd, struct epoll_event * events, intmaxevents, int timeout);

Wait until the event is ready, similar to the select method in Java NIO. If the event is ready, the ready event is stored in the events array.

[

parameter
epfd: File descriptor for the epoll instance;
events: Array of events ready;
intmaxevents: Number of events that can be processed per time;
timeout: Block time, waiting for a timeout value to produce a ready event.

]

Source code analysis

The event

The Redis event system divides events into two types:

File events; Events corresponding to network sockets; Time events: One of the timed operational events in Redis, such as the serverCron function.

The following from the event registration, triggering two processes for source code analysis

The binding event

Establish eventLoop

In the initServer method (called by the main function of ES120en.c), an RedisDb object is created while a "eventLoop" object is initialized, which I call an event handler object. The key member variables of the structure are as follows:


struct aeEventLoop { 
aeFileEvent *events;// Array of registered file events 
aeFiredEvent *fired;// Array of file events ready 
aeTimeEvent *timeEventHead;// Time event array 
...
 } 

Initializing eventLoop is performed in the "aeCreateEventLoop" method of ES129en.c. In addition to initializing eventLoop, this method initializes an instance of epoll by calling the following method.


/*
 * ae_epoll.c
 *  create 1 A new one  epoll  Instance and assign it to  eventLoop
 */
static int aeApiCreate(aeEventLoop *eventLoop) {

  aeApiState *state = zmalloc(sizeof(aeApiState));

  if (!state) return -1;

  //  Initializes the event slot space 
  state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
  if (!state->events) {
    zfree(state);
    return -1;
  }

  //  create  epoll  The instance 
  state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
  if (state->epfd == -1) {
    zfree(state->events);
    zfree(state);
    return -1;
  }

  //  Assigned to  eventLoop
  eventLoop->apidata = state;
  return 0;
}

This is where the system method "epoll_create" is called. Here state is an aeApiState structure, as shown below:


/*
 *  State of the event 
 */
typedef struct aeApiState {

  // epoll  Instance descriptor 
  int epfd;

  //  Event slot 
  struct epoll_event *events;

} aeApiState;

This state by eventLoop- > apidata to record.

Bind the ip port to the handle

Open TCP port with the listenToPort method, and each IP port will correspond to a file descriptor ipfd (because the server may have multiple ip addresses)


//  Open the  TCP  Listen on the port to wait for a command request from the client 
if (server.port != 0 &&
  listenToPort(server.port,server.ipfd,&server.ipfd_count) == REDIS_ERR)
  exit(1);

Note: *eventLoop and ipfd are cited by ES163en. el and ES165en. ipfd[], respectively. server is an instance of the structure RedisServer and is the global variable of Redis.

Register event

The code below binds 1 event function for each 1 file descriptor


// initServer Methods: 
for (j = 0; j < server.ipfd_count; j++) {
  if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
    acceptTcpHandler,NULL) == AE_ERR)
    {
      redisPanic(
        "Unrecoverable error creating server.ipfd file event.");
    }
}
// ae.c  In the  aeCreateFileEvent  methods 
/*
 *  According to the  mask  Parameter, listen  fd  The status of the file, 
 *  when  fd  When available, execute  proc  function 
 */
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
    aeFileProc *proc, void *clientData)
{
  if (fd >= eventLoop->setsize) {
    errno = ERANGE;
    return AE_ERR;
  }

  if (fd >= eventLoop->setsize) return AE_ERR;

  //  Fetch the file event structure 
  aeFileEvent *fe = &eventLoop->events[fd];

  //  Listening to the specified  fd  Specified event of 
  if (aeApiAddEvent(eventLoop, fd, mask) == -1)
    return AE_ERR;

  //  Sets the file event type, as well as the event handler 
  fe->mask |= mask;
  if (mask & AE_READABLE) fe->rfileProc = proc;
  if (mask & AE_WRITABLE) fe->wfileProc = proc;

  //  Private data 
  fe->clientData = clientData;

  //  If necessary, update the maximum of the event handler  fd
  if (fd > eventLoop->maxfd)
    eventLoop->maxfd = fd;

  return AE_OK;
}

The aeCreateFileEvent function has one method call: aeApiAddEvent, as follows


/*
 * ae_epoll.c
 *  Associate a given event to  fd
 */
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
  aeApiState *state = eventLoop->apidata;
  struct epoll_event ee;

  /* If the fd was already monitored for some event, we need a MOD
   * operation. Otherwise we need an ADD operation. 
   *
   *  if  fd  There are no associated events, so this is 1 a  ADD  Operation. 
   *
   *  If it's already associated with something / Some event, then this is 1 a  MOD  Operation. 
   */
  int op = eventLoop->events[fd].mask == AE_NONE ?
      EPOLL_CTL_ADD : EPOLL_CTL_MOD;

  //  Register event to  epoll
  ee.events = 0;
  mask |= eventLoop->events[fd].mask; /* Merge old events */
  if (mask & AE_READABLE) ee.events |= EPOLLIN;
  if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
  ee.data.u64 = 0; /* avoid valgrind warning */
  ee.data.fd = fd;

  if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;

  return 0;
}

What you're really doing here is calling the system method "epoll_ctl" and registering the event (file descriptor) in epoll. The first step is to encapsulate an epoll_event structure, known as ee, and register it in epoll via "epoll_ctl".

In addition, aeCreateFileEvent did the following two important things:

The event function "acceptTcpHandler" was stored in eventLoop, that is, by eventLoop- > events[fd]- > rfileProc for reference (or wfileProc for read event and write event, respectively); Will add when opcode to eventLoop- > events[fd]- > mask (mask is similar to the ops opcode in JavaNIO and represents the event type).

Event listening and execution

The main function of ES216en. c calls the main method in ES219en. c, as shown below:


/*
 *  The main loop for the event handler 
 */
void aeMain(aeEventLoop *eventLoop) {

  eventLoop->stop = 0;

  while (!eventLoop->stop) {

    //  If you have a function that needs to be executed before the event is processed, run it 
    if (eventLoop->beforesleep != NULL)
      eventLoop->beforesleep(eventLoop);

    //  Start processing events 
    aeProcessEvents(eventLoop, AE_ALL_EVENTS);
  }
}

The above code calls the aeProcessEvents method to handle the event, as shown below


/* Process every pending time event, then every pending file event
 * (that may be registered by time event callbacks just processed).
 *
 *  Handles all arrived time events, as well as all ready file events. 
 *  The return value of the function is the number of events handled 
 */
 int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
  int processed = 0, numevents;

  /* Nothing to do? return ASAP */
  if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;

  if (eventLoop->maxfd != -1 ||
    ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
    int j;
    aeTimeEvent *shortest = NULL;
    struct timeval tv, *tvp;

    //  Get the most recent time event 
    if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
      shortest = aeSearchNearestTimer(eventLoop);
    if (shortest) {
      //  If time events exist 
      //  Then determine the blocking time of a file event based on the time difference between the most recent executable and the present time 
      long now_sec, now_ms;

      /* Calculate the time missing for the nearest
       * timer to fire. */
      //  How long will it take to calculate the most recent time event 
      //  And save the time interval at  tv  In the structure 
      aeGetTime(&now_sec, &now_ms);
      tvp = &tv;
      tvp->tv_sec = shortest->when_sec - now_sec;
      if (shortest->when_ms < now_ms) {
        tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;
        tvp->tv_sec --;
      } else {
        tvp->tv_usec = (shortest->when_ms - now_ms)*1000;
      }

      //  The time difference is less than  0  To indicate that the event is ready to execute, set seconds and milliseconds to  0  (Non-blocking) 
      if (tvp->tv_sec < 0) tvp->tv_sec = 0;
      if (tvp->tv_usec < 0) tvp->tv_usec = 0;
    } else {
      
      //  Through this 1 Step means there is no time event 
      //  Then according to the  AE_DONT_WAIT  Is set to determine whether or not to block, and for how long 

      /* If we have to check for events but need to return
       * ASAP because of AE_DONT_WAIT we need to set the timeout
       * to zero */
      if (flags & AE_DONT_WAIT) {
        //  Sets file events not to block 
        tv.tv_sec = tv.tv_usec = 0;
        tvp = &tv;
      } else {
        /* Otherwise we can block */
        //  File events can block until an event arrives 
        tvp = NULL; /* wait forever */
      }
    }

    //  Handle file events, block time by  tvp  decision 
    numevents = aeApiPoll(eventLoop, tvp);
    for (j = 0; j < numevents; j++) {
      //  Gets the event from the ready array 
      aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];

      int mask = eventLoop->fired[j].mask;
      int fd = eventLoop->fired[j].fd;
      int rfired = 0;

      /* note the fe->mask & mask & ... code: maybe an already processed
       * event removed an element that fired and we still didn't
       * processed, so we check if the event is still valid. */
      //  Read the event 
      if (fe->mask & mask & AE_READABLE) {
        // rfired  Make sure to read / Write events can only be executed within them 1 a 
        rfired = 1;
        fe->rfileProc(eventLoop,fd,fe->clientData,mask);
      }
      //  Write the event 
      if (fe->mask & mask & AE_WRITABLE) {
        if (!rfired || fe->wfileProc != fe->rfileProc)
          fe->wfileProc(eventLoop,fd,fe->clientData,mask);
      }

      processed++;
    }
  }

  /* Check time events */
  //  Execution time event 
  if (flags & AE_TIME_EVENTS)
    processed += processTimeEvents(eventLoop);

  return processed; 
}

The code in this function is roughly divided into three main steps

Block time tvp is determined based on the relationship between the time event and the current time. Call the aeApiPoll method and write all ready events to eventLoop- > In fired[], returns the number of ready events; Traverse eventLoop - > fired[], traversing each ready event, executes the previously bound methods rfileProc or wfileProc.

The aeApiPoll method in ae_ES243en.c is as follows:


/*
 *  Gets executable events 
 */
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
  aeApiState *state = eventLoop->apidata;
  int retval, numevents = 0;

  //  Waiting time 
  retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
      tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);

  //  There are at least 1 Event ready? 
  if (retval > 0) {
    int j;

    //  Set the appropriate mode for the ready event 
    //  To join the  eventLoop  the  fired  In the array 
    numevents = retval;
    for (j = 0; j < numevents; j++) {
      int mask = 0;
      struct epoll_event *e = state->events+j;

      if (e->events & EPOLLIN) mask |= AE_READABLE;
      if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
      if (e->events & EPOLLERR) mask |= AE_WRITABLE;
      if (e->events & EPOLLHUP) mask |= AE_WRITABLE;

      eventLoop->fired[j].fd = e->data.fd;
      eventLoop->fired[j].mask = mask;
    }
  }
  
  //  Returns the number of ready events 
  return numevents;
}

After epoll_wait is executed, the ready events are written to eventLoop- > apidata- > events event slot. The next loop is to write events from the event slot to eventLoop- > fired []. Specific description: each event is an epoll_event structure, referred to by e, then ES262en. data. fd represents the file descriptor, e- > events represents its opcode, converts the opcode to mask, and finally writes both fd and mask to eventLoop- > fired [j].

Then, in the outer aeProcessEvents method, the method to which the function pointer rfileProc or wfileProc refers is executed, such as the registered "acceptTcpHandler" mentioned earlier.

conclusion

Redis's network module is actually a simple Reactor mode. This article follows the "server registers events --. > Accept client connection -- > Listen for event ready -- > Execution event "such a route, to analyze the Redis source code, describes the PROCESS of Redis accepting client connect. In fact, NIO's ideas are basically similar.


Related articles: