1
0
Fork 0

Use atomic.Uint64 instead of mutexes

The code is simpler and easier to read.
master
Philip O'Toole 8 months ago
parent cc9e134352
commit ca5e279742

@ -3,6 +3,7 @@
- [PR #1670](https://github.com/rqlite/rqlite/pull/1670): Improve error message when query on remote node fails. - [PR #1670](https://github.com/rqlite/rqlite/pull/1670): Improve error message when query on remote node fails.
- [PR #1671](https://github.com/rqlite/rqlite/pull/1670): Minor optimizations to Unified Request processing. - [PR #1671](https://github.com/rqlite/rqlite/pull/1670): Minor optimizations to Unified Request processing.
- [PR #1674](https://github.com/rqlite/rqlite/pull/1674): Small refactor of _Stale Reads_ check. - [PR #1674](https://github.com/rqlite/rqlite/pull/1674): Small refactor of _Stale Reads_ check.
- [PR #1675](https://github.com/rqlite/rqlite/pull/1675): Use atomic.Uint64 instead of Mutexes.
## 8.19.0 (February 3rd 2024) ## 8.19.0 (February 3rd 2024)
This release allows you to set a maximum amount of a time a query will run. If the query does not complete within the set time, an error will be returned. This release allows you to set a maximum amount of a time a query will run. If the query does not complete within the set time, an error will be returned.

@ -19,6 +19,7 @@ import (
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/hashicorp/go-hclog" "github.com/hashicorp/go-hclog"
@ -273,12 +274,10 @@ type Store struct {
// Latest log entry index actually reflected by the FSM. Due to Raft code // Latest log entry index actually reflected by the FSM. Due to Raft code
// this value is not updated after a Snapshot-restore. // this value is not updated after a Snapshot-restore.
fsmIdx uint64 fsmIdx *atomic.Uint64
fsmIdxMu sync.RWMutex
// Latest log entry index which actually changed the database. // Latest log entry index which actually changed the database.
dbAppliedIdxMu sync.RWMutex dbAppliedIdx *atomic.Uint64
dbAppliedIdx uint64
appliedIdxUpdateDone chan struct{} appliedIdxUpdateDone chan struct{}
reqMarshaller *command.RequestMarshaler // Request marshaler for writing to log. reqMarshaller *command.RequestMarshaler // Request marshaler for writing to log.
@ -376,6 +375,8 @@ func New(ly Layer, c *Config) *Store {
notifyingNodes: make(map[string]*Server), notifyingNodes: make(map[string]*Server),
ApplyTimeout: applyTimeout, ApplyTimeout: applyTimeout,
snapshotCAS: NewCheckAndSet(), snapshotCAS: NewCheckAndSet(),
fsmIdx: &atomic.Uint64{},
dbAppliedIdx: &atomic.Uint64{},
} }
} }
@ -706,9 +707,7 @@ func (s *Store) WaitForAppliedIndex(idx uint64, timeout time.Duration) error {
// DBAppliedIndex returns the index of the last Raft log that changed the // DBAppliedIndex returns the index of the last Raft log that changed the
// underlying database. If the index is unknown then 0 is returned. // underlying database. If the index is unknown then 0 is returned.
func (s *Store) DBAppliedIndex() uint64 { func (s *Store) DBAppliedIndex() uint64 {
s.dbAppliedIdxMu.RLock() return s.dbAppliedIdx.Load()
defer s.dbAppliedIdxMu.RUnlock()
return s.dbAppliedIdx
} }
// IsLeader is used to determine if the current node is cluster leader // IsLeader is used to determine if the current node is cluster leader
@ -915,16 +914,11 @@ func (s *Store) WaitForFSMIndex(idx uint64, timeout time.Duration) (uint64, erro
defer tck.Stop() defer tck.Stop()
tmr := time.NewTimer(timeout) tmr := time.NewTimer(timeout)
defer tmr.Stop() defer tmr.Stop()
var fsmIdx uint64
for { for {
select { select {
case <-tck.C: case <-tck.C:
s.fsmIdxMu.RLock() if s.fsmIdx.Load() >= idx {
fsmIdx = s.fsmIdx return s.fsmIdx.Load(), nil
s.fsmIdxMu.RUnlock()
if fsmIdx >= idx {
return fsmIdx, nil
} }
case <-tmr.C: case <-tmr.C:
return 0, fmt.Errorf("timeout expired") return 0, fmt.Errorf("timeout expired")
@ -940,16 +934,6 @@ func (s *Store) Stats() (map[string]interface{}, error) {
}, nil }, nil
} }
fsmIdx := func() uint64 {
s.fsmIdxMu.RLock()
defer s.fsmIdxMu.RUnlock()
return s.fsmIdx
}()
dbAppliedIdx := func() uint64 {
s.dbAppliedIdxMu.Lock()
defer s.dbAppliedIdxMu.Unlock()
return s.dbAppliedIdx
}()
dbStatus, err := s.db.Stats() dbStatus, err := s.db.Stats()
if err != nil { if err != nil {
stats.Add(numDBStatsErrors, 1) stats.Add(numDBStatsErrors, 1)
@ -994,8 +978,8 @@ func (s *Store) Stats() (map[string]interface{}, error) {
"open": s.open, "open": s.open,
"node_id": s.raftID, "node_id": s.raftID,
"raft": raftStats, "raft": raftStats,
"fsm_index": fsmIdx, "fsm_index": s.fsmIdx.Load(),
"db_applied_index": dbAppliedIdx, "db_applied_index": s.dbAppliedIdx.Load(),
"last_applied_index": lAppliedIdx, "last_applied_index": lAppliedIdx,
"addr": s.Addr(), "addr": s.Addr(),
"leader": map[string]string{ "leader": map[string]string{
@ -1831,10 +1815,7 @@ type fsmGenericResponse struct {
// fsmApply applies a Raft log entry to the database. // fsmApply applies a Raft log entry to the database.
func (s *Store) fsmApply(l *raft.Log) (e interface{}) { func (s *Store) fsmApply(l *raft.Log) (e interface{}) {
defer func() { defer func() {
s.fsmIdxMu.Lock() s.fsmIdx.Store(l.Index)
defer s.fsmIdxMu.Unlock()
s.fsmIdx = l.Index
if l.Index <= s.lastCommandIdxOnOpen { if l.Index <= s.lastCommandIdxOnOpen {
// In here means at least one command entry was in the log when the Store // In here means at least one command entry was in the log when the Store
// opened. // opened.
@ -1853,9 +1834,7 @@ func (s *Store) fsmApply(l *raft.Log) (e interface{}) {
cmd, mutated, r := s.cmdProc.Process(l.Data, s.db) cmd, mutated, r := s.cmdProc.Process(l.Data, s.db)
if mutated { if mutated {
s.dbAppliedIdxMu.Lock() s.dbAppliedIdx.Store(l.Index)
s.dbAppliedIdx = l.Index
s.dbAppliedIdxMu.Unlock()
} }
if cmd.Type == proto.Command_COMMAND_TYPE_NOOP { if cmd.Type == proto.Command_COMMAND_TYPE_NOOP {
s.numNoops++ s.numNoops++
@ -2047,12 +2026,8 @@ func (s *Store) fsmRestore(rc io.ReadCloser) (retErr error) {
if err := s.boltStore.SetAppliedIndex(li); err != nil { if err := s.boltStore.SetAppliedIndex(li); err != nil {
return fmt.Errorf("failed to set applied index: %s", err) return fmt.Errorf("failed to set applied index: %s", err)
} }
s.fsmIdxMu.Lock() s.fsmIdx.Store(li)
s.fsmIdx = li s.dbAppliedIdx.Store(li)
s.fsmIdxMu.Unlock()
s.dbAppliedIdxMu.Lock()
s.dbAppliedIdx = li
s.dbAppliedIdxMu.Unlock()
stats.Add(numRestores, 1) stats.Add(numRestores, 1)
s.logger.Printf("node restored in %s", time.Since(startT)) s.logger.Printf("node restored in %s", time.Since(startT))

Loading…
Cancel
Save