|
|
|
@ -1569,27 +1569,32 @@ func (s *Service) runQueue() {
|
|
|
|
|
if er.Request.Statements != nil {
|
|
|
|
|
for {
|
|
|
|
|
_, err = s.store.Execute(er)
|
|
|
|
|
if err != nil {
|
|
|
|
|
if err == store.ErrNotLeader {
|
|
|
|
|
addr, err := s.store.LeaderAddr()
|
|
|
|
|
if err != nil || addr == "" {
|
|
|
|
|
s.logger.Printf("execute queue can't find leader for sequence number %d",
|
|
|
|
|
req.SequenceNumber)
|
|
|
|
|
stats.Add(numQueuedExecutionsNoLeader, 1)
|
|
|
|
|
time.Sleep(retryDelay)
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
if err == nil {
|
|
|
|
|
// Success!
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if err == store.ErrNotLeader {
|
|
|
|
|
addr, err := s.store.LeaderAddr()
|
|
|
|
|
if err != nil || addr == "" {
|
|
|
|
|
s.logger.Printf("execute queue can't find leader for sequence number %d",
|
|
|
|
|
req.SequenceNumber)
|
|
|
|
|
stats.Add(numQueuedExecutionsNoLeader, 1)
|
|
|
|
|
} else {
|
|
|
|
|
_, err = s.cluster.Execute(er, addr, nil, defaultTimeout)
|
|
|
|
|
if err != nil {
|
|
|
|
|
s.logger.Printf("execute queue write failed for sequence number %d: %s",
|
|
|
|
|
req.SequenceNumber, err.Error())
|
|
|
|
|
stats.Add(numQueuedExecutionsFailed, 1)
|
|
|
|
|
time.Sleep(retryDelay)
|
|
|
|
|
continue
|
|
|
|
|
} else {
|
|
|
|
|
// Success!
|
|
|
|
|
stats.Add(numRemoteExecutions, 1)
|
|
|
|
|
break
|
|
|
|
|
}
|
|
|
|
|
stats.Add(numRemoteExecutions, 1)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
stats.Add(numQueuedExecutionsFailed, 1)
|
|
|
|
|
time.Sleep(retryDelay)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|