From 091c078b02bb9c32e8d05f01d2301e731ae3f369 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Tue, 13 Feb 2024 08:29:13 -0500 Subject: [PATCH 1/6] Make a init case cleare for IsStaleRead --- store/state.go | 5 +++++ store/state_test.go | 7 +++++++ 2 files changed, 12 insertions(+) diff --git a/store/state.go b/store/state.go index 74191a9f..87094022 100644 --- a/store/state.go +++ b/store/state.go @@ -39,6 +39,11 @@ func IsStaleRead( // Strict mode is not enabled, so no further checks are needed. return false } + if lastAppendedAtTime.IsZero() { + // We've yet to be told about any appended log entries, so we + // assume we're caught up. + return false + } if fsmIndex == commitIndex { // FSM index is the same as the commit index, so we're caught up. return false diff --git a/store/state_test.go b/store/state_test.go index 98f6ffca..c7dd99ad 100644 --- a/store/state_test.go +++ b/store/state_test.go @@ -44,6 +44,13 @@ func Test_IsStaleRead(t *testing.T) { Freshness: time.Second, Exp: true, }, + { + Name: "freshness set and ok, strict is set, but no appended time", + LeaderLastContact: time.Now(), + Freshness: 10 * time.Second, + Strict: true, + Exp: false, + }, { Name: "freshness set, is ok, strict is set, appended time exceeds, but applied index is up-to-date", LeaderLastContact: time.Now(), From 542d1f6086589bdabaa3e170808de685daa4dc9d Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Tue, 13 Feb 2024 08:32:07 -0500 Subject: [PATCH 2/6] CHANGELOG --- CHANGELOG.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index fca52d45..9711c480 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,8 @@ +## 8.20.1 (unreleased) + +### Implementation changes and bug fixes +- [PR #1684](https://github.com/rqlite/rqlite/pull/1684): Improvements to strict _Stale Read_ checks + ## 8.20.0 (February 12th 2024) ### New features - [PR #1681](https://github.com/rqlite/rqlite/pull/1681): Add `freshness_strict` flag, which checks for _Stale Reads_ using Leader time. Thanks @aderouineau From 7f1977176d44aeacef069fff47c495351c35b0fe Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Tue, 13 Feb 2024 08:43:15 -0500 Subject: [PATCH 3/6] Track latest committed FSM command Not all Raft log entries represent changes to the FSM. Some are cluster config changes, which are never passed to rqlite. This means that CommitIndex can be ahead of FSM index, even though the FSM is fully caught up. --- store/transport.go | 30 ++++++++++++++++++++++++------ 1 file changed, 24 insertions(+), 6 deletions(-) diff --git a/store/transport.go b/store/transport.go index 53fe8ef2..9472c1b1 100644 --- a/store/transport.go +++ b/store/transport.go @@ -3,6 +3,7 @@ package store import ( "io" "net" + "sync/atomic" "time" "github.com/hashicorp/raft" @@ -52,18 +53,26 @@ func (t *Transport) Addr() net.Addr { // custom configuration of the InstallSnapshot method. type NodeTransport struct { *raft.NetworkTransport - done chan struct{} - closed bool + commandCommitIndex *atomic.Uint64 + done chan struct{} + closed bool } // NewNodeTransport returns an initialized NodeTransport. func NewNodeTransport(transport *raft.NetworkTransport) *NodeTransport { return &NodeTransport{ - NetworkTransport: transport, - done: make(chan struct{}), + NetworkTransport: transport, + commandCommitIndex: &atomic.Uint64{}, + done: make(chan struct{}), } } +// CommandCommitIndex returns the index of the latest committed log entry +// which is applied to the FSM. +func (n *NodeTransport) CommandCommitIndex() uint64 { + return n.commandCommitIndex.Load() +} + // Close closes the transport func (n *NodeTransport) Close() error { if n.closed { @@ -100,8 +109,17 @@ func (n *NodeTransport) Consumer() <-chan raft.RPC { case <-n.done: return case rpc := <-srcCh: - if rpc.Reader != nil { - rpc.Reader = gzip.NewDecompressor(rpc.Reader) + switch cmd := rpc.Command.(type) { + case *raft.InstallSnapshotRequest: + if rpc.Reader != nil { + rpc.Reader = gzip.NewDecompressor(rpc.Reader) + } + case raft.AppendEntriesRequest: + for _, e := range cmd.Entries { + if e.Type == raft.LogCommand { + n.commandCommitIndex.Store(e.Index) + } + } } ch <- rpc } From 8255d199a2fec2ba7bdc20703cb966e692c262a7 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Tue, 13 Feb 2024 09:11:03 -0500 Subject: [PATCH 4/6] Use FSM Command Commit Index for Stale Reads check --- store/store.go | 24 +++++++++--------- store/store_multi_test.go | 51 +++++++++++++++++++++++++++++++++++++++ store/store_test.go | 10 ++++---- store/transport.go | 2 +- 4 files changed, 70 insertions(+), 17 deletions(-) diff --git a/store/store.go b/store/store.go index 430fc9e3..a89d31fa 100644 --- a/store/store.go +++ b/store/store.go @@ -336,7 +336,7 @@ type Store struct { // For whitebox testing numAutoVacuums int numIgnoredJoins int - numNoops int + numNoops *atomic.Uint64 numSnapshotsMu sync.Mutex numSnapshots int } @@ -384,6 +384,7 @@ func New(ly Layer, c *Config) *Store { fsmUpdateTime: NewAtomicTime(), appendedAtTime: NewAtomicTime(), dbAppliedIdx: &atomic.Uint64{}, + numNoops: &atomic.Uint64{}, } } @@ -982,14 +983,15 @@ func (s *Store) Stats() (map[string]interface{}, error) { return nil, err } status := map[string]interface{}{ - "open": s.open, - "node_id": s.raftID, - "raft": raftStats, - "fsm_index": s.fsmIdx.Load(), - "fsm_update_time": s.fsmUpdateTime.Load(), - "db_applied_index": s.dbAppliedIdx.Load(), - "last_applied_index": lAppliedIdx, - "addr": s.Addr(), + "open": s.open, + "node_id": s.raftID, + "raft": raftStats, + "fsm_index": s.fsmIdx.Load(), + "fsm_update_time": s.fsmUpdateTime.Load(), + "db_applied_index": s.dbAppliedIdx.Load(), + "last_applied_index": lAppliedIdx, + "command_commit_index": s.raftTn.CommandCommitIndex(), + "addr": s.Addr(), "leader": map[string]string{ "node_id": leaderID, "addr": leaderAddr, @@ -1804,7 +1806,7 @@ func (s *Store) isStaleRead(freshness int64, strict bool) bool { s.fsmUpdateTime.Load(), s.appendedAtTime.Load(), s.fsmIdx.Load(), - s.raft.CommitIndex(), + s.raftTn.CommandCommitIndex(), freshness, strict) } @@ -1855,7 +1857,7 @@ func (s *Store) fsmApply(l *raft.Log) (e interface{}) { s.dbAppliedIdx.Store(l.Index) } if cmd.Type == proto.Command_COMMAND_TYPE_NOOP { - s.numNoops++ + s.numNoops.Add(1) } else if cmd.Type == proto.Command_COMMAND_TYPE_LOAD { // Swapping in a new database invalidates any existing snapshot. err := s.snapshotStore.SetFullNeeded() diff --git a/store/store_multi_test.go b/store/store_multi_test.go index 0264d8af..724552cc 100644 --- a/store/store_multi_test.go +++ b/store/store_multi_test.go @@ -106,6 +106,57 @@ func Test_MultiNodeSimple(t *testing.T) { testFn2(t, s1) } +func Test_MultiNodeNode_CommitIndexes(t *testing.T) { + s0, ln0 := mustNewStore(t) + defer s0.Close(true) + defer ln0.Close() + if err := s0.Open(); err != nil { + t.Fatalf("failed to open single-node store: %s", err.Error()) + } + if err := s0.Bootstrap(NewServer(s0.ID(), s0.Addr(), true)); err != nil { + t.Fatalf("failed to bootstrap single-node store: %s", err.Error()) + } + _, err := s0.WaitForLeader(10 * time.Second) + if err != nil { + t.Fatalf("Error waiting for leader: %s", err) + } + + s1, ln1 := mustNewStore(t) + defer ln1.Close() + + if err := s1.Open(); err != nil { + t.Fatalf("failed to open single-node store: %s", err.Error()) + } + defer s1.Close(true) + + if err := s0.Join(joinRequest(s1.ID(), s1.Addr(), true)); err != nil { + t.Fatalf("failed to join single-node store: %s", err.Error()) + } + if _, err := s1.WaitForLeader(10 * time.Second); err != nil { + t.Fatalf("Error waiting for leader: %s", err) + } + + if exp, got := uint64(3), s1.raft.CommitIndex(); exp != got { + t.Fatalf("wrong commit index, got: %d, exp %d", got, exp) + } + if exp, got := uint64(0), s1.raftTn.CommandCommitIndex(); exp != got { + t.Fatalf("wrong command commit index, got: %d, exp %d", got, exp) + } + + // Send an FSM command through the log, ensure the indexes are correctly updated. + // on the follower. + s0.Noop("don't care") + testPoll(t, func() bool { + return s1.numNoops.Load() == 1 + }, 50*time.Millisecond, 2*time.Second) + if exp, got := uint64(4), s1.raft.CommitIndex(); exp != got { + t.Fatalf("wrong commit index, got: %d, exp %d", got, exp) + } + if exp, got := uint64(4), s1.raftTn.CommandCommitIndex(); exp != got { + t.Fatalf("wrong command commit index, got: %d, exp %d", got, exp) + } +} + // Test_MultiNodeSnapshot_ErrorMessage tests that a snapshot fails with a specific // error message when the snapshot is attempted too soon after joining a cluster. // Hashicorp Raft doesn't expose a typed error, so we have to check the error diff --git a/store/store_test.go b/store/store_test.go index 040fb238..55f7a162 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -2406,8 +2406,8 @@ func Test_SingleNodeNoop(t *testing.T) { if af.Error() != nil { t.Fatalf("expected nil apply future error") } - if s.numNoops != 1 { - t.Fatalf("noop count is wrong, got: %d", s.numNoops) + if s.numNoops.Load() != 1 { + t.Fatalf("noop count is wrong, got: %d", s.numNoops.Load()) } } @@ -2884,11 +2884,11 @@ func asJSONAssociative(v interface{}) string { return string(b) } -func testPoll(t *testing.T, f func() bool, p time.Duration, d time.Duration) { +func testPoll(t *testing.T, f func() bool, checkPeriod time.Duration, timeout time.Duration) { t.Helper() - tck := time.NewTicker(p) + tck := time.NewTicker(checkPeriod) defer tck.Stop() - tmr := time.NewTimer(d) + tmr := time.NewTimer(timeout) defer tmr.Stop() for { diff --git a/store/transport.go b/store/transport.go index 9472c1b1..d391a64c 100644 --- a/store/transport.go +++ b/store/transport.go @@ -114,7 +114,7 @@ func (n *NodeTransport) Consumer() <-chan raft.RPC { if rpc.Reader != nil { rpc.Reader = gzip.NewDecompressor(rpc.Reader) } - case raft.AppendEntriesRequest: + case *raft.AppendEntriesRequest: for _, e := range cmd.Entries { if e.Type == raft.LogCommand { n.commandCommitIndex.Store(e.Index) From cb5ea123162f7b7ce53bff689a0c471e20b0b073 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Tue, 13 Feb 2024 09:40:37 -0500 Subject: [PATCH 5/6] Cluster config commands not instantly in log --- store/store_multi_test.go | 44 +++++++++++++++++++++++++++++++++------ 1 file changed, 38 insertions(+), 6 deletions(-) diff --git a/store/store_multi_test.go b/store/store_multi_test.go index 724552cc..4cd06efc 100644 --- a/store/store_multi_test.go +++ b/store/store_multi_test.go @@ -106,6 +106,9 @@ func Test_MultiNodeSimple(t *testing.T) { testFn2(t, s1) } +// Test_MultiNodeNode_CommitIndexes tests that the commit indexes are +// correctly updated as nodes join and leave the cluster, and as +// commands are committed through the Raft log. func Test_MultiNodeNode_CommitIndexes(t *testing.T) { s0, ln0 := mustNewStore(t) defer s0.Close(true) @@ -123,22 +126,20 @@ func Test_MultiNodeNode_CommitIndexes(t *testing.T) { s1, ln1 := mustNewStore(t) defer ln1.Close() - if err := s1.Open(); err != nil { t.Fatalf("failed to open single-node store: %s", err.Error()) } defer s1.Close(true) - if err := s0.Join(joinRequest(s1.ID(), s1.Addr(), true)); err != nil { t.Fatalf("failed to join single-node store: %s", err.Error()) } if _, err := s1.WaitForLeader(10 * time.Second); err != nil { t.Fatalf("Error waiting for leader: %s", err) } - - if exp, got := uint64(3), s1.raft.CommitIndex(); exp != got { - t.Fatalf("wrong commit index, got: %d, exp %d", got, exp) - } + testPoll(t, func() bool { + // The config change command comming through the log due to s1 joining is not instant. + return s1.raft.CommitIndex() == 3 + }, 50*time.Millisecond, 2*time.Second) if exp, got := uint64(0), s1.raftTn.CommandCommitIndex(); exp != got { t.Fatalf("wrong command commit index, got: %d, exp %d", got, exp) } @@ -155,6 +156,37 @@ func Test_MultiNodeNode_CommitIndexes(t *testing.T) { if exp, got := uint64(4), s1.raftTn.CommandCommitIndex(); exp != got { t.Fatalf("wrong command commit index, got: %d, exp %d", got, exp) } + + // Join another node to the cluster, which will result in Raft cluster + // config commands through the log, but no FSM commands. + s2, ln2 := mustNewStore(t) + defer ln2.Close() + if err := s2.Open(); err != nil { + t.Fatalf("failed to open single-node store: %s", err.Error()) + } + defer s2.Close(true) + if err := s0.Join(joinRequest(s2.ID(), s2.Addr(), true)); err != nil { + t.Fatalf("failed to join single-node store: %s", err.Error()) + } + if _, err := s2.WaitForLeader(10 * time.Second); err != nil { + t.Fatalf("Error waiting for leader: %s", err) + } + testPoll(t, func() bool { + // The config change command comming through the log due to s2 joining is not instant. + return s2.raft.CommitIndex() == 5 + }, 50*time.Millisecond, 2*time.Second) + if exp, got := uint64(4), s2.raftTn.CommandCommitIndex(); exp != got { + t.Fatalf("wrong command commit index, got: %d, exp %d", got, exp) + } + + // First node to join should also reflect the new cluster config + // command. + if exp, got := uint64(5), s1.raft.CommitIndex(); exp != got { + t.Fatalf("wrong commit index, got: %d, exp %d", got, exp) + } + if exp, got := uint64(4), s1.raftTn.CommandCommitIndex(); exp != got { + t.Fatalf("wrong command commit index, got: %d, exp %d", got, exp) + } } // Test_MultiNodeSnapshot_ErrorMessage tests that a snapshot fails with a specific From 3783f176a4c56c73856f2599b39fbf164d6ef971 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Tue, 13 Feb 2024 09:58:32 -0500 Subject: [PATCH 6/6] Deflake test --- store/store_multi_test.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/store/store_multi_test.go b/store/store_multi_test.go index 4cd06efc..6be265bd 100644 --- a/store/store_multi_test.go +++ b/store/store_multi_test.go @@ -181,9 +181,10 @@ func Test_MultiNodeNode_CommitIndexes(t *testing.T) { // First node to join should also reflect the new cluster config // command. - if exp, got := uint64(5), s1.raft.CommitIndex(); exp != got { - t.Fatalf("wrong commit index, got: %d, exp %d", got, exp) - } + testPoll(t, func() bool { + // The config change command comming through the log due to s2 joining is not instant. + return s1.raft.CommitIndex() == 5 + }, 50*time.Millisecond, 2*time.Second) if exp, got := uint64(4), s1.raftTn.CommandCommitIndex(); exp != got { t.Fatalf("wrong command commit index, got: %d, exp %d", got, exp) }