|
|
|
@ -221,6 +221,7 @@ type Store struct {
|
|
|
|
|
raftID string // Node ID.
|
|
|
|
|
dbConf *DBConfig // SQLite database config.
|
|
|
|
|
dbPath string // Path to underlying SQLite file.
|
|
|
|
|
walPath string // Path to WAL file.
|
|
|
|
|
dbDir string // Path to directory containing SQLite file.
|
|
|
|
|
db *sql.DB // The underlying SQLite store.
|
|
|
|
|
|
|
|
|
@ -338,6 +339,7 @@ func New(ly Layer, c *Config) *Store {
|
|
|
|
|
raftID: c.ID,
|
|
|
|
|
dbConf: c.DBConf,
|
|
|
|
|
dbPath: dbPath,
|
|
|
|
|
walPath: sql.WALPath(dbPath),
|
|
|
|
|
dbDir: filepath.Dir(dbPath),
|
|
|
|
|
leaderObservers: make([]chan<- struct{}, 0),
|
|
|
|
|
reqMarshaller: command.NewRequestMarshaler(),
|
|
|
|
@ -952,11 +954,6 @@ func (s *Store) Stats() (map[string]interface{}, error) {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
snapsStats, err := s.snapshotStore.Stats()
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
lAppliedIdx, err := s.boltStore.GetAppliedIndex()
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
@ -981,7 +978,6 @@ func (s *Store) Stats() (map[string]interface{}, error) {
|
|
|
|
|
"apply_timeout": s.ApplyTimeout.String(),
|
|
|
|
|
"heartbeat_timeout": s.HeartbeatTimeout.String(),
|
|
|
|
|
"election_timeout": s.ElectionTimeout.String(),
|
|
|
|
|
"snapshot_store": snapsStats,
|
|
|
|
|
"snapshot_threshold": s.SnapshotThreshold,
|
|
|
|
|
"snapshot_interval": s.SnapshotInterval.String(),
|
|
|
|
|
"reap_timeout": s.ReapTimeout.String(),
|
|
|
|
@ -995,6 +991,13 @@ func (s *Store) Stats() (map[string]interface{}, error) {
|
|
|
|
|
"sqlite3": dbStatus,
|
|
|
|
|
"db_conf": s.dbConf,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Snapshot stats may be in flux if a snapshot is in progress. Only
|
|
|
|
|
// report them if they are available.
|
|
|
|
|
snapsStats, err := s.snapshotStore.Stats()
|
|
|
|
|
if err == nil {
|
|
|
|
|
status["snapshot_store"] = snapsStats
|
|
|
|
|
}
|
|
|
|
|
return status, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -1738,8 +1741,8 @@ func (s *Store) fsmSnapshot() (fSnap raft.FSMSnapshot, retErr error) {
|
|
|
|
|
} else {
|
|
|
|
|
compactedBuf := bytes.NewBuffer(nil)
|
|
|
|
|
var err error
|
|
|
|
|
if pathExistsWithData(s.db.WALPath()) {
|
|
|
|
|
walFD, err := os.Open(s.db.WALPath())
|
|
|
|
|
if pathExistsWithData(s.walPath) {
|
|
|
|
|
walFD, err := os.Open(s.walPath)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
@ -1758,7 +1761,7 @@ func (s *Store) fsmSnapshot() (fSnap raft.FSMSnapshot, retErr error) {
|
|
|
|
|
}
|
|
|
|
|
walFD.Close() // We need it closed for the next step.
|
|
|
|
|
|
|
|
|
|
walSz, err := fileSize(s.db.WALPath())
|
|
|
|
|
walSz, err := fileSize(s.walPath)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
@ -1975,7 +1978,7 @@ func (s *Store) runWALSnapshotting() (closeCh, doneCh chan struct{}) {
|
|
|
|
|
for {
|
|
|
|
|
select {
|
|
|
|
|
case <-ticker.C:
|
|
|
|
|
sz, err := s.db.WALSize()
|
|
|
|
|
sz, err := fileSize(s.walPath)
|
|
|
|
|
if err != nil {
|
|
|
|
|
s.logger.Printf("failed to get WAL size: %s", err.Error())
|
|
|
|
|
continue
|
|
|
|
|