From b923c738389ec7606a35d96ecee0eb5a12d9b084 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Wed, 29 Mar 2023 22:44:55 -0400 Subject: [PATCH] Handle queue timers better No need to the queuedStmts == nil check now. --- queue/queue.go | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/queue/queue.go b/queue/queue.go index 76cc9215..39e2f594 100644 --- a/queue/queue.go +++ b/queue/queue.go @@ -174,17 +174,11 @@ func (q *Queue) run() { defer close(q.closed) queuedStmts := make([]*queuedStatements, 0) - timer := time.NewTimer(q.timeout) - timer.Stop() + // Create an initial timer, in the stopped state. + timer := time.NewTimer(0) + <-timer.C writeFn := func() { - timer.Stop() - if queuedStmts == nil { - // Batch size was met, but timer expired before it could be - // stopped, so this function was called again. Possibly. - return - } - // mergeQueued returns a new object, ownership will pass // implicitly to the other side of sendCh. req := mergeQueued(queuedStmts) @@ -198,9 +192,14 @@ func (q *Queue) run() { case s := <-q.batchCh: queuedStmts = append(queuedStmts, s) if len(queuedStmts) == 1 { + // First item in queue, start the timer so that if + // we don't get in a batch, we'll still write. timer.Reset(q.timeout) } if len(queuedStmts) == q.batchSize { + if !timer.Stop() { + <-timer.C + } writeFn() } case <-timer.C: