1
0
Fork 0

Periodically record actually applied index

This ensures the system doesn't apply uncommitted log entries to the
FSM, if it runs the in-memory startup. There have been no reports of
this in the field -- it is an edge case -- but this removes any chance
of it.
master
Philip O'Toole 1 year ago
parent f3a73efb84
commit ad73779285

@ -8,6 +8,10 @@ import (
"go.etcd.io/bbolt"
)
const (
rqliteAppliedIndex = "rqlite_applied_index"
)
// Log is an object that can return information about the Raft log.
type Log struct {
*raftboltdb.BoltStore
@ -48,12 +52,7 @@ func (l *Log) Indexes() (uint64, uint64, error) {
// LastCommandIndex returns the index of the last Command
// log entry written to the Raft log. Returns an index of
// zero if no such log exists.
func (l *Log) LastCommandIndex() (uint64, error) {
fi, li, err := l.Indexes()
if err != nil {
return 0, fmt.Errorf("failed to get indexes: %s", err)
}
func (l *Log) LastCommandIndex(fi, li uint64) (uint64, error) {
// Check for empty log.
if li == 0 {
return 0, nil
@ -71,6 +70,20 @@ func (l *Log) LastCommandIndex() (uint64, error) {
return 0, nil
}
// SetAppliedIndex sets the AppliedIndex value.
func (l *Log) SetAppliedIndex(index uint64) error {
return l.SetUint64([]byte(rqliteAppliedIndex), index)
}
// GetAppliedIndex returns the AppliedIndex value.
func (l *Log) GetAppliedIndex() (uint64, error) {
i, err := l.GetUint64([]byte(rqliteAppliedIndex))
if err != nil {
return 0, nil
}
return i, nil
}
// Stats returns stats about the BBoltDB database.
func (l *Log) Stats() bbolt.Stats {
return l.BoltStore.Stats()

@ -33,7 +33,7 @@ func Test_LogNewEmpty(t *testing.T) {
t.Fatalf("got non-zero value for last index of empty log: %d", li)
}
lci, err := l.LastCommandIndex()
lci, err := l.LastCommandIndex(fi, li)
if err != nil {
t.Fatalf("failed to get last command index: %s", err)
}
@ -83,7 +83,7 @@ func Test_LogNewExistNotEmpty(t *testing.T) {
t.Fatalf("got wrong value for last index of not empty log: %d", li)
}
lci, err := l.LastCommandIndex()
lci, err := l.LastCommandIndex(fi, li)
if err != nil {
t.Fatalf("failed to get last command index: %s", err)
}
@ -185,7 +185,7 @@ func Test_LogNewExistNotEmptyNoFreelistSync(t *testing.T) {
t.Fatalf("got wrong value for last index of not empty log: %d", li)
}
lci, err := l.LastCommandIndex()
lci, err := l.LastCommandIndex(fi, li)
if err != nil {
t.Fatalf("failed to get last command index: %s", err)
}
@ -288,7 +288,7 @@ func Test_LogLastCommandIndexNotExist(t *testing.T) {
t.Fatalf("got wrong for last index of not empty log: %d", li)
}
lci, err := l.LastCommandIndex()
lci, err := l.LastCommandIndex(fi, li)
if err != nil {
t.Fatalf("failed to get last command index: %s", err)
}
@ -317,7 +317,11 @@ func Test_LogLastCommandIndexNotExist(t *testing.T) {
t.Fatalf("failed to create new log: %s", err)
}
lci, err = l.LastCommandIndex()
fi, li, err = l.Indexes()
if err != nil {
t.Fatalf("failed to get indexes: %s", err)
}
lci, err = l.LastCommandIndex(fi, li)
if err != nil {
t.Fatalf("failed to get last command index: %s", err)
}
@ -326,6 +330,35 @@ func Test_LogLastCommandIndexNotExist(t *testing.T) {
}
}
func Test_LogAppliedIndex(t *testing.T) {
path := mustTempFile()
defer os.Remove(path)
l, err := New(path, false)
if err != nil {
t.Fatalf("failed to create new log: %s", err)
}
ai, err := l.GetAppliedIndex()
if err != nil {
t.Fatalf("failed to get applied index: %s", err)
}
if ai != 0 {
t.Fatalf("got wrong applied index for non-existent key: %d", ai)
}
if l.SetAppliedIndex(1234); err != nil {
t.Fatalf("failed to set applied index: %s", err)
}
ai, err = l.GetAppliedIndex()
if err != nil {
t.Fatalf("failed to get applied index: %s", err)
}
if ai != 1234 {
t.Fatalf("got wrong applied index: %d", ai)
}
}
func Test_LogStats(t *testing.T) {
path := mustTempFile()
defer os.Remove(path)

@ -63,20 +63,21 @@ var (
)
const (
raftDBPath = "raft.db" // Changing this will break backwards compatibility.
peersPath = "raft/peers.json"
peersInfoPath = "raft/peers.info"
retainSnapshotCount = 1
applyTimeout = 10 * time.Second
openTimeout = 120 * time.Second
sqliteFile = "db.sqlite"
leaderWaitDelay = 100 * time.Millisecond
appliedWaitDelay = 100 * time.Millisecond
connectionPoolCount = 5
connectionTimeout = 10 * time.Second
raftLogCacheSize = 512
trailingScale = 1.25
observerChanLen = 50
raftDBPath = "raft.db" // Changing this will break backwards compatibility.
peersPath = "raft/peers.json"
peersInfoPath = "raft/peers.info"
retainSnapshotCount = 1
applyTimeout = 10 * time.Second
openTimeout = 120 * time.Second
sqliteFile = "db.sqlite"
leaderWaitDelay = 100 * time.Millisecond
appliedWaitDelay = 100 * time.Millisecond
appliedIndexUpdateInterval = 5 * time.Second
connectionPoolCount = 5
connectionTimeout = 10 * time.Second
raftLogCacheSize = 512
trailingScale = 1.25
observerChanLen = 50
)
const (
@ -172,8 +173,9 @@ type Store struct {
queryTxMu sync.RWMutex
dbAppliedIndexMu sync.RWMutex
dbAppliedIndex uint64
dbAppliedIndexMu sync.RWMutex
dbAppliedIndex uint64
appliedIdxUpdateDone chan struct{}
// Channels that must be closed for the Store to be considered ready.
readyChans []<-chan struct{}
@ -202,7 +204,8 @@ type Store struct {
snapsExistOnOpen bool // Any snaps present when store opens?
firstIdxOnOpen uint64 // First index on log when Store opens.
lastIdxOnOpen uint64 // Last index on log when Store opens.
lastCommandIdxOnOpen uint64 // Last command index on log when Store opens.
lastCommandIdxOnOpen uint64 // Last command index before applied index when Store opens.
lastAppliedIdxOnOpen uint64 // Last applied index on log when Store opens.
firstLogAppliedT time.Time // Time first log is applied
appliedOnOpen uint64 // Number of logs applied at open.
openT time.Time // Timestamp when Store opens.
@ -406,8 +409,8 @@ func (s *Store) Open() (retErr error) {
if err := s.setLogInfo(); err != nil {
return fmt.Errorf("set log info: %s", err)
}
s.logger.Printf("first log index: %d, last log index: %d, last command log index: %d:",
s.firstIdxOnOpen, s.lastIdxOnOpen, s.lastCommandIdxOnOpen)
s.logger.Printf("first log index: %d, last log index: %d, last applied index: %d, last command log index: %d:",
s.firstIdxOnOpen, s.lastIdxOnOpen, s.lastAppliedIdxOnOpen, s.lastCommandIdxOnOpen)
// If an on-disk database has been requested, and there are no snapshots, and
// there are no commands in the log, then this is the only opportunity to
@ -449,6 +452,9 @@ func (s *Store) Open() (retErr error) {
s.raft.RegisterObserver(s.observer)
s.observerClose, s.observerDone = s.observe()
// Periodically update the applied index for faster startup.
s.appliedIdxUpdateDone = s.updateAppliedIndex()
return nil
}
@ -527,6 +533,7 @@ func (s *Store) Close(wait bool) (retErr error) {
return nil
}
close(s.appliedIdxUpdateDone)
close(s.observerClose)
<-s.observerDone
@ -879,13 +886,18 @@ func (s *Store) Stats() (map[string]interface{}, error) {
if err != nil {
return nil, err
}
lAppliedIdx, err := s.boltStore.GetAppliedIndex()
if err != nil {
return nil, err
}
status := map[string]interface{}{
"open": s.open,
"node_id": s.raftID,
"raft": raftStats,
"fsm_index": fsmIdx,
"db_applied_index": dbAppliedIdx,
"addr": s.Addr(),
"open": s.open,
"node_id": s.raftID,
"raft": raftStats,
"fsm_index": fsmIdx,
"db_applied_index": dbAppliedIdx,
"last_applied_index": lAppliedIdx,
"addr": s.Addr(),
"leader": map[string]string{
"node_id": leaderID,
"addr": leaderAddr,
@ -1387,11 +1399,15 @@ func (s *Store) setLogInfo() error {
if err != nil {
return fmt.Errorf("failed to get last index: %s", err)
}
s.lastAppliedIdxOnOpen, err = s.boltStore.GetAppliedIndex()
if err != nil {
return fmt.Errorf("failed to get last applied index: %s", err)
}
s.lastIdxOnOpen, err = s.boltStore.LastIndex()
if err != nil {
return fmt.Errorf("failed to get last index: %s", err)
}
s.lastCommandIdxOnOpen, err = s.boltStore.LastCommandIndex()
s.lastCommandIdxOnOpen, err = s.boltStore.LastCommandIndex(s.firstIdxOnOpen, s.lastAppliedIdxOnOpen)
if err != nil {
return fmt.Errorf("failed to get last command index: %s", err)
}
@ -1431,6 +1447,31 @@ func (s *Store) raftConfig() *raft.Config {
return config
}
func (s *Store) updateAppliedIndex() chan struct{} {
done := make(chan struct{})
go func() {
ticker := time.NewTicker(appliedIndexUpdateInterval)
defer ticker.Stop()
var idx uint64
for {
select {
case <-ticker.C:
newIdx := s.raft.AppliedIndex()
if newIdx == idx {
continue
}
idx = newIdx
if err := s.boltStore.SetAppliedIndex(idx); err != nil {
s.logger.Printf("failed to set applied index: %s", err.Error())
}
case <-done:
return
}
}
}()
return done
}
type fsmExecuteResponse struct {
results []*command.ExecuteResult
error error
@ -1462,7 +1503,7 @@ func (s *Store) Apply(l *raft.Log) (e interface{}) {
// opened.
s.appliedOnOpen++
if l.Index == s.lastCommandIdxOnOpen {
s.logger.Printf("%d committed log entries applied in %s, took %s since open",
s.logger.Printf("%d confirmed committed log entries applied in %s, took %s since open",
s.appliedOnOpen, time.Since(s.firstLogAppliedT), time.Since(s.openT))
// Last command log applied. Time to switch to on-disk database?

Loading…
Cancel
Save