1
0
Fork 0

Merge pull request #1684 from rqlite/strict-tweaks

Use FSM Command Index for Stale Read checks
master
Philip O'Toole 7 months ago committed by GitHub
commit eb86173381
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -1,3 +1,8 @@
## 8.20.1 (unreleased)
### Implementation changes and bug fixes
- [PR #1684](https://github.com/rqlite/rqlite/pull/1684): Improvements to strict _Stale Read_ checks
## 8.20.0 (February 12th 2024)
### New features
- [PR #1681](https://github.com/rqlite/rqlite/pull/1681): Add `freshness_strict` flag, which checks for _Stale Reads_ using Leader time. Thanks @aderouineau

@ -39,6 +39,11 @@ func IsStaleRead(
// Strict mode is not enabled, so no further checks are needed.
return false
}
if lastAppendedAtTime.IsZero() {
// We've yet to be told about any appended log entries, so we
// assume we're caught up.
return false
}
if fsmIndex == commitIndex {
// FSM index is the same as the commit index, so we're caught up.
return false

@ -44,6 +44,13 @@ func Test_IsStaleRead(t *testing.T) {
Freshness: time.Second,
Exp: true,
},
{
Name: "freshness set and ok, strict is set, but no appended time",
LeaderLastContact: time.Now(),
Freshness: 10 * time.Second,
Strict: true,
Exp: false,
},
{
Name: "freshness set, is ok, strict is set, appended time exceeds, but applied index is up-to-date",
LeaderLastContact: time.Now(),

@ -336,7 +336,7 @@ type Store struct {
// For whitebox testing
numAutoVacuums int
numIgnoredJoins int
numNoops int
numNoops *atomic.Uint64
numSnapshotsMu sync.Mutex
numSnapshots int
}
@ -384,6 +384,7 @@ func New(ly Layer, c *Config) *Store {
fsmUpdateTime: NewAtomicTime(),
appendedAtTime: NewAtomicTime(),
dbAppliedIdx: &atomic.Uint64{},
numNoops: &atomic.Uint64{},
}
}
@ -982,14 +983,15 @@ func (s *Store) Stats() (map[string]interface{}, error) {
return nil, err
}
status := map[string]interface{}{
"open": s.open,
"node_id": s.raftID,
"raft": raftStats,
"fsm_index": s.fsmIdx.Load(),
"fsm_update_time": s.fsmUpdateTime.Load(),
"db_applied_index": s.dbAppliedIdx.Load(),
"last_applied_index": lAppliedIdx,
"addr": s.Addr(),
"open": s.open,
"node_id": s.raftID,
"raft": raftStats,
"fsm_index": s.fsmIdx.Load(),
"fsm_update_time": s.fsmUpdateTime.Load(),
"db_applied_index": s.dbAppliedIdx.Load(),
"last_applied_index": lAppliedIdx,
"command_commit_index": s.raftTn.CommandCommitIndex(),
"addr": s.Addr(),
"leader": map[string]string{
"node_id": leaderID,
"addr": leaderAddr,
@ -1804,7 +1806,7 @@ func (s *Store) isStaleRead(freshness int64, strict bool) bool {
s.fsmUpdateTime.Load(),
s.appendedAtTime.Load(),
s.fsmIdx.Load(),
s.raft.CommitIndex(),
s.raftTn.CommandCommitIndex(),
freshness,
strict)
}
@ -1855,7 +1857,7 @@ func (s *Store) fsmApply(l *raft.Log) (e interface{}) {
s.dbAppliedIdx.Store(l.Index)
}
if cmd.Type == proto.Command_COMMAND_TYPE_NOOP {
s.numNoops++
s.numNoops.Add(1)
} else if cmd.Type == proto.Command_COMMAND_TYPE_LOAD {
// Swapping in a new database invalidates any existing snapshot.
err := s.snapshotStore.SetFullNeeded()

@ -106,6 +106,90 @@ func Test_MultiNodeSimple(t *testing.T) {
testFn2(t, s1)
}
// Test_MultiNodeNode_CommitIndexes tests that the commit indexes are
// correctly updated as nodes join and leave the cluster, and as
// commands are committed through the Raft log.
func Test_MultiNodeNode_CommitIndexes(t *testing.T) {
s0, ln0 := mustNewStore(t)
defer s0.Close(true)
defer ln0.Close()
if err := s0.Open(); err != nil {
t.Fatalf("failed to open single-node store: %s", err.Error())
}
if err := s0.Bootstrap(NewServer(s0.ID(), s0.Addr(), true)); err != nil {
t.Fatalf("failed to bootstrap single-node store: %s", err.Error())
}
_, err := s0.WaitForLeader(10 * time.Second)
if err != nil {
t.Fatalf("Error waiting for leader: %s", err)
}
s1, ln1 := mustNewStore(t)
defer ln1.Close()
if err := s1.Open(); err != nil {
t.Fatalf("failed to open single-node store: %s", err.Error())
}
defer s1.Close(true)
if err := s0.Join(joinRequest(s1.ID(), s1.Addr(), true)); err != nil {
t.Fatalf("failed to join single-node store: %s", err.Error())
}
if _, err := s1.WaitForLeader(10 * time.Second); err != nil {
t.Fatalf("Error waiting for leader: %s", err)
}
testPoll(t, func() bool {
// The config change command comming through the log due to s1 joining is not instant.
return s1.raft.CommitIndex() == 3
}, 50*time.Millisecond, 2*time.Second)
if exp, got := uint64(0), s1.raftTn.CommandCommitIndex(); exp != got {
t.Fatalf("wrong command commit index, got: %d, exp %d", got, exp)
}
// Send an FSM command through the log, ensure the indexes are correctly updated.
// on the follower.
s0.Noop("don't care")
testPoll(t, func() bool {
return s1.numNoops.Load() == 1
}, 50*time.Millisecond, 2*time.Second)
if exp, got := uint64(4), s1.raft.CommitIndex(); exp != got {
t.Fatalf("wrong commit index, got: %d, exp %d", got, exp)
}
if exp, got := uint64(4), s1.raftTn.CommandCommitIndex(); exp != got {
t.Fatalf("wrong command commit index, got: %d, exp %d", got, exp)
}
// Join another node to the cluster, which will result in Raft cluster
// config commands through the log, but no FSM commands.
s2, ln2 := mustNewStore(t)
defer ln2.Close()
if err := s2.Open(); err != nil {
t.Fatalf("failed to open single-node store: %s", err.Error())
}
defer s2.Close(true)
if err := s0.Join(joinRequest(s2.ID(), s2.Addr(), true)); err != nil {
t.Fatalf("failed to join single-node store: %s", err.Error())
}
if _, err := s2.WaitForLeader(10 * time.Second); err != nil {
t.Fatalf("Error waiting for leader: %s", err)
}
testPoll(t, func() bool {
// The config change command comming through the log due to s2 joining is not instant.
return s2.raft.CommitIndex() == 5
}, 50*time.Millisecond, 2*time.Second)
if exp, got := uint64(4), s2.raftTn.CommandCommitIndex(); exp != got {
t.Fatalf("wrong command commit index, got: %d, exp %d", got, exp)
}
// First node to join should also reflect the new cluster config
// command.
testPoll(t, func() bool {
// The config change command comming through the log due to s2 joining is not instant.
return s1.raft.CommitIndex() == 5
}, 50*time.Millisecond, 2*time.Second)
if exp, got := uint64(4), s1.raftTn.CommandCommitIndex(); exp != got {
t.Fatalf("wrong command commit index, got: %d, exp %d", got, exp)
}
}
// Test_MultiNodeSnapshot_ErrorMessage tests that a snapshot fails with a specific
// error message when the snapshot is attempted too soon after joining a cluster.
// Hashicorp Raft doesn't expose a typed error, so we have to check the error

@ -2406,8 +2406,8 @@ func Test_SingleNodeNoop(t *testing.T) {
if af.Error() != nil {
t.Fatalf("expected nil apply future error")
}
if s.numNoops != 1 {
t.Fatalf("noop count is wrong, got: %d", s.numNoops)
if s.numNoops.Load() != 1 {
t.Fatalf("noop count is wrong, got: %d", s.numNoops.Load())
}
}
@ -2884,11 +2884,11 @@ func asJSONAssociative(v interface{}) string {
return string(b)
}
func testPoll(t *testing.T, f func() bool, p time.Duration, d time.Duration) {
func testPoll(t *testing.T, f func() bool, checkPeriod time.Duration, timeout time.Duration) {
t.Helper()
tck := time.NewTicker(p)
tck := time.NewTicker(checkPeriod)
defer tck.Stop()
tmr := time.NewTimer(d)
tmr := time.NewTimer(timeout)
defer tmr.Stop()
for {

@ -3,6 +3,7 @@ package store
import (
"io"
"net"
"sync/atomic"
"time"
"github.com/hashicorp/raft"
@ -52,18 +53,26 @@ func (t *Transport) Addr() net.Addr {
// custom configuration of the InstallSnapshot method.
type NodeTransport struct {
*raft.NetworkTransport
done chan struct{}
closed bool
commandCommitIndex *atomic.Uint64
done chan struct{}
closed bool
}
// NewNodeTransport returns an initialized NodeTransport.
func NewNodeTransport(transport *raft.NetworkTransport) *NodeTransport {
return &NodeTransport{
NetworkTransport: transport,
done: make(chan struct{}),
NetworkTransport: transport,
commandCommitIndex: &atomic.Uint64{},
done: make(chan struct{}),
}
}
// CommandCommitIndex returns the index of the latest committed log entry
// which is applied to the FSM.
func (n *NodeTransport) CommandCommitIndex() uint64 {
return n.commandCommitIndex.Load()
}
// Close closes the transport
func (n *NodeTransport) Close() error {
if n.closed {
@ -100,8 +109,17 @@ func (n *NodeTransport) Consumer() <-chan raft.RPC {
case <-n.done:
return
case rpc := <-srcCh:
if rpc.Reader != nil {
rpc.Reader = gzip.NewDecompressor(rpc.Reader)
switch cmd := rpc.Command.(type) {
case *raft.InstallSnapshotRequest:
if rpc.Reader != nil {
rpc.Reader = gzip.NewDecompressor(rpc.Reader)
}
case *raft.AppendEntriesRequest:
for _, e := range cmd.Entries {
if e.Type == raft.LogCommand {
n.commandCommitIndex.Store(e.Index)
}
}
}
ch <- rpc
}

Loading…
Cancel
Save