diff --git a/store/store.go b/store/store.go index 43a8e400..1ccd3554 100644 --- a/store/store.go +++ b/store/store.go @@ -315,14 +315,15 @@ func (s *Store) Close(wait bool) error { // WaitForAppliedFSM waits until the currently applied logs (at the time this // function is called) are actually reflected by the FSM, or the timeout expires. -func (s *Store) WaitForAppliedFSM(timeout time.Duration) error { +func (s *Store) WaitForAppliedFSM(timeout time.Duration) (uint64, error) { if timeout == 0 { - return nil + return 0, nil } - if err := s.WaitForFSMIndex(s.raft.AppliedIndex(), timeout); err != nil { - return ErrOpenTimeout + if fsmIdx, err := s.WaitForFSMIndex(s.raft.AppliedIndex(), timeout); err != nil { + return 0, ErrOpenTimeout + } else { + return fsmIdx, nil } - return nil } // WaitForInitialLogs waits for logs that were in the Store at time of open @@ -489,27 +490,24 @@ func (s *Store) SetRequestCompression(batch, size int) { // WaitForFSMIndex blocks until a given log index has been applied to the // state machine or the timeout expires. -func (s *Store) WaitForFSMIndex(idx uint64, timeout time.Duration) error { +func (s *Store) WaitForFSMIndex(idx uint64, timeout time.Duration) (uint64, error) { tck := time.NewTicker(appliedWaitDelay) defer tck.Stop() tmr := time.NewTimer(timeout) defer tmr.Stop() + var fsmIdx uint64 for { select { case <-tck.C: - if func() bool { - s.fsmIndexMu.RLock() - defer s.fsmIndexMu.RUnlock() - if s.fsmIndex >= idx { - return true - } - return false - }() == true { - return nil + s.fsmIndexMu.RLock() + fsmIdx = s.fsmIndex + s.fsmIndexMu.RUnlock() + if fsmIdx >= idx { + return fsmIdx, nil } case <-tmr.C: - return fmt.Errorf("timeout expired") + return 0, fmt.Errorf("timeout expired") } } } diff --git a/store/store_test.go b/store/store_test.go index f0cf372c..226852f7 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -674,6 +674,11 @@ func Test_MultiNodeExecuteQuery(t *testing.T) { if err != nil { t.Fatalf("failed to execute on single node: %s", err.Error()) } + s0FsmIdx, err := s0.WaitForAppliedFSM(5 * time.Second) + if err != nil { + t.Fatalf("failed to wait for fsmIndex: %s", err.Error()) + } + qr := queryRequestFromString("SELECT * FROM foo", false, false) qr.Level = command.QueryRequest_QUERY_REQUEST_LEVEL_NONE r, err := s0.Query(qr) @@ -687,9 +692,9 @@ func Test_MultiNodeExecuteQuery(t *testing.T) { t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got) } - // Wait until the 3 log entries have been applied to the voting follower, + // Wait until the log entries have been applied to the voting follower, // and then query. - if err := s1.WaitForAppliedIndex(3, 5*time.Second); err != nil { + if _, err := s1.WaitForFSMIndex(s0FsmIdx, 5*time.Second); err != nil { t.Fatalf("error waiting for follower to apply index: %s:", err.Error()) }