1
0
Fork 0

Merge pull request #1675 from rqlite/idx-atomic

Use atomic.Uint64 instead of mutexes for Store indexes
master
Philip O'Toole 8 months ago committed by GitHub
commit 0de8c5a091
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -3,6 +3,7 @@
- [PR #1670](https://github.com/rqlite/rqlite/pull/1670): Improve error message when query on remote node fails.
- [PR #1671](https://github.com/rqlite/rqlite/pull/1670): Minor optimizations to Unified Request processing.
- [PR #1674](https://github.com/rqlite/rqlite/pull/1674): Small refactor of _Stale Reads_ check.
- [PR #1675](https://github.com/rqlite/rqlite/pull/1675): Use atomic.Uint64 instead of Mutexes for Store indexes.
## 8.19.0 (February 3rd 2024)
This release allows you to set a maximum amount of a time a query will run. If the query does not complete within the set time, an error will be returned.

@ -68,9 +68,13 @@ func Test_UploaderSingleUpload_ID(t *testing.T) {
ResetStats()
var wg sync.WaitGroup
wg.Add(1)
nCalled := 0
sc := &mockStorageClient{
currentIDFn: func(ctx context.Context) (string, error) {
defer wg.Done()
nCalled++
if nCalled == 1 {
defer wg.Done()
}
return "1234", nil
},
}

@ -19,6 +19,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/hashicorp/go-hclog"
@ -273,12 +274,10 @@ type Store struct {
// Latest log entry index actually reflected by the FSM. Due to Raft code
// this value is not updated after a Snapshot-restore.
fsmIdx uint64
fsmIdxMu sync.RWMutex
fsmIdx *atomic.Uint64
// Latest log entry index which actually changed the database.
dbAppliedIdxMu sync.RWMutex
dbAppliedIdx uint64
dbAppliedIdx *atomic.Uint64
appliedIdxUpdateDone chan struct{}
reqMarshaller *command.RequestMarshaler // Request marshaler for writing to log.
@ -376,6 +375,8 @@ func New(ly Layer, c *Config) *Store {
notifyingNodes: make(map[string]*Server),
ApplyTimeout: applyTimeout,
snapshotCAS: NewCheckAndSet(),
fsmIdx: &atomic.Uint64{},
dbAppliedIdx: &atomic.Uint64{},
}
}
@ -706,9 +707,7 @@ func (s *Store) WaitForAppliedIndex(idx uint64, timeout time.Duration) error {
// DBAppliedIndex returns the index of the last Raft log that changed the
// underlying database. If the index is unknown then 0 is returned.
func (s *Store) DBAppliedIndex() uint64 {
s.dbAppliedIdxMu.RLock()
defer s.dbAppliedIdxMu.RUnlock()
return s.dbAppliedIdx
return s.dbAppliedIdx.Load()
}
// IsLeader is used to determine if the current node is cluster leader
@ -915,16 +914,11 @@ func (s *Store) WaitForFSMIndex(idx uint64, timeout time.Duration) (uint64, erro
defer tck.Stop()
tmr := time.NewTimer(timeout)
defer tmr.Stop()
var fsmIdx uint64
for {
select {
case <-tck.C:
s.fsmIdxMu.RLock()
fsmIdx = s.fsmIdx
s.fsmIdxMu.RUnlock()
if fsmIdx >= idx {
return fsmIdx, nil
if s.fsmIdx.Load() >= idx {
return s.fsmIdx.Load(), nil
}
case <-tmr.C:
return 0, fmt.Errorf("timeout expired")
@ -940,16 +934,6 @@ func (s *Store) Stats() (map[string]interface{}, error) {
}, nil
}
fsmIdx := func() uint64 {
s.fsmIdxMu.RLock()
defer s.fsmIdxMu.RUnlock()
return s.fsmIdx
}()
dbAppliedIdx := func() uint64 {
s.dbAppliedIdxMu.Lock()
defer s.dbAppliedIdxMu.Unlock()
return s.dbAppliedIdx
}()
dbStatus, err := s.db.Stats()
if err != nil {
stats.Add(numDBStatsErrors, 1)
@ -994,8 +978,8 @@ func (s *Store) Stats() (map[string]interface{}, error) {
"open": s.open,
"node_id": s.raftID,
"raft": raftStats,
"fsm_index": fsmIdx,
"db_applied_index": dbAppliedIdx,
"fsm_index": s.fsmIdx.Load(),
"db_applied_index": s.dbAppliedIdx.Load(),
"last_applied_index": lAppliedIdx,
"addr": s.Addr(),
"leader": map[string]string{
@ -1831,10 +1815,7 @@ type fsmGenericResponse struct {
// fsmApply applies a Raft log entry to the database.
func (s *Store) fsmApply(l *raft.Log) (e interface{}) {
defer func() {
s.fsmIdxMu.Lock()
defer s.fsmIdxMu.Unlock()
s.fsmIdx = l.Index
s.fsmIdx.Store(l.Index)
if l.Index <= s.lastCommandIdxOnOpen {
// In here means at least one command entry was in the log when the Store
// opened.
@ -1853,9 +1834,7 @@ func (s *Store) fsmApply(l *raft.Log) (e interface{}) {
cmd, mutated, r := s.cmdProc.Process(l.Data, s.db)
if mutated {
s.dbAppliedIdxMu.Lock()
s.dbAppliedIdx = l.Index
s.dbAppliedIdxMu.Unlock()
s.dbAppliedIdx.Store(l.Index)
}
if cmd.Type == proto.Command_COMMAND_TYPE_NOOP {
s.numNoops++
@ -2047,12 +2026,8 @@ func (s *Store) fsmRestore(rc io.ReadCloser) (retErr error) {
if err := s.boltStore.SetAppliedIndex(li); err != nil {
return fmt.Errorf("failed to set applied index: %s", err)
}
s.fsmIdxMu.Lock()
s.fsmIdx = li
s.fsmIdxMu.Unlock()
s.dbAppliedIdxMu.Lock()
s.dbAppliedIdx = li
s.dbAppliedIdxMu.Unlock()
s.fsmIdx.Store(li)
s.dbAppliedIdx.Store(li)
stats.Add(numRestores, 1)
s.logger.Printf("node restored in %s", time.Since(startT))

Loading…
Cancel
Save