Go language Synchronous and asynchronous multi task encapsulation details of Runner and RunnerAsync
- 2020-06-15 09:14:07
- OfStack
preface
Synchronous is suitable for multiple consecutive executions, each step is dependent on the previous step, and asynchronous execution is independent of the order of task execution (such as fetching data from 10 sites).
Synchronize the execution of class RunnerAsync
Support return timeout detection, system interrupt detection
Error constant definition
// Timeout error
var ErrTimeout = errors.New("received timeout")
// Operating system system interrupt error
var ErrInterrupt = errors.New("received interrupt")
The implementation code is as follows
package task
import (
"os"
"time"
"os/signal"
"sync"
)
// Asynchronous execution of tasks
type Runner struct {
// Operating system signal detection
interrupt chan os.Signal
// Record the status of execution completion
complete chan error
// Timeout detection
timeout <-chan time.Time
// Save all tasks to be performed , sequential
tasks []func(id int) error
waitGroup sync.WaitGroup
lock sync.Mutex
errs []error
}
//new1 a Runner object
func NewRunner(d time.Duration) *Runner {
return &Runner{
interrupt: make(chan os.Signal, 1),
complete: make(chan error),
timeout: time.After(d),
waitGroup: sync.WaitGroup{},
lock: sync.Mutex{},
}
}
// add 1 A task
func (this *Runner) Add(tasks ...func(id int) error) {
this.tasks = append(this.tasks, tasks...)
}
// Start the Runner , listen for error messages
func (this *Runner) Start() error {
// Receive operating system signals
signal.Notify(this.interrupt, os.Interrupt)
// Concurrent execution of tasks
go func() {
this.complete <- this.Run()
}()
select {
// Return execution results
case err := <-this.complete:
return err
// Timeout return
case <-this.timeout:
return ErrTimeout
}
}
// Perform all tasks asynchronously
func (this *Runner) Run() error {
for id, task := range this.tasks {
if this.gotInterrupt() {
return ErrInterrupt
}
this.waitGroup.Add(1)
go func(id int) {
this.lock.Lock()
// Perform a task
err := task(id)
// Lock and save to the result set
this.errs = append(this.errs, err)
this.lock.Unlock()
this.waitGroup.Done()
}(id)
}
this.waitGroup.Wait()
return nil
}
// Determines whether an operating system interrupt signal has been received
func (this *Runner) gotInterrupt() bool {
select {
case <-this.interrupt:
// Stop receiving other signals
signal.Stop(this.interrupt)
return true
// Normal execution
default:
return false
}
}
// Gets the executed error
func (this *Runner) GetErrs() []error {
return this.errs
}
Method of use
Add adds a task to receive a closure of type int
Start starts execution injury, returns 1 error type, nil is completed, ErrTimeout is timeout, ErrInterrupt is interrupted (similar to Ctrl + C operation)
Test sample code
package task
import (
"testing"
"time"
"fmt"
"os"
"runtime"
)
func TestRunnerAsync_Start(t *testing.T) {
// Open the multicore
runtime.GOMAXPROCS(runtime.NumCPU())
// create runner Object to set the timeout period
runner := NewRunnerAsync(8 * time.Second)
// Add running tasks
runner.Add(
createTaskAsync(),
createTaskAsync(),
createTaskAsync(),
createTaskAsync(),
createTaskAsync(),
createTaskAsync(),
createTaskAsync(),
createTaskAsync(),
createTaskAsync(),
createTaskAsync(),
createTaskAsync(),
createTaskAsync(),
createTaskAsync(),
)
fmt.Println(" Perform tasks synchronously ")
// Set to work
if err := runner.Start(); err != nil {
switch err {
case ErrTimeout:
fmt.Println(" timeouts ")
os.Exit(1)
case ErrInterrupt:
fmt.Println(" Task interrupted ")
os.Exit(2)
}
}
t.Log(" Perform the end ")
}
// Create the task to perform
func createTaskAsync() func(id int) {
return func(id int) {
fmt.Printf(" Being performed %v A task \n", id)
// Simulated task execution ,sleep Two seconds
//time.Sleep(1 * time.Second)
}
}
The execution result
Perform tasks synchronously
Being performed 0 A task
Being performed 1 A task
Being performed 2 A task
Being performed 3 A task
Being performed 4 A task
Being performed 5 A task
Being performed 6 A task
Being performed 7 A task
Being performed 8 A task
Being performed 9 A task
Being performed 10 A task
Being performed 11 A task
Being performed 12 A task
runnerAsync_test.go:49: Perform the end
Asynchronous execution of class Runner
Support return timeout detection, system interrupt detection
The implementation code is as follows
package task
import (
"os"
"time"
"os/signal"
"sync"
)
// Asynchronous execution of tasks
type Runner struct {
// Operating system signal detection
interrupt chan os.Signal
// Record the status of execution completion
complete chan error
// Timeout detection
timeout <-chan time.Time
// Save all tasks to be performed , sequential
tasks []func(id int) error
waitGroup sync.WaitGroup
lock sync.Mutex
errs []error
}
//new1 a Runner object
func NewRunner(d time.Duration) *Runner {
return &Runner{
interrupt: make(chan os.Signal, 1),
complete: make(chan error),
timeout: time.After(d),
waitGroup: sync.WaitGroup{},
lock: sync.Mutex{},
}
}
// add 1 A task
func (this *Runner) Add(tasks ...func(id int) error) {
this.tasks = append(this.tasks, tasks...)
}
// Start the Runner , listen for error messages
func (this *Runner) Start() error {
// Receive operating system signals
signal.Notify(this.interrupt, os.Interrupt)
// Concurrent execution of tasks
go func() {
this.complete <- this.Run()
}()
select {
// Return execution results
case err := <-this.complete:
return err
// Timeout return
case <-this.timeout:
return ErrTimeout
}
}
// Perform all tasks asynchronously
func (this *Runner) Run() error {
for id, task := range this.tasks {
if this.gotInterrupt() {
return ErrInterrupt
}
this.waitGroup.Add(1)
go func(id int) {
this.lock.Lock()
// Perform a task
err := task(id)
// Lock and save to the result set
this.errs = append(this.errs, err)
this.lock.Unlock()
this.waitGroup.Done()
}(id)
}
this.waitGroup.Wait()
return nil
}
// Determines whether an operating system interrupt signal has been received
func (this *Runner) gotInterrupt() bool {
select {
case <-this.interrupt:
// Stop receiving other signals
signal.Stop(this.interrupt)
return true
// Normal execution
default:
return false
}
}
// Gets the executed error
func (this *Runner) GetErrs() []error {
return this.errs
}
Method of use
Add adds a task to receive type int and returns a closure of type error
Start starts execution injury and returns 1 error type. nil means completion of execution, ErrTimeout means execution timeout, ErrInterrupt means execution is interrupted (similar to Ctrl + C operation)
getErrs gets all task execution results
Test sample code
package task
import (
"testing"
"time"
"fmt"
"os"
"runtime"
)
func TestRunner_Start(t *testing.T) {
// Turn on multicore
runtime.GOMAXPROCS(runtime.NumCPU())
// create runner Object to set the timeout period
runner := NewRunner(18 * time.Second)
// Add running tasks
runner.Add(
createTask(),
createTask(),
createTask(),
createTask(),
createTask(),
createTask(),
createTask(),
createTask(),
createTask(),
createTask(),
createTask(),
createTask(),
createTask(),
createTask(),
)
fmt.Println(" Asynchronous execution of tasks ")
// Set to work
if err := runner.Start(); err != nil {
switch err {
case ErrTimeout:
fmt.Println(" timeouts ")
os.Exit(1)
case ErrInterrupt:
fmt.Println(" Task interrupted ")
os.Exit(2)
}
}
t.Log(" Perform the end ")
t.Log(runner.GetErrs())
}
// Create the task to perform
func createTask() func(id int) error {
return func(id int) error {
fmt.Printf(" Being performed %v A task \n", id)
// Simulated task execution ,sleep
//time.Sleep(1 * time.Second)
return nil
}
}
The execution result
Asynchronous execution of tasks
Being performed 2 A task
Being performed 1 A task
Being performed 4 A task
Being performed 3 A task
Being performed 6 A task
Being performed 5 A task
Being performed 9 A task
Being performed 7 A task
Being performed 10 A task
Being performed 13 A task
Being performed 8 A task
Being performed 11 A task
Being performed 12 A task
Being performed 0 A task
runner_test.go:49: Perform the end
runner_test.go:51: [<nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil>]
conclusion