diff --git a/CHANGELOG.md b/CHANGELOG.md index 30607f9c..5478d897 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ - [PR #1670](https://github.com/rqlite/rqlite/pull/1670): Improve error message when query on remote node fails. - [PR #1671](https://github.com/rqlite/rqlite/pull/1670): Minor optimizations to Unified Request processing. - [PR #1674](https://github.com/rqlite/rqlite/pull/1674): Small refactor of _Stale Reads_ check. +- [PR #1675](https://github.com/rqlite/rqlite/pull/1675): Use atomic.Uint64 instead of Mutexes. ## 8.19.0 (February 3rd 2024) This release allows you to set a maximum amount of a time a query will run. If the query does not complete within the set time, an error will be returned. diff --git a/store/store.go b/store/store.go index 6b9db35c..dbbc41b2 100644 --- a/store/store.go +++ b/store/store.go @@ -19,6 +19,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" "github.com/hashicorp/go-hclog" @@ -273,12 +274,10 @@ type Store struct { // Latest log entry index actually reflected by the FSM. Due to Raft code // this value is not updated after a Snapshot-restore. - fsmIdx uint64 - fsmIdxMu sync.RWMutex + fsmIdx *atomic.Uint64 // Latest log entry index which actually changed the database. - dbAppliedIdxMu sync.RWMutex - dbAppliedIdx uint64 + dbAppliedIdx *atomic.Uint64 appliedIdxUpdateDone chan struct{} reqMarshaller *command.RequestMarshaler // Request marshaler for writing to log. @@ -376,6 +375,8 @@ func New(ly Layer, c *Config) *Store { notifyingNodes: make(map[string]*Server), ApplyTimeout: applyTimeout, snapshotCAS: NewCheckAndSet(), + fsmIdx: &atomic.Uint64{}, + dbAppliedIdx: &atomic.Uint64{}, } } @@ -706,9 +707,7 @@ func (s *Store) WaitForAppliedIndex(idx uint64, timeout time.Duration) error { // DBAppliedIndex returns the index of the last Raft log that changed the // underlying database. If the index is unknown then 0 is returned. func (s *Store) DBAppliedIndex() uint64 { - s.dbAppliedIdxMu.RLock() - defer s.dbAppliedIdxMu.RUnlock() - return s.dbAppliedIdx + return s.dbAppliedIdx.Load() } // IsLeader is used to determine if the current node is cluster leader @@ -915,16 +914,11 @@ func (s *Store) WaitForFSMIndex(idx uint64, timeout time.Duration) (uint64, erro defer tck.Stop() tmr := time.NewTimer(timeout) defer tmr.Stop() - - var fsmIdx uint64 for { select { case <-tck.C: - s.fsmIdxMu.RLock() - fsmIdx = s.fsmIdx - s.fsmIdxMu.RUnlock() - if fsmIdx >= idx { - return fsmIdx, nil + if s.fsmIdx.Load() >= idx { + return s.fsmIdx.Load(), nil } case <-tmr.C: return 0, fmt.Errorf("timeout expired") @@ -940,16 +934,6 @@ func (s *Store) Stats() (map[string]interface{}, error) { }, nil } - fsmIdx := func() uint64 { - s.fsmIdxMu.RLock() - defer s.fsmIdxMu.RUnlock() - return s.fsmIdx - }() - dbAppliedIdx := func() uint64 { - s.dbAppliedIdxMu.Lock() - defer s.dbAppliedIdxMu.Unlock() - return s.dbAppliedIdx - }() dbStatus, err := s.db.Stats() if err != nil { stats.Add(numDBStatsErrors, 1) @@ -994,8 +978,8 @@ func (s *Store) Stats() (map[string]interface{}, error) { "open": s.open, "node_id": s.raftID, "raft": raftStats, - "fsm_index": fsmIdx, - "db_applied_index": dbAppliedIdx, + "fsm_index": s.fsmIdx.Load(), + "db_applied_index": s.dbAppliedIdx.Load(), "last_applied_index": lAppliedIdx, "addr": s.Addr(), "leader": map[string]string{ @@ -1831,10 +1815,7 @@ type fsmGenericResponse struct { // fsmApply applies a Raft log entry to the database. func (s *Store) fsmApply(l *raft.Log) (e interface{}) { defer func() { - s.fsmIdxMu.Lock() - defer s.fsmIdxMu.Unlock() - s.fsmIdx = l.Index - + s.fsmIdx.Store(l.Index) if l.Index <= s.lastCommandIdxOnOpen { // In here means at least one command entry was in the log when the Store // opened. @@ -1853,9 +1834,7 @@ func (s *Store) fsmApply(l *raft.Log) (e interface{}) { cmd, mutated, r := s.cmdProc.Process(l.Data, s.db) if mutated { - s.dbAppliedIdxMu.Lock() - s.dbAppliedIdx = l.Index - s.dbAppliedIdxMu.Unlock() + s.dbAppliedIdx.Store(l.Index) } if cmd.Type == proto.Command_COMMAND_TYPE_NOOP { s.numNoops++ @@ -2047,12 +2026,8 @@ func (s *Store) fsmRestore(rc io.ReadCloser) (retErr error) { if err := s.boltStore.SetAppliedIndex(li); err != nil { return fmt.Errorf("failed to set applied index: %s", err) } - s.fsmIdxMu.Lock() - s.fsmIdx = li - s.fsmIdxMu.Unlock() - s.dbAppliedIdxMu.Lock() - s.dbAppliedIdx = li - s.dbAppliedIdxMu.Unlock() + s.fsmIdx.Store(li) + s.dbAppliedIdx.Store(li) stats.Add(numRestores, 1) s.logger.Printf("node restored in %s", time.Since(startT))