diff --git a/log/log.go b/log/log.go index 7b13db94..bfe73e54 100644 --- a/log/log.go +++ b/log/log.go @@ -8,6 +8,10 @@ import ( "go.etcd.io/bbolt" ) +const ( + rqliteAppliedIndex = "rqlite_applied_index" +) + // Log is an object that can return information about the Raft log. type Log struct { *raftboltdb.BoltStore @@ -48,12 +52,7 @@ func (l *Log) Indexes() (uint64, uint64, error) { // LastCommandIndex returns the index of the last Command // log entry written to the Raft log. Returns an index of // zero if no such log exists. -func (l *Log) LastCommandIndex() (uint64, error) { - fi, li, err := l.Indexes() - if err != nil { - return 0, fmt.Errorf("failed to get indexes: %s", err) - } - +func (l *Log) LastCommandIndex(fi, li uint64) (uint64, error) { // Check for empty log. if li == 0 { return 0, nil @@ -71,6 +70,20 @@ func (l *Log) LastCommandIndex() (uint64, error) { return 0, nil } +// SetAppliedIndex sets the AppliedIndex value. +func (l *Log) SetAppliedIndex(index uint64) error { + return l.SetUint64([]byte(rqliteAppliedIndex), index) +} + +// GetAppliedIndex returns the AppliedIndex value. +func (l *Log) GetAppliedIndex() (uint64, error) { + i, err := l.GetUint64([]byte(rqliteAppliedIndex)) + if err != nil { + return 0, nil + } + return i, nil +} + // Stats returns stats about the BBoltDB database. func (l *Log) Stats() bbolt.Stats { return l.BoltStore.Stats() diff --git a/log/log_test.go b/log/log_test.go index f573a7e3..226fe831 100644 --- a/log/log_test.go +++ b/log/log_test.go @@ -33,7 +33,7 @@ func Test_LogNewEmpty(t *testing.T) { t.Fatalf("got non-zero value for last index of empty log: %d", li) } - lci, err := l.LastCommandIndex() + lci, err := l.LastCommandIndex(fi, li) if err != nil { t.Fatalf("failed to get last command index: %s", err) } @@ -83,7 +83,7 @@ func Test_LogNewExistNotEmpty(t *testing.T) { t.Fatalf("got wrong value for last index of not empty log: %d", li) } - lci, err := l.LastCommandIndex() + lci, err := l.LastCommandIndex(fi, li) if err != nil { t.Fatalf("failed to get last command index: %s", err) } @@ -185,7 +185,7 @@ func Test_LogNewExistNotEmptyNoFreelistSync(t *testing.T) { t.Fatalf("got wrong value for last index of not empty log: %d", li) } - lci, err := l.LastCommandIndex() + lci, err := l.LastCommandIndex(fi, li) if err != nil { t.Fatalf("failed to get last command index: %s", err) } @@ -288,7 +288,7 @@ func Test_LogLastCommandIndexNotExist(t *testing.T) { t.Fatalf("got wrong for last index of not empty log: %d", li) } - lci, err := l.LastCommandIndex() + lci, err := l.LastCommandIndex(fi, li) if err != nil { t.Fatalf("failed to get last command index: %s", err) } @@ -317,7 +317,11 @@ func Test_LogLastCommandIndexNotExist(t *testing.T) { t.Fatalf("failed to create new log: %s", err) } - lci, err = l.LastCommandIndex() + fi, li, err = l.Indexes() + if err != nil { + t.Fatalf("failed to get indexes: %s", err) + } + lci, err = l.LastCommandIndex(fi, li) if err != nil { t.Fatalf("failed to get last command index: %s", err) } @@ -326,6 +330,35 @@ func Test_LogLastCommandIndexNotExist(t *testing.T) { } } +func Test_LogAppliedIndex(t *testing.T) { + path := mustTempFile() + defer os.Remove(path) + + l, err := New(path, false) + if err != nil { + t.Fatalf("failed to create new log: %s", err) + } + + ai, err := l.GetAppliedIndex() + if err != nil { + t.Fatalf("failed to get applied index: %s", err) + } + if ai != 0 { + t.Fatalf("got wrong applied index for non-existent key: %d", ai) + } + + if l.SetAppliedIndex(1234); err != nil { + t.Fatalf("failed to set applied index: %s", err) + } + ai, err = l.GetAppliedIndex() + if err != nil { + t.Fatalf("failed to get applied index: %s", err) + } + if ai != 1234 { + t.Fatalf("got wrong applied index: %d", ai) + } +} + func Test_LogStats(t *testing.T) { path := mustTempFile() defer os.Remove(path) diff --git a/store/store.go b/store/store.go index 00134ebe..357dc627 100644 --- a/store/store.go +++ b/store/store.go @@ -63,20 +63,21 @@ var ( ) const ( - raftDBPath = "raft.db" // Changing this will break backwards compatibility. - peersPath = "raft/peers.json" - peersInfoPath = "raft/peers.info" - retainSnapshotCount = 1 - applyTimeout = 10 * time.Second - openTimeout = 120 * time.Second - sqliteFile = "db.sqlite" - leaderWaitDelay = 100 * time.Millisecond - appliedWaitDelay = 100 * time.Millisecond - connectionPoolCount = 5 - connectionTimeout = 10 * time.Second - raftLogCacheSize = 512 - trailingScale = 1.25 - observerChanLen = 50 + raftDBPath = "raft.db" // Changing this will break backwards compatibility. + peersPath = "raft/peers.json" + peersInfoPath = "raft/peers.info" + retainSnapshotCount = 1 + applyTimeout = 10 * time.Second + openTimeout = 120 * time.Second + sqliteFile = "db.sqlite" + leaderWaitDelay = 100 * time.Millisecond + appliedWaitDelay = 100 * time.Millisecond + appliedIndexUpdateInterval = 5 * time.Second + connectionPoolCount = 5 + connectionTimeout = 10 * time.Second + raftLogCacheSize = 512 + trailingScale = 1.25 + observerChanLen = 50 ) const ( @@ -172,8 +173,9 @@ type Store struct { queryTxMu sync.RWMutex - dbAppliedIndexMu sync.RWMutex - dbAppliedIndex uint64 + dbAppliedIndexMu sync.RWMutex + dbAppliedIndex uint64 + appliedIdxUpdateDone chan struct{} // Channels that must be closed for the Store to be considered ready. readyChans []<-chan struct{} @@ -202,7 +204,8 @@ type Store struct { snapsExistOnOpen bool // Any snaps present when store opens? firstIdxOnOpen uint64 // First index on log when Store opens. lastIdxOnOpen uint64 // Last index on log when Store opens. - lastCommandIdxOnOpen uint64 // Last command index on log when Store opens. + lastCommandIdxOnOpen uint64 // Last command index before applied index when Store opens. + lastAppliedIdxOnOpen uint64 // Last applied index on log when Store opens. firstLogAppliedT time.Time // Time first log is applied appliedOnOpen uint64 // Number of logs applied at open. openT time.Time // Timestamp when Store opens. @@ -406,8 +409,8 @@ func (s *Store) Open() (retErr error) { if err := s.setLogInfo(); err != nil { return fmt.Errorf("set log info: %s", err) } - s.logger.Printf("first log index: %d, last log index: %d, last command log index: %d:", - s.firstIdxOnOpen, s.lastIdxOnOpen, s.lastCommandIdxOnOpen) + s.logger.Printf("first log index: %d, last log index: %d, last applied index: %d, last command log index: %d:", + s.firstIdxOnOpen, s.lastIdxOnOpen, s.lastAppliedIdxOnOpen, s.lastCommandIdxOnOpen) // If an on-disk database has been requested, and there are no snapshots, and // there are no commands in the log, then this is the only opportunity to @@ -449,6 +452,9 @@ func (s *Store) Open() (retErr error) { s.raft.RegisterObserver(s.observer) s.observerClose, s.observerDone = s.observe() + // Periodically update the applied index for faster startup. + s.appliedIdxUpdateDone = s.updateAppliedIndex() + return nil } @@ -527,6 +533,7 @@ func (s *Store) Close(wait bool) (retErr error) { return nil } + close(s.appliedIdxUpdateDone) close(s.observerClose) <-s.observerDone @@ -879,13 +886,18 @@ func (s *Store) Stats() (map[string]interface{}, error) { if err != nil { return nil, err } + lAppliedIdx, err := s.boltStore.GetAppliedIndex() + if err != nil { + return nil, err + } status := map[string]interface{}{ - "open": s.open, - "node_id": s.raftID, - "raft": raftStats, - "fsm_index": fsmIdx, - "db_applied_index": dbAppliedIdx, - "addr": s.Addr(), + "open": s.open, + "node_id": s.raftID, + "raft": raftStats, + "fsm_index": fsmIdx, + "db_applied_index": dbAppliedIdx, + "last_applied_index": lAppliedIdx, + "addr": s.Addr(), "leader": map[string]string{ "node_id": leaderID, "addr": leaderAddr, @@ -1387,11 +1399,15 @@ func (s *Store) setLogInfo() error { if err != nil { return fmt.Errorf("failed to get last index: %s", err) } + s.lastAppliedIdxOnOpen, err = s.boltStore.GetAppliedIndex() + if err != nil { + return fmt.Errorf("failed to get last applied index: %s", err) + } s.lastIdxOnOpen, err = s.boltStore.LastIndex() if err != nil { return fmt.Errorf("failed to get last index: %s", err) } - s.lastCommandIdxOnOpen, err = s.boltStore.LastCommandIndex() + s.lastCommandIdxOnOpen, err = s.boltStore.LastCommandIndex(s.firstIdxOnOpen, s.lastAppliedIdxOnOpen) if err != nil { return fmt.Errorf("failed to get last command index: %s", err) } @@ -1431,6 +1447,31 @@ func (s *Store) raftConfig() *raft.Config { return config } +func (s *Store) updateAppliedIndex() chan struct{} { + done := make(chan struct{}) + go func() { + ticker := time.NewTicker(appliedIndexUpdateInterval) + defer ticker.Stop() + var idx uint64 + for { + select { + case <-ticker.C: + newIdx := s.raft.AppliedIndex() + if newIdx == idx { + continue + } + idx = newIdx + if err := s.boltStore.SetAppliedIndex(idx); err != nil { + s.logger.Printf("failed to set applied index: %s", err.Error()) + } + case <-done: + return + } + } + }() + return done +} + type fsmExecuteResponse struct { results []*command.ExecuteResult error error @@ -1462,7 +1503,7 @@ func (s *Store) Apply(l *raft.Log) (e interface{}) { // opened. s.appliedOnOpen++ if l.Index == s.lastCommandIdxOnOpen { - s.logger.Printf("%d committed log entries applied in %s, took %s since open", + s.logger.Printf("%d confirmed committed log entries applied in %s, took %s since open", s.appliedOnOpen, time.Since(s.firstLogAppliedT), time.Since(s.openT)) // Last command log applied. Time to switch to on-disk database?