1
0
Fork 0

Complete batched writer

master
Philip O'Toole 2 years ago
parent 58ccf4faf4
commit 3ee16ce511

@ -16,7 +16,8 @@ type Queue struct {
timeout time.Duration
store Execer
c chan *command.Statement
c chan *command.Statement
done chan struct{}
closed chan struct{}
flush chan struct{}
@ -31,13 +32,19 @@ func New(maxSize, batchSize int, t time.Duration, e Execer) *Queue {
c: make(chan *command.Statement, maxSize),
done: make(chan struct{}),
closed: make(chan struct{}),
flush: make(chan struct{}),
}
go q.run()
return q
}
// Requests or ExecuteRequests? Gotta be requests, and merge inside single ER. Maybe just
// needs to be Statements
func (q *Queue) Write(stmt *command.Statement) error {
if stmt == nil {
return nil
}
q.c <- stmt
return nil
}
@ -53,28 +60,38 @@ func (q *Queue) Close() error {
return nil
}
func (q *Queue) Depth() int {
return len(q.c)
}
func (q *Queue) run() {
defer close(q.closed)
stmts := make([]*command.Statement, 0)
//ticker := time.NewTicker(q.timeout)
var stmts []*command.Statement
timer := time.NewTimer(q.timeout)
timer.Stop()
writeFn := func(stmts []*command.Statement) {
q.exec(stmts)
stmts = make([]*command.Statement, 0)
stmts = nil
timer.Stop()
}
for {
select {
case s := <-q.c:
stmts = append(stmts, s)
if len(stmts) == 1 {
timer.Reset(q.timeout)
}
if len(stmts) == q.batchSize {
writeFn(stmts)
}
// case <-ticker.C:
// // check batch, flush if empty. Not quite right. Timeout should expire when first item added?
case <-timer.C:
writeFn(stmts)
case <-q.flush:
writeFn(stmts)
case <-q.done:
timer.Stop()
return
}
}

@ -1,13 +1,122 @@
package queue
import (
"sync"
"testing"
"time"
"github.com/rqlite/rqlite/command"
)
var testStmt = &command.Statement{
Sql: "SELECT * FROM foo",
}
func Test_NewQueue(t *testing.T) {
q := New(1, 1, 100*time.Millisecond, nil)
if q == nil {
t.Fatalf("failed to create new Queue")
}
defer q.Close()
}
func Test_NewQueueWriteNil(t *testing.T) {
m := &MockExecer{}
q := New(1, 1, 60*time.Second, m)
defer q.Close()
if err := q.Write(nil); err != nil {
t.Fatalf("failing to write nil: %s", err.Error())
}
}
func Test_NewQueueWriteBatchSize(t *testing.T) {
m := &MockExecer{}
q := New(1024, 1, 60*time.Second, m)
defer q.Close()
var wg sync.WaitGroup
var numExecs int
wg.Add(1)
m.execFn = func(er *command.ExecuteRequest) ([]*command.ExecuteResult, error) {
wg.Done()
numExecs++
return nil, nil
}
if err := q.Write(testStmt); err != nil {
t.Fatalf("failed to write: %s", err.Error())
}
wg.Wait()
if exp, got := 1, numExecs; exp != got {
t.Fatalf("exec not called correct number of times, exp %d got %d", exp, got)
}
}
func Test_NewQueueWriteFlush(t *testing.T) {
m := &MockExecer{}
q := New(1024, 10, 60*time.Second, m)
defer q.Close()
var wg sync.WaitGroup
var numExecs int
wg.Add(1)
m.execFn = func(er *command.ExecuteRequest) ([]*command.ExecuteResult, error) {
wg.Done()
numExecs++
return nil, nil
}
if err := q.Write(testStmt); err != nil {
t.Fatalf("failed to write: %s", err.Error())
}
time.Sleep(1 * time.Second)
if err := q.Flush(); err != nil {
t.Fatalf("failed to flush: %s", err.Error())
}
wg.Wait()
if exp, got := 1, numExecs; exp != got {
t.Fatalf("exec not called correct number of times, exp %d got %d", exp, got)
}
}
func Test_NewQueueWriteTimeout(t *testing.T) {
m := &MockExecer{}
q := New(1024, 10, 1*time.Second, m)
defer q.Close()
var wg sync.WaitGroup
var numExecs int
wg.Add(1)
m.execFn = func(er *command.ExecuteRequest) ([]*command.ExecuteResult, error) {
wg.Done()
numExecs++
return nil, nil
}
if err := q.Write(testStmt); err != nil {
t.Fatalf("failed to write: %s", err.Error())
}
time.Sleep(time.Second)
wg.Wait()
if exp, got := 1, numExecs; exp != got {
t.Fatalf("exec not called correct number of times, exp %d got %d", exp, got)
}
}
type MockExecer struct {
execFn func(er *command.ExecuteRequest) ([]*command.ExecuteResult, error)
}
func (m *MockExecer) Execute(er *command.ExecuteRequest) ([]*command.ExecuteResult, error) {
if m.execFn != nil {
return m.execFn(er)
}
return nil, nil
}

Loading…
Cancel
Save