1
0
Fork 0

Batcher emits using channel

master
Philip O'Toole 2 years ago
parent 3ee16ce511
commit ba0be0dd41

@ -6,35 +6,33 @@ import (
"github.com/rqlite/rqlite/command" "github.com/rqlite/rqlite/command"
) )
type Execer interface {
Execute(er *command.ExecuteRequest) ([]*command.ExecuteResult, error)
}
type Queue struct { type Queue struct {
maxSize int maxSize int
batchSize int batchSize int
timeout time.Duration timeout time.Duration
store Execer
c chan *command.Statement batchCh chan *command.Statement
sendCh chan []*command.Statement
C <-chan []*command.Statement
done chan struct{} done chan struct{}
closed chan struct{} closed chan struct{}
flush chan struct{} flush chan struct{}
} }
func New(maxSize, batchSize int, t time.Duration, e Execer) *Queue { func New(maxSize, batchSize int, t time.Duration) *Queue {
q := &Queue{ q := &Queue{
maxSize: maxSize, maxSize: maxSize,
batchSize: batchSize, batchSize: batchSize,
timeout: t, timeout: t,
store: e, batchCh: make(chan *command.Statement, maxSize),
c: make(chan *command.Statement, maxSize), sendCh: make(chan []*command.Statement, maxSize),
done: make(chan struct{}), done: make(chan struct{}),
closed: make(chan struct{}), closed: make(chan struct{}),
flush: make(chan struct{}), flush: make(chan struct{}),
} }
q.C = q.sendCh
go q.run() go q.run()
return q return q
} }
@ -45,7 +43,7 @@ func (q *Queue) Write(stmt *command.Statement) error {
if stmt == nil { if stmt == nil {
return nil return nil
} }
q.c <- stmt q.batchCh <- stmt
return nil return nil
} }
@ -61,7 +59,7 @@ func (q *Queue) Close() error {
} }
func (q *Queue) Depth() int { func (q *Queue) Depth() int {
return len(q.c) return len(q.batchCh)
} }
func (q *Queue) run() { func (q *Queue) run() {
@ -71,19 +69,22 @@ func (q *Queue) run() {
timer.Stop() timer.Stop()
writeFn := func(stmts []*command.Statement) { writeFn := func(stmts []*command.Statement) {
q.exec(stmts) newStmts := make([]*command.Statement, len(stmts))
copy(newStmts, stmts)
q.sendCh <- newStmts
stmts = nil stmts = nil
timer.Stop() timer.Stop()
} }
for { for {
select { select {
case s := <-q.c: case s := <-q.batchCh:
stmts = append(stmts, s) stmts = append(stmts, s)
if len(stmts) == 1 { if len(stmts) == 1 {
timer.Reset(q.timeout) timer.Reset(q.timeout)
} }
if len(stmts) == q.batchSize { if len(stmts) >= q.batchSize {
writeFn(stmts) writeFn(stmts)
} }
case <-timer.C: case <-timer.C:
@ -96,20 +97,3 @@ func (q *Queue) run() {
} }
} }
} }
func (q *Queue) exec(stmts []*command.Statement) error {
if stmts == nil || len(stmts) == 0 {
return nil
}
er := &command.ExecuteRequest{
Request: &command.Request{
Statements: stmts,
},
}
// Doesn't handle leader-redirect, transparent forwarding, etc.
// Would need a "wrapped" store which handles it.
_, err := q.store.Execute(er)
return err
}

@ -1,7 +1,6 @@
package queue package queue
import ( import (
"sync"
"testing" "testing"
"time" "time"
@ -13,7 +12,7 @@ var testStmt = &command.Statement{
} }
func Test_NewQueue(t *testing.T) { func Test_NewQueue(t *testing.T) {
q := New(1, 1, 100*time.Millisecond, nil) q := New(1, 1, 100*time.Millisecond)
if q == nil { if q == nil {
t.Fatalf("failed to create new Queue") t.Fatalf("failed to create new Queue")
} }
@ -21,8 +20,7 @@ func Test_NewQueue(t *testing.T) {
} }
func Test_NewQueueWriteNil(t *testing.T) { func Test_NewQueueWriteNil(t *testing.T) {
m := &MockExecer{} q := New(1, 1, 60*time.Second)
q := New(1, 1, 60*time.Second, m)
defer q.Close() defer q.Close()
if err := q.Write(nil); err != nil { if err := q.Write(nil); err != nil {
@ -30,93 +28,79 @@ func Test_NewQueueWriteNil(t *testing.T) {
} }
} }
func Test_NewQueueWriteBatchSize(t *testing.T) { func Test_NewQueueWriteBatchSizeSingle(t *testing.T) {
m := &MockExecer{} q := New(1024, 1, 60*time.Second)
q := New(1024, 1, 60*time.Second, m)
defer q.Close() defer q.Close()
var wg sync.WaitGroup
var numExecs int
wg.Add(1)
m.execFn = func(er *command.ExecuteRequest) ([]*command.ExecuteResult, error) {
wg.Done()
numExecs++
return nil, nil
}
if err := q.Write(testStmt); err != nil { if err := q.Write(testStmt); err != nil {
t.Fatalf("failed to write: %s", err.Error()) t.Fatalf("failed to write: %s", err.Error())
} }
wg.Wait()
if exp, got := 1, numExecs; exp != got { select {
t.Fatalf("exec not called correct number of times, exp %d got %d", exp, got) case stmts := <-q.C:
if len(stmts) != 1 {
t.Fatalf("received wrong length slice")
}
if stmts[0].Sql != "SELECT * FROM foo" {
t.Fatalf("received wrong SQL")
}
case <-time.NewTimer(5 * time.Second).C:
t.Fatalf("timed out waiting for statement")
} }
} }
func Test_NewQueueWriteFlush(t *testing.T) { func Test_NewQueueWriteBatchSizeMulti(t *testing.T) {
m := &MockExecer{} q := New(1024, 5, 60*time.Second)
q := New(1024, 10, 60*time.Second, m)
defer q.Close() defer q.Close()
var wg sync.WaitGroup // Write a batch size and wait for it.
var numExecs int for i := 0; i < 5; i++ {
wg.Add(1) if err := q.Write(testStmt); err != nil {
m.execFn = func(er *command.ExecuteRequest) ([]*command.ExecuteResult, error) { t.Fatalf("failed to write: %s", err.Error())
wg.Done() }
numExecs++
return nil, nil
} }
select {
if err := q.Write(testStmt); err != nil { case stmts := <-q.C:
t.Fatalf("failed to write: %s", err.Error()) if len(stmts) != 5 {
t.Fatalf("received wrong length slice")
}
case <-time.NewTimer(5 * time.Second).C:
t.Fatalf("timed out waiting for first statements")
} }
time.Sleep(1 * time.Second) // Write one more than a batch size, should still get a batch.
for i := 0; i < 6; i++ {
if err := q.Flush(); err != nil { if err := q.Write(testStmt); err != nil {
t.Fatalf("failed to flush: %s", err.Error()) t.Fatalf("failed to write: %s", err.Error())
}
} }
wg.Wait() select {
case stmts := <-q.C:
if exp, got := 1, numExecs; exp != got { if len(stmts) < 5 {
t.Fatalf("exec not called correct number of times, exp %d got %d", exp, got) t.Fatalf("received too-short slice")
}
case <-time.NewTimer(5 * time.Second).C:
t.Fatalf("timed out waiting for second statements")
} }
} }
func Test_NewQueueWriteTimeout(t *testing.T) { func Test_NewQueueWriteTimeout(t *testing.T) {
m := &MockExecer{} q := New(1024, 10, 1*time.Second)
q := New(1024, 10, 1*time.Second, m)
defer q.Close() defer q.Close()
var wg sync.WaitGroup
var numExecs int
wg.Add(1)
m.execFn = func(er *command.ExecuteRequest) ([]*command.ExecuteResult, error) {
wg.Done()
numExecs++
return nil, nil
}
if err := q.Write(testStmt); err != nil { if err := q.Write(testStmt); err != nil {
t.Fatalf("failed to write: %s", err.Error()) t.Fatalf("failed to write: %s", err.Error())
} }
time.Sleep(time.Second) select {
case stmts := <-q.C:
wg.Wait() if len(stmts) != 1 {
if exp, got := 1, numExecs; exp != got { t.Fatalf("received wrong length slice")
t.Fatalf("exec not called correct number of times, exp %d got %d", exp, got) }
} if stmts[0].Sql != "SELECT * FROM foo" {
} t.Fatalf("received wrong SQL")
}
type MockExecer struct { case <-time.NewTimer(5 * time.Second).C:
execFn func(er *command.ExecuteRequest) ([]*command.ExecuteResult, error) t.Fatalf("timed out waiting for statement")
}
func (m *MockExecer) Execute(er *command.ExecuteRequest) ([]*command.ExecuteResult, error) {
if m.execFn != nil {
return m.execFn(er)
} }
return nil, nil
} }

Loading…
Cancel
Save