Condition variables for the go language synchronization tutorial

  • 2020-06-15 09:18:17
  • OfStack

Go's standard library has a type called a conditional variable: sync.Cond. Unlike mutexes and read-write locks, this type is not available out of the box and needs to be used in combination with a mutex:


// NewCond returns a new Cond with Locker l.
func NewCond(l Locker) *Cond {
 return &Cond{L: l}
}

// A Locker represents an object that can be locked and unlocked.
type Locker interface {
 Lock()
 Unlock()
}

By using the NewCond function, we can return the result of type * sync.Cond. * sync.Cond we mainly operate on its three methods, which are:

[

wait() : Waiting for notification

Signal() : Unicast notification

Broadcast() : Broadcast announcements

]

The specific functions are described as follows:


// Wait atomically unlocks c.L and suspends execution
// of the calling goroutine. After later resuming execution,
// Wait locks c.L before returning. Unlike in other systems,
// Wait cannot return unless awoken by Broadcast or Signal.
//
// Because c.L is not locked when Wait first resumes, the caller
// typically cannot assume that the condition is true when
// Wait returns. Instead, the caller should Wait in a loop:
//
// c.L.Lock()
// for !condition() {
//  c.Wait()
// }
// ... make use of condition ...
// c.L.Unlock()
//
func (c *Cond) Wait() {
 c.checker.check()
 t := runtime_notifyListAdd(&c.notify)
 c.L.Unlock()
 runtime_notifyListWait(&c.notify, t)
 c.L.Lock()
}

// Signal wakes one goroutine waiting on c, if there is any.
//
// It is allowed but not required for the caller to hold c.L
// during the call.
func (c *Cond) Signal() {
 c.checker.check()
 runtime_notifyListNotifyOne(&c.notify)
}

// Broadcast wakes all goroutines waiting on c.
//
// It is allowed but not required for the caller to hold c.L
// during the call.
func (c *Cond) Broadcast() {
 c.checker.check()
 runtime_notifyListNotifyAll(&c.notify)
}

The condition variable sync.Cond is essentially a synchronization mechanism for threads that are waiting for a condition.

sync Cond mainly to achieve a condition variable, if goroutine A before execution needs to wait for the other goroutine B notice, there are waiting goroutine A are saved in a notification list, that is to say, need some kind of variable state goroutine A will wait/Wait there, when a moment when the state changes responsible for notifying goroutine B (by means of condition variables, notify Broadcast, Signal) to notify goroutine A in the wait condition variable so that there is first a synchronization mechanism for "message notification".

Take the http processing of go as an example. As shown in the source code of Go http module server part, when a new connection needs to be processed, if the connection of conn is implemented from * tls. Conn, the client side and the server side will "shake hands" processing of Handshake(), the entry code is as follows:


if tlsConn, ok := c.rwc.(*tls.Conn); ok {
  if d := c.server.ReadTimeout; d != 0 {
   c.rwc.SetReadDeadline(time.Now().Add(d))
  }
  if d := c.server.WriteTimeout; d != 0 {
   c.rwc.SetWriteDeadline(time.Now().Add(d))
  }
  if err := tlsConn.Handshake(); err != nil {
   c.server.logf("http: TLS handshake error from %s: %v", c.rwc.RemoteAddr(), err)
   return
  }
  c.tlsState = new(tls.ConnectionState)
  *c.tlsState = tlsConn.ConnectionState()
  if proto := c.tlsState.NegotiatedProtocol; validNPN(proto) {
   if fn := c.server.TLSNextProto[proto]; fn != nil {
    h := initNPNRequest{tlsConn, serverHandler{c.server}}
    fn(c.server, tlsConn, h)
   }
   return
  }
 }

The Handshake function code handles the synchronization of new connection handshake calls by using condition variables:


func (c *Conn) Handshake() error {
 c.handshakeMutex.Lock()
 defer c.handshakeMutex.Unlock()

 for {
  if err := c.handshakeErr; err != nil {
   return err
  }
  if c.handshakeComplete {
   return nil
  }
  if c.handshakeCond == nil {
   break
  }

  c.handshakeCond.Wait()
 }

 c.handshakeCond = sync.NewCond(&c.handshakeMutex)
 c.handshakeMutex.Unlock()

 c.in.Lock()
 defer c.in.Unlock()

 c.handshakeMutex.Lock()

 if c.handshakeErr != nil || c.handshakeComplete {
  panic("handshake should not have been able to complete after handshakeCond was set")
 }

 if c.isClient {
  c.handshakeErr = c.clientHandshake()
 } else {
  c.handshakeErr = c.serverHandshake()
 }
 if c.handshakeErr == nil {
  c.handshakes++
 } else {
  c.flush()
 }

 if c.handshakeErr == nil && !c.handshakeComplete {
  panic("handshake should have had a result.")
 }

 c.handshakeCond.Broadcast()
 c.handshakeCond = nil

 return c.hand

We can also get familiar with the use of ES68en. Cond through another example:

We are trying to implement an example of read-write synchronization. The requirement is that we have several readers and several writers, and the reader must rely on the writer to write the data to the cache before it can read the data from the cache. Let's think about how to do something like this, other than using channel.

Every time a writer completes a write, it needs some kind of notification mechanism to broadcast to the blocking reader that it can access the data. With this broadcast mechanism, we can implement this example via ES77en.Cond:


package main

import (
 "bytes"
 "fmt"
 "io"
 "sync"
 "time"
)

type MyDataBucket struct {
 br  *bytes.Buffer
 gmutex *sync.RWMutex
 rcond *sync.Cond // The conditional variables required by the read operation 
}

func NewDataBucket() *MyDataBucket {
 buf := make([]byte, 0)
 db := &MyDataBucket{
  br:  bytes.NewBuffer(buf),
  gmutex: new(sync.RWMutex),
 }
 db.rcond = sync.NewCond(db.gmutex.RLocker())
 return db
}

func (db *MyDataBucket) Read(i int) {
 db.gmutex.RLock()
 defer db.gmutex.RUnlock()
 var data []byte
 var d byte
 var err error
 for {
  // read 1 bytes 
  if d, err = db.br.ReadByte(); err != nil {
   if err == io.EOF {
    if string(data) != "" {
     fmt.Printf("reader-%d: %s\n", i, data)
    }
    db.rcond.Wait()
    data = data[:0]
    continue
   }
  }
  data = append(data, d)
 }
}

func (db *MyDataBucket) Put(d []byte) (int, error) {
 db.gmutex.Lock()
 defer db.gmutex.Unlock()
 // write 1 A data block 
 n, err := db.br.Write(d)
 db.rcond.Broadcast()
 return n, err
}

func main() {
 db := NewDataBucket()

 go db.Read(1)

 go db.Read(2)

 for i := 0; i < 10; i++ {
  go func(i int) {
   d := fmt.Sprintf("data-%d", i)
   db.Put([]byte(d))
  }(i)
  time.Sleep(100 * time.Millisecond)
 }
}

There are two points to note when using ES82en.Cond:

Be sure to lock the read/write lock associated with the cond.Wait method before calling it Do not forget to unlock the read-write lock associated with the data after cond.Wait before returning, if the data has been processed.

As shown in the source code of Wait() below, ES91en.Wait automatically releases the lock waiting for the signal to arrive. When the signal arrives, the first Wait to get the signal will continue to execute and re-lock


func (c *Cond) Wait() {
 c.checker.check()
 t := runtime_notifyListAdd(&c.notify)
 c.L.Unlock()
 runtime_notifyListWait(&c.notify, t)
 c.L.Lock()
}

If the lock is not released, the other gouroutine receiving the signal will block and be unable to continue.

conclusion


Related articles: