diff --git a/CHANGELOG.md b/CHANGELOG.md index 40490f8c..d7ae9fe0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +## 7.18.2 (unreleased) +### Implementation changes and bug fixes +- [PR #1269](https://github.com/rqlite/rqlite/pull/1269): Add WaitForRemoval() to Store. + ## 7.18.1 (May 20th 2023) This release also includes some small logging improvements, related to node-shutdown. diff --git a/store/server.go b/store/server.go index dd8029e5..f2b3ef21 100644 --- a/store/server.go +++ b/store/server.go @@ -44,6 +44,21 @@ func (s Servers) IsReadOnly(id string) (readOnly bool, found bool) { return } +// Contains returns whether the given node, as specified by its Raft ID, +// is a member of the set of servers. +func (s Servers) Contains(id string) bool { + if s == nil || id == "" { + return false + } + + for _, n := range s { + if n != nil && n.ID == id { + return true + } + } + return false +} + func (s Servers) Less(i, j int) bool { return s[i].ID < s[j].ID } func (s Servers) Len() int { return len(s) } func (s Servers) Swap(i, j int) { s[i], s[j] = s[j], s[i] } diff --git a/store/server_test.go b/store/server_test.go index 586684f3..e967569e 100644 --- a/store/server_test.go +++ b/store/server_test.go @@ -64,3 +64,50 @@ func Test_IsReadOnly(t *testing.T) { }) } } + +func Test_Contains(t *testing.T) { + testCases := []struct { + name string + servers Servers + nodeID string + expected bool + }{ + { + name: "EmptyServers", + servers: nil, + nodeID: "1", + expected: false, + }, + { + name: "EmptyNodeID", + servers: Servers(make([]*Server, 1)), + nodeID: "", + expected: false, + }, + { + name: "NonExistentNode", + servers: Servers([]*Server{ + {ID: "node1", Addr: "localhost:4002", Suffrage: "Voter"}, + }), + nodeID: "node2", + expected: false, + }, + { + name: "ExistingNode", + servers: Servers([]*Server{ + {ID: "node1", Addr: "localhost:4002", Suffrage: "Voter"}, + }), + nodeID: "node1", + expected: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + actual := tc.servers.Contains(tc.nodeID) + if actual != tc.expected { + t.Fatalf("Contains for %s returned %t, expected %t", tc.name, actual, tc.expected) + } + }) + } +} diff --git a/store/store.go b/store/store.go index 51bb5b9c..656ee5a2 100644 --- a/store/store.go +++ b/store/store.go @@ -696,6 +696,30 @@ func (s *Store) Nodes() ([]*Server, error) { return servers, nil } +// WaitForRemoval blocks until a node with the given ID is removed from the +// cluster or the timeout expires. +func (s *Store) WaitForRemoval(id string, timeout time.Duration) error { + tck := time.NewTicker(appliedWaitDelay) + defer tck.Stop() + tmr := time.NewTimer(timeout) + defer tmr.Stop() + + for { + select { + case <-tck.C: + nodes, err := s.Nodes() + if err == nil { + servers := Servers(nodes) + if !servers.Contains(id) { + return nil + } + } + case <-tmr.C: + return fmt.Errorf("timeout expired") + } + } +} + // WaitForLeader blocks until a leader is detected, or the timeout expires. func (s *Store) WaitForLeader(timeout time.Duration) (string, error) { tck := time.NewTicker(leaderWaitDelay) diff --git a/store/store_test.go b/store/store_test.go index c184ec86..d101f109 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -1629,6 +1629,37 @@ func Test_SingleNodeStepdownNoWaitOK(t *testing.T) { } } +func Test_SingleNodeWaitForRemove(t *testing.T) { + s, ln := mustNewStore(t, true) + defer ln.Close() + if err := s.Open(); err != nil { + t.Fatalf("failed to open single-node store: %s", err.Error()) + } + defer s.Close(true) + if err := s.Bootstrap(NewServer(s.ID(), s.Addr(), true)); err != nil { + t.Fatalf("failed to bootstrap single-node store: %s", err.Error()) + } + if _, err := s.WaitForLeader(10 * time.Second); err != nil { + t.Fatalf("Error waiting for leader: %s", err) + } + + // Should timeout waiting for removal of ourselves + err := s.WaitForRemoval(s.ID(), time.Second) + // if err is nil then fail the test + if err == nil { + t.Fatalf("no error waiting for removal of non-existent node") + } + if !strings.Contains(err.Error(), "timeout expired") { + t.Fatalf("waiting for removal resulted in wrong error: %s", err.Error()) + } + + // should be no error waiting for removal of non-existent node + err = s.WaitForRemoval("non-existent-node", time.Second) + if err != nil { + t.Fatalf("error waiting for removal of non-existent node: %s", err.Error()) + } +} + func Test_MultiNodeJoinRemove(t *testing.T) { s0, ln0 := mustNewStore(t, true) defer ln0.Close() @@ -1690,6 +1721,16 @@ func Test_MultiNodeJoinRemove(t *testing.T) { t.Fatalf("cluster does not have correct nodes") } + // Should timeout waiting for removal of other node + err = s0.WaitForRemoval(s1.ID(), time.Second) + // if err is nil then fail the test + if err == nil { + t.Fatalf("no error waiting for removal of non-existent node") + } + if !strings.Contains(err.Error(), "timeout expired") { + t.Fatalf("waiting for removal resulted in wrong error: %s", err.Error()) + } + // Remove a node. if err := s0.Remove(removeNodeRequest(s1.ID())); err != nil { t.Fatalf("failed to remove %s from cluster: %s", s1.ID(), err.Error()) @@ -1705,6 +1746,13 @@ func Test_MultiNodeJoinRemove(t *testing.T) { if s0.ID() != nodes[0].ID { t.Fatalf("cluster does not have correct nodes post remove") } + + // Should be no error now waiting for removal of other node + err = s0.WaitForRemoval(s1.ID(), time.Second) + // if err is nil then fail the test + if err != nil { + t.Fatalf("error waiting for removal of removed node") + } } func Test_MultiNodeStepdown(t *testing.T) {