|
|
|
package queue
|
|
|
|
|
|
|
|
import (
|
|
|
|
"time"
|
|
|
|
|
|
|
|
"github.com/rqlite/rqlite/command"
|
|
|
|
)
|
|
|
|
|
|
|
|
type Queue struct {
|
|
|
|
maxSize int
|
|
|
|
batchSize int
|
|
|
|
timeout time.Duration
|
|
|
|
|
|
|
|
batchCh chan *command.Statement
|
|
|
|
sendCh chan []*command.Statement
|
|
|
|
C <-chan []*command.Statement
|
|
|
|
|
|
|
|
done chan struct{}
|
|
|
|
closed chan struct{}
|
|
|
|
flush chan struct{}
|
|
|
|
}
|
|
|
|
|
|
|
|
func New(maxSize, batchSize int, t time.Duration) *Queue {
|
|
|
|
q := &Queue{
|
|
|
|
maxSize: maxSize,
|
|
|
|
batchSize: batchSize,
|
|
|
|
timeout: t,
|
|
|
|
batchCh: make(chan *command.Statement, maxSize),
|
|
|
|
sendCh: make(chan []*command.Statement, maxSize),
|
|
|
|
done: make(chan struct{}),
|
|
|
|
closed: make(chan struct{}),
|
|
|
|
flush: make(chan struct{}),
|
|
|
|
}
|
|
|
|
|
|
|
|
q.C = q.sendCh
|
|
|
|
go q.run()
|
|
|
|
return q
|
|
|
|
}
|
|
|
|
|
|
|
|
// Requests or ExecuteRequests? Gotta be requests, and merge inside single ER. Maybe just
|
|
|
|
// needs to be Statements
|
|
|
|
func (q *Queue) Write(stmt *command.Statement) error {
|
|
|
|
if stmt == nil {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
q.batchCh <- stmt
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (q *Queue) Flush() error {
|
|
|
|
q.flush <- struct{}{}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (q *Queue) Close() error {
|
|
|
|
close(q.done)
|
|
|
|
<-q.closed
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (q *Queue) Depth() int {
|
|
|
|
return len(q.batchCh)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (q *Queue) run() {
|
|
|
|
defer close(q.closed)
|
|
|
|
var stmts []*command.Statement
|
|
|
|
timer := time.NewTimer(q.timeout)
|
|
|
|
timer.Stop()
|
|
|
|
|
|
|
|
writeFn := func(stmts []*command.Statement) {
|
|
|
|
newStmts := make([]*command.Statement, len(stmts))
|
|
|
|
copy(newStmts, stmts)
|
|
|
|
q.sendCh <- newStmts
|
|
|
|
|
|
|
|
stmts = nil
|
|
|
|
timer.Stop()
|
|
|
|
}
|
|
|
|
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case s := <-q.batchCh:
|
|
|
|
stmts = append(stmts, s)
|
|
|
|
if len(stmts) == 1 {
|
|
|
|
timer.Reset(q.timeout)
|
|
|
|
}
|
|
|
|
if len(stmts) >= q.batchSize {
|
|
|
|
writeFn(stmts)
|
|
|
|
}
|
|
|
|
case <-timer.C:
|
|
|
|
writeFn(stmts)
|
|
|
|
case <-q.flush:
|
|
|
|
writeFn(stmts)
|
|
|
|
case <-q.done:
|
|
|
|
timer.Stop()
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|