diff --git a/store/store.go b/store/store.go index d58a5519..9e1dcf47 100644 --- a/store/store.go +++ b/store/store.go @@ -644,20 +644,9 @@ func (s *Store) WaitForAppliedFSM(timeout time.Duration) (uint64, error) { return s.WaitForFSMIndex(s.raft.AppliedIndex(), timeout) } -// WaitForInitialLogs waits for logs that were in the Store at time of open -// to be applied to the state machine. -func (s *Store) WaitForInitialLogs(timeout time.Duration) error { - if timeout == 0 { - return nil - } - s.logger.Printf("waiting for up to %s for application of initial logs (lcIdx=%d)", - timeout, s.lastCommandIdxOnOpen) - return s.WaitForApplied(timeout) -} - // WaitForApplied waits for all Raft log entries to be applied to the // underlying database. -func (s *Store) WaitForApplied(timeout time.Duration) error { +func (s *Store) WaitForAllApplied(timeout time.Duration) error { if timeout == 0 { return nil } @@ -884,7 +873,7 @@ func (s *Store) SetRequestCompression(batch, size int) { s.reqMarshaller.SizeThreshold = size } -// WaitForFSMIndex blocks until a given log index has been applied to the +// WaitForFSMIndex blocks until a given log index has been applied to our // state machine or the timeout expires. func (s *Store) WaitForFSMIndex(idx uint64, timeout time.Duration) (uint64, error) { tck := time.NewTicker(appliedWaitDelay) diff --git a/store/store_restart_test.go b/store/store_restart_test.go index c1a1b2d2..0ab70b70 100644 --- a/store/store_restart_test.go +++ b/store/store_restart_test.go @@ -9,7 +9,10 @@ import ( command "github.com/rqlite/rqlite/v8/command/proto" ) -func test_OpenStoreCloseStartup(t *testing.T, s *Store) { +// Test_OpenStoreCloseStartupSingleNode tests various restart scenarios. +func Test_OpenStoreCloseStartupSingleNode(t *testing.T) { + s, ln := mustNewStore(t) + defer ln.Close() if err := s.Open(); err != nil { t.Fatalf("failed to open single-node store: %s", err.Error()) } @@ -27,12 +30,6 @@ func test_OpenStoreCloseStartup(t *testing.T, s *Store) { if err != nil { t.Fatalf("failed to execute on single node: %s", err.Error()) } - - fsmIdx, err := s.WaitForAppliedFSM(5 * time.Second) - if err != nil { - t.Fatalf("failed to wait for fsmIndex: %s", err.Error()) - } - if err := s.Close(true); err != nil { t.Fatalf("failed to close single-node store: %s", err.Error()) } @@ -45,25 +42,12 @@ func test_OpenStoreCloseStartup(t *testing.T, s *Store) { t.Fatalf("Error waiting for leader: %s", err) } - // Wait until the log entries have been applied to the voting follower, - // and then query. - if _, err := s.WaitForFSMIndex(fsmIdx, 5*time.Second); err != nil { - t.Fatalf("error waiting for follower to apply index: %s:", err.Error()) - } - - qr := queryRequestFromString("SELECT * FROM foo", false, false) - qr.Level = command.QueryRequest_QUERY_REQUEST_LEVEL_STRONG - r, err := s.Query(qr) - if err != nil { - t.Fatalf("failed to query single node: %s", err.Error()) - } - if exp, got := `["id","name"]`, asJSON(r[0].Columns); exp != got { - t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got) - } - if exp, got := `[[1,"fiona"]]`, asJSON(r[0].Values); exp != got { - t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got) - } - + testPoll(t, func() bool { + qr := queryRequestFromString("SELECT COUNT(*) FROM foo", false, false) + qr.Level = command.QueryRequest_QUERY_REQUEST_LEVEL_STRONG + r, err := s.Query(qr) + return err == nil && asJSON(r) == `[{"columns":["COUNT(*)"],"types":["integer"],"values":[[1]]}]` + }, 100*time.Millisecond, 5*time.Second) if err := s.Close(true); err != nil { t.Fatalf("failed to close single-node store: %s", err.Error()) } @@ -114,12 +98,8 @@ func test_OpenStoreCloseStartup(t *testing.T, s *Store) { } } - fsmIdx, err = s.WaitForAppliedFSM(5 * time.Second) - if err != nil { - t.Fatalf("failed to wait for fsmIndex: %s", err.Error()) - } - - // Close and re-open to make sure all continues to work with recovery from snapshot. + // Close and re-open to make sure all data is there after starting up + // with a snapshot. if err := s.Close(true); err != nil { t.Fatalf("failed to close single-node store: %s", err.Error()) } @@ -129,23 +109,12 @@ func test_OpenStoreCloseStartup(t *testing.T, s *Store) { if _, err := s.WaitForLeader(10 * time.Second); err != nil { t.Fatalf("Error waiting for leader: %s", err) } - - // Wait until the log entries have been applied to the voting follower, - // and then query. - if _, err := s.WaitForFSMIndex(fsmIdx, 5*time.Second); err != nil { - t.Fatalf("error waiting for follower to apply index: %s:", err.Error()) - } - - qr = queryRequestFromString("SELECT COUNT(*) FROM foo", false, false) - qr.Level = command.QueryRequest_QUERY_REQUEST_LEVEL_STRONG - r, err = s.Query(qr) - if err != nil { - t.Fatalf("failed to query single node: %s", err.Error()) - } - if exp, got := `[{"columns":["COUNT(*)"],"types":["integer"],"values":[[10]]}]`, asJSON(r); exp != got { - t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got) - } - + testPoll(t, func() bool { + qr := queryRequestFromString("SELECT COUNT(*) FROM foo", false, false) + qr.Level = command.QueryRequest_QUERY_REQUEST_LEVEL_NONE + r, err := s.Query(qr) + return err == nil && asJSON(r) == `[{"columns":["COUNT(*)"],"types":["integer"],"values":[[10]]}]` + }, 100*time.Millisecond, 5*time.Second) if err := s.Close(true); err != nil { t.Fatalf("failed to close single-node store: %s", err.Error()) } @@ -165,19 +134,13 @@ func test_OpenStoreCloseStartup(t *testing.T, s *Store) { if err != nil { t.Fatalf("failed to execute on single node: %s", err.Error()) } - qr = queryRequestFromString("SELECT COUNT(*) FROM foo", false, false) - r, err = s.Query(qr) - if err != nil { - t.Fatalf("failed to query single node: %s", err.Error()) - } - if exp, got := `[{"columns":["COUNT(*)"],"types":["integer"],"values":[[11]]}]`, asJSON(r); exp != got { - t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got) - } - fsmIdx, err = s.WaitForAppliedFSM(5 * time.Second) - if err != nil { - t.Fatalf("failed to wait for fsmIndex: %s", err.Error()) - } + testPoll(t, func() bool { + qr := queryRequestFromString("SELECT COUNT(*) FROM foo", false, false) + qr.Level = command.QueryRequest_QUERY_REQUEST_LEVEL_NONE + r, err := s.Query(qr) + return err == nil && asJSON(r) == `[{"columns":["COUNT(*)"],"types":["integer"],"values":[[11]]}]` + }, 100*time.Millisecond, 5*time.Second) if err := s.Close(true); err != nil { t.Fatalf("failed to close single-node store: %s", err.Error()) @@ -189,30 +152,6 @@ func test_OpenStoreCloseStartup(t *testing.T, s *Store) { if _, err := s.WaitForLeader(10 * time.Second); err != nil { t.Fatalf("Error waiting for leader: %s", err) } - - // Wait until the log entries have been applied to the voting follower, - // and then query. - if _, err := s.WaitForFSMIndex(fsmIdx, 5*time.Second); err != nil { - t.Fatalf("error waiting for follower to apply index: %s:", err.Error()) - } - - qr = queryRequestFromString("SELECT COUNT(*) FROM foo", false, false) - qr.Level = command.QueryRequest_QUERY_REQUEST_LEVEL_STRONG - r, err = s.Query(qr) - if err != nil { - t.Fatalf("failed to query single node: %s", err.Error()) - } - if exp, got := `[{"columns":["COUNT(*)"],"types":["integer"],"values":[[11]]}]`, asJSON(r); exp != got { - t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got) - } -} - -// Test_OpenStoreCloseStartupSingleNode tests that on-disk -// works fine during various restart scenarios. -func Test_OpenStoreCloseStartupSingleNode(t *testing.T) { - s, ln := mustNewStore(t) - defer ln.Close() - test_OpenStoreCloseStartup(t, s) } func test_SnapshotStress(t *testing.T, s *Store) {