golang implements channel based general connection pool detail
- 2020-06-15 09:13:27
- OfStack
preface
golang's channel has many other features besides goroutine communication, and this article will implement a common connection pool based on channel. Without further ado, let's take a look at the details.
function
* The connection type in the connection pool is interface{}, making it more generic
* The maximum free time of the link, the link timeout will be closed and discarded, to avoid the automatic failure of the link idle
* Use the links in the channel processing pool for efficiency
What is universal?
The implementation of the connection pool does not depend on specific instances, but on an interface. The io.Closer interface is selected for the connection pool in this article. Any object that implements the interface can be managed by the pool.
Of course, you can implement connection pooling based on interface{} so that any object can be managed.
Realize the principle of
Save the connection handle to channel. Due to the caching nature of channel, the connection is retrieved if there is a connection in the pool, and blocked or created if there is no connection in the pool (without exceeding the maximum limit).
Because of interface oriented programming, all the logic for creating connections is unclear, and you need to pass in a function that returns an io.Closer object.
implementation
Because of concurrency problems, locking is required when you need to manipulate mutex data in the pool.
package pool
import (
"errors"
"io"
"sync"
"time"
)
var (
ErrInvalidConfig = errors.New("invalid pool config")
ErrPoolClosed = errors.New("pool closed")
)
type factory func() (io.Closer, error)
type Pool interface {
Acquire() (io.Closer, error) // Access to resources
Release(io.Closer) error // Release resources
Close(io.Closer) error // Close the resource
Shutdown() error // Close the pool
}
type GenericPool struct {
sync.Mutex
pool chan io.Closer
maxOpen int // Maximum number of resources in the pool
numOpen int // Number of resources currently in the pool
minOpen int // Minimum number of resources in the pool
closed bool // Whether the pool is closed
maxLifetime time.Duration
factory factory // A method to create a connection
}
func NewGenericPool(minOpen, maxOpen int, maxLifetime time.Duration, factory factory) (*GenericPool, error) {
if maxOpen <= 0 || minOpen > maxOpen {
return nil, ErrInvalidConfig
}
p := &GenericPool{
maxOpen: maxOpen,
minOpen: minOpen,
maxLifetime: maxLifetime,
factory: factory,
pool: make(chan io.Closer, maxOpen),
}
for i := 0; i < minOpen; i++ {
closer, err := factory()
if err != nil {
continue
}
p.numOpen++
p.pool <- closer
}
return p, nil
}
func (p *GenericPool) Acquire() (io.Closer, error) {
if p.closed {
return nil, ErrPoolClosed
}
for {
closer, err := p.getOrCreate()
if err != nil {
return nil, err
}
// todo maxLifttime To deal with
return closer, nil
}
}
func (p *GenericPool) getOrCreate() (io.Closer, error) {
select {
case closer := <-p.pool:
return closer, nil
default:
}
p.Lock()
if p.numOpen >= p.maxOpen {
closer := <-p.pool
p.Unlock()
return closer, nil
}
// A new connection
closer, err := p.factory()
if err != nil {
p.Unlock()
return nil, err
}
p.numOpen++
p.Unlock()
return closer, nil
}
// Release a single resource to the connection pool
func (p *GenericPool) Release(closer io.Closer) error {
if p.closed {
return ErrPoolClosed
}
p.Lock()
p.pool <- closer
p.Unlock()
return nil
}
// Close a single resource
func (p *GenericPool) Close(closer io.Closer) error {
p.Lock()
closer.Close()
p.numOpen--
p.Unlock()
return nil
}
// Close the connection pool and release all resources
func (p *GenericPool) Shutdown() error {
if p.closed {
return ErrPoolClosed
}
p.Lock()
close(p.pool)
for closer := range p.pool {
closer.Close()
p.numOpen--
}
p.closed = true
p.Unlock()
return nil
}
conclusion
Based on this connection pool, all io.Closer objects can be managed. Such as memcached,redis and so on, very convenient!
conclusion