golang implements channel based general connection pool detail

  • 2020-06-15 09:13:27
  • OfStack

preface

golang's channel has many other features besides goroutine communication, and this article will implement a common connection pool based on channel. Without further ado, let's take a look at the details.

function

* The connection type in the connection pool is interface{}, making it more generic

* The maximum free time of the link, the link timeout will be closed and discarded, to avoid the automatic failure of the link idle

* Use the links in the channel processing pool for efficiency

What is universal?

The implementation of the connection pool does not depend on specific instances, but on an interface. The io.Closer interface is selected for the connection pool in this article. Any object that implements the interface can be managed by the pool.

Of course, you can implement connection pooling based on interface{} so that any object can be managed.

Realize the principle of

Save the connection handle to channel. Due to the caching nature of channel, the connection is retrieved if there is a connection in the pool, and blocked or created if there is no connection in the pool (without exceeding the maximum limit).

Because of interface oriented programming, all the logic for creating connections is unclear, and you need to pass in a function that returns an io.Closer object.

implementation

Because of concurrency problems, locking is required when you need to manipulate mutex data in the pool.


package pool
import (
  "errors"
  "io"
  "sync"
  "time"
)

var (
  ErrInvalidConfig = errors.New("invalid pool config")
  ErrPoolClosed  = errors.New("pool closed")
)

type factory func() (io.Closer, error)

type Pool interface {
  Acquire() (io.Closer, error) //  Access to resources 
  Release(io.Closer) error   //  Release resources 
  Close(io.Closer) error    //  Close the resource 
  Shutdown() error       //  Close the pool 
}

type GenericPool struct {
  sync.Mutex
  pool    chan io.Closer
  maxOpen   int //  Maximum number of resources in the pool 
  numOpen   int //  Number of resources currently in the pool 
  minOpen   int //  Minimum number of resources in the pool 
  closed   bool //  Whether the pool is closed 
  maxLifetime time.Duration
  factory   factory //  A method to create a connection 
}

func NewGenericPool(minOpen, maxOpen int, maxLifetime time.Duration, factory factory) (*GenericPool, error) {
  if maxOpen <= 0 || minOpen > maxOpen {
    return nil, ErrInvalidConfig
  }
  p := &GenericPool{
    maxOpen:   maxOpen,
    minOpen:   minOpen,
    maxLifetime: maxLifetime,
    factory:   factory,
    pool:    make(chan io.Closer, maxOpen),
  }

  for i := 0; i < minOpen; i++ {
    closer, err := factory()
    if err != nil {
      continue
    }
    p.numOpen++
    p.pool <- closer
  }
  return p, nil
}

func (p *GenericPool) Acquire() (io.Closer, error) {
  if p.closed {
    return nil, ErrPoolClosed
  }
  for {
    closer, err := p.getOrCreate()
    if err != nil {
      return nil, err
    }
    // todo maxLifttime To deal with 
    return closer, nil
  }
}

func (p *GenericPool) getOrCreate() (io.Closer, error) {
  select {
  case closer := <-p.pool:
    return closer, nil
  default:
  }
  p.Lock()
  if p.numOpen >= p.maxOpen {
    closer := <-p.pool
    p.Unlock()
    return closer, nil
  }
  //  A new connection 
  closer, err := p.factory()
  if err != nil {
    p.Unlock()
    return nil, err
  }
  p.numOpen++
  p.Unlock()
  return closer, nil
}

//  Release a single resource to the connection pool 
func (p *GenericPool) Release(closer io.Closer) error {
  if p.closed {
    return ErrPoolClosed
  }
  p.Lock()
  p.pool <- closer
  p.Unlock()
  return nil
}

//  Close a single resource 
func (p *GenericPool) Close(closer io.Closer) error {
  p.Lock()
  closer.Close()
  p.numOpen--
  p.Unlock()
  return nil
}

//  Close the connection pool and release all resources 
func (p *GenericPool) Shutdown() error {
  if p.closed {
    return ErrPoolClosed
  }
  p.Lock()
  close(p.pool)
  for closer := range p.pool {
    closer.Close()
    p.numOpen--
  }
  p.closed = true
  p.Unlock()
  return nil
}

conclusion

Based on this connection pool, all io.Closer objects can be managed. Such as memcached,redis and so on, very convenient!

conclusion


Related articles: