From 481bb0937c1907bd18379bf34f33d2df413aca56 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Sat, 13 Jan 2024 12:10:17 -0500 Subject: [PATCH] Unit test DB applied index --- store/store.go | 8 ++++++ store/store_test.go | 65 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 73 insertions(+) diff --git a/store/store.go b/store/store.go index 81cdb605..88f1aa69 100644 --- a/store/store.go +++ b/store/store.go @@ -683,6 +683,14 @@ func (s *Store) WaitForAppliedIndex(idx uint64, timeout time.Duration) error { } } +// DBAppliedIndex returns the index of the last Raft log that changed the +// underlying database. +func (s *Store) DBAppliedIndex() uint64 { + s.dbAppliedIdxMu.RLock() + defer s.dbAppliedIdxMu.RUnlock() + return s.dbAppliedIdx +} + // IsLeader is used to determine if the current node is cluster leader func (s *Store) IsLeader() bool { return s.raft.State() == raft.Leader diff --git a/store/store_test.go b/store/store_test.go index 4aeaba8a..12982d57 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -1751,6 +1751,71 @@ func Test_SingleNodeWaitForRemove(t *testing.T) { } } +func Test_MultiNodeDBAppliedIndex(t *testing.T) { + s0, ln0 := mustNewStore(t) + defer ln0.Close() + if err := s0.Open(); err != nil { + t.Fatalf("failed to open single-node store: %s", err.Error()) + } + defer s0.Close(true) + if err := s0.Bootstrap(NewServer(s0.ID(), s0.Addr(), true)); err != nil { + t.Fatalf("failed to bootstrap single-node store: %s", err.Error()) + } + if _, err := s0.WaitForLeader(10 * time.Second); err != nil { + t.Fatalf("Error waiting for leader: %s", err) + } + + // Join a second node to the first. + s1, ln1 := mustNewStore(t) + defer ln1.Close() + if err := s1.Open(); err != nil { + t.Fatalf("failed to open single-node store: %s", err.Error()) + } + defer s1.Close(true) + if err := s1.Bootstrap(NewServer(s1.ID(), s1.Addr(), true)); err != nil { + t.Fatalf("failed to bootstrap single-node store: %s", err.Error()) + } + if err := s0.Join(joinRequest(s1.ID(), s1.Addr(), true)); err != nil { + t.Fatalf("failed to join to node at %s: %s", s0.Addr(), err.Error()) + } + _, err := s1.WaitForLeader(10 * time.Second) + if err != nil { + t.Fatalf("failed to wait for leader on follower: %s", err.Error()) + } + + // Check that the DBAppliedIndex is the same on both nodes. + if s0.DBAppliedIndex() != s1.DBAppliedIndex() { + t.Fatalf("applied index mismatch") + } + + // Write some data, and check that the DBAppliedIndex remains the same on both nodes. + er := executeRequestFromStrings([]string{ + `CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)`, + `INSERT INTO foo(id, name) VALUES(1, "fiona")`, + }, false, false) + _, err = s0.Execute(er) + if err != nil { + t.Fatalf("failed to execute on single node: %s", err.Error()) + } + testPoll(t, func() bool { + // wait until the SELECT count(*) returns 1 on s1 + qr := queryRequestFromString("SELECT count(*) FROM foo", false, true) + qr.Level = proto.QueryRequest_QUERY_REQUEST_LEVEL_NONE + r, err := s1.Query(qr) + if err != nil { + return false + } + if exp, got := `[[1]]`, asJSON(r[0].Values); exp != got { + return false + } + return true + }, 250*time.Millisecond, 3*time.Second) + + if s0.DBAppliedIndex() != s1.DBAppliedIndex() { + t.Fatalf("applied index mismatch (%d, %d)", s0.DBAppliedIndex(), s1.DBAppliedIndex()) + } +} + func Test_MultiNodeJoinRemove(t *testing.T) { s0, ln0 := mustNewStore(t) defer ln0.Close()