|
|
@ -6,6 +6,7 @@ import (
|
|
|
|
"github.com/rqlite/rqlite/command"
|
|
|
|
"github.com/rqlite/rqlite/command"
|
|
|
|
)
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Queue is a batching queue with a timeout.
|
|
|
|
type Queue struct {
|
|
|
|
type Queue struct {
|
|
|
|
maxSize int
|
|
|
|
maxSize int
|
|
|
|
batchSize int
|
|
|
|
batchSize int
|
|
|
@ -20,6 +21,7 @@ type Queue struct {
|
|
|
|
flush chan struct{}
|
|
|
|
flush chan struct{}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// New returns a instance of a Queue
|
|
|
|
func New(maxSize, batchSize int, t time.Duration) *Queue {
|
|
|
|
func New(maxSize, batchSize int, t time.Duration) *Queue {
|
|
|
|
q := &Queue{
|
|
|
|
q := &Queue{
|
|
|
|
maxSize: maxSize,
|
|
|
|
maxSize: maxSize,
|
|
|
@ -37,8 +39,7 @@ func New(maxSize, batchSize int, t time.Duration) *Queue {
|
|
|
|
return q
|
|
|
|
return q
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// Requests or ExecuteRequests? Gotta be requests, and merge inside single ER. Maybe just
|
|
|
|
// Write queues a request.
|
|
|
|
// needs to be Statements
|
|
|
|
|
|
|
|
func (q *Queue) Write(stmt *command.Statement) error {
|
|
|
|
func (q *Queue) Write(stmt *command.Statement) error {
|
|
|
|
if stmt == nil {
|
|
|
|
if stmt == nil {
|
|
|
|
return nil
|
|
|
|
return nil
|
|
|
@ -47,17 +48,20 @@ func (q *Queue) Write(stmt *command.Statement) error {
|
|
|
|
return nil
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Flush flushes the queue
|
|
|
|
func (q *Queue) Flush() error {
|
|
|
|
func (q *Queue) Flush() error {
|
|
|
|
q.flush <- struct{}{}
|
|
|
|
q.flush <- struct{}{}
|
|
|
|
return nil
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Close closes the queue. A closed queue should not be used.
|
|
|
|
func (q *Queue) Close() error {
|
|
|
|
func (q *Queue) Close() error {
|
|
|
|
close(q.done)
|
|
|
|
close(q.done)
|
|
|
|
<-q.closed
|
|
|
|
<-q.closed
|
|
|
|
return nil
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Depth returns the number of queue requests
|
|
|
|
func (q *Queue) Depth() int {
|
|
|
|
func (q *Queue) Depth() int {
|
|
|
|
return len(q.batchCh)
|
|
|
|
return len(q.batchCh)
|
|
|
|
}
|
|
|
|
}
|
|
|
|