|
|
@ -1,12 +1,37 @@
|
|
|
|
package queue
|
|
|
|
package queue
|
|
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
import (
|
|
|
|
|
|
|
|
"expvar"
|
|
|
|
"sync"
|
|
|
|
"sync"
|
|
|
|
"time"
|
|
|
|
"time"
|
|
|
|
|
|
|
|
|
|
|
|
"github.com/rqlite/rqlite/command"
|
|
|
|
"github.com/rqlite/rqlite/command"
|
|
|
|
)
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// stats captures stats for the Queue.
|
|
|
|
|
|
|
|
var stats *expvar.Map
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
const (
|
|
|
|
|
|
|
|
numStatementsRx = "statements_rx"
|
|
|
|
|
|
|
|
numStatementsTx = "statements_tx"
|
|
|
|
|
|
|
|
numTimeout = "num_timeout"
|
|
|
|
|
|
|
|
numFlush = "num_flush"
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
func init() {
|
|
|
|
|
|
|
|
stats = expvar.NewMap("queue")
|
|
|
|
|
|
|
|
ResetStats()
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// ResetStats resets the expvar stats for this module. Mostly for test purposes.
|
|
|
|
|
|
|
|
func ResetStats() {
|
|
|
|
|
|
|
|
stats.Init()
|
|
|
|
|
|
|
|
stats.Add(numStatementsRx, 0)
|
|
|
|
|
|
|
|
stats.Add(numStatementsTx, 0)
|
|
|
|
|
|
|
|
stats.Add(numTimeout, 0)
|
|
|
|
|
|
|
|
stats.Add(numFlush, 0)
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// FlushChannel is the type passed to the Queue, if caller wants
|
|
|
|
// FlushChannel is the type passed to the Queue, if caller wants
|
|
|
|
// to know when a specific set of statements has been processed.
|
|
|
|
// to know when a specific set of statements has been processed.
|
|
|
|
type FlushChannel chan bool
|
|
|
|
type FlushChannel chan bool
|
|
|
@ -110,6 +135,7 @@ func (q *Queue) Write(stmts []*command.Statement, c FlushChannel) (int64, error)
|
|
|
|
Statements: stmts,
|
|
|
|
Statements: stmts,
|
|
|
|
flushChan: c,
|
|
|
|
flushChan: c,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
stats.Add(numStatementsRx, int64(len(stmts)))
|
|
|
|
return q.seqNum, nil
|
|
|
|
return q.seqNum, nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
@ -159,7 +185,11 @@ func (q *Queue) run() {
|
|
|
|
return
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
q.sendCh <- mergeQueued(queuedStmts)
|
|
|
|
// mergeQueued returns a new object, ownership will pass
|
|
|
|
|
|
|
|
// implicitly to the other side of sendCh.
|
|
|
|
|
|
|
|
req := mergeQueued(queuedStmts)
|
|
|
|
|
|
|
|
q.sendCh <- req
|
|
|
|
|
|
|
|
stats.Add(numStatementsTx, int64(len(req.Statements)))
|
|
|
|
queuedStmts = nil
|
|
|
|
queuedStmts = nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
@ -174,9 +204,11 @@ func (q *Queue) run() {
|
|
|
|
writeFn()
|
|
|
|
writeFn()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
case <-timer.C:
|
|
|
|
case <-timer.C:
|
|
|
|
|
|
|
|
stats.Add(numTimeout, 1)
|
|
|
|
q.numTimeouts++
|
|
|
|
q.numTimeouts++
|
|
|
|
writeFn()
|
|
|
|
writeFn()
|
|
|
|
case <-q.flush:
|
|
|
|
case <-q.flush:
|
|
|
|
|
|
|
|
stats.Add(numFlush, 1)
|
|
|
|
writeFn()
|
|
|
|
writeFn()
|
|
|
|
case <-q.done:
|
|
|
|
case <-q.done:
|
|
|
|
timer.Stop()
|
|
|
|
timer.Stop()
|
|
|
|