|
|
|
@ -72,6 +72,7 @@ const (
|
|
|
|
|
sqliteFile = "db.sqlite"
|
|
|
|
|
leaderWaitDelay = 100 * time.Millisecond
|
|
|
|
|
appliedWaitDelay = 100 * time.Millisecond
|
|
|
|
|
appliedIndexUpdateInterval = 5 * time.Second
|
|
|
|
|
connectionPoolCount = 5
|
|
|
|
|
connectionTimeout = 10 * time.Second
|
|
|
|
|
raftLogCacheSize = 512
|
|
|
|
@ -174,6 +175,7 @@ type Store struct {
|
|
|
|
|
|
|
|
|
|
dbAppliedIndexMu sync.RWMutex
|
|
|
|
|
dbAppliedIndex uint64
|
|
|
|
|
appliedIdxUpdateDone chan struct{}
|
|
|
|
|
|
|
|
|
|
// Channels that must be closed for the Store to be considered ready.
|
|
|
|
|
readyChans []<-chan struct{}
|
|
|
|
@ -202,7 +204,8 @@ type Store struct {
|
|
|
|
|
snapsExistOnOpen bool // Any snaps present when store opens?
|
|
|
|
|
firstIdxOnOpen uint64 // First index on log when Store opens.
|
|
|
|
|
lastIdxOnOpen uint64 // Last index on log when Store opens.
|
|
|
|
|
lastCommandIdxOnOpen uint64 // Last command index on log when Store opens.
|
|
|
|
|
lastCommandIdxOnOpen uint64 // Last command index before applied index when Store opens.
|
|
|
|
|
lastAppliedIdxOnOpen uint64 // Last applied index on log when Store opens.
|
|
|
|
|
firstLogAppliedT time.Time // Time first log is applied
|
|
|
|
|
appliedOnOpen uint64 // Number of logs applied at open.
|
|
|
|
|
openT time.Time // Timestamp when Store opens.
|
|
|
|
@ -392,7 +395,7 @@ func (s *Store) Open() (retErr error) {
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("failed to read peers file: %s", err.Error())
|
|
|
|
|
}
|
|
|
|
|
if err = RecoverNode(s.raftDir, s.logger, s.raftLog, s.raftStable, snapshots, s.raftTn, config); err != nil {
|
|
|
|
|
if err = RecoverNode(s.raftDir, s.logger, s.raftLog, s.boltStore, snapshots, s.raftTn, config); err != nil {
|
|
|
|
|
return fmt.Errorf("failed to recover node: %s", err.Error())
|
|
|
|
|
}
|
|
|
|
|
if err := os.Rename(s.peersPath, s.peersInfoPath); err != nil {
|
|
|
|
@ -406,8 +409,8 @@ func (s *Store) Open() (retErr error) {
|
|
|
|
|
if err := s.setLogInfo(); err != nil {
|
|
|
|
|
return fmt.Errorf("set log info: %s", err)
|
|
|
|
|
}
|
|
|
|
|
s.logger.Printf("first log index: %d, last log index: %d, last command log index: %d:",
|
|
|
|
|
s.firstIdxOnOpen, s.lastIdxOnOpen, s.lastCommandIdxOnOpen)
|
|
|
|
|
s.logger.Printf("first log index: %d, last log index: %d, last applied index: %d, last command log index: %d:",
|
|
|
|
|
s.firstIdxOnOpen, s.lastIdxOnOpen, s.lastAppliedIdxOnOpen, s.lastCommandIdxOnOpen)
|
|
|
|
|
|
|
|
|
|
// If an on-disk database has been requested, and there are no snapshots, and
|
|
|
|
|
// there are no commands in the log, then this is the only opportunity to
|
|
|
|
@ -449,6 +452,9 @@ func (s *Store) Open() (retErr error) {
|
|
|
|
|
s.raft.RegisterObserver(s.observer)
|
|
|
|
|
s.observerClose, s.observerDone = s.observe()
|
|
|
|
|
|
|
|
|
|
// Periodically update the applied index for faster startup.
|
|
|
|
|
s.appliedIdxUpdateDone = s.updateAppliedIndex()
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -527,6 +533,7 @@ func (s *Store) Close(wait bool) (retErr error) {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
close(s.appliedIdxUpdateDone)
|
|
|
|
|
close(s.observerClose)
|
|
|
|
|
<-s.observerDone
|
|
|
|
|
|
|
|
|
@ -879,12 +886,17 @@ func (s *Store) Stats() (map[string]interface{}, error) {
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
lAppliedIdx, err := s.boltStore.GetAppliedIndex()
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
status := map[string]interface{}{
|
|
|
|
|
"open": s.open,
|
|
|
|
|
"node_id": s.raftID,
|
|
|
|
|
"raft": raftStats,
|
|
|
|
|
"fsm_index": fsmIdx,
|
|
|
|
|
"db_applied_index": dbAppliedIdx,
|
|
|
|
|
"last_applied_index": lAppliedIdx,
|
|
|
|
|
"addr": s.Addr(),
|
|
|
|
|
"leader": map[string]string{
|
|
|
|
|
"node_id": leaderID,
|
|
|
|
@ -1387,11 +1399,15 @@ func (s *Store) setLogInfo() error {
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("failed to get last index: %s", err)
|
|
|
|
|
}
|
|
|
|
|
s.lastAppliedIdxOnOpen, err = s.boltStore.GetAppliedIndex()
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("failed to get last applied index: %s", err)
|
|
|
|
|
}
|
|
|
|
|
s.lastIdxOnOpen, err = s.boltStore.LastIndex()
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("failed to get last index: %s", err)
|
|
|
|
|
}
|
|
|
|
|
s.lastCommandIdxOnOpen, err = s.boltStore.LastCommandIndex()
|
|
|
|
|
s.lastCommandIdxOnOpen, err = s.boltStore.LastCommandIndex(s.firstIdxOnOpen, s.lastAppliedIdxOnOpen)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return fmt.Errorf("failed to get last command index: %s", err)
|
|
|
|
|
}
|
|
|
|
@ -1431,6 +1447,31 @@ func (s *Store) raftConfig() *raft.Config {
|
|
|
|
|
return config
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *Store) updateAppliedIndex() chan struct{} {
|
|
|
|
|
done := make(chan struct{})
|
|
|
|
|
go func() {
|
|
|
|
|
ticker := time.NewTicker(appliedIndexUpdateInterval)
|
|
|
|
|
defer ticker.Stop()
|
|
|
|
|
var idx uint64
|
|
|
|
|
for {
|
|
|
|
|
select {
|
|
|
|
|
case <-ticker.C:
|
|
|
|
|
newIdx := s.raft.AppliedIndex()
|
|
|
|
|
if newIdx == idx {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
idx = newIdx
|
|
|
|
|
if err := s.boltStore.SetAppliedIndex(idx); err != nil {
|
|
|
|
|
s.logger.Printf("failed to set applied index: %s", err.Error())
|
|
|
|
|
}
|
|
|
|
|
case <-done:
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
return done
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type fsmExecuteResponse struct {
|
|
|
|
|
results []*command.ExecuteResult
|
|
|
|
|
error error
|
|
|
|
@ -1462,7 +1503,7 @@ func (s *Store) Apply(l *raft.Log) (e interface{}) {
|
|
|
|
|
// opened.
|
|
|
|
|
s.appliedOnOpen++
|
|
|
|
|
if l.Index == s.lastCommandIdxOnOpen {
|
|
|
|
|
s.logger.Printf("%d committed log entries applied in %s, took %s since open",
|
|
|
|
|
s.logger.Printf("%d confirmed committed log entries applied in %s, took %s since open",
|
|
|
|
|
s.appliedOnOpen, time.Since(s.firstLogAppliedT), time.Since(s.openT))
|
|
|
|
|
|
|
|
|
|
// Last command log applied. Time to switch to on-disk database?
|
|
|
|
@ -1759,7 +1800,7 @@ func (s *Store) tryCompress(rq command.Requester) ([]byte, bool, error) {
|
|
|
|
|
// RecoverNode is used to manually force a new configuration, in the event that
|
|
|
|
|
// quorum cannot be restored. This borrows heavily from RecoverCluster functionality
|
|
|
|
|
// of the Hashicorp Raft library, but has been customized for rqlite use.
|
|
|
|
|
func RecoverNode(dataDir string, logger *log.Logger, logs raft.LogStore, stable raft.StableStore,
|
|
|
|
|
func RecoverNode(dataDir string, logger *log.Logger, logs raft.LogStore, stable *rlog.Log,
|
|
|
|
|
snaps raft.SnapshotStore, tn raft.Transport, conf raft.Configuration) error {
|
|
|
|
|
logPrefix := logger.Prefix()
|
|
|
|
|
logger.SetPrefix(fmt.Sprintf("%s[recovery] ", logPrefix))
|
|
|
|
@ -1869,6 +1910,11 @@ func RecoverNode(dataDir string, logger *log.Logger, logs raft.LogStore, stable
|
|
|
|
|
return fmt.Errorf("log compaction failed: %v", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Erase record of previous updating of Applied Index too.
|
|
|
|
|
if err := stable.SetAppliedIndex(0); err != nil {
|
|
|
|
|
return fmt.Errorf("failed to zero applied index: %v", err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|