1
0
Fork 0

Clearer runQueue logic

master
Philip O'Toole 2 years ago
parent bf1f79e9c5
commit cf3fa8eedb

@ -174,26 +174,27 @@ func NewResponse() *Response {
var stats *expvar.Map
const (
numLeaderNotFound = "leader_not_found"
numExecutions = "executions"
numQueuedExecutions = "queued_executions"
numQueuedExecutionsOK = "queued_executions_ok"
numQueuedExecutionsFailed = "queued_executions_failed"
numQueuedExecutionsWait = "queued_executions_wait"
numQueries = "queries"
numRemoteExecutions = "remote_executions"
numRemoteQueries = "remote_queries"
numRemoteBackups = "remote_backups"
numRemoteLoads = "remote_loads"
numRemoteRemoveNode = "remte_remove_node"
numReadyz = "num_readyz"
numStatus = "num_status"
numBackups = "backups"
numLoad = "loads"
numJoins = "joins"
numNotifies = "notifies"
numAuthOK = "authOK"
numAuthFail = "authFail"
numLeaderNotFound = "leader_not_found"
numExecutions = "executions"
numQueuedExecutions = "queued_executions"
numQueuedExecutionsOK = "queued_executions_ok"
numQueuedExecutionsNoLeader = "queued_executions_no_leader"
numQueuedExecutionsFailed = "queued_executions_failed"
numQueuedExecutionsWait = "queued_executions_wait"
numQueries = "queries"
numRemoteExecutions = "remote_executions"
numRemoteQueries = "remote_queries"
numRemoteBackups = "remote_backups"
numRemoteLoads = "remote_loads"
numRemoteRemoveNode = "remte_remove_node"
numReadyz = "num_readyz"
numStatus = "num_status"
numBackups = "backups"
numLoad = "loads"
numJoins = "joins"
numNotifies = "notifies"
numAuthOK = "authOK"
numAuthFail = "authFail"
// Default timeout for cluster communications.
defaultTimeout = 30 * time.Second
@ -213,6 +214,7 @@ func init() {
stats.Add(numExecutions, 0)
stats.Add(numQueuedExecutions, 0)
stats.Add(numQueuedExecutionsOK, 0)
stats.Add(numQueuedExecutionsNoLeader, 0)
stats.Add(numQueuedExecutionsFailed, 0)
stats.Add(numQueuedExecutionsWait, 0)
stats.Add(numQueries, 0)
@ -1561,10 +1563,11 @@ func (s *Service) runQueue() {
Transaction: s.DefaultQueueTx,
},
}
for {
// Nil statements are valid, as clients may want to just send
// a "checkpoint" through the queue.
if er.Request.Statements != nil {
// Nil statements are valid, as clients may want to just send
// a "checkpoint" through the queue.
if er.Request.Statements != nil {
for {
_, err = s.store.Execute(er)
if err != nil {
if err == store.ErrNotLeader {
@ -1572,7 +1575,7 @@ func (s *Service) runQueue() {
if err != nil || addr == "" {
s.logger.Printf("execute queue can't find leader for sequence number %d",
req.SequenceNumber)
stats.Add(numQueuedExecutionsFailed, 1)
stats.Add(numQueuedExecutionsNoLeader, 1)
time.Sleep(retryDelay)
continue
}
@ -1580,6 +1583,7 @@ func (s *Service) runQueue() {
if err != nil {
s.logger.Printf("execute queue write failed for sequence number %d: %s",
req.SequenceNumber, err.Error())
stats.Add(numQueuedExecutionsFailed, 1)
time.Sleep(retryDelay)
continue
}
@ -1587,14 +1591,14 @@ func (s *Service) runQueue() {
}
}
}
s.seqNumMu.Lock()
s.seqNum = req.SequenceNumber
s.seqNumMu.Unlock()
req.Close()
stats.Add(numQueuedExecutionsOK, 1)
break
}
// Perform post-write processing.
s.seqNumMu.Lock()
s.seqNum = req.SequenceNumber
s.seqNumMu.Unlock()
req.Close()
stats.Add(numQueuedExecutionsOK, 1)
}
}
}

Loading…
Cancel
Save