|
|
@ -1,6 +1,7 @@
|
|
|
|
package queue
|
|
|
|
package queue
|
|
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
import (
|
|
|
|
|
|
|
|
"errors"
|
|
|
|
"expvar"
|
|
|
|
"expvar"
|
|
|
|
"sync"
|
|
|
|
"sync"
|
|
|
|
"time"
|
|
|
|
"time"
|
|
|
@ -58,16 +59,16 @@ type queuedStatements struct {
|
|
|
|
|
|
|
|
|
|
|
|
func mergeQueued(qs []*queuedStatements) *Request {
|
|
|
|
func mergeQueued(qs []*queuedStatements) *Request {
|
|
|
|
var o *Request
|
|
|
|
var o *Request
|
|
|
|
|
|
|
|
if len(qs) > 0 {
|
|
|
|
|
|
|
|
o = &Request{
|
|
|
|
|
|
|
|
SequenceNumber: qs[0].SequenceNumber,
|
|
|
|
|
|
|
|
flushChans: make([]FlushChannel, 0),
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
for i := range qs {
|
|
|
|
for i := range qs {
|
|
|
|
if o == nil {
|
|
|
|
if o.SequenceNumber < qs[i].SequenceNumber {
|
|
|
|
o = &Request{
|
|
|
|
o.SequenceNumber = qs[i].SequenceNumber
|
|
|
|
SequenceNumber: qs[i].SequenceNumber,
|
|
|
|
|
|
|
|
flushChans: make([]FlushChannel, 0),
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
} else {
|
|
|
|
|
|
|
|
if o.SequenceNumber < qs[i].SequenceNumber {
|
|
|
|
|
|
|
|
o.SequenceNumber = qs[i].SequenceNumber
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
o.Statements = append(o.Statements, qs[i].Statements...)
|
|
|
|
o.Statements = append(o.Statements, qs[i].Statements...)
|
|
|
|
if qs[i].flushChan != nil {
|
|
|
|
if qs[i].flushChan != nil {
|
|
|
@ -126,6 +127,12 @@ func New(maxSize, batchSize int, t time.Duration) *Queue {
|
|
|
|
// c is an optional channel. If non-nil, it will be closed when the Request
|
|
|
|
// c is an optional channel. If non-nil, it will be closed when the Request
|
|
|
|
// containing these statements is closed.
|
|
|
|
// containing these statements is closed.
|
|
|
|
func (q *Queue) Write(stmts []*command.Statement, c FlushChannel) (int64, error) {
|
|
|
|
func (q *Queue) Write(stmts []*command.Statement, c FlushChannel) (int64, error) {
|
|
|
|
|
|
|
|
select {
|
|
|
|
|
|
|
|
case <-q.done:
|
|
|
|
|
|
|
|
return 0, errors.New("queue is closed")
|
|
|
|
|
|
|
|
default:
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
q.seqMu.Lock()
|
|
|
|
q.seqMu.Lock()
|
|
|
|
defer q.seqMu.Unlock()
|
|
|
|
defer q.seqMu.Unlock()
|
|
|
|
q.seqNum++
|
|
|
|
q.seqNum++
|
|
|
|