1
0
Fork 0

Use FSM Command Commit Index for Stale Reads check

master
Philip O'Toole 7 months ago
parent 7f1977176d
commit 8255d199a2

@ -336,7 +336,7 @@ type Store struct {
// For whitebox testing
numAutoVacuums int
numIgnoredJoins int
numNoops int
numNoops *atomic.Uint64
numSnapshotsMu sync.Mutex
numSnapshots int
}
@ -384,6 +384,7 @@ func New(ly Layer, c *Config) *Store {
fsmUpdateTime: NewAtomicTime(),
appendedAtTime: NewAtomicTime(),
dbAppliedIdx: &atomic.Uint64{},
numNoops: &atomic.Uint64{},
}
}
@ -982,14 +983,15 @@ func (s *Store) Stats() (map[string]interface{}, error) {
return nil, err
}
status := map[string]interface{}{
"open": s.open,
"node_id": s.raftID,
"raft": raftStats,
"fsm_index": s.fsmIdx.Load(),
"fsm_update_time": s.fsmUpdateTime.Load(),
"db_applied_index": s.dbAppliedIdx.Load(),
"last_applied_index": lAppliedIdx,
"addr": s.Addr(),
"open": s.open,
"node_id": s.raftID,
"raft": raftStats,
"fsm_index": s.fsmIdx.Load(),
"fsm_update_time": s.fsmUpdateTime.Load(),
"db_applied_index": s.dbAppliedIdx.Load(),
"last_applied_index": lAppliedIdx,
"command_commit_index": s.raftTn.CommandCommitIndex(),
"addr": s.Addr(),
"leader": map[string]string{
"node_id": leaderID,
"addr": leaderAddr,
@ -1804,7 +1806,7 @@ func (s *Store) isStaleRead(freshness int64, strict bool) bool {
s.fsmUpdateTime.Load(),
s.appendedAtTime.Load(),
s.fsmIdx.Load(),
s.raft.CommitIndex(),
s.raftTn.CommandCommitIndex(),
freshness,
strict)
}
@ -1855,7 +1857,7 @@ func (s *Store) fsmApply(l *raft.Log) (e interface{}) {
s.dbAppliedIdx.Store(l.Index)
}
if cmd.Type == proto.Command_COMMAND_TYPE_NOOP {
s.numNoops++
s.numNoops.Add(1)
} else if cmd.Type == proto.Command_COMMAND_TYPE_LOAD {
// Swapping in a new database invalidates any existing snapshot.
err := s.snapshotStore.SetFullNeeded()

@ -106,6 +106,57 @@ func Test_MultiNodeSimple(t *testing.T) {
testFn2(t, s1)
}
func Test_MultiNodeNode_CommitIndexes(t *testing.T) {
s0, ln0 := mustNewStore(t)
defer s0.Close(true)
defer ln0.Close()
if err := s0.Open(); err != nil {
t.Fatalf("failed to open single-node store: %s", err.Error())
}
if err := s0.Bootstrap(NewServer(s0.ID(), s0.Addr(), true)); err != nil {
t.Fatalf("failed to bootstrap single-node store: %s", err.Error())
}
_, err := s0.WaitForLeader(10 * time.Second)
if err != nil {
t.Fatalf("Error waiting for leader: %s", err)
}
s1, ln1 := mustNewStore(t)
defer ln1.Close()
if err := s1.Open(); err != nil {
t.Fatalf("failed to open single-node store: %s", err.Error())
}
defer s1.Close(true)
if err := s0.Join(joinRequest(s1.ID(), s1.Addr(), true)); err != nil {
t.Fatalf("failed to join single-node store: %s", err.Error())
}
if _, err := s1.WaitForLeader(10 * time.Second); err != nil {
t.Fatalf("Error waiting for leader: %s", err)
}
if exp, got := uint64(3), s1.raft.CommitIndex(); exp != got {
t.Fatalf("wrong commit index, got: %d, exp %d", got, exp)
}
if exp, got := uint64(0), s1.raftTn.CommandCommitIndex(); exp != got {
t.Fatalf("wrong command commit index, got: %d, exp %d", got, exp)
}
// Send an FSM command through the log, ensure the indexes are correctly updated.
// on the follower.
s0.Noop("don't care")
testPoll(t, func() bool {
return s1.numNoops.Load() == 1
}, 50*time.Millisecond, 2*time.Second)
if exp, got := uint64(4), s1.raft.CommitIndex(); exp != got {
t.Fatalf("wrong commit index, got: %d, exp %d", got, exp)
}
if exp, got := uint64(4), s1.raftTn.CommandCommitIndex(); exp != got {
t.Fatalf("wrong command commit index, got: %d, exp %d", got, exp)
}
}
// Test_MultiNodeSnapshot_ErrorMessage tests that a snapshot fails with a specific
// error message when the snapshot is attempted too soon after joining a cluster.
// Hashicorp Raft doesn't expose a typed error, so we have to check the error

@ -2406,8 +2406,8 @@ func Test_SingleNodeNoop(t *testing.T) {
if af.Error() != nil {
t.Fatalf("expected nil apply future error")
}
if s.numNoops != 1 {
t.Fatalf("noop count is wrong, got: %d", s.numNoops)
if s.numNoops.Load() != 1 {
t.Fatalf("noop count is wrong, got: %d", s.numNoops.Load())
}
}
@ -2884,11 +2884,11 @@ func asJSONAssociative(v interface{}) string {
return string(b)
}
func testPoll(t *testing.T, f func() bool, p time.Duration, d time.Duration) {
func testPoll(t *testing.T, f func() bool, checkPeriod time.Duration, timeout time.Duration) {
t.Helper()
tck := time.NewTicker(p)
tck := time.NewTicker(checkPeriod)
defer tck.Stop()
tmr := time.NewTimer(d)
tmr := time.NewTimer(timeout)
defer tmr.Stop()
for {

@ -114,7 +114,7 @@ func (n *NodeTransport) Consumer() <-chan raft.RPC {
if rpc.Reader != nil {
rpc.Reader = gzip.NewDecompressor(rpc.Reader)
}
case raft.AppendEntriesRequest:
case *raft.AppendEntriesRequest:
for _, e := range cmd.Entries {
if e.Type == raft.LogCommand {
n.commandCommitIndex.Store(e.Index)

Loading…
Cancel
Save