1
0
Fork 0

Merge pull request #1269 from rqlite/add-wait-for-remove

Add wait for remove
master
Philip O'Toole 1 year ago committed by GitHub
commit dae39a4914
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -1,3 +1,7 @@
## 7.18.2 (unreleased)
### Implementation changes and bug fixes
- [PR #1269](https://github.com/rqlite/rqlite/pull/1269): Add WaitForRemoval() to Store.
## 7.18.1 (May 20th 2023)
This release also includes some small logging improvements, related to node-shutdown.

@ -44,6 +44,21 @@ func (s Servers) IsReadOnly(id string) (readOnly bool, found bool) {
return
}
// Contains returns whether the given node, as specified by its Raft ID,
// is a member of the set of servers.
func (s Servers) Contains(id string) bool {
if s == nil || id == "" {
return false
}
for _, n := range s {
if n != nil && n.ID == id {
return true
}
}
return false
}
func (s Servers) Less(i, j int) bool { return s[i].ID < s[j].ID }
func (s Servers) Len() int { return len(s) }
func (s Servers) Swap(i, j int) { s[i], s[j] = s[j], s[i] }

@ -64,3 +64,50 @@ func Test_IsReadOnly(t *testing.T) {
})
}
}
func Test_Contains(t *testing.T) {
testCases := []struct {
name string
servers Servers
nodeID string
expected bool
}{
{
name: "EmptyServers",
servers: nil,
nodeID: "1",
expected: false,
},
{
name: "EmptyNodeID",
servers: Servers(make([]*Server, 1)),
nodeID: "",
expected: false,
},
{
name: "NonExistentNode",
servers: Servers([]*Server{
{ID: "node1", Addr: "localhost:4002", Suffrage: "Voter"},
}),
nodeID: "node2",
expected: false,
},
{
name: "ExistingNode",
servers: Servers([]*Server{
{ID: "node1", Addr: "localhost:4002", Suffrage: "Voter"},
}),
nodeID: "node1",
expected: true,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
actual := tc.servers.Contains(tc.nodeID)
if actual != tc.expected {
t.Fatalf("Contains for %s returned %t, expected %t", tc.name, actual, tc.expected)
}
})
}
}

@ -696,6 +696,30 @@ func (s *Store) Nodes() ([]*Server, error) {
return servers, nil
}
// WaitForRemoval blocks until a node with the given ID is removed from the
// cluster or the timeout expires.
func (s *Store) WaitForRemoval(id string, timeout time.Duration) error {
tck := time.NewTicker(appliedWaitDelay)
defer tck.Stop()
tmr := time.NewTimer(timeout)
defer tmr.Stop()
for {
select {
case <-tck.C:
nodes, err := s.Nodes()
if err == nil {
servers := Servers(nodes)
if !servers.Contains(id) {
return nil
}
}
case <-tmr.C:
return fmt.Errorf("timeout expired")
}
}
}
// WaitForLeader blocks until a leader is detected, or the timeout expires.
func (s *Store) WaitForLeader(timeout time.Duration) (string, error) {
tck := time.NewTicker(leaderWaitDelay)

@ -1629,6 +1629,37 @@ func Test_SingleNodeStepdownNoWaitOK(t *testing.T) {
}
}
func Test_SingleNodeWaitForRemove(t *testing.T) {
s, ln := mustNewStore(t, true)
defer ln.Close()
if err := s.Open(); err != nil {
t.Fatalf("failed to open single-node store: %s", err.Error())
}
defer s.Close(true)
if err := s.Bootstrap(NewServer(s.ID(), s.Addr(), true)); err != nil {
t.Fatalf("failed to bootstrap single-node store: %s", err.Error())
}
if _, err := s.WaitForLeader(10 * time.Second); err != nil {
t.Fatalf("Error waiting for leader: %s", err)
}
// Should timeout waiting for removal of ourselves
err := s.WaitForRemoval(s.ID(), time.Second)
// if err is nil then fail the test
if err == nil {
t.Fatalf("no error waiting for removal of non-existent node")
}
if !strings.Contains(err.Error(), "timeout expired") {
t.Fatalf("waiting for removal resulted in wrong error: %s", err.Error())
}
// should be no error waiting for removal of non-existent node
err = s.WaitForRemoval("non-existent-node", time.Second)
if err != nil {
t.Fatalf("error waiting for removal of non-existent node: %s", err.Error())
}
}
func Test_MultiNodeJoinRemove(t *testing.T) {
s0, ln0 := mustNewStore(t, true)
defer ln0.Close()
@ -1690,6 +1721,16 @@ func Test_MultiNodeJoinRemove(t *testing.T) {
t.Fatalf("cluster does not have correct nodes")
}
// Should timeout waiting for removal of other node
err = s0.WaitForRemoval(s1.ID(), time.Second)
// if err is nil then fail the test
if err == nil {
t.Fatalf("no error waiting for removal of non-existent node")
}
if !strings.Contains(err.Error(), "timeout expired") {
t.Fatalf("waiting for removal resulted in wrong error: %s", err.Error())
}
// Remove a node.
if err := s0.Remove(removeNodeRequest(s1.ID())); err != nil {
t.Fatalf("failed to remove %s from cluster: %s", s1.ID(), err.Error())
@ -1705,6 +1746,13 @@ func Test_MultiNodeJoinRemove(t *testing.T) {
if s0.ID() != nodes[0].ID {
t.Fatalf("cluster does not have correct nodes post remove")
}
// Should be no error now waiting for removal of other node
err = s0.WaitForRemoval(s1.ID(), time.Second)
// if err is nil then fail the test
if err != nil {
t.Fatalf("error waiting for removal of removed node")
}
}
func Test_MultiNodeStepdown(t *testing.T) {

Loading…
Cancel
Save