Details of golang's method for converting multiplex asynchronous io to blocking io

  • 2020-06-07 04:39:32
  • OfStack

preface

This article mainly introduces the golang how to multiplex asynchronous io into blocking io related content, sharing for your reference and learning, the following words are not enough, let's have a look at the detailed introduction:


package main

import (
 "net"
)

func handleConnection(c net.Conn) {
 // Read and write data 
 buffer := make([]byte, 1024)
 c.Read(buffer)
 c.Write([]byte("Hello from server"))
}

func main() {
 l, err := net.Listen("tcp", "host:port")
 if err != nil {
 return
 }
 defer l.Close()
 for {
 c, err := l.Accept()
 if err!= nil {
 return
 }
 go handleConnection(c)
 }
}

For we will write the code, very simple, really golang network part for our hidden too many things, we don't have to like c + + 1 kind to call the underlying socket function, also need not complicated to use epoll io multiplexing related logic, but the above code really as we look in the calling accept and read jam?


// Multiple goroutines may invoke methods on a Conn simultaneously.
// Official note: Multiple goroutines Methods may be called at the same time 1 On the connection, my understanding is the so-called shock effect 
// In other words, you have multiple goroutines Listen with 1 A connection with 1 Events, all of them goroutines Will trigger, 
// This is just my guess, to be tested. 
type Conn interface {
 Read(b []byte) (n int, err error)
 Write(b []byte) (n int, err error)
 Close() error
 LocalAddr() Addr
 RemoteAddr() Addr
 SetDeadline(t time.Time) error
 SetReadDeadline(t time.Time) error
 SetWriteDeadline(t time.Time) error
}

type conn struct {
 fd *netFD
}

Here is another Conn interface, conn implements this interface, there is only one member netFD.


// Network file descriptor.
type netFD struct {
 // locking/lifetime of sysfd + serialize access to Read and Write methods
 fdmu fdMutex

 // immutable until Close
 sysfd  int
 family  int
 sotype  int
 isConnected bool
 net   string
 laddr  Addr
 raddr  Addr

 // wait server
 pd pollDesc
}

func (fd *netFD) accept() (netfd *netFD, err error) {
 //................
 for {
 s, rsa, err = accept(fd.sysfd)
 if err != nil {
 nerr, ok := err.(*os.SyscallError)
 if !ok {
 return nil, err
 }
 switch nerr.Err {
 /*  If the error is EAGAIN instructions Socket Is empty and no data has been read 
     The call fd.pd.WaitRead . */
 case syscall.EAGAIN:
 if err = fd.pd.waitRead(); err == nil {
  continue
 }
 case syscall.ECONNABORTED:
 continue
 }
 return nil, err
 }
 break
 }
 //.........
 // Code too long no longer listed, interested to see go The source code, runtime  Under the fd_unix.go
 return netfd, nil
}

The code snippet above is the accept section. Note here that when there is an error in accept, we check to see if it is syscall.EAGAIN If so, calling WaitRead will wait for the goroutine of the fd currently read until the read event on the fd occurs again. When new data arrives on the socket, the WaitRead call returns, continuing the for loop, thus making the Read call a synchronous "block." Those interested can look at the read and write methods of netFD, which have the same implementation.

All the questions here are focused on pollDesc. What is it?


const (
 pdReady uintptr = 1
 pdWait uintptr = 2
)

// Network poller descriptor.
type pollDesc struct {
 link *pollDesc // in pollcache, protected by pollcache.lock
 lock mutex // protects the following fields
 fd  uintptr
 closing bool
 seq  uintptr // protects from stale timers and ready notifications
 rg  uintptr // pdReady, pdWait, G waiting for read or nil
 rt  timer // read deadline timer (set if rt.f != nil)
 rd  int64 // read deadline
 wg  uintptr // pdReady, pdWait, G waiting for write or nil
 wt  timer // write deadline timer
 wd  int64 // write deadline
 user uint32 // user settable cookie
}

type pollCache struct {
 lock mutex
 first *pollDesc
}

pollDesc network poler is a polling mechanism in Golang for each socket file descriptor. Polling here is not polling in the general sense, but rather the runtime of Golang calls epoll_wait to get the socket file descriptor for all IO events generated after scheduling goroutine or GC has completed or within a specified time. Of course, before runtime polls, the socket file descriptor and the information related to the current goroutine need to be added to the data structure maintained by epoll, and the current goroutine needs to be suspended. When IO is ready, the current goroutine execution needs to be resumed through the file descriptor returned by epoll and the information of goroutine attached to it. Here we can see that there are two variables wg and rg in pollDesc. In fact, we can consider them as semaphore. These two variables have several different states:

pdReady: io ready pdWait: The current goroutine is preparing to hang on the semaphore, but has not yet hung. G pointer: The current goroutine hangs when we change it to point to a pointer to the current goroutine

Continuing with the WaitRead call above, what is go doing here to make the current goroutine hang?


func net_runtime_pollWait(pd *pollDesc, mode int) int {
 err := netpollcheckerr(pd, int32(mode))
 if err != 0 {
 return err
 }
 // As for now only Solaris uses level-triggered IO.
 if GOOS == "solaris" {
 netpollarm(pd, mode)
 }
 for !netpollblock(pd, int32(mode), false) {
 err = netpollcheckerr(pd, int32(mode))
 if err != 0 {
 return err
 }
 // Can happen if timeout has fired and unblocked us,
 // but before we had a chance to run, timeout has been reset.
 // Pretend it has not happened and retry.
 }
 return 0
}


// returns true if IO is ready, or false if timedout or closed
// waitio - wait only for completed IO, ignore errors
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
 // Gets the corresponding one based on the read/write mode pollDesc Read and write semaphore 
 gpp := &pd.rg
 if mode == 'w' {
 gpp = &pd.wg
 }

 for {
 old := *gpp
 // It's ready to go straight back true
 if old == pdReady {
 *gpp = 0
 return true
 }
 if old != 0 {
 throw("netpollblock: double wait")
 }
  // Set up the gpp pdWait
 if atomic.Casuintptr(gpp, 0, pdWait) {
 break
 }
 }

 if waitio || netpollcheckerr(pd, mode) == 0 {
 gopark(netpollblockcommit, unsafe.Pointer(gpp), "IO wait", traceEvGoBlockNet, 5)
 }

 old := atomic.Xchguintptr(gpp, 0)
 if old > pdWait {
 throw("netpollblock: corrupted state")
 }
 return old == pdReady
}

When invoking WaitRead after a period of assembly net_runtime_pollWait above the heaviest call the function, the function call netpollblock function, return true said io is ready for returning false said error or timeout, call in netpollblock gopark function, mcall gopark function calls the function, the function in assembly, specific function is to make the current goroutine hangs, and then to carry out other goroutine executable. At this point the entire process of hanging goroutine is over, so how do you notify goroutine when goroutine is readable? That's what epoll does.


func netpoll(block bool) *g {
 if epfd == -1 {
 return nil
 }
 waitms := int32(-1)
 if !block {
 waitms = 0
 }
 var events [128]epollevent
retry:
 // Maximum listening per session 128 An event 
 n := epollwait(epfd, &events[0], int32(len(events)), waitms)
 if n < 0 {
 if n != -_EINTR {
 println("runtime: epollwait on fd", epfd, "failed with", -n)
 throw("epollwait failed")
 }
 goto retry
 }
 var gp guintptr
 for i := int32(0); i < n; i++ {
 ev := &events[i]
 if ev.events == 0 {
 continue
 }
 var mode int32
 // Read the event 
 if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
 mode += 'r'
 }
 // Write the event 
 if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
 mode += 'w'
 }
 if mode != 0 {
  // the epoll In the data Converted to pollDesc
 pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
 netpollready(&gp, pd, mode)
 }
 }
 if block && gp == 0 {
 goto retry
 }
 return gp.ptr()
}

Here's the familiar code. epoll looks a lot more user-friendly. pd:=*(**pollDesc)(unsafe.Pointer(&ev.data)) This is the key sentence. Here we get pollDesc for the current readability time. As we said above, goroutine will hang when the read/write semaphore of pollDesc is saved as G pointer. Here we call the netpollready function, in which the corresponding read/write semaphore G pointer is erased and set to pdReady, the state of ES119en-ES120en is erased, and the current goroutine pointer is put into the runnable queue, so goroutine is woken up.

Can see although we write tcp server seemingly a blocking network model, in its underlying is actually based on asynchronous multiplexing mechanism to realize, just like jam io encapsulate it into development mode, the asynchronous io, so is that we don't have to pay attention to these complex concepts such as multiplexing and chaos of the callback function.

conclusion


Related articles: