diff --git a/queue/queue.go b/queue/queue.go index b1df5238..76cc9215 100644 --- a/queue/queue.go +++ b/queue/queue.go @@ -1,12 +1,37 @@ package queue import ( + "expvar" "sync" "time" "github.com/rqlite/rqlite/command" ) +// stats captures stats for the Queue. +var stats *expvar.Map + +const ( + numStatementsRx = "statements_rx" + numStatementsTx = "statements_tx" + numTimeout = "num_timeout" + numFlush = "num_flush" +) + +func init() { + stats = expvar.NewMap("queue") + ResetStats() +} + +// ResetStats resets the expvar stats for this module. Mostly for test purposes. +func ResetStats() { + stats.Init() + stats.Add(numStatementsRx, 0) + stats.Add(numStatementsTx, 0) + stats.Add(numTimeout, 0) + stats.Add(numFlush, 0) +} + // FlushChannel is the type passed to the Queue, if caller wants // to know when a specific set of statements has been processed. type FlushChannel chan bool @@ -110,6 +135,7 @@ func (q *Queue) Write(stmts []*command.Statement, c FlushChannel) (int64, error) Statements: stmts, flushChan: c, } + stats.Add(numStatementsRx, int64(len(stmts))) return q.seqNum, nil } @@ -159,7 +185,11 @@ func (q *Queue) run() { return } - q.sendCh <- mergeQueued(queuedStmts) + // mergeQueued returns a new object, ownership will pass + // implicitly to the other side of sendCh. + req := mergeQueued(queuedStmts) + q.sendCh <- req + stats.Add(numStatementsTx, int64(len(req.Statements))) queuedStmts = nil } @@ -174,9 +204,11 @@ func (q *Queue) run() { writeFn() } case <-timer.C: + stats.Add(numTimeout, 1) q.numTimeouts++ writeFn() case <-q.flush: + stats.Add(numFlush, 1) writeFn() case <-q.done: timer.Stop()