1
0
Fork 0

Improve Store restart test

master
Philip O'Toole 8 months ago
parent 0cbbb44b48
commit 3ddeebbba4

@ -644,20 +644,9 @@ func (s *Store) WaitForAppliedFSM(timeout time.Duration) (uint64, error) {
return s.WaitForFSMIndex(s.raft.AppliedIndex(), timeout) return s.WaitForFSMIndex(s.raft.AppliedIndex(), timeout)
} }
// WaitForInitialLogs waits for logs that were in the Store at time of open
// to be applied to the state machine.
func (s *Store) WaitForInitialLogs(timeout time.Duration) error {
if timeout == 0 {
return nil
}
s.logger.Printf("waiting for up to %s for application of initial logs (lcIdx=%d)",
timeout, s.lastCommandIdxOnOpen)
return s.WaitForApplied(timeout)
}
// WaitForApplied waits for all Raft log entries to be applied to the // WaitForApplied waits for all Raft log entries to be applied to the
// underlying database. // underlying database.
func (s *Store) WaitForApplied(timeout time.Duration) error { func (s *Store) WaitForAllApplied(timeout time.Duration) error {
if timeout == 0 { if timeout == 0 {
return nil return nil
} }
@ -884,7 +873,7 @@ func (s *Store) SetRequestCompression(batch, size int) {
s.reqMarshaller.SizeThreshold = size s.reqMarshaller.SizeThreshold = size
} }
// WaitForFSMIndex blocks until a given log index has been applied to the // WaitForFSMIndex blocks until a given log index has been applied to our
// state machine or the timeout expires. // state machine or the timeout expires.
func (s *Store) WaitForFSMIndex(idx uint64, timeout time.Duration) (uint64, error) { func (s *Store) WaitForFSMIndex(idx uint64, timeout time.Duration) (uint64, error) {
tck := time.NewTicker(appliedWaitDelay) tck := time.NewTicker(appliedWaitDelay)

@ -9,7 +9,10 @@ import (
command "github.com/rqlite/rqlite/v8/command/proto" command "github.com/rqlite/rqlite/v8/command/proto"
) )
func test_OpenStoreCloseStartup(t *testing.T, s *Store) { // Test_OpenStoreCloseStartupSingleNode tests various restart scenarios.
func Test_OpenStoreCloseStartupSingleNode(t *testing.T) {
s, ln := mustNewStore(t)
defer ln.Close()
if err := s.Open(); err != nil { if err := s.Open(); err != nil {
t.Fatalf("failed to open single-node store: %s", err.Error()) t.Fatalf("failed to open single-node store: %s", err.Error())
} }
@ -27,12 +30,6 @@ func test_OpenStoreCloseStartup(t *testing.T, s *Store) {
if err != nil { if err != nil {
t.Fatalf("failed to execute on single node: %s", err.Error()) t.Fatalf("failed to execute on single node: %s", err.Error())
} }
fsmIdx, err := s.WaitForAppliedFSM(5 * time.Second)
if err != nil {
t.Fatalf("failed to wait for fsmIndex: %s", err.Error())
}
if err := s.Close(true); err != nil { if err := s.Close(true); err != nil {
t.Fatalf("failed to close single-node store: %s", err.Error()) t.Fatalf("failed to close single-node store: %s", err.Error())
} }
@ -45,25 +42,12 @@ func test_OpenStoreCloseStartup(t *testing.T, s *Store) {
t.Fatalf("Error waiting for leader: %s", err) t.Fatalf("Error waiting for leader: %s", err)
} }
// Wait until the log entries have been applied to the voting follower, testPoll(t, func() bool {
// and then query. qr := queryRequestFromString("SELECT COUNT(*) FROM foo", false, false)
if _, err := s.WaitForFSMIndex(fsmIdx, 5*time.Second); err != nil { qr.Level = command.QueryRequest_QUERY_REQUEST_LEVEL_STRONG
t.Fatalf("error waiting for follower to apply index: %s:", err.Error()) r, err := s.Query(qr)
} return err == nil && asJSON(r) == `[{"columns":["COUNT(*)"],"types":["integer"],"values":[[1]]}]`
}, 100*time.Millisecond, 5*time.Second)
qr := queryRequestFromString("SELECT * FROM foo", false, false)
qr.Level = command.QueryRequest_QUERY_REQUEST_LEVEL_STRONG
r, err := s.Query(qr)
if err != nil {
t.Fatalf("failed to query single node: %s", err.Error())
}
if exp, got := `["id","name"]`, asJSON(r[0].Columns); exp != got {
t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
}
if exp, got := `[[1,"fiona"]]`, asJSON(r[0].Values); exp != got {
t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
}
if err := s.Close(true); err != nil { if err := s.Close(true); err != nil {
t.Fatalf("failed to close single-node store: %s", err.Error()) t.Fatalf("failed to close single-node store: %s", err.Error())
} }
@ -114,12 +98,8 @@ func test_OpenStoreCloseStartup(t *testing.T, s *Store) {
} }
} }
fsmIdx, err = s.WaitForAppliedFSM(5 * time.Second) // Close and re-open to make sure all data is there after starting up
if err != nil { // with a snapshot.
t.Fatalf("failed to wait for fsmIndex: %s", err.Error())
}
// Close and re-open to make sure all continues to work with recovery from snapshot.
if err := s.Close(true); err != nil { if err := s.Close(true); err != nil {
t.Fatalf("failed to close single-node store: %s", err.Error()) t.Fatalf("failed to close single-node store: %s", err.Error())
} }
@ -129,23 +109,12 @@ func test_OpenStoreCloseStartup(t *testing.T, s *Store) {
if _, err := s.WaitForLeader(10 * time.Second); err != nil { if _, err := s.WaitForLeader(10 * time.Second); err != nil {
t.Fatalf("Error waiting for leader: %s", err) t.Fatalf("Error waiting for leader: %s", err)
} }
testPoll(t, func() bool {
// Wait until the log entries have been applied to the voting follower, qr := queryRequestFromString("SELECT COUNT(*) FROM foo", false, false)
// and then query. qr.Level = command.QueryRequest_QUERY_REQUEST_LEVEL_NONE
if _, err := s.WaitForFSMIndex(fsmIdx, 5*time.Second); err != nil { r, err := s.Query(qr)
t.Fatalf("error waiting for follower to apply index: %s:", err.Error()) return err == nil && asJSON(r) == `[{"columns":["COUNT(*)"],"types":["integer"],"values":[[10]]}]`
} }, 100*time.Millisecond, 5*time.Second)
qr = queryRequestFromString("SELECT COUNT(*) FROM foo", false, false)
qr.Level = command.QueryRequest_QUERY_REQUEST_LEVEL_STRONG
r, err = s.Query(qr)
if err != nil {
t.Fatalf("failed to query single node: %s", err.Error())
}
if exp, got := `[{"columns":["COUNT(*)"],"types":["integer"],"values":[[10]]}]`, asJSON(r); exp != got {
t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
}
if err := s.Close(true); err != nil { if err := s.Close(true); err != nil {
t.Fatalf("failed to close single-node store: %s", err.Error()) t.Fatalf("failed to close single-node store: %s", err.Error())
} }
@ -165,19 +134,13 @@ func test_OpenStoreCloseStartup(t *testing.T, s *Store) {
if err != nil { if err != nil {
t.Fatalf("failed to execute on single node: %s", err.Error()) t.Fatalf("failed to execute on single node: %s", err.Error())
} }
qr = queryRequestFromString("SELECT COUNT(*) FROM foo", false, false)
r, err = s.Query(qr)
if err != nil {
t.Fatalf("failed to query single node: %s", err.Error())
}
if exp, got := `[{"columns":["COUNT(*)"],"types":["integer"],"values":[[11]]}]`, asJSON(r); exp != got {
t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
}
fsmIdx, err = s.WaitForAppliedFSM(5 * time.Second) testPoll(t, func() bool {
if err != nil { qr := queryRequestFromString("SELECT COUNT(*) FROM foo", false, false)
t.Fatalf("failed to wait for fsmIndex: %s", err.Error()) qr.Level = command.QueryRequest_QUERY_REQUEST_LEVEL_NONE
} r, err := s.Query(qr)
return err == nil && asJSON(r) == `[{"columns":["COUNT(*)"],"types":["integer"],"values":[[11]]}]`
}, 100*time.Millisecond, 5*time.Second)
if err := s.Close(true); err != nil { if err := s.Close(true); err != nil {
t.Fatalf("failed to close single-node store: %s", err.Error()) t.Fatalf("failed to close single-node store: %s", err.Error())
@ -189,30 +152,6 @@ func test_OpenStoreCloseStartup(t *testing.T, s *Store) {
if _, err := s.WaitForLeader(10 * time.Second); err != nil { if _, err := s.WaitForLeader(10 * time.Second); err != nil {
t.Fatalf("Error waiting for leader: %s", err) t.Fatalf("Error waiting for leader: %s", err)
} }
// Wait until the log entries have been applied to the voting follower,
// and then query.
if _, err := s.WaitForFSMIndex(fsmIdx, 5*time.Second); err != nil {
t.Fatalf("error waiting for follower to apply index: %s:", err.Error())
}
qr = queryRequestFromString("SELECT COUNT(*) FROM foo", false, false)
qr.Level = command.QueryRequest_QUERY_REQUEST_LEVEL_STRONG
r, err = s.Query(qr)
if err != nil {
t.Fatalf("failed to query single node: %s", err.Error())
}
if exp, got := `[{"columns":["COUNT(*)"],"types":["integer"],"values":[[11]]}]`, asJSON(r); exp != got {
t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
}
}
// Test_OpenStoreCloseStartupSingleNode tests that on-disk
// works fine during various restart scenarios.
func Test_OpenStoreCloseStartupSingleNode(t *testing.T) {
s, ln := mustNewStore(t)
defer ln.Close()
test_OpenStoreCloseStartup(t, s)
} }
func test_SnapshotStress(t *testing.T, s *Store) { func test_SnapshotStress(t *testing.T, s *Store) {

Loading…
Cancel
Save