Detail concurrency limits and timeout controls in Golang

  • 2020-06-15 09:13:08
  • OfStack

preface

As mentioned last time in writing a lightweight ssh batch tool with Go, we need to limit the concurrency when doing Golang concurrency and have timeout control for goroutine execution. I'm not going to go into that, but let's talk about it a little bit.

The following sample code all runs the tests directly on The Go Playground:

concurrent

Let's start by running a simple concurrency test


package main

import (
  "fmt"
  "time"
)

func run(task_id, sleeptime int, ch chan string) {

  time.Sleep(time.Duration(sleeptime) * time.Second)
  ch <- fmt.Sprintf("task id %d , sleep %d second", task_id, sleeptime)
  return
}

func main() {
  input := []int{3, 2, 1}
  ch := make(chan string)
  startTime := time.Now()
  fmt.Println("Multirun start")
  for i, sleeptime := range input {
    go run(i, sleeptime, ch)
  }

  for range input {
    fmt.Println(<-ch)
  }

  endTime := time.Now()
  fmt.Printf("Multissh finished. Process time %s. Number of tasks is %d", endTime.Sub(startTime), len(input))
}

The function run() takes the input parameter, sleep, for a number of seconds. It is then executed concurrently via the go keyword, and the results are returned via channel.

As the name implies, channel is the "conduit" for communication between goroutine. The flow of data in the pipeline is actually a memory share between goroutine. It allows us to interact with data between goroutine.


ch <- xxx //  to  channel  Write data 
<- ch //  from  channel  Medium read data 

channel is divided into buffering (unbuffered) and buffering (buffered). For example, we just created a bufferless channel as follows.


ch := make(chan string)

channel buffer, which we'll talk about in 1, but let's look at the result of the execution.


Multirun start
task id 2 , sleep 1 second
task id 1 , sleep 2 second
task id 0 , sleep 3 second
Multissh finished. Process time 3s. Number of tasks is 3
Program exited.

3 goroutine 'sleep 3, 2, 1 second respectively. But the total time is only three seconds. So concurrency works, go's concurrency is that simple.

According to the sequence returns

In the previous example, I executed the tasks in order 0,1,2. But the order returned from channel is 2,1,0. This makes sense, since task 2 is the fastest, so it first returns into channel, task 1 next, and task 0 the slowest.

What if we want to return data in the order in which the tasks are executed? You can do this with an channel array (well, slice), like this


package main

import (
  "fmt"
  "time"
)

func run(task_id, sleeptime int, ch chan string) {

  time.Sleep(time.Duration(sleeptime) * time.Second)
  ch <- fmt.Sprintf("task id %d , sleep %d second", task_id, sleeptime)
  return
}

func main() {
  input := []int{3, 2, 1}
  chs := make([]chan string, len(input))
  startTime := time.Now()
  fmt.Println("Multirun start")
  for i, sleeptime := range input {
    chs[i] = make(chan string)
    go run(i, sleeptime, chs[i])
  }

  for _, ch := range chs {
    fmt.Println(<-ch)
  }

  endTime := time.Now()
  fmt.Printf("Multissh finished. Process time %s. Number of tasks is %d", endTime.Sub(startTime), len(input))
}

The result of running the output is now in order with the input in order 1.

[

Multirun start
task id 0 , sleep 3 second
task id 1 , sleep 2 second
task id 2 , sleep 1 second
Multissh finished. Process time 3s. Number of tasks is 3
Program exited.

]

Timeout control

We didn't think about timeouts in this example. However, if an goroutine takes too long to run, it will almost certainly cause the host goroutine to be blocked and the entire program to hang there. So we need to have timeout control.

We can usually do timeout checking with select + time.After, for example by adding a function Run() to perform go run() in Run(). select + ES93en. After to determine the timeout.


package main

import (
  "fmt"
  "time"
)

func Run(task_id, sleeptime, timeout int, ch chan string) {
  ch_run := make(chan string)
  go run(task_id, sleeptime, ch_run)
  select {
  case re := <-ch_run:
    ch <- re
  case <-time.After(time.Duration(timeout) * time.Second):
    re := fmt.Sprintf("task id %d , timeout", task_id)
    ch <- re
  }
}

func run(task_id, sleeptime int, ch chan string) {

  time.Sleep(time.Duration(sleeptime) * time.Second)
  ch <- fmt.Sprintf("task id %d , sleep %d second", task_id, sleeptime)
  return
}

func main() {
  input := []int{3, 2, 1}
  timeout := 2
  chs := make([]chan string, len(input))
  startTime := time.Now()
  fmt.Println("Multirun start")
  for i, sleeptime := range input {
    chs[i] = make(chan string)
    go Run(i, sleeptime, timeout, chs[i])
  }

  for _, ch := range chs {
    fmt.Println(<-ch)
  }
  endTime := time.Now()
  fmt.Printf("Multissh finished. Process time %s. Number of task is %d", endTime.Sub(startTime), len(input))
}

As a result, task 0 and task 1 have timed out

[

Multirun start
task id 0 , timeout
task id 1 , timeout
tasi id 2 , sleep 1 second
Multissh finished. Process time 2s. Number of task is 3
Program exited.

]

Concurrency limit

If the number of tasks is too large and goroutine is turned on in unrestricted concurrency, it can become too resource-intensive and the server can explode. So the concurrency limitation in the real world is also something 1 must do.

One common approach is to take advantage of channel's buffering mechanism -- the one we mentioned in the beginning.

Let's create a buffered and unbuffered channel to see


ch := make(chan string) //  This is a 1 An unbuffered one  channel Or the buffer length is  0
ch := make(chan string, 1) //  This is a 1 A buffered one  channel,  The buffer length is  1 

The difference is that if the channel has no buffer, or the buffer is full. goroutine blocks until the data in channel is read. For example


package main

import (
  "fmt"
)

func main() {
  ch := make(chan string)
  ch <- "123"
  fmt.Println(<-ch)
}

This code execution will report an error

[

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan send]:
main.main()
/tmp/sandbox531498664/main.go:9 +0x60

Program exited.

]

This is because the ch we created is a bufferless channel. So in execution to ch < -"123", this goroutine blocks, and the following ES159en.Println ( < -ch) there is no way to enforce it. So deadlock error will be reported.

If we change it to this, the program will execute


package main

import (
  "fmt"
)

func main() {
  ch := make(chan string, 1)
  ch <- "123"
  fmt.Println(<-ch)
}

perform

[

123

Program exited.

]

If we change it to this


package main

import (
  "fmt"
)

func main() {
  ch := make(chan string, 1)
  ch <- "123"
  ch <- "123"
  fmt.Println(<-ch)
  fmt.Println(<-ch)
}

Despite reading channel twice, the program still deadlocks because the buffer is full and the goroutine block hangs. The second ch < - there's no way to write "123".

[

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan send]:
main.main()
/tmp/sandbox642690323/main.go:10 +0x80

Program exited.

]

Therefore, using the buffer setting of channel, we can implement the concurrency limitation. We just need to write something to a buffered channel (whatever it is, it doesn't matter) while executing concurrency. Let the concurrent goroutine read out the channel after execution. The total amount of concurrency is then limited to the channel buffer size.

For example, we could use a buffered channel of type bool as a concurrency limited counter.


ch <- xxx //  to  channel  Write data 
<- ch //  from  channel  Medium read data 
0

Then, for every new goroutine created in concurrent execution, something is added to chLimit.


ch <- xxx //  to  channel  Write data 
<- ch //  from  channel  Medium read data 
1

The newly constructed functions are executed concurrently with the go keyword. After executing the original Run(), he consumes one of the chLimit buffers.


ch <- xxx //  to  channel  Write data 
<- ch //  from  channel  Medium read data 
2

This is done when the number of goroutine created reaches the buffer limit of chLimit. The primary goroutine hangs and blocks until these goroutine have finished executing and consume the data in the chLimit buffer, and the program continues to create new goroutine. Our goal of concurrency limitation has been achieved.

The following is the complete code


package main

import (
  "fmt"
  "time"
)

func Run(task_id, sleeptime, timeout int, ch chan string) {
  ch_run := make(chan string)
  go run(task_id, sleeptime, ch_run)
  select {
  case re := <-ch_run:
    ch <- re
  case <-time.After(time.Duration(timeout) * time.Second):
    re := fmt.Sprintf("task id %d , timeout", task_id)
    ch <- re
  }
}

func run(task_id, sleeptime int, ch chan string) {

  time.Sleep(time.Duration(sleeptime) * time.Second)
  ch <- fmt.Sprintf("task id %d , sleep %d second", task_id, sleeptime)
  return
}

func main() {
  input := []int{3, 2, 1}
  timeout := 2
  chLimit := make(chan bool, 1)
  chs := make([]chan string, len(input))
  limitFunc := func(chLimit chan bool, ch chan string, task_id, sleeptime, timeout int) {
    Run(task_id, sleeptime, timeout, ch)
    <-chLimit
  }
  startTime := time.Now()
  fmt.Println("Multirun start")
  for i, sleeptime := range input {
    chs[i] = make(chan string, 1)
    chLimit <- true
    go limitFunc(chLimit, chs[i], i, sleeptime, timeout)
  }

  for _, ch := range chs {
    fmt.Println(<-ch)
  }
  endTime := time.Now()
  fmt.Printf("Multissh finished. Process time %s. Number of task is %d", endTime.Sub(startTime), len(input))
}

The results

[

Multirun start
task id 0 , timeout
task id 1 , timeout
task id 2 , sleep 1 second
Multissh finished. Process time 5s. Number of task is 3
Program exited.

]

The buffer for chLimit is 1. task 0 and task 1 timed out 2 seconds. The task 2 takes 1 second. The total time is 5 seconds. The concurrency limitation is in effect.

If we change the concurrency limit to 2


ch <- xxx //  to  channel  Write data 
<- ch //  from  channel  Medium read data 
4

The results

[

Multirun start
task id 0 , timeout
task id 1 , timeout
task id 2 , sleep 1 second
Multissh finished. Process time 3s. Number of task is 3
Program exited.

]

task 0, task 1 execute concurrently, taking 2 seconds. task 2 takes one second. The total time is 3 seconds. As expected.

Notice a difference in the code. Here, a buffered channel is used


ch <- xxx //  to  channel  Write data 
<- ch //  from  channel  Medium read data 
5

Remember the example above. If channel is not buffered, the goroutine will be blocked and suspended until it is consumed.

However, if the concurrency limit here, chLimit, is in effect blocking the master goroutine, then the code that consumes this data later will not execute to... So deadlock la!


ch <- xxx //  to  channel  Write data 
<- ch //  from  channel  Medium read data 
6

So just give him a buffer.


Related articles: