Linux I and O multiplexing details and examples

  • 2020-05-12 06:44:25
  • OfStack

Linux I/O multiplexing

Linux 1 slice of the file, whether we the characters files stored on disk, an executable file is our access to a computer I/O equipment were VFS abstraction becomes a file, such as standard input device is the default keyboard, we in the operation standard input device, actually open operation is the default one file descriptor is 0, and 1 cutting software operating hardware will need through OS, cut and OS operation 1 hardware requires the corresponding driver, the driver in the configuration of the hardware configuration and the corresponding method of use. Linux I/O blocking I/O, non-blocking I/O, I/O multiplexing, signal drive I/O4 kinds. For drivers of I/O devices, 1 generally provides both blocking and non-blocking configurations. Our most common I/O device 1-- the driver for the keyboard (standard input device) is blocked by default.

The purpose of multiplexing is to enable a process to get the data it wants from multiple blocks of I/O and proceed to the next task. The main idea is to monitor multiple file descriptors at the same time. If the setting state of the file descriptor is triggered, the process will continue to execute. If no one of the setting states of the file descriptor is triggered, the process will enter sleep

One of the main USES of multiplexing is to implement "I/O multiplexing concurrent servers ". Compared with multi-threaded or multi-process concurrency, this kind of server has lower system overhead and is more suitable for web servers.

Blocking I/O

Blocking I/O means that when a process tries to access the I/O device and the device is not ready, the device's driver will pass through the kernel to put the process trying to access into the sleep state. Blocking I/O one advantage is that can greatly save CPU time, because 1 denier 1 process is trying to access 1 unprepared blocking I/O, will enter a state of sleep, and entered into a state of sleep process is not in the kernel process scheduling list, until the target I/O ready to wake up and join the schedule list, so you can save CPU time. Of course, blocking I/O also has its inherent disadvantages. If a process tries to access 1 blocking I/O, but the success of the access does not have a decisive impact on the next task, then it will obviously delay the completion of its task if it is directly put into sleep state.
The typical default blocking IO has standard input devices, socket devices, plumbing devices, etc. When we use gets(),scanf(),read() and so on to request these IO when IO has no data inflow, sleep of the process will be caused.

Suppose a process wants to read and display data through any one of three channels. The pseudocode is as follows


read(pipe_0,buf,sizeof(buf));    //sleep
print buf;
read(pipe_1,buf,sizeof(buf));
print buf;
read(pipe_2,buf,sizeof(buf));
print buf;

Since the pipe is blocked I/O, if pipe_0 has no data inflow, the process will enter the sleep state at the first read() and will not be read even if pipe_1 and pipe_2 have data inflow.
If we use the following code to reset pipeline blocking properties, obviously, if none of the three pipeline data flow, so the process will not be able to get the request of the data and continue to be executed, if the data is very important, so we didn't want to use block I/O), the result will be bad 10 points, instead of polling the but again a lot of CPU time.

int fl = fcntl(pipe_fd, F_GETFL);
fcntl(pipe_fd, F_SETFL, fl | O_NONBLOCK);

How to make the process monitor three pipes at the same time, one of which will continue to execute without sleep if there is data, and if there is no data flowing into sleep if there is no data flowing into sleep, is the problem that multiplexing technology needs to solve.

Non-blocking I/O

Non-blocking I/O means that when a process tries to access an I/O device, it will return the requested data regardless of whether it has obtained it or not and continue to perform the following tasks. , but very well suited to request I/O requests that have little impact on the next mission. However, if a non-blocking I/O is accessed, and the request fails and is fatal to the subsequent tasks of the process, the most violent thing to do is to poll using while(1) {read()}. Obviously, this takes up a lot of CPU time.

select mechanism

select is a very "old" synchronous I/O interface, but it provides a good idea of I/O multiplexing

model


fd_set   // create fd_set Object that needs to be monitored for future additions and subtractions fd
FD_ZERO()  // empty fd_set object 
FD_SET()  // will 1 a fd join fd_set In the object  
select()  // monitoring fd_set The file descriptor in the 
pselect()  // First set the signal shielding, and then monitor 
FD_ISSET() // test fd Whether to belong to fd_set object 
FD_CLR()  // from fd_set Delete in object fd

Note:

The first parameter of select, nfds, refers to the largest file descriptor +1 in the collection. Since select will traverse the entire file descriptor table indiscriminately until it finds the target, and the file descriptor starts from 0, 1 is the largest file descriptor +1 in the collection.

The previous one leads to the inefficiency of this mechanism, which is that if the file descriptors to be monitored are 0 and 100, then each time will be traversed 101 times

select() modifies fd_set every time it returns. If you want to loop select(), you need to prepare the initial fd_set

Example _I/O multiplexing concurrent servers

For the programming model of server itself, see the tcp/ip protocol server model and the udp/ip protocol server model


#define BUFSIZE 100
#define MAXNFD 1024 

int main()
{
  /*********** The server's listenfd The book is ready **************/
  fd_set readfds;
  fd_set writefds;
  FD_ZERO(&readfds);
  FD_ZERO(&writefds);
  FD_SET(listenfd, &readfds);

  fd_set temprfds = readfds;
  fd_set tempwfds = writefds;
  int maxfd = listenfd;


  int nready;
  char buf[MAXNFD][BUFSIZE] = {0};
  while(1){
    temprfds = readfds;
    tempwfds = writefds;

    nready = select(maxfd+1, &temprfds, &tempwfds, NULL, NULL)
    if(FD_ISSET(listenfd, &temprfds)) {      
      // If it's listening for listenfd it accept
      int sockfd = accept(listenfd, (struct sockaddr*)&clientaddr, &len);
      
      // Will the new accept the scokfd Join the listening collection and hold maxfd As the largest fd
      FD_SET(sockfd, &readfds);
      maxfd = maxfd>sockfd?maxfd:sockfd;
      
      // If comments are checked nready a fd There's no need to wait any longer. Just go ahead 1 A loop 
      if(--nready==0)
        continue;
    }
    
    int fd = 0;
    // Traverses the file descriptor table, processing the received message 
    for(;fd<=maxfd; fd++) {   
      if(fd == listenfd)
        continue;

      if(FD_ISSET(fd, &temprfds)){
        int ret = read(fd, buf[fd], sizeof buf[0]);
        if(0 == ret) {   // The client link has been broken 
          close(fd);
          FD_CLR(fd, &readfds);
          if(maxfd==fd) 
            --maxfd;
          continue;
        }
        // will fd Adds listening to writable collections 
        FD_SET(fd, &writefds); 
      }
      // The receiver of the message was found socket the fd , and then add it to the monitor write fd_set In the 
      // next 1 time while() Loop start monitoring 
      if(FD_ISSET(fd, &tempwfds)){
        int ret = write(fd, buf[fd], sizeof buf[0]);
        printf("ret %d: %d\n", fd, ret);
        FD_CLR(fd, &writefds);
      }
    }
  }
  close(listenfd);
}

poll mechanism

poll is an improved mechanism based on select proposed by System V, which is redesigned for many obvious defects of select, including only traversing the number of triggered file descriptors, no need to backup fd_set, etc

model


struct pollfd  fds   // create 1 a pollfd Array of type 
fds[0].fd        // to fds[0] Put in those that need to be monitored fd
fds[0].events      // to fds[0] Put in those that need to be monitored fd Trigger event 
  POLLIN       //I/O Have input 
  POLLPRI       // There is urgent data to read 
  POLLOUT       //I/O Can write 
  POLLRDHUP      // The current socket connection is disconnected or the socket is half closed 
  POLLERR       // Error conditions ( For output only )
  POLLHUP       // hang ( For output only )
  POLLNVAL      // Invalid request: fd It's not opened ( For output only )

Example _I/O multiplexing concurrent servers


/* ... */

int main()
{
  /* ... */
  struct pollfd myfds[MAXNFD] = {0};
  myfds[0].fd = listenfd;
  myfds[0].events = POLLIN;
  int maxnum = 1;
  
  int nready;
  // To prepare 2 Dimensional array buf , each fd use buf the 1 Okay, data interference 
  char buf[MAXNFD][BUFSIZE] = {0};
  while(1){
    //poll Direct return event triggered fd The number of 
    nready = poll(myfds, maxnum, -1)
    int i = 0;
    for(;i<maxnum; i++){
      //poll By will correspond 2 Base position 1 To indicate that it has been set 
      // If the following conditions are true, represents revent[i] In the POLLIN Who is already a 1 the 
      if(myfds[i].revents & POLLIN){
        if(myfds[i].fd == listenfd){
          int sockfd = accept(listenfd, (struct sockaddr*)&clientaddr, &len);
          // Will the new accept the scokfd Join the listening set 
          myfds[maxnum].fd = sockfd;
          myfds[maxnum].events = POLLIN;
          maxnum++;
          
          // If comments are checked nready a fd Just go straight down 1 A loop 
          if(--nready==0)
            continue;
        }
        else{
          int ret = read(myfds[i].fd, buf[myfds[i].fd], sizeof buf[0]);
          if(0 == ret) {   // If the connection breaks 
            close(myfds[i].fd);
            
             // Initialization marks all file descriptors in the file descriptor table as -1
             //close The file descriptor is also marked as -1
             // Searches for the number from the table when a new descriptor is opened 1 a -1
             //open() That's how you always use the minimum fd
             // This mechanism is not used here for demonstration purposes 
             myfds[i].fd = -1; 
            continue;
          }
          myfds[i].events = POLLOUT;
        }
      }
      else if(myfds[i].revents & POLLOUT){
        int ret = write(myfds[i].fd, buf[myfds[i].fd], sizeof buf[0]);
        myfds[i].events = POLLIN;
      }
    }
  }
  close(listenfd);
}

epoll

epoll based on poll implementation is more robust interfaces, is now the mainstream web server using multiplexing, epoll1 feature is to support EPOLLET (edge-triggered) and EPOLLLT trigger (level), the former said if after reading the buffer and data, so as long as read the end, the rest of the data will be discarded, and the latter said the inside of the data is not abandoned, also in, next time you're reading is EPOLLLT by default

model


epoll_create()     // create epoll object 
struct epoll_event   // Prepares the event struct and the array of event structs 
  event.events
  event.data.fd ...
epoll_ctl()       // configuration epoll object 
epoll_wait()      // monitoring epoll The object of the fd And its corresponding event

Example _I/O multiplexing concurrent servers


/* ... */

int main()
{
  /* ... */
  /*  create epoll object  */
  int epoll_fd = epoll_create(1024);
  
  // To prepare 1 Event structures 
  struct epoll_event event = {0};
  event.events = EPOLLIN;
  event.data.fd = listenfd;  //data is 1 All of them, except fd You can also return other data 
  
  //ctl Is to monitor listenfd Is there a event Be triggered 
  // If it happens, do it event through wait Out of the. 
  // So if event Do not show fd We won't know which one to get in the future fd
  epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listenfd, &event);
  
  struct epoll_event revents[MAXNFD] = {0};
  int nready;
  char buf[MAXNFD][BUFSIZE] = {0};
  while(1){
    //wait Return waiting event Number of occurrences 
    // And put the corresponding event In the event In an array of type 
    nready = epoll_wait(epoll_fd, revents, MAXNFD, -1)
    int i = 0;
    for(;i<nready; i++){
      //wait Through the events , to indicate the occurrence of the event 
      // If the input is available, the following result should be true 
      if(revents[i].events & EPOLLIN){
        // If it is listenfd There is data entry 
        if(revents[i].data.fd == listenfd){
          int sockfd = accept(listenfd, (struct sockaddr*)&clientaddr, &len);
          struct epoll_event event = {0};
          event.events = EPOLLIN;
          event.data.fd = sockfd;
          epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sockfd, &event);
        }
        else{
          int ret = read(revents[i].data.fd, buf[revents[i].data.fd], sizeof buf[0]);
          if(0 == ret){
            close(revents[i].data.fd);
            epoll_ctl(epoll_fd, EPOLL_CTL_DEL, revents[i].data.fd, &revents[i]);
          }
          
          revents[i].events = EPOLLOUT;
          epoll_ctl(epoll_fd, EPOLL_CTL_MOD, revents[i].data.fd, &revents[i]);
        }
      }
      else if(revents[i].events & EPOLLOUT){
        int ret = write(revents[i].data.fd, buf[revents[i].data.fd], sizeof buf[0]);
        revents[i].events = EPOLLIN;
        epoll_ctl(epoll_fd, EPOLL_CTL_MOD, revents[i].data.fd, &revents[i]);
      }
    }
  }
  close(listenfd);
}



Thank you for reading, I hope to help you, thank you for your support of this site!


Related articles: