From 9d2c698910d7fdff2c605c9314b7a20e5e1f5fd0 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Fri, 19 Jan 2024 21:58:09 -0500 Subject: [PATCH] Initial implementation of automatic VACUUM --- cmd/rqlited/flags.go | 4 ++ cmd/rqlited/main.go | 1 + log/log.go | 15 ++++++ store/store.go | 121 ++++++++++++++++++++++++++++++++++++++++++- 4 files changed, 140 insertions(+), 1 deletion(-) diff --git a/cmd/rqlited/flags.go b/cmd/rqlited/flags.go index 26874ab2..f055d78a 100644 --- a/cmd/rqlited/flags.go +++ b/cmd/rqlited/flags.go @@ -135,6 +135,9 @@ type Config struct { // FKConstraints enables SQLite foreign key constraints. FKConstraints bool + // AutoVacInterval sets the automatic VACUUM interval. Use 0s to disable. + AutoVacInterval time.Duration + // RaftLogLevel sets the minimum logging level for the Raft subsystem. RaftLogLevel string @@ -460,6 +463,7 @@ func ParseFlags(name, desc string, build *BuildInfo) (*Config, error) { flag.StringVar(&config.OnDiskPath, "on-disk-path", "", "Path for SQLite on-disk database file. If not set, use a file in data directory") flag.BoolVar(&config.FKConstraints, "fk", false, "Enable SQLite foreign key constraints") flag.BoolVar(&showVersion, "version", false, "Show version information and exit") + flag.DurationVar(&config.AutoVacInterval, "auto-vacuum-int", 0, "Automatic VACUUM interval. Automatic VACUUM disabled if not set") flag.BoolVar(&config.RaftNonVoter, "raft-non-voter", false, "Configure as non-voting node") flag.DurationVar(&config.RaftHeartbeatTimeout, "raft-timeout", time.Second, "Raft heartbeat timeout") flag.DurationVar(&config.RaftElectionTimeout, "raft-election-timeout", time.Second, "Raft election timeout") diff --git a/cmd/rqlited/main.go b/cmd/rqlited/main.go index 7e18fff4..0ab85f66 100644 --- a/cmd/rqlited/main.go +++ b/cmd/rqlited/main.go @@ -282,6 +282,7 @@ func createStore(cfg *Config, ln *tcp.Layer) (*store.Store, error) { str.BootstrapExpect = cfg.BootstrapExpect str.ReapTimeout = cfg.RaftReapNodeTimeout str.ReapReadOnlyTimeout = cfg.RaftReapReadOnlyNodeTimeout + str.AutoVacInterval = cfg.AutoVacInterval if store.IsNewNode(cfg.DataPath) { log.Printf("no preexisting node state detected in %s, node may be bootstrapping", cfg.DataPath) diff --git a/log/log.go b/log/log.go index 5e8d7781..189b2585 100644 --- a/log/log.go +++ b/log/log.go @@ -12,6 +12,10 @@ const ( rqliteAppliedIndex = "rqlite_applied_index" ) +var ( + ErrKeyNotFound = raftboltdb.ErrKeyNotFound +) + // Log is an object that can return information about the Raft log. type Log struct { *raftboltdb.BoltStore @@ -101,6 +105,17 @@ func (l *Log) GetAppliedIndex() (uint64, error) { return i, nil } +// Get returns the value for the given key. +func (l *Log) Get(key []byte) (val []byte, err error) { + defer func() { + if err != raftboltdb.ErrKeyNotFound { + err = ErrKeyNotFound + } + }() + val, err = l.BoltStore.Get(key) + return +} + // Stats returns stats about the BBoltDB database. func (l *Log) Stats() bbolt.Stats { return l.BoltStore.Stats() diff --git a/store/store.go b/store/store.go index 0a33d7f7..775bf5a7 100644 --- a/store/store.go +++ b/store/store.go @@ -6,6 +6,7 @@ package store import ( "bytes" "compress/gzip" + "encoding/binary" "errors" "expvar" "fmt" @@ -85,6 +86,7 @@ const ( restoreScratchPattern = "rqlite-restore-*" bootScatchPattern = "rqlite-boot-*" backupScatchPattern = "rqlite-backup-*" + vacuumScatchPattern = "rqlite-vacuum-*" raftDBPath = "raft.db" // Changing this will break backwards compatibility. peersPath = "raft/peers.json" peersInfoPath = "raft/peers.info" @@ -100,6 +102,8 @@ const ( raftLogCacheSize = 512 trailingScale = 1.25 observerChanLen = 50 + + lastVacuumTimeKey = "rqlite_last_vacuum" ) const ( @@ -111,6 +115,8 @@ const ( numWALSnapshotsFailed = "num_wal_snapshots_failed" numSnapshotsFull = "num_snapshots_full" numSnapshotsIncremental = "num_snapshots_incremental" + numAutoVacuums = "num_auto_vacuums" + autoVacuumDuration = "auto_vacuum_duration" numBoots = "num_boots" numBackups = "num_backups" numLoads = "num_loads" @@ -159,6 +165,8 @@ func ResetStats() { stats.Add(numWALSnapshotsFailed, 0) stats.Add(numSnapshotsFull, 0) stats.Add(numSnapshotsIncremental, 0) + stats.Add(numAutoVacuums, 0) + stats.Add(autoVacuumDuration, 0) stats.Add(numBoots, 0) stats.Add(numBackups, 0) stats.Add(numLoads, 0) @@ -301,6 +309,7 @@ type Store struct { ApplyTimeout time.Duration RaftLogLevel string NoFreeListSync bool + AutoVacInterval time.Duration // Node-reaping configuration ReapTimeout time.Duration @@ -491,7 +500,11 @@ func (s *Store) Open() (retErr error) { // Clean up any files from aborted operations. This tries to catch the case where scratch files // were created in the Raft directory, not cleaned up, and then the node was restarted with an // explicit SQLite path set. - for _, pattern := range []string{restoreScratchPattern, bootScatchPattern, backupScatchPattern} { + for _, pattern := range []string{ + restoreScratchPattern, + bootScatchPattern, + backupScatchPattern, + vacuumScatchPattern} { for _, dir := range []string{s.raftDir, s.dbDir} { files, err := filepath.Glob(filepath.Join(dir, pattern)) if err != nil { @@ -530,6 +543,9 @@ func (s *Store) Open() (retErr error) { // Periodically update the applied index for faster startup. s.appliedIdxUpdateDone = s.updateAppliedIndex() + if err := s.initLastVacuumTime(); err != nil { + return fmt.Errorf("failed to initialize last vacuum time: %s", err.Error()) + } return nil } @@ -957,6 +973,10 @@ func (s *Store) Stats() (map[string]interface{}, error) { if err != nil { return nil, err } + lVac, err := s.lastVacuumTime() + if err != nil { + return nil, err + } status := map[string]interface{}{ "open": s.open, "node_id": s.raftID, @@ -990,6 +1010,8 @@ func (s *Store) Stats() (map[string]interface{}, error) { "dir_size_friendly": friendlyBytes(uint64(dirSz)), "sqlite3": dbStatus, "db_conf": s.dbConf, + "last_vacuum": lVac.String(), + "next_vacuum": lVac.Add(s.AutoVacInterval).String(), } // Snapshot stats may be in flux if a snapshot is in progress. Only @@ -1606,6 +1628,59 @@ func (s *Store) remove(id string) error { return f.Error() } +func (s *Store) vacuumInto() (string, error) { + fd, err := os.CreateTemp(s.dbDir, vacuumScatchPattern) + if err != nil { + return "", err + } + if err := fd.Close(); err != nil { + return "", err + } + if err := s.db.VacuumInto(fd.Name()); err != nil { + return "", err + } + return fd.Name(), nil +} + +func (s *Store) initLastVacuumTime() error { + if _, err := s.boltStore.Get([]byte(lastVacuumTimeKey)); err != nil { + if err == rlog.ErrKeyNotFound { + s.logger.Println("no vacuum has been performed on this database") + n := time.Now().UnixNano() // First vacuum will be in the future. + buf := bytes.NewBuffer(make([]byte, 0, 8)) + if err := binary.Write(buf, binary.LittleEndian, n); err != nil { + return fmt.Errorf("failed to write last vacuum time: %s", err) + } + if err := s.boltStore.Set([]byte("rqlite_last_vacuum"), buf.Bytes()); err != nil { + return fmt.Errorf("failed to set last vacuum time: %s", err) + } + } else { + return fmt.Errorf("failed to get last vacuum time: %s", err) + } + } + return nil +} + +func (s *Store) setLastVacuumTime(t time.Time) error { + buf := bytes.NewBuffer(make([]byte, 0, 8)) + if err := binary.Write(buf, binary.LittleEndian, t.UnixNano()); err != nil { + return fmt.Errorf("failed to encode last vacuum time: %s", err) + } + if err := s.boltStore.Set([]byte(lastVacuumTimeKey), buf.Bytes()); err != nil { + return fmt.Errorf("failed to set last vacuum time: %s", err) + } + return nil +} + +func (s *Store) lastVacuumTime() (time.Time, error) { + vt, err := s.boltStore.Get([]byte(lastVacuumTimeKey)) + if err != nil { + return time.Time{}, fmt.Errorf("failed to get last vacuum time: %s", err) + } + n := int64(binary.LittleEndian.Uint64(vt)) + return time.Unix(0, n), nil +} + // raftConfig returns a new Raft config for the store. func (s *Store) raftConfig() *raft.Config { config := raft.DefaultConfig() @@ -1760,6 +1835,50 @@ func (s *Store) fsmSnapshot() (fSnap raft.FSMSnapshot, retErr error) { } }() + // Automatic VACUUM needed? + lvt, err := s.lastVacuumTime() + if err != nil { + return nil, err + } + if s.AutoVacInterval != 0 && time.Since(lvt) > s.AutoVacInterval { + vacStart := time.Now() + vacPath, err := s.vacuumInto() + if err != nil { + return nil, err + } + + // Verify that the VACUUMed database is valid. + if !sql.IsValidSQLiteFile(vacPath) { + return nil, fmt.Errorf("invalid SQLite file post VACUUM") + } + + // Swap in new database file. + if err := s.db.Close(); err != nil { + return nil, err + } + if err := sql.RemoveFiles(s.dbPath); err != nil { + return nil, err + } + if err := os.Rename(vacPath, s.dbPath); err != nil { + return nil, err + } + db, err := sql.Open(s.dbPath, s.dbConf.FKConstraints, true) + if err != nil { + return nil, err + } + s.db = db + + if err := s.snapshotStore.SetFullNeeded(); err != nil { + return nil, err + } + if err := s.setLastVacuumTime(time.Now()); err != nil { + return nil, err + } + s.logger.Printf("database vacuumed in %s", time.Since(vacStart)) + stats.Get(autoVacuumDuration).(*expvar.Int).Set(time.Since(vacStart).Milliseconds()) + stats.Add(numAutoVacuums, 1) + } + fullNeeded, err := s.snapshotStore.FullNeeded() if err != nil { return nil, err