Go language Synchronous and asynchronous multi task encapsulation details of Runner and RunnerAsync

  • 2020-06-15 09:14:07
  • OfStack

preface

Synchronous is suitable for multiple consecutive executions, each step is dependent on the previous step, and asynchronous execution is independent of the order of task execution (such as fetching data from 10 sites).

Synchronize the execution of class RunnerAsync

Support return timeout detection, system interrupt detection

Error constant definition


// Timeout error 
var ErrTimeout = errors.New("received timeout")
// Operating system system interrupt error 
var ErrInterrupt = errors.New("received interrupt")

The implementation code is as follows


package task
import (
 "os"
 "time"
 "os/signal"
 "sync"
)
 
// Asynchronous execution of tasks 
type Runner struct {
 // Operating system signal detection 
 interrupt chan os.Signal
 // Record the status of execution completion 
 complete chan error
 // Timeout detection 
 timeout <-chan time.Time
 // Save all tasks to be performed , sequential 
 tasks []func(id int) error
 waitGroup sync.WaitGroup
 lock sync.Mutex
 errs []error
}
 
//new1 a Runner object 
func NewRunner(d time.Duration) *Runner {
 return &Runner{
 interrupt: make(chan os.Signal, 1),
 complete: make(chan error),
 timeout: time.After(d),
 waitGroup: sync.WaitGroup{},
 lock: sync.Mutex{},
 }
}
 
// add 1 A task 
func (this *Runner) Add(tasks ...func(id int) error) {
 this.tasks = append(this.tasks, tasks...)
}
 
// Start the Runner , listen for error messages 
func (this *Runner) Start() error {
 // Receive operating system signals 
 signal.Notify(this.interrupt, os.Interrupt)
 // Concurrent execution of tasks 
 go func() {
 this.complete <- this.Run()
 }()
 select {
 // Return execution results 
 case err := <-this.complete:
 return err
 // Timeout return 
 case <-this.timeout:
 return ErrTimeout
 }
}
 
// Perform all tasks asynchronously 
func (this *Runner) Run() error {
 for id, task := range this.tasks {
 if this.gotInterrupt() {
  return ErrInterrupt
 }
 this.waitGroup.Add(1)
 go func(id int) {
  this.lock.Lock()
  // Perform a task 
  err := task(id)
  // Lock and save to the result set 
  this.errs = append(this.errs, err)
 
  this.lock.Unlock()
  this.waitGroup.Done()
 }(id)
 }
 this.waitGroup.Wait()
 
 return nil
}
 
// Determines whether an operating system interrupt signal has been received 
func (this *Runner) gotInterrupt() bool {
 select {
 case <-this.interrupt:
 // Stop receiving other signals 
 signal.Stop(this.interrupt)
 return true
 // Normal execution 
 default:
 return false
 }
}
 
// Gets the executed error
func (this *Runner) GetErrs() []error {
 return this.errs
}

Method of use

Add adds a task to receive a closure of type int

Start starts execution injury, returns 1 error type, nil is completed, ErrTimeout is timeout, ErrInterrupt is interrupted (similar to Ctrl + C operation)

Test sample code


package task
import (
 "testing"
 "time"
 "fmt"
 "os"
 "runtime"
)
 
func TestRunnerAsync_Start(t *testing.T) {
 // Open the multicore 
 runtime.GOMAXPROCS(runtime.NumCPU())
 // create runner Object to set the timeout period 
 runner := NewRunnerAsync(8 * time.Second)
 // Add running tasks 
 runner.Add(
 createTaskAsync(),
 createTaskAsync(),
 createTaskAsync(),
 createTaskAsync(),
 createTaskAsync(),
 createTaskAsync(),
 createTaskAsync(),
 createTaskAsync(),
 createTaskAsync(),
 createTaskAsync(),
 createTaskAsync(),
 createTaskAsync(),
 createTaskAsync(),
 )
 fmt.Println(" Perform tasks synchronously ")
 // Set to work 
 if err := runner.Start(); err != nil {
 switch err {
 case ErrTimeout:
  fmt.Println(" timeouts ")
  os.Exit(1)
 case ErrInterrupt:
  fmt.Println(" Task interrupted ")
  os.Exit(2)
 }
 }
 t.Log(" Perform the end ")
}
 
// Create the task to perform 
func createTaskAsync() func(id int) {
 return func(id int) {
 fmt.Printf(" Being performed %v A task \n", id)
 // Simulated task execution ,sleep Two seconds 
 //time.Sleep(1 * time.Second)
 }
}

The execution result


 Perform tasks synchronously 
 Being performed 0 A task 
 Being performed 1 A task 
 Being performed 2 A task 
 Being performed 3 A task 
 Being performed 4 A task 
 Being performed 5 A task 
 Being performed 6 A task 
 Being performed 7 A task 
 Being performed 8 A task 
 Being performed 9 A task 
 Being performed 10 A task 
 Being performed 11 A task 
 Being performed 12 A task 
 runnerAsync_test.go:49:  Perform the end 

Asynchronous execution of class Runner

Support return timeout detection, system interrupt detection

The implementation code is as follows


package task
import (
 "os"
 "time"
 "os/signal"
 "sync"
)
 
// Asynchronous execution of tasks 
type Runner struct {
 // Operating system signal detection 
 interrupt chan os.Signal
 // Record the status of execution completion 
 complete chan error
 // Timeout detection 
 timeout <-chan time.Time
 // Save all tasks to be performed , sequential 
 tasks []func(id int) error
 waitGroup sync.WaitGroup
 lock sync.Mutex
 errs []error
}
 
//new1 a Runner object 
func NewRunner(d time.Duration) *Runner {
 return &Runner{
  interrupt: make(chan os.Signal, 1),
  complete: make(chan error),
  timeout: time.After(d),
  waitGroup: sync.WaitGroup{},
  lock:  sync.Mutex{},
 }
}
 
// add 1 A task 
func (this *Runner) Add(tasks ...func(id int) error) {
 this.tasks = append(this.tasks, tasks...)
}
 
// Start the Runner , listen for error messages 
func (this *Runner) Start() error {
 // Receive operating system signals 
 signal.Notify(this.interrupt, os.Interrupt)
 // Concurrent execution of tasks 
 go func() {
  this.complete <- this.Run()
 }()
 select {
 // Return execution results 
 case err := <-this.complete:
  return err
  // Timeout return 
 case <-this.timeout:
  return ErrTimeout
 }
}
 
// Perform all tasks asynchronously 
func (this *Runner) Run() error {
 for id, task := range this.tasks {
  if this.gotInterrupt() {
   return ErrInterrupt
  }
  this.waitGroup.Add(1)
  go func(id int) {
   this.lock.Lock()
   // Perform a task 
   err := task(id)
   // Lock and save to the result set 
   this.errs = append(this.errs, err)
   this.lock.Unlock()
   this.waitGroup.Done()
  }(id)
 }
 this.waitGroup.Wait()
 return nil
}
 
// Determines whether an operating system interrupt signal has been received 
func (this *Runner) gotInterrupt() bool {
 select {
 case <-this.interrupt:
  // Stop receiving other signals 
  signal.Stop(this.interrupt)
  return true
  // Normal execution 
 default:
  return false
 }
}
 
// Gets the executed error
func (this *Runner) GetErrs() []error {
 return this.errs
}

Method of use

Add adds a task to receive type int and returns a closure of type error

Start starts execution injury and returns 1 error type. nil means completion of execution, ErrTimeout means execution timeout, ErrInterrupt means execution is interrupted (similar to Ctrl + C operation)

getErrs gets all task execution results

Test sample code


package task
import (
 "testing"
 "time"
 "fmt"
 "os"
 "runtime"
)
 
func TestRunner_Start(t *testing.T) {
 // Turn on multicore 
 runtime.GOMAXPROCS(runtime.NumCPU())
 // create runner Object to set the timeout period 
 runner := NewRunner(18 * time.Second)
 // Add running tasks 
 runner.Add(
  createTask(),
  createTask(),
  createTask(),
  createTask(),
  createTask(),
  createTask(),
  createTask(),
  createTask(),
  createTask(),
  createTask(),
  createTask(),
  createTask(),
  createTask(),
  createTask(),
 )
 fmt.Println(" Asynchronous execution of tasks ")
 // Set to work 
 if err := runner.Start(); err != nil {
  switch err {
  case ErrTimeout:
   fmt.Println(" timeouts ")
   os.Exit(1)
  case ErrInterrupt:
   fmt.Println(" Task interrupted ")
   os.Exit(2)
  }
 }
 t.Log(" Perform the end ")
 t.Log(runner.GetErrs())
}
 
// Create the task to perform 
func createTask() func(id int) error {
 return func(id int) error {
  fmt.Printf(" Being performed %v A task \n", id)
  // Simulated task execution ,sleep
  //time.Sleep(1 * time.Second)
  return nil
 }
}

The execution result


 Asynchronous execution of tasks 
 Being performed 2 A task 
 Being performed 1 A task 
 Being performed 4 A task 
 Being performed 3 A task 
 Being performed 6 A task 
 Being performed 5 A task 
 Being performed 9 A task 
 Being performed 7 A task 
 Being performed 10 A task 
 Being performed 13 A task 
 Being performed 8 A task 
 Being performed 11 A task 
 Being performed 12 A task 
 Being performed 0 A task 
 runner_test.go:49:  Perform the end 
 runner_test.go:51: [<nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil>]

conclusion


Related articles: