|
|
|
@ -6,6 +6,7 @@ package store
|
|
|
|
|
import (
|
|
|
|
|
"bytes"
|
|
|
|
|
"compress/gzip"
|
|
|
|
|
"encoding/binary"
|
|
|
|
|
"errors"
|
|
|
|
|
"expvar"
|
|
|
|
|
"fmt"
|
|
|
|
@ -85,6 +86,7 @@ const (
|
|
|
|
|
restoreScratchPattern = "rqlite-restore-*"
|
|
|
|
|
bootScatchPattern = "rqlite-boot-*"
|
|
|
|
|
backupScatchPattern = "rqlite-backup-*"
|
|
|
|
|
vacuumScatchPattern = "rqlite-vacuum-*"
|
|
|
|
|
raftDBPath = "raft.db" // Changing this will break backwards compatibility.
|
|
|
|
|
peersPath = "raft/peers.json"
|
|
|
|
|
peersInfoPath = "raft/peers.info"
|
|
|
|
@ -100,6 +102,8 @@ const (
|
|
|
|
|
raftLogCacheSize = 512
|
|
|
|
|
trailingScale = 1.25
|
|
|
|
|
observerChanLen = 50
|
|
|
|
|
|
|
|
|
|
lastVacuumTimeKey = "rqlite_last_vacuum"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
const (
|
|
|
|
@ -111,6 +115,8 @@ const (
|
|
|
|
|
numWALSnapshotsFailed = "num_wal_snapshots_failed"
|
|
|
|
|
numSnapshotsFull = "num_snapshots_full"
|
|
|
|
|
numSnapshotsIncremental = "num_snapshots_incremental"
|
|
|
|
|
numAutoVacuums = "num_auto_vacuums"
|
|
|
|
|
autoVacuumDuration = "auto_vacuum_duration"
|
|
|
|
|
numBoots = "num_boots"
|
|
|
|
|
numBackups = "num_backups"
|
|
|
|
|
numLoads = "num_loads"
|
|
|
|
@ -159,6 +165,8 @@ func ResetStats() {
|
|
|
|
|
stats.Add(numWALSnapshotsFailed, 0)
|
|
|
|
|
stats.Add(numSnapshotsFull, 0)
|
|
|
|
|
stats.Add(numSnapshotsIncremental, 0)
|
|
|
|
|
stats.Add(numAutoVacuums, 0)
|
|
|
|
|
stats.Add(autoVacuumDuration, 0)
|
|
|
|
|
stats.Add(numBoots, 0)
|
|
|
|
|
stats.Add(numBackups, 0)
|
|
|
|
|
stats.Add(numLoads, 0)
|
|
|
|
@ -301,6 +309,7 @@ type Store struct {
|
|
|
|
|
ApplyTimeout time.Duration
|
|
|
|
|
RaftLogLevel string
|
|
|
|
|
NoFreeListSync bool
|
|
|
|
|
AutoVacInterval time.Duration
|
|
|
|
|
|
|
|
|
|
// Node-reaping configuration
|
|
|
|
|
ReapTimeout time.Duration
|
|
|
|
@ -491,7 +500,11 @@ func (s *Store) Open() (retErr error) {
|
|
|
|
|
// Clean up any files from aborted operations. This tries to catch the case where scratch files
|
|
|
|
|
// were created in the Raft directory, not cleaned up, and then the node was restarted with an
|
|
|
|
|
// explicit SQLite path set.
|
|
|
|
|
for _, pattern := range []string{restoreScratchPattern, bootScatchPattern, backupScatchPattern} {
|
|
|
|
|
for _, pattern := range []string{
|
|
|
|
|
restoreScratchPattern,
|
|
|
|
|
bootScatchPattern,
|
|
|
|
|
backupScatchPattern,
|
|
|
|
|
vacuumScatchPattern} {
|
|
|
|
|
for _, dir := range []string{s.raftDir, s.dbDir} {
|
|
|
|
|
files, err := filepath.Glob(filepath.Join(dir, pattern))
|
|
|
|
|
if err != nil {
|
|
|
|
@ -530,6 +543,9 @@ func (s *Store) Open() (retErr error) {
|
|
|
|
|
// Periodically update the applied index for faster startup.
|
|
|
|
|
s.appliedIdxUpdateDone = s.updateAppliedIndex()
|
|
|
|
|
|
|
|
|
|
if err := s.initLastVacuumTime(); err != nil {
|
|
|
|
|
return fmt.Errorf("failed to initialize last vacuum time: %s", err.Error())
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -957,6 +973,10 @@ func (s *Store) Stats() (map[string]interface{}, error) {
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
lVac, err := s.lastVacuumTime()
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
status := map[string]interface{}{
|
|
|
|
|
"open": s.open,
|
|
|
|
|
"node_id": s.raftID,
|
|
|
|
@ -990,6 +1010,8 @@ func (s *Store) Stats() (map[string]interface{}, error) {
|
|
|
|
|
"dir_size_friendly": friendlyBytes(uint64(dirSz)),
|
|
|
|
|
"sqlite3": dbStatus,
|
|
|
|
|
"db_conf": s.dbConf,
|
|
|
|
|
"last_vacuum": lVac.String(),
|
|
|
|
|
"next_vacuum": lVac.Add(s.AutoVacInterval).String(),
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Snapshot stats may be in flux if a snapshot is in progress. Only
|
|
|
|
@ -1606,6 +1628,59 @@ func (s *Store) remove(id string) error {
|
|
|
|
|
return f.Error()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *Store) vacuumInto() (string, error) {
|
|
|
|
|
fd, err := os.CreateTemp(s.dbDir, vacuumScatchPattern)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return "", err
|
|
|
|
|
}
|
|
|
|
|
if err := fd.Close(); err != nil {
|
|
|
|
|
return "", err
|
|
|
|
|
}
|
|
|
|
|
if err := s.db.VacuumInto(fd.Name()); err != nil {
|
|
|
|
|
return "", err
|
|
|
|
|
}
|
|
|
|
|
return fd.Name(), nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *Store) initLastVacuumTime() error {
|
|
|
|
|
if _, err := s.boltStore.Get([]byte(lastVacuumTimeKey)); err != nil {
|
|
|
|
|
if err == rlog.ErrKeyNotFound {
|
|
|
|
|
s.logger.Println("no vacuum has been performed on this database")
|
|
|
|
|
n := time.Now().UnixNano() // First vacuum will be in the future.
|
|
|
|
|
buf := bytes.NewBuffer(make([]byte, 0, 8))
|
|
|
|
|
if err := binary.Write(buf, binary.LittleEndian, n); err != nil {
|
|
|
|
|
return fmt.Errorf("failed to write last vacuum time: %s", err)
|
|
|
|
|
}
|
|
|
|
|
if err := s.boltStore.Set([]byte("rqlite_last_vacuum"), buf.Bytes()); err != nil {
|
|
|
|
|
return fmt.Errorf("failed to set last vacuum time: %s", err)
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
return fmt.Errorf("failed to get last vacuum time: %s", err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *Store) setLastVacuumTime(t time.Time) error {
|
|
|
|
|
buf := bytes.NewBuffer(make([]byte, 0, 8))
|
|
|
|
|
if err := binary.Write(buf, binary.LittleEndian, t.UnixNano()); err != nil {
|
|
|
|
|
return fmt.Errorf("failed to encode last vacuum time: %s", err)
|
|
|
|
|
}
|
|
|
|
|
if err := s.boltStore.Set([]byte(lastVacuumTimeKey), buf.Bytes()); err != nil {
|
|
|
|
|
return fmt.Errorf("failed to set last vacuum time: %s", err)
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *Store) lastVacuumTime() (time.Time, error) {
|
|
|
|
|
vt, err := s.boltStore.Get([]byte(lastVacuumTimeKey))
|
|
|
|
|
if err != nil {
|
|
|
|
|
return time.Time{}, fmt.Errorf("failed to get last vacuum time: %s", err)
|
|
|
|
|
}
|
|
|
|
|
n := int64(binary.LittleEndian.Uint64(vt))
|
|
|
|
|
return time.Unix(0, n), nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// raftConfig returns a new Raft config for the store.
|
|
|
|
|
func (s *Store) raftConfig() *raft.Config {
|
|
|
|
|
config := raft.DefaultConfig()
|
|
|
|
@ -1760,6 +1835,50 @@ func (s *Store) fsmSnapshot() (fSnap raft.FSMSnapshot, retErr error) {
|
|
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
// Automatic VACUUM needed?
|
|
|
|
|
lvt, err := s.lastVacuumTime()
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
if s.AutoVacInterval != 0 && time.Since(lvt) > s.AutoVacInterval {
|
|
|
|
|
vacStart := time.Now()
|
|
|
|
|
vacPath, err := s.vacuumInto()
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Verify that the VACUUMed database is valid.
|
|
|
|
|
if !sql.IsValidSQLiteFile(vacPath) {
|
|
|
|
|
return nil, fmt.Errorf("invalid SQLite file post VACUUM")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Swap in new database file.
|
|
|
|
|
if err := s.db.Close(); err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
if err := sql.RemoveFiles(s.dbPath); err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
if err := os.Rename(vacPath, s.dbPath); err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
db, err := sql.Open(s.dbPath, s.dbConf.FKConstraints, true)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
s.db = db
|
|
|
|
|
|
|
|
|
|
if err := s.snapshotStore.SetFullNeeded(); err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
if err := s.setLastVacuumTime(time.Now()); err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
s.logger.Printf("database vacuumed in %s", time.Since(vacStart))
|
|
|
|
|
stats.Get(autoVacuumDuration).(*expvar.Int).Set(time.Since(vacStart).Milliseconds())
|
|
|
|
|
stats.Add(numAutoVacuums, 1)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fullNeeded, err := s.snapshotStore.FullNeeded()
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|