1
0
Fork 0

Add Connection Pool source

Many thanks to https://github.com/fatih/pool.
master
Philip O'Toole 3 years ago
parent f547695b2d
commit fd2c3a6314

@ -0,0 +1,20 @@
The MIT License (MIT)
Copyright (c) 2013 Fatih Arslan
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so,
subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

@ -0,0 +1,135 @@
package pool
import (
"errors"
"fmt"
"net"
"sync"
)
// channelPool implements the Pool interface based on buffered channels.
type channelPool struct {
// storage for our net.Conn connections
mu sync.RWMutex
conns chan net.Conn
// net.Conn generator
factory Factory
}
// Factory is a function to create new connections.
type Factory func() (net.Conn, error)
// NewChannelPool returns a new pool based on buffered channels with an initial
// capacity and maximum capacity. Factory is used when initial capacity is
// greater than zero to fill the pool. A zero initialCap doesn't fill the Pool
// until a new Get() is called. During a Get(), If there is no new connection
// available in the pool, a new connection will be created via the Factory()
// method.
func NewChannelPool(initialCap, maxCap int, factory Factory) (Pool, error) {
if initialCap < 0 || maxCap <= 0 || initialCap > maxCap {
return nil, errors.New("invalid capacity settings")
}
c := &channelPool{
conns: make(chan net.Conn, maxCap),
factory: factory,
}
// create initial connections, if something goes wrong,
// just close the pool error out.
for i := 0; i < initialCap; i++ {
conn, err := factory()
if err != nil {
c.Close()
return nil, fmt.Errorf("factory is not able to fill the pool: %s", err)
}
c.conns <- conn
}
return c, nil
}
func (c *channelPool) getConnsAndFactory() (chan net.Conn, Factory) {
c.mu.RLock()
conns := c.conns
factory := c.factory
c.mu.RUnlock()
return conns, factory
}
// Get implements the Pool interfaces Get() method. If there is no new
// connection available in the pool, a new connection will be created via the
// Factory() method.
func (c *channelPool) Get() (net.Conn, error) {
conns, factory := c.getConnsAndFactory()
if conns == nil {
return nil, ErrClosed
}
// wrap our connections with out custom net.Conn implementation (wrapConn
// method) that puts the connection back to the pool if it's closed.
select {
case conn := <-conns:
if conn == nil {
return nil, ErrClosed
}
return c.wrapConn(conn), nil
default:
conn, err := factory()
if err != nil {
return nil, err
}
return c.wrapConn(conn), nil
}
}
// put puts the connection back to the pool. If the pool is full or closed,
// conn is simply closed. A nil conn will be rejected.
func (c *channelPool) put(conn net.Conn) error {
if conn == nil {
return errors.New("connection is nil. rejecting")
}
c.mu.RLock()
defer c.mu.RUnlock()
if c.conns == nil {
// pool is closed, close passed connection
return conn.Close()
}
// put the resource back into the pool. If the pool is full, this will
// block and the default case will be executed.
select {
case c.conns <- conn:
return nil
default:
// pool is full, close passed connection
return conn.Close()
}
}
func (c *channelPool) Close() {
c.mu.Lock()
conns := c.conns
c.conns = nil
c.factory = nil
c.mu.Unlock()
if conns == nil {
return
}
close(conns)
for conn := range conns {
conn.Close()
}
}
func (c *channelPool) Len() int {
conns, _ := c.getConnsAndFactory()
return len(conns)
}

@ -0,0 +1,292 @@
package pool
import (
"log"
"math/rand"
"net"
"sync"
"testing"
"time"
)
var (
InitialCap = 5
MaximumCap = 30
network = "tcp"
address = "127.0.0.1:7777"
factory = func() (net.Conn, error) { return net.Dial(network, address) }
)
func init() {
// used for factory function
go simpleTCPServer()
time.Sleep(time.Millisecond * 300) // wait until tcp server has been settled
rand.Seed(time.Now().UTC().UnixNano())
}
func TestNew(t *testing.T) {
_, err := newChannelPool()
if err != nil {
t.Errorf("New error: %s", err)
}
}
func TestPool_Get_Impl(t *testing.T) {
p, _ := newChannelPool()
defer p.Close()
conn, err := p.Get()
if err != nil {
t.Errorf("Get error: %s", err)
}
_, ok := conn.(*PoolConn)
if !ok {
t.Errorf("Conn is not of type poolConn")
}
}
func TestPool_Get(t *testing.T) {
p, _ := newChannelPool()
defer p.Close()
_, err := p.Get()
if err != nil {
t.Errorf("Get error: %s", err)
}
// after one get, current capacity should be lowered by one.
if p.Len() != (InitialCap - 1) {
t.Errorf("Get error. Expecting %d, got %d",
(InitialCap - 1), p.Len())
}
// get them all
var wg sync.WaitGroup
for i := 0; i < (InitialCap - 1); i++ {
wg.Add(1)
go func() {
defer wg.Done()
_, err := p.Get()
if err != nil {
t.Errorf("Get error: %s", err)
}
}()
}
wg.Wait()
if p.Len() != 0 {
t.Errorf("Get error. Expecting %d, got %d",
(InitialCap - 1), p.Len())
}
_, err = p.Get()
if err != nil {
t.Errorf("Get error: %s", err)
}
}
func TestPool_Put(t *testing.T) {
p, err := NewChannelPool(0, 30, factory)
if err != nil {
t.Fatal(err)
}
defer p.Close()
// get/create from the pool
conns := make([]net.Conn, MaximumCap)
for i := 0; i < MaximumCap; i++ {
conn, _ := p.Get()
conns[i] = conn
}
// now put them all back
for _, conn := range conns {
conn.Close()
}
if p.Len() != MaximumCap {
t.Errorf("Put error len. Expecting %d, got %d",
1, p.Len())
}
conn, _ := p.Get()
p.Close() // close pool
conn.Close() // try to put into a full pool
if p.Len() != 0 {
t.Errorf("Put error. Closed pool shouldn't allow to put connections.")
}
}
func TestPool_PutUnusableConn(t *testing.T) {
p, _ := newChannelPool()
defer p.Close()
// ensure pool is not empty
conn, _ := p.Get()
conn.Close()
poolSize := p.Len()
conn, _ = p.Get()
conn.Close()
if p.Len() != poolSize {
t.Errorf("Pool size is expected to be equal to initial size")
}
conn, _ = p.Get()
if pc, ok := conn.(*PoolConn); !ok {
t.Errorf("impossible")
} else {
pc.MarkUnusable()
}
conn.Close()
if p.Len() != poolSize-1 {
t.Errorf("Pool size is expected to be initial_size - 1, %d, %d", p.Len(), poolSize-1)
}
}
func TestPool_UsedCapacity(t *testing.T) {
p, _ := newChannelPool()
defer p.Close()
if p.Len() != InitialCap {
t.Errorf("InitialCap error. Expecting %d, got %d",
InitialCap, p.Len())
}
}
func TestPool_Close(t *testing.T) {
p, _ := newChannelPool()
// now close it and test all cases we are expecting.
p.Close()
c := p.(*channelPool)
if c.conns != nil {
t.Errorf("Close error, conns channel should be nil")
}
if c.factory != nil {
t.Errorf("Close error, factory should be nil")
}
_, err := p.Get()
if err == nil {
t.Errorf("Close error, get conn should return an error")
}
if p.Len() != 0 {
t.Errorf("Close error used capacity. Expecting 0, got %d", p.Len())
}
}
func TestPoolConcurrent(t *testing.T) {
p, _ := newChannelPool()
pipe := make(chan net.Conn, 0)
go func() {
p.Close()
}()
for i := 0; i < MaximumCap; i++ {
go func() {
conn, _ := p.Get()
pipe <- conn
}()
go func() {
conn := <-pipe
if conn == nil {
return
}
conn.Close()
}()
}
}
func TestPoolWriteRead(t *testing.T) {
p, _ := NewChannelPool(0, 30, factory)
conn, _ := p.Get()
msg := "hello"
_, err := conn.Write([]byte(msg))
if err != nil {
t.Error(err)
}
}
func TestPoolConcurrent2(t *testing.T) {
p, _ := NewChannelPool(0, 30, factory)
var wg sync.WaitGroup
go func() {
for i := 0; i < 10; i++ {
wg.Add(1)
go func(i int) {
conn, _ := p.Get()
time.Sleep(time.Millisecond * time.Duration(rand.Intn(100)))
conn.Close()
wg.Done()
}(i)
}
}()
for i := 0; i < 10; i++ {
wg.Add(1)
go func(i int) {
conn, _ := p.Get()
time.Sleep(time.Millisecond * time.Duration(rand.Intn(100)))
conn.Close()
wg.Done()
}(i)
}
wg.Wait()
}
func TestPoolConcurrent3(t *testing.T) {
p, _ := NewChannelPool(0, 1, factory)
var wg sync.WaitGroup
wg.Add(1)
go func() {
p.Close()
wg.Done()
}()
if conn, err := p.Get(); err == nil {
conn.Close()
}
wg.Wait()
}
func newChannelPool() (Pool, error) {
return NewChannelPool(InitialCap, MaximumCap, factory)
}
func simpleTCPServer() {
l, err := net.Listen(network, address)
if err != nil {
log.Fatal(err)
}
defer l.Close()
for {
conn, err := l.Accept()
if err != nil {
log.Fatal(err)
}
go func() {
buffer := make([]byte, 256)
conn.Read(buffer)
}()
}
}

@ -0,0 +1,43 @@
package pool
import (
"net"
"sync"
)
// PoolConn is a wrapper around net.Conn to modify the the behavior of
// net.Conn's Close() method.
type PoolConn struct {
net.Conn
mu sync.RWMutex
c *channelPool
unusable bool
}
// Close() puts the given connects back to the pool instead of closing it.
func (p *PoolConn) Close() error {
p.mu.RLock()
defer p.mu.RUnlock()
if p.unusable {
if p.Conn != nil {
return p.Conn.Close()
}
return nil
}
return p.c.put(p.Conn)
}
// MarkUnusable() marks the connection not usable any more, to let the pool close it instead of returning it to pool.
func (p *PoolConn) MarkUnusable() {
p.mu.Lock()
p.unusable = true
p.mu.Unlock()
}
// newConn wraps a standard net.Conn to a poolConn net.Conn.
func (c *channelPool) wrapConn(conn net.Conn) net.Conn {
p := &PoolConn{c: c}
p.Conn = conn
return p
}

@ -0,0 +1,10 @@
package pool
import (
"net"
"testing"
)
func TestConn_Impl(t *testing.T) {
var _ net.Conn = new(PoolConn)
}

@ -0,0 +1,28 @@
// Package pool implements a pool of net.Conn interfaces to manage and reuse them.
package pool
import (
"errors"
"net"
)
var (
// ErrClosed is the error resulting if the pool is closed via pool.Close().
ErrClosed = errors.New("pool is closed")
)
// Pool interface describes a pool implementation. A pool should have maximum
// capacity. An ideal pool is threadsafe and easy to use.
type Pool interface {
// Get returns a new connection from the pool. Closing the connections puts
// it back to the Pool. Closing it when the pool is destroyed or full will
// be counted as an error.
Get() (net.Conn, error)
// Close closes the pool and all its connections. After Close() the pool is
// no longer usable.
Close()
// Len returns the current number of connections of the pool.
Len() int
}
Loading…
Cancel
Save