1
0
Fork 0
You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

156 lines
3.6 KiB
Go

package pool
import (
"errors"
"fmt"
"net"
"sync"
"sync/atomic"
)
// channelPool implements the Pool interface based on buffered channels.
type channelPool struct {
// storage for our net.Conn connections
mu sync.RWMutex
conns chan net.Conn
// net.Conn generator
factory Factory
nOpenConns int64
}
// Factory is a function to create new connections.
type Factory func() (net.Conn, error)
// NewChannelPool returns a new pool based on buffered channels with an initial
// capacity and maximum capacity. Factory is used when initial capacity is
// greater than zero to fill the pool. A zero initialCap doesn't fill the Pool
// until a new Get() is called. During a Get(), If there is no new connection
// available in the pool, a new connection will be created via the Factory()
// method.
func NewChannelPool(initialCap, maxCap int, factory Factory) (Pool, error) {
if initialCap < 0 || maxCap <= 0 || initialCap > maxCap {
return nil, errors.New("invalid capacity settings")
}
c := &channelPool{
conns: make(chan net.Conn, maxCap),
factory: factory,
}
// create initial connections, if something goes wrong,
// just close the pool error out.
for i := 0; i < initialCap; i++ {
conn, err := factory()
if err != nil {
c.Close()
return nil, fmt.Errorf("factory is not able to fill the pool: %s", err)
}
atomic.AddInt64(&c.nOpenConns, 1)
c.conns <- conn
}
return c, nil
}
func (c *channelPool) getConnsAndFactory() (chan net.Conn, Factory) {
c.mu.RLock()
conns := c.conns
factory := c.factory
c.mu.RUnlock()
return conns, factory
}
// Get implements the Pool interfaces Get() method. If there is no new
// connection available in the pool, a new connection will be created via the
// Factory() method.
func (c *channelPool) Get() (net.Conn, error) {
conns, factory := c.getConnsAndFactory()
if conns == nil {
return nil, ErrClosed
}
// wrap our connections without custom net.Conn implementation (wrapConn
// method) that puts the connection back to the pool if it's closed.
select {
case conn := <-conns:
if conn == nil {
return nil, ErrClosed
}
return c.wrapConn(conn), nil
default:
conn, err := factory()
if err != nil {
return nil, err
}
atomic.AddInt64(&c.nOpenConns, 1)
return c.wrapConn(conn), nil
}
}
// put puts the connection back to the pool. If the pool is full or closed,
// conn is simply closed. A nil conn will be rejected.
func (c *channelPool) put(conn net.Conn) error {
if conn == nil {
return errors.New("connection is nil. rejecting")
}
c.mu.RLock()
defer c.mu.RUnlock()
if c.conns == nil {
// pool is closed, close passed connection
atomic.AddInt64(&c.nOpenConns, -1)
return conn.Close()
}
// put the resource back into the pool. If the pool is full, this will
// block and the default case will be executed.
select {
case c.conns <- conn:
return nil
default:
// pool is full, close passed connection
atomic.AddInt64(&c.nOpenConns, -1)
return conn.Close()
}
}
// Close closes every connection in the pool.
func (c *channelPool) Close() {
c.mu.Lock()
conns := c.conns
c.conns = nil
c.factory = nil
c.mu.Unlock()
if conns == nil {
return
}
close(conns)
for conn := range conns {
conn.Close()
}
atomic.AddInt64(&c.nOpenConns, 0)
}
// Len returns the number of idle connections.
func (c *channelPool) Len() int {
conns, _ := c.getConnsAndFactory()
return len(conns)
}
// Stats returns stats for the pool.
func (c *channelPool) Stats() (map[string]interface{}, error) {
conns, _ := c.getConnsAndFactory()
return map[string]interface{}{
"idle": len(conns),
"open_connections": c.nOpenConns,
"max_open_connections": cap(conns),
}, nil
}