1
0
Fork 0

Merge pull request #847 from rqlite/use-fsm-idx

Use FSM index in tests
master
Philip O'Toole 3 years ago committed by GitHub
commit 0fbd16bd3d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -315,14 +315,15 @@ func (s *Store) Close(wait bool) error {
// WaitForAppliedFSM waits until the currently applied logs (at the time this
// function is called) are actually reflected by the FSM, or the timeout expires.
func (s *Store) WaitForAppliedFSM(timeout time.Duration) error {
func (s *Store) WaitForAppliedFSM(timeout time.Duration) (uint64, error) {
if timeout == 0 {
return nil
return 0, nil
}
if err := s.WaitForFSMIndex(s.raft.AppliedIndex(), timeout); err != nil {
return ErrOpenTimeout
if fsmIdx, err := s.WaitForFSMIndex(s.raft.AppliedIndex(), timeout); err != nil {
return 0, ErrOpenTimeout
} else {
return fsmIdx, nil
}
return nil
}
// WaitForInitialLogs waits for logs that were in the Store at time of open
@ -489,27 +490,24 @@ func (s *Store) SetRequestCompression(batch, size int) {
// WaitForFSMIndex blocks until a given log index has been applied to the
// state machine or the timeout expires.
func (s *Store) WaitForFSMIndex(idx uint64, timeout time.Duration) error {
func (s *Store) WaitForFSMIndex(idx uint64, timeout time.Duration) (uint64, error) {
tck := time.NewTicker(appliedWaitDelay)
defer tck.Stop()
tmr := time.NewTimer(timeout)
defer tmr.Stop()
var fsmIdx uint64
for {
select {
case <-tck.C:
if func() bool {
s.fsmIndexMu.RLock()
defer s.fsmIndexMu.RUnlock()
if s.fsmIndex >= idx {
return true
}
return false
}() == true {
return nil
s.fsmIndexMu.RLock()
fsmIdx = s.fsmIndex
s.fsmIndexMu.RUnlock()
if fsmIdx >= idx {
return fsmIdx, nil
}
case <-tmr.C:
return fmt.Errorf("timeout expired")
return 0, fmt.Errorf("timeout expired")
}
}
}

@ -674,6 +674,11 @@ func Test_MultiNodeExecuteQuery(t *testing.T) {
if err != nil {
t.Fatalf("failed to execute on single node: %s", err.Error())
}
s0FsmIdx, err := s0.WaitForAppliedFSM(5 * time.Second)
if err != nil {
t.Fatalf("failed to wait for fsmIndex: %s", err.Error())
}
qr := queryRequestFromString("SELECT * FROM foo", false, false)
qr.Level = command.QueryRequest_QUERY_REQUEST_LEVEL_NONE
r, err := s0.Query(qr)
@ -687,9 +692,9 @@ func Test_MultiNodeExecuteQuery(t *testing.T) {
t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
}
// Wait until the 3 log entries have been applied to the voting follower,
// Wait until the log entries have been applied to the voting follower,
// and then query.
if err := s1.WaitForAppliedIndex(3, 5*time.Second); err != nil {
if _, err := s1.WaitForFSMIndex(s0FsmIdx, 5*time.Second); err != nil {
t.Fatalf("error waiting for follower to apply index: %s:", err.Error())
}

Loading…
Cancel
Save