Use singleflight of Golang to prevent cache breakdown

  • 2020-10-31 21:47:52
  • OfStack

Cache breakdowns can easily occur when caching is used.

Cache breakdown: 1 existing key. At the moment when the cache expires, a large number of requests come at the same time, causing all the requests to read dB. These requests will be broken down to DB, resulting in a large number of requests from DB and a sudden increase in pressure.

singleflight

introduce


import "golang.org/x/sync/singleflight"

The method of singleflight class creates a new singleflight.Group and USES its method Do or DoChan to wrap the method. The wrapped method will only have one coroutine executed for the same key, and the other coroutines will get the same result after the execution of that coroutine.

Group structure

Represents a class 1 job where the same key can only be performed once at the same time in the same group.

Do method


func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool)

key: With one key, only one coprogram is executed at the same time.

fn: Wrapped functions.

v: Return value, the result of execution. All other waiting coroutines are taken.

shared: Indicates whether any other coroutines obtained this result v.

DoChan method


func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result

Like Do method 1, only one channel is returned, the execution result is sent to channel, and other waiting coroutines can get the result from channel.

ref: https: / / godoc org/golang org/x/sync/singleflight

The sample

The Do method is used to simulate and solve the cache breakdown problem


func main() {

  var singleSetCache singleflight.Group

  getAndSetCache:=func (requestID int,cacheKey string) (string, error) {

  log.Printf("request %v start to get and set cache...",requestID)

  value,_, _ :=singleSetCache.Do(cacheKey, func() (ret interface{}, err error) {//do The arguments key , you can use the cache directly key So that the same 1 Three caches, only one 1 The individual coroutines will read DB

    log.Printf("request %v is setting cache...",requestID)

     time.Sleep(3*time._Second_)

     log.Printf("request %v set cache success!",requestID)

    return "VALUE",nil

   })

  return value.(string),nil

  }

  cacheKey:="cacheKey"

  for i:=1;i<10;i++{// Simulates multiple simultaneous requests 

  go func(requestID int) {

     value,_:=getAndSetCache(requestID,cacheKey)

     log.Printf("request %v get value: %v",requestID,value)

   }(i)

  }

  time.Sleep(20*time._Second_)
}

Output:

[

2020/04/12 18:18:40 request 4 start to get and set cache...

2020/04/12 18:18:40 request 4 is setting cache...

2020/04/12 18:18:40 request 2 start to get and set cache...

2020/04/12 18:18:40 request 7 start to get and set cache...

2020/04/12 18:18:40 request 5 start to get and set cache...

2020/04/12 18:18:40 request 1 start to get and set cache...

2020/04/12 18:18:40 request 6 start to get and set cache...

2020/04/12 18:18:40 request 3 start to get and set cache...

2020/04/12 18:18:40 request 8 start to get and set cache...

2020/04/12 18:18:40 request 9 start to get and set cache...

2020/04/12 18:18:43 request 4 set cache success!

2020/04/12 18:18:43 request 4 get value: VALUE

2020/04/12 18:18:43 request 9 get value: VALUE

2020/04/12 18:18:43 request 6 get value: VALUE

2020/04/12 18:18:43 request 3 get value: VALUE

2020/04/12 18:18:43 request 8 get value: VALUE

2020/04/12 18:18:43 request 1 get value: VALUE

2020/04/12 18:18:43 request 5 get value: VALUE

2020/04/12 18:18:43 request 2 get value: VALUE

2020/04/12 18:18:43 request 7 get value: VALUE`

]

You can see that only 1 coroutine actually executes the wrapped function, and all the others get the result.

Source code analysis

See how this Do method is implemented.

First, let's look at the structure of Group:


type Group struct {

  mu sync.Mutex   

  m map[string]*call // save key The corresponding function executes the procedure and the result of the variable. 

}

The structure of Group is very simple, with one lock to keep the concurrency safe and one map to hold the variables corresponding to key's function execution and results.

Look at the structure of call:


type call struct {

  wg sync.WaitGroup // with WaitGroup Realize only 1 Individual coroutine execution functions 

  val interface{} // Function execution result 

  err error

  forgotten bool

  dups int // The meaning is duplications , that is, simultaneously executes the same 1 a key The number of coincidences 

  chans []chan<- Result
}

Let's look at the Do method


func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {

  g.mu.Lock()// write Group the m Field, lock to ensure write security. 

  if g.m == nil {

  g.m = make(map[string]*call)

  }

if c, ok := g.m[key]; ok {// if key Exists, indicates that a coroutine has been executed, then dups++ , and waits for its execution to complete, returns its execution result, the execution result is saved in the corresponding call the val In the field 

   c.dups++

   g.mu.Unlock()

   c.wg.Wait()

 return c.val, c.err, true

  }

// if key If it doesn't exist, create it 1 a call And the use of WaitGroup To block other coroutines at the same time m Field to say key And the corresponding call

c := new(call)

  c.wg.Add(1)

  g.m[key] = c

  g.mu.Unlock()

  g.doCall(c, key, fn)// The first 1 The coroutines come in to execute this function 

return c.val, c.err, c.dups > 0

}

Let's move on to g. What does doCall do


func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {

  c.val, c.err = fn()// Executes the wrapped function 

  c.wg.Done()// After the execution, you can notify the other coroutines to get the result 

  g.mu.Lock()

if !c.forgotten {// I'm actually doing this to make sure that when I'm done, I'm going to do this key Be deleted, Group There are 1 A method of Forget ( key string ), which can be used for active deletion key , to determine if that method has been called, and if it has been called forgotten Will be set to true , if it has not been called, put it here key Delete it. 

  delete(g.m, key)

  }

  for _, ch := range c.chans {// Sends the result of the execution to channel Li, here is for DoChan Method used 

  ch <- Result{c.val, c.err, c.dups > 0}

  }

  g.mu.Unlock()

}

From this perspective, the implementation is quite simple. You have to admire a hundred lines of code to achieve the function.

other

An example of using the DoChan method is attached:


func main() {

  var singleSetCache singleflight.Group

  getAndSetCache:=func (requestID int,cacheKey string) (string, error) {

  log.Printf("request %v start to get and set cache...",requestID)

  retChan:=singleSetCache.DoChan(cacheKey, func() (ret interface{}, err error) {

    log.Printf("request %v is setting cache...",requestID)

    time.Sleep(3*time._Second_)

    log.Printf("request %v set cache success!",requestID)

    return "VALUE",nil

   })

  var ret singleflight.Result

  timeout := time.After(5 * time._Second_)

  select {// Timeout mechanism has been added 

    case <-timeout:

      log.Printf("time out!")

      return "",errors.New("time out")

    case ret =<- retChan:// from chan Take out the result 

      return ret.Val.(string),ret.Err

   }

  return "",nil

  }

  cacheKey:="cacheKey"
  
  for i:=1;i<10;i++{

  go func(requestID int) {

     value,_:=getAndSetCache(requestID,cacheKey)

     log.Printf("request %v get value: %v",requestID,value)

   }(i)

  }

  time.Sleep(20*time._Second_)

}

Take a look at the DoChan source code


func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {

  ch := make(chan Result, 1)

  g.mu.Lock()

  if g.m == nil {

  g.m = make(map[string]*call)

  }

  if c, ok := g.m[key]; ok {

   c.dups++

c.chans = append(c.chans, ch)// As you can see, for each of the waiting coroutines, there are 1 results channel . From the previous g.doCall You can also see in each channel All the results were canceled. Why not share all coroutines 1 a channel ? Because then you have to be here channel The riser has at least the number of coincidences 1 The number of results, but you can't guarantee the user 1 The individual coroutines read only 1 Times. 

   g.mu.Unlock()

   return ch

  }

  c := &call{chans: []chan<- Result{ch}}

  c.wg.Add(1)

  g.m[key] = c

  g.mu.Unlock()

  go g.doCall(c, key, fn)

  return ch
}

Related articles: