1
0
Fork 0

Reap nodes (#1114)

Support automatic reaping of nodes
master
Philip O'Toole 2 years ago committed by GitHub
parent c791461526
commit 07620cab41
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -1,3 +1,7 @@
## 7.11.0 (unreleased)
### New features
- [PR #1114](https://github.com/rqlite/rqlite/pull/1114): Support automatically removing non-reachable nodes after a configurable period. Fixes [issue #728](https://github.com/rqlite/rqlite/issues/728)
## 7.10.1 (November 11th 2022)
### Implementation changes and bug fixes

@ -86,32 +86,17 @@ assuming `localhost` is the address of the cluster leader. If you do not do this
If you cannot bring sufficient nodes back online such that the cluster can elect a leader, follow the instructions in the section titled _Dealing with failure_.
## Examples
_Quorum is defined as (N/2)+1 where N is the size of the cluster._
### 2-node cluster
Quorum of a 2-node cluster is 2.
## Automatically removing failed nodes
> :warning: **This functionality was introduced in version 7.11.0. It does not exist in earlier releases.**
If 1 node fails, quorum can no longer reached. The failing node must be recovered, as the failed node cannot be removed, and a new node cannot be added to the cluster to takes its place. This is why you shouldn't run 2-node clusters, except for testing purposes. In general it doesn't make much sense to run clusters with even-number of nodes at all.
If you remove a single node from a fully-functional 2-node cluster, quorum will be reduced to 1 since you will be left with a 1-node cluster.
rqlite supports automatically removing both voting and non-voting (read-only) nodes that have been non-reachable for a configurable period of time. A non-reachable node is defined as a node that the Leader cannot heartbeat with. To enable reaping set `-raft-reap-nodes` when launching `rqlited`. The reaping timeout for each type of node can be set independently, but defaults to 72 hours. It is recommended that this is set to at least double the maximum expected recoverable outage time for a node or network partition for nodes. Note that the timeout clock is reset if a cluster elects a new Leader.
### 3-node cluster
Quorum of a 3-node cluster is 2.
If 1 node fails, the cluster can still reach quorum. Remove the failing node, or restart it. If you remove the node, quorum remains at 2. You should add a new node to get the cluster back to 3 nodes in size. If 2 nodes fail, the cluster will not be able to reach quorum. You must instead restart at least one of the nodes.
If you remove a single node from a fully-functional 3-node cluster, quorum will be unchanged since you now have a 2-node cluster.
### 4-node cluster
Quorum of a 4-node cluster is 3.
The situation is similar for a 3-node cluster, in the sense that it can only tolerate the failure of a single node. If you remove a single node from a fully-functional 4-node cluster, quorum will decrease to 2 you now have a 3-node cluster.
### 5-node cluster
Quorum of a 5-node cluster is 3.
With a 5-node cluster, the cluster can tolerate the failure of 2 nodes. However if 3 nodes fail, at least one of those nodes must be restarted before you can make any change. If you remove a single node from a fully-functional 5-node cluster, quorum will be unchanged since you now have a 4-node cluster.
### Example configuration
Enable reaping, instructing rqlite to reap non-reachable voting nodes after 2 days, and non-reachable read-only nodes after 4 hours.
```bash
rqlited -node-id 1 -raft-reap-nodes -raft-reap-node-timeout=48h -raft-reap-read-only-node-timeout=4h data
```
For reaping to work properly you **must** set these flags on **every** voting node in the cluster -- in otherwords, every node that could potentially become the Leader. To effectively disable reaping for one type of node, but not the other, simply set the relevant timeout to a very long time.
# Dealing with failure
It is the nature of clustered systems that nodes can fail at anytime. Depending on the size of your cluster, it will tolerate various amounts of failure. With a 3-node cluster, it can tolerate the failure of a single node, including the leader.
@ -156,3 +141,30 @@ Below is an example, of bringing a 3-node cluster back online.
Next simply create entries for all the nodes you plan to bring up (in the example above that's 3 nodes). You must confirm that nodes you don't include here have indeed failed and will not later rejoin the cluster. Ensure that this file is the same across all remaining rqlite nodes. At this point, you can restart your rqlite cluster. In the example above, this means you'd start 3 nodes.
Once recovery is completed, the `peers.json` file is renamed to `peers.info`. `peers.info` will not trigger further recoveries, and simply acts as a record for future reference. It may be deleted at anytime.
# Example Cluster Sizes
_Quorum is defined as (N/2)+1 where N is the size of the cluster._
## 2-node cluster
Quorum of a 2-node cluster is 2.
If 1 node fails, quorum can no longer reached. The failing node must be recovered, as the failed node cannot be removed, and a new node cannot be added to the cluster to takes its place. This is why you shouldn't run 2-node clusters, except for testing purposes. In general it doesn't make much sense to run clusters with even-number of nodes at all.
If you remove a single node from a fully-functional 2-node cluster, quorum will be reduced to 1 since you will be left with a 1-node cluster.
## 3-node cluster
Quorum of a 3-node cluster is 2.
If 1 node fails, the cluster can still reach quorum. Remove the failing node, or restart it. If you remove the node, quorum remains at 2. You should add a new node to get the cluster back to 3 nodes in size. If 2 nodes fail, the cluster will not be able to reach quorum. You must instead restart at least one of the nodes.
If you remove a single node from a fully-functional 3-node cluster, quorum will be unchanged since you now have a 2-node cluster.
## 4-node cluster
Quorum of a 4-node cluster is 3.
The situation is similar for a 3-node cluster, in the sense that it can only tolerate the failure of a single node. If you remove a single node from a fully-functional 4-node cluster, quorum will decrease to 2 you now have a 3-node cluster.
## 5-node cluster
Quorum of a 5-node cluster is 3.
With a 5-node cluster, the cluster can tolerate the failure of 2 nodes. However if 3 nodes fail, at least one of those nodes must be restarted before you can make any change. If you remove a single node from a fully-functional 5-node cluster, quorum will be unchanged since you now have a 4-node cluster.

@ -151,7 +151,7 @@ type Config struct {
// RaftLeaderLeaseTimeout sets the leader lease timeout.
RaftLeaderLeaseTimeout time.Duration
// RaftHeartbeatTimeout sets the heartbeast timeout.
// RaftHeartbeatTimeout sets the heartbeat timeout.
RaftHeartbeatTimeout time.Duration
// RaftElectionTimeout sets the election timeout.
@ -168,6 +168,17 @@ type Config struct {
// a full database re-sync during recovery.
RaftNoFreelistSync bool
// RaftReapNodes enables reaping of non-reachable nodes.
RaftReapNodes bool
// RaftReapNodeTimeout sets the duration after which a non-reachable voting node is
// reaped i.e. removed from the cluster.
RaftReapNodeTimeout time.Duration
// RaftReapReadOnlyNodeTimeout sets the duration after which a non-reachable non-voting node is
// reaped i.e. removed from the cluster.
RaftReapReadOnlyNodeTimeout time.Duration
// ClusterConnectTimeout sets the timeout when initially connecting to another node in
// the cluster, for non-Raft communications.
ClusterConnectTimeout time.Duration
@ -388,6 +399,9 @@ func ParseFlags(name, desc string, build *BuildInfo) (*Config, error) {
flag.BoolVar(&config.RaftShutdownOnRemove, "raft-remove-shutdown", false, "Shutdown Raft if node removed")
flag.BoolVar(&config.RaftNoFreelistSync, "raft-no-freelist-sync", false, "Do not sync Raft log database freelist to disk")
flag.StringVar(&config.RaftLogLevel, "raft-log-level", "INFO", "Minimum log level for Raft module")
flag.BoolVar(&config.RaftReapNodes, "raft-reap-nodes", false, "Enable reaping of non-reachable nodes")
flag.DurationVar(&config.RaftReapNodeTimeout, "raft-reap-node-timeout", 72*time.Hour, "Time after which a nonreachable voting node will be reaped")
flag.DurationVar(&config.RaftReapReadOnlyNodeTimeout, "raft-reap-read-only-node-timeout", 72*time.Hour, "Time after which a non-reachable non-voting node will be reaped")
flag.DurationVar(&config.ClusterConnectTimeout, "cluster-connect-timeout", 30*time.Second, "Timeout for initial connection to other nodes")
flag.IntVar(&config.WriteQueueCap, "write-queue-capacity", 1024, "Write queue capacity")
flag.IntVar(&config.WriteQueueBatchSz, "write-queue-batch-size", 128, "Write queue batch size")

@ -197,6 +197,9 @@ func createStore(cfg *Config, ln *tcp.Layer) (*store.Store, error) {
str.ElectionTimeout = cfg.RaftElectionTimeout
str.ApplyTimeout = cfg.RaftApplyTimeout
str.BootstrapExpect = cfg.BootstrapExpect
str.ReapNodes = cfg.RaftReapNodes
str.ReapTimeout = cfg.RaftReapNodeTimeout
str.ReapReadOnlyTimeout = cfg.RaftReapReadOnlyNodeTimeout
isNew := store.IsNewNode(dataPath)
if isNew {

@ -26,3 +26,24 @@ type Servers []*Server
func (s Servers) Less(i, j int) bool { return s[i].ID < s[j].ID }
func (s Servers) Len() int { return len(s) }
func (s Servers) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
// IsReadOnly returns whether the given node, as specified by its Raft ID,
// is a read-only (non-voting) node. If no node is found with the given ID
// then found will be false.
func IsReadOnly(servers []*Server, id string) (readOnly bool, found bool) {
readOnly = false
found = false
if servers == nil || id == "" {
return
}
for _, n := range servers {
if n != nil && n.ID == id {
readOnly = n.Suffrage == "Nonvoter"
found = true
return
}
}
return
}

@ -0,0 +1,45 @@
package store
import (
"testing"
)
func Test_IsReadOnly(t *testing.T) {
var servers []*Server
if _, found := IsReadOnly(nil, "1"); found {
t.Fatalf("found should be false")
}
servers = make([]*Server, 1)
if _, found := IsReadOnly(servers, ""); found {
t.Fatalf("found should be false")
}
if _, found := IsReadOnly(servers, "node1"); found {
t.Fatalf("found should be false")
}
servers[0] = &Server{
ID: "node1",
Addr: "localhost:4002",
Suffrage: "Voter",
}
if ro, found := IsReadOnly(servers, "node1"); ro || !found {
t.Fatalf("IsReadOnly returned ro: %t, found: %t", ro, found)
}
servers[0] = &Server{
ID: "node1",
Addr: "localhost:4002",
Suffrage: "Voter",
}
if ro, found := IsReadOnly(servers, "node2"); found {
t.Fatalf("IsReadOnly returned ro: %t, found: %t", ro, found)
}
servers[0] = &Server{
ID: "node1",
Addr: "localhost:4002",
Suffrage: "Nonvoter",
}
if ro, found := IsReadOnly(servers, "node1"); !ro || !found {
t.Fatalf("IsReadOnly returned ro: %t, found: %t", ro, found)
}
}

@ -60,6 +60,7 @@ const (
retainSnapshotCount = 2
applyTimeout = 10 * time.Second
openTimeout = 120 * time.Second
reapTimeout = 72 * time.Hour
sqliteFile = "db.sqlite"
leaderWaitDelay = 100 * time.Millisecond
appliedWaitDelay = 100 * time.Millisecond
@ -67,7 +68,7 @@ const (
connectionTimeout = 10 * time.Second
raftLogCacheSize = 512
trailingScale = 1.25
observerChanLen = 5 // Support any fast back-to-back leadership changes.
observerChanLen = 50
)
const (
@ -87,6 +88,8 @@ const (
snapshotDBOnDiskSize = "snapshot_db_ondisk_size"
leaderChangesObserved = "leader_changes_observed"
leaderChangesDropped = "leader_changes_dropped"
nodesReapedOK = "nodes_reaped_ok"
nodesReapedFailed = "nodes_reaped_failed"
)
// stats captures stats for the Store.
@ -109,6 +112,8 @@ func init() {
stats.Add(snapshotDBOnDiskSize, 0)
stats.Add(leaderChangesObserved, 0)
stats.Add(leaderChangesDropped, 0)
stats.Add(nodesReapedOK, 0)
stats.Add(nodesReapedFailed, 0)
}
// ClusterState defines the possible Raft states the current node can be in
@ -153,7 +158,7 @@ type Store struct {
raftStable raft.StableStore // Persistent k-v store.
boltStore *rlog.Log // Physical store.
// Leader change observer
// Raft changes observer
leaderObserversMu sync.RWMutex
leaderObservers []chan<- struct{}
observerClose chan struct{}
@ -196,6 +201,11 @@ type Store struct {
RaftLogLevel string
NoFreeListSync bool
// Node-reaping configuration
ReapNodes bool
ReapTimeout time.Duration
ReapReadOnlyTimeout time.Duration
numTrailingLogs uint64
// For whitebox testing
@ -234,18 +244,20 @@ func New(ln Listener, c *Config) *Store {
}
return &Store{
ln: ln,
raftDir: c.Dir,
peersPath: filepath.Join(c.Dir, peersPath),
peersInfoPath: filepath.Join(c.Dir, peersInfoPath),
raftID: c.ID,
dbConf: c.DBConf,
dbPath: dbPath,
leaderObservers: make([]chan<- struct{}, 0),
reqMarshaller: command.NewRequestMarshaler(),
logger: logger,
notifyingNodes: make(map[string]*Server),
ApplyTimeout: applyTimeout,
ln: ln,
raftDir: c.Dir,
peersPath: filepath.Join(c.Dir, peersPath),
peersInfoPath: filepath.Join(c.Dir, peersInfoPath),
raftID: c.ID,
dbConf: c.DBConf,
dbPath: dbPath,
leaderObservers: make([]chan<- struct{}, 0),
reqMarshaller: command.NewRequestMarshaler(),
logger: logger,
notifyingNodes: make(map[string]*Server),
ApplyTimeout: applyTimeout,
ReapTimeout: reapTimeout,
ReapReadOnlyTimeout: reapTimeout,
}
}
@ -373,8 +385,9 @@ func (s *Store) Open() (retErr error) {
// Open the observer channels.
s.observerChan = make(chan raft.Observation, observerChanLen)
s.observer = raft.NewObserver(s.observerChan, false, func(o *raft.Observation) bool {
_, ok := o.Data.(raft.LeaderObservation)
return ok
_, isLeaderChange := o.Data.(raft.LeaderObservation)
_, isFailedHeartBeat := o.Data.(raft.FailedHeartbeatObservation)
return isLeaderChange || (isFailedHeartBeat && s.ReapNodes)
})
// Register and listen for leader changes.
@ -700,20 +713,23 @@ func (s *Store) Stats() (map[string]interface{}, error) {
"observed": s.observer.GetNumObserved(),
"dropped": s.observer.GetNumDropped(),
},
"startup_on_disk": s.StartupOnDisk,
"apply_timeout": s.ApplyTimeout.String(),
"heartbeat_timeout": s.HeartbeatTimeout.String(),
"election_timeout": s.ElectionTimeout.String(),
"snapshot_threshold": s.SnapshotThreshold,
"snapshot_interval": s.SnapshotInterval,
"no_freelist_sync": s.NoFreeListSync,
"trailing_logs": s.numTrailingLogs,
"request_marshaler": s.reqMarshaller.Stats(),
"nodes": nodes,
"dir": s.raftDir,
"dir_size": dirSz,
"sqlite3": dbStatus,
"db_conf": s.dbConf,
"startup_on_disk": s.StartupOnDisk,
"apply_timeout": s.ApplyTimeout.String(),
"heartbeat_timeout": s.HeartbeatTimeout.String(),
"election_timeout": s.ElectionTimeout.String(),
"snapshot_threshold": s.SnapshotThreshold,
"snapshot_interval": s.SnapshotInterval,
"reap_nodes": s.ReapNodes,
"reap_timeout": s.ReapTimeout,
"reap_read_only_timeout": s.ReapReadOnlyTimeout,
"no_freelist_sync": s.NoFreeListSync,
"trailing_logs": s.numTrailingLogs,
"request_marshaler": s.reqMarshaller.Stats(),
"nodes": nodes,
"dir": s.raftDir,
"dir_size": dirSz,
"sqlite3": dbStatus,
"db_conf": s.dbConf,
}
return status, nil
}
@ -1332,17 +1348,49 @@ func (s *Store) observe() (closeCh, doneCh chan struct{}) {
defer close(doneCh)
for {
select {
case <-s.observerChan:
s.leaderObserversMu.RLock()
for i := range s.leaderObservers {
select {
case s.leaderObservers[i] <- struct{}{}:
stats.Add(leaderChangesObserved, 1)
default:
stats.Add(leaderChangesDropped, 1)
case o := <-s.observerChan:
switch signal := o.Data.(type) {
case raft.FailedHeartbeatObservation:
nodes, err := s.Nodes()
if err != nil {
s.logger.Printf("failed to get nodes configuration during reap check: %s", err.Error())
}
id := string(signal.PeerID)
dur := time.Since(signal.LastContact)
isReadOnly, found := IsReadOnly(nodes, id)
if !found {
s.logger.Printf("node %s is not present in configuration", id)
break
}
if (isReadOnly && dur > s.ReapReadOnlyTimeout) || (!isReadOnly && dur > s.ReapTimeout) {
pn := "voting node"
if isReadOnly {
pn = "non-voting node"
}
if err := s.remove(id); err != nil {
stats.Add(nodesReapedFailed, 1)
s.logger.Printf("failed to reap %s %s: %s", pn, id, err.Error())
} else {
stats.Add(nodesReapedOK, 1)
s.logger.Printf("successfully reaped %s %s", pn, id)
}
}
case raft.LeaderObservation:
s.leaderObserversMu.RLock()
for i := range s.leaderObservers {
select {
case s.leaderObservers[i] <- struct{}{}:
stats.Add(leaderChangesObserved, 1)
default:
stats.Add(leaderChangesDropped, 1)
}
}
s.leaderObserversMu.RUnlock()
}
s.leaderObserversMu.RUnlock()
case <-closeCh:
return
}

@ -1503,3 +1503,154 @@ func Test_MultiNodeClusterRecoverFull(t *testing.T) {
t.Fatalf("got incorrect results from recovered node: %s", rows)
}
}
// Test_MultiNodeClusterReapNodes tests that unreachable nodes are reaped.
func Test_MultiNodeClusterReapNodes(t *testing.T) {
cfgStoreFn := func(n *Node) {
n.Store.ReapNodes = true
n.Store.ReapTimeout = time.Second
n.Store.ReapReadOnlyTimeout = time.Second
}
node1 := mustNewLeaderNode()
defer node1.Deprovision()
cfgStoreFn(node1)
_, err := node1.WaitForLeader()
if err != nil {
t.Fatalf("failed waiting for leader: %s", err.Error())
}
node2 := mustNewNode(false)
defer node2.Deprovision()
cfgStoreFn(node2)
if err := node2.Join(node1); err != nil {
t.Fatalf("node failed to join leader: %s", err.Error())
}
_, err = node2.WaitForLeader()
if err != nil {
t.Fatalf("failed waiting for leader: %s", err.Error())
}
// Get the new leader, in case it changed.
c := Cluster{node1, node2}
leader, err := c.Leader()
if err != nil {
t.Fatalf("failed to find cluster leader: %s", err.Error())
}
node3 := mustNewNode(false)
defer node3.Deprovision()
cfgStoreFn(node3)
if err := node3.Join(leader); err != nil {
t.Fatalf("node failed to join leader: %s", err.Error())
}
_, err = node3.WaitForLeader()
if err != nil {
t.Fatalf("failed waiting for leader: %s", err.Error())
}
// Get the new leader, in case it changed.
c = Cluster{node1, node2, node3}
leader, err = c.Leader()
if err != nil {
t.Fatalf("failed to find cluster leader: %s", err.Error())
}
nonVoter := mustNewNode(false)
defer nonVoter.Deprovision()
cfgStoreFn(nonVoter)
if err := nonVoter.JoinAsNonVoter(leader); err != nil {
t.Fatalf("non-voting node failed to join leader: %s", err.Error())
}
_, err = nonVoter.WaitForLeader()
if err != nil {
t.Fatalf("failed waiting for leader: %s", err.Error())
}
c = Cluster{node1, node2, node3, nonVoter}
// Confirm non-voter node is in the the cluster config.
nodes, err := leader.Nodes(true)
if err != nil {
t.Fatalf("failed to get nodes: %s", err.Error())
}
if !nodes.HasAddr(nonVoter.RaftAddr) {
t.Fatalf("nodes do not contain non-voter node")
}
// Kill non-voter node, confirm it's removed.
nonVoter.Deprovision()
tFn := func() bool {
nodes, _ = leader.Nodes(true)
return !nodes.HasAddr(nonVoter.RaftAddr)
}
if !trueOrTimeout(tFn, 20*time.Second) {
t.Fatalf("timed out waiting for non-voting node to be reaped")
}
// Confirm voting node is in the the cluster config.
nodes, err = leader.Nodes(true)
if err != nil {
t.Fatalf("failed to get nodes: %s", err.Error())
}
if !nodes.HasAddr(node3.RaftAddr) {
t.Fatalf("nodes do not contain non-voter node")
}
// Kill voting node, confirm it's removed.
node3.Deprovision()
tFn = func() bool {
nodes, _ = leader.Nodes(true)
return !nodes.HasAddr(node3.RaftAddr)
}
if !trueOrTimeout(tFn, 20*time.Second) {
t.Fatalf("timed out waiting for voting node to be reaped")
}
}
// Test_MultiNodeClusterNoReap tests that a node is not reaped before
// its time.
func Test_MultiNodeClusterNoReap(t *testing.T) {
cfgStoreFn := func(n *Node) {
n.Store.ReapNodes = true
n.Store.ReapReadOnlyTimeout = 120 * time.Second
}
node1 := mustNewLeaderNode()
defer node1.Deprovision()
cfgStoreFn(node1)
_, err := node1.WaitForLeader()
if err != nil {
t.Fatalf("failed waiting for leader: %s", err.Error())
}
nonVoter := mustNewNode(false)
defer nonVoter.Deprovision()
cfgStoreFn(nonVoter)
if err := nonVoter.JoinAsNonVoter(node1); err != nil {
t.Fatalf("non-voting node failed to join leader: %s", err.Error())
}
_, err = nonVoter.WaitForLeader()
if err != nil {
t.Fatalf("failed waiting for leader: %s", err.Error())
}
// Confirm non-voter node is in the the cluster config.
nodes, err := node1.Nodes(true)
if err != nil {
t.Fatalf("failed to get nodes: %s", err.Error())
}
if !nodes.HasAddr(nonVoter.RaftAddr) {
t.Fatalf("nodes do not contain non-voter node")
}
// Kill non-voter node, confirm it's not removed.
nonVoter.Deprovision()
tFn := func() bool {
nodes, _ = node1.Nodes(true)
return !nodes.HasAddr(nonVoter.RaftAddr)
}
if trueOrTimeout(tFn, 20*time.Second) {
t.Fatalf("didn't time out waiting for node to be removed")
}
}

@ -230,6 +230,16 @@ type NodesStatus map[string]struct {
Leader bool `json:"leader,omitempty"`
}
// HasAddr returns whether any node in the NodeStatus has the given Raft address.
func (n NodesStatus) HasAddr(addr string) bool {
for i := range n {
if n[i].Addr == addr {
return true
}
}
return false
}
// Nodes returns the sNodes endpoint output for node.
func (n *Node) Nodes(includeNonVoters bool) (NodesStatus, error) {
v, _ := url.Parse("http://" + n.APIAddr + "/nodes")
@ -915,3 +925,23 @@ func (m *mockCredentialStore) AA(username, password, perm string) bool {
func mustNewMockCredentialStore() *mockCredentialStore {
return &mockCredentialStore{HasPermOK: true}
}
// trueOrTimeout returns true if the given function returns true
// within the timeout. Returns false otherwise.
func trueOrTimeout(fn func() bool, dur time.Duration) bool {
timer := time.NewTimer(dur)
defer timer.Stop()
ticker := time.NewTicker(1000 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-timer.C:
return false
case <-ticker.C:
if fn() {
return true
}
}
}
}

Loading…
Cancel
Save