diff --git a/queue/queue.go b/queue/queue.go index 76cc9215..39e2f594 100644 --- a/queue/queue.go +++ b/queue/queue.go @@ -174,17 +174,11 @@ func (q *Queue) run() { defer close(q.closed) queuedStmts := make([]*queuedStatements, 0) - timer := time.NewTimer(q.timeout) - timer.Stop() + // Create an initial timer, in the stopped state. + timer := time.NewTimer(0) + <-timer.C writeFn := func() { - timer.Stop() - if queuedStmts == nil { - // Batch size was met, but timer expired before it could be - // stopped, so this function was called again. Possibly. - return - } - // mergeQueued returns a new object, ownership will pass // implicitly to the other side of sendCh. req := mergeQueued(queuedStmts) @@ -198,9 +192,14 @@ func (q *Queue) run() { case s := <-q.batchCh: queuedStmts = append(queuedStmts, s) if len(queuedStmts) == 1 { + // First item in queue, start the timer so that if + // we don't get in a batch, we'll still write. timer.Reset(q.timeout) } if len(queuedStmts) == q.batchSize { + if !timer.Stop() { + <-timer.C + } writeFn() } case <-timer.C: