diff --git a/CHANGELOG.md b/CHANGELOG.md index 10bb8aec..d7526aed 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,8 +1,12 @@ +## 8.17.0 (unreleased) +### New features +- [PR #1619](https://github.com/rqlite/rqlite/pull/1619): Support automatic `VACUUM` of the SQLite database. Fixes [#1609](https://github.com/rqlite/rqlite/issues/1609). + ## 8.16.8 (January 20th 2024) ### Implementation changes and bug fixes - [PR #1615](https://github.com/rqlite/rqlite/pull/1615): Add extensive WAL checkpoint test at the database level. - [PR #1616](https://github.com/rqlite/rqlite/pull/1616): Add time and checksum based change-detection functions to the database level. -- [PR #1617](https://github.com/rqlite/rqlite/pull/1617): Add VacuumInto to database layer. +- [PR #1617](https://github.com/rqlite/rqlite/pull/1617): Add `VacuumInto` to database layer. - [PR #1621](https://github.com/rqlite/rqlite/pull/1621): Fix a panic in the rqlite shell by not stomping on an "outer" error. ## 8.16.7 (January 18th 2024) diff --git a/cmd/rqbench/simple_load.sh b/cmd/rqbench/simple_load.sh index c6ec87ed..50a94c4f 100755 --- a/cmd/rqbench/simple_load.sh +++ b/cmd/rqbench/simple_load.sh @@ -14,7 +14,7 @@ handle_ctrl_c() { exit 1 } -COUNT=10000 +COUNT=1000000 trap 'handle_ctrl_c' SIGINT @@ -29,4 +29,6 @@ $RQBENCH -p "/db/query" -n 150000000 -m 1000 "SELECT COUNT(*) FROM foo" & $RQBENCH -p "/db/query" -n 150000000 -m 1000 "SELECT COUNT(*) FROM bar" & $RQBENCH -p "/db/query" -n 150000000 -m 1000 "SELECT * FROM qux LIMIT 10" & +$RQBENCH -p "/db/execute" -m 100 -n $COUNT -a $EXECUTE_HOST 'DELETE FROM qux WHERE id IN (SELECT id FROM qux ORDER BY RANDOM() LIMIT 10)' + wait diff --git a/cmd/rqlited/flags.go b/cmd/rqlited/flags.go index 26874ab2..1ddc06ce 100644 --- a/cmd/rqlited/flags.go +++ b/cmd/rqlited/flags.go @@ -135,6 +135,9 @@ type Config struct { // FKConstraints enables SQLite foreign key constraints. FKConstraints bool + // AutoVacInterval sets the automatic VACUUM interval. Use 0s to disable. + AutoVacInterval time.Duration + // RaftLogLevel sets the minimum logging level for the Raft subsystem. RaftLogLevel string @@ -460,6 +463,7 @@ func ParseFlags(name, desc string, build *BuildInfo) (*Config, error) { flag.StringVar(&config.OnDiskPath, "on-disk-path", "", "Path for SQLite on-disk database file. If not set, use a file in data directory") flag.BoolVar(&config.FKConstraints, "fk", false, "Enable SQLite foreign key constraints") flag.BoolVar(&showVersion, "version", false, "Show version information and exit") + flag.DurationVar(&config.AutoVacInterval, "auto-vacuum-int", 0, "Period between automatic VACUUMs. It not set, not enabled") flag.BoolVar(&config.RaftNonVoter, "raft-non-voter", false, "Configure as non-voting node") flag.DurationVar(&config.RaftHeartbeatTimeout, "raft-timeout", time.Second, "Raft heartbeat timeout") flag.DurationVar(&config.RaftElectionTimeout, "raft-election-timeout", time.Second, "Raft election timeout") diff --git a/cmd/rqlited/main.go b/cmd/rqlited/main.go index 7e18fff4..0ab85f66 100644 --- a/cmd/rqlited/main.go +++ b/cmd/rqlited/main.go @@ -282,6 +282,7 @@ func createStore(cfg *Config, ln *tcp.Layer) (*store.Store, error) { str.BootstrapExpect = cfg.BootstrapExpect str.ReapTimeout = cfg.RaftReapNodeTimeout str.ReapReadOnlyTimeout = cfg.RaftReapReadOnlyNodeTimeout + str.AutoVacInterval = cfg.AutoVacInterval if store.IsNewNode(cfg.DataPath) { log.Printf("no preexisting node state detected in %s, node may be bootstrapping", cfg.DataPath) diff --git a/db/swappable_db.go b/db/swappable_db.go new file mode 100644 index 00000000..bdedbe6c --- /dev/null +++ b/db/swappable_db.go @@ -0,0 +1,151 @@ +package db + +import ( + "fmt" + "io" + "os" + "sync" + + command "github.com/rqlite/rqlite/v8/command/proto" +) + +// SwappableDB is a wrapper around DB that allows the underlying database to be swapped out +// in a thread-safe manner. +type SwappableDB struct { + db *DB + dbMu sync.RWMutex +} + +// OpenSwappable returns a new SwappableDB instance, which opens the database at the given path. +func OpenSwappable(dbPath string, fkEnabled, wal bool) (*SwappableDB, error) { + db, err := Open(dbPath, fkEnabled, wal) + if err != nil { + return nil, err + } + return &SwappableDB{db: db}, nil +} + +// Swap swaps the underlying database with that at the given path. +func (s *SwappableDB) Swap(path string, fkConstraints, walEnabled bool) error { + if !IsValidSQLiteFile(path) { + return fmt.Errorf("invalid SQLite data") + } + + s.dbMu.Lock() + defer s.dbMu.Unlock() + if err := s.db.Close(); err != nil { + return fmt.Errorf("failed to close: %s", err) + } + if err := RemoveFiles(s.db.Path()); err != nil { + return fmt.Errorf("failed to remove files: %s", err) + } + if err := os.Rename(path, s.db.Path()); err != nil { + return fmt.Errorf("failed to rename database: %s", err) + } + + var db *DB + db, err := Open(s.db.Path(), fkConstraints, walEnabled) + if err != nil { + return fmt.Errorf("open SQLite file failed: %s", err) + } + s.db = db + return nil +} + +// Close closes the underlying database. +func (s *SwappableDB) Close() error { + s.dbMu.RLock() + defer s.dbMu.RUnlock() + return s.db.Close() +} + +// Stats returns the underlying database's stats. +func (s *SwappableDB) Stats() (map[string]interface{}, error) { + s.dbMu.RLock() + defer s.dbMu.RUnlock() + return s.db.Stats() +} + +// Request calls Request on the underlying database. +func (s *SwappableDB) Request(req *command.Request, xTime bool) ([]*command.ExecuteQueryResponse, error) { + s.dbMu.RLock() + defer s.dbMu.RUnlock() + return s.db.Request(req, xTime) +} + +// Execute calls Execute on the underlying database. +func (s *SwappableDB) Execute(ex *command.Request, xTime bool) ([]*command.ExecuteResult, error) { + s.dbMu.RLock() + defer s.dbMu.RUnlock() + return s.db.Execute(ex, xTime) +} + +// Query calls Query on the underlying database. +func (s *SwappableDB) Query(q *command.Request, xTime bool) ([]*command.QueryRows, error) { + s.dbMu.RLock() + defer s.dbMu.RUnlock() + return s.db.Query(q, xTime) +} + +// VacuumInto calls VacuumInto on the underlying database. +func (s *SwappableDB) VacuumInto(path string) error { + s.dbMu.RLock() + defer s.dbMu.RUnlock() + return s.db.VacuumInto(path) +} + +// Backup calls Backup on the underlying database. +func (s *SwappableDB) Backup(path string, vacuum bool) error { + s.dbMu.RLock() + defer s.dbMu.RUnlock() + return s.db.Backup(path, vacuum) +} + +// Serialize calls Serialize on the underlying database. +func (s *SwappableDB) Serialize() ([]byte, error) { + s.dbMu.RLock() + defer s.dbMu.RUnlock() + return s.db.Serialize() +} + +// StmtReadOnly calls StmtReadOnly on the underlying database. +func (s *SwappableDB) StmtReadOnly(sql string) (bool, error) { + s.dbMu.RLock() + defer s.dbMu.RUnlock() + return s.db.StmtReadOnly(sql) +} + +// Checkpoint calls Checkpoint on the underlying database. +func (s *SwappableDB) Checkpoint() error { + s.dbMu.RLock() + defer s.dbMu.RUnlock() + return s.db.Checkpoint() +} + +// Path calls Path on the underlying database. +func (s *SwappableDB) Path() string { + s.dbMu.RLock() + defer s.dbMu.RUnlock() + return s.db.Path() +} + +// Dump calls Dump on the underlying database. +func (s *SwappableDB) Dump(w io.Writer) error { + s.dbMu.RLock() + defer s.dbMu.RUnlock() + return s.db.Dump(w) +} + +// FKEnabled calls FKEnabled on the underlying database. +func (s *SwappableDB) FKEnabled() bool { + s.dbMu.RLock() + defer s.dbMu.RUnlock() + return s.db.FKEnabled() +} + +// WALEnabled calls WALEnabled on the underlying database. +func (s *SwappableDB) WALEnabled() bool { + s.dbMu.RLock() + defer s.dbMu.RUnlock() + return s.db.WALEnabled() +} diff --git a/store/command_processor.go b/store/command_processor.go index c8c51043..0e79f818 100644 --- a/store/command_processor.go +++ b/store/command_processor.go @@ -4,6 +4,7 @@ import ( "fmt" "log" "os" + "path/filepath" "github.com/rqlite/rqlite/v8/command" "github.com/rqlite/rqlite/v8/command/chunking" @@ -41,8 +42,7 @@ func NewCommandProcessor(logger *log.Logger, dm *chunking.DechunkerManager) *Com } // Process processes the given command against the given database. -func (c *CommandProcessor) Process(data []byte, pDB **sql.DB) (*proto.Command, bool, interface{}) { - db := *pDB +func (c *CommandProcessor) Process(data []byte, db *sql.SwappableDB) (*proto.Command, bool, interface{}) { cmd := &proto.Command{} if err := command.Unmarshal(data, cmd); err != nil { panic(fmt.Sprintf("failed to unmarshal cluster command: %s", err.Error())) @@ -76,20 +76,23 @@ func (c *CommandProcessor) Process(data []byte, pDB **sql.DB) (*proto.Command, b panic(fmt.Sprintf("failed to unmarshal load subcommand: %s", err.Error())) } - // Swap the underlying database to the new one. - if err := db.Close(); err != nil { - return cmd, false, &fsmGenericResponse{error: fmt.Errorf("failed to close post-load database: %s", err)} - } - if err := sql.RemoveFiles(db.Path()); err != nil { - return cmd, false, &fsmGenericResponse{error: fmt.Errorf("failed to remove existing database files: %s", err)} + // create a scratch file in the same directory as s.db.Path() + fd, err := os.CreateTemp(filepath.Dir(db.Path()), "rqlilte-load-") + if err != nil { + return cmd, false, &fsmGenericResponse{error: fmt.Errorf("failed to create temporary database file: %s", err)} } - - newDB, err := createOnDisk(lr.Data, db.Path(), db.FKEnabled(), db.WALEnabled()) + defer os.Remove(fd.Name()) + defer fd.Close() + _, err = fd.Write(lr.Data) if err != nil { - return cmd, false, &fsmGenericResponse{error: fmt.Errorf("failed to create on-disk database: %s", err)} + return cmd, false, &fsmGenericResponse{error: fmt.Errorf("failed to write to temporary database file: %s", err)} } + fd.Close() - *pDB = newDB + // Swap the underlying database to the new one. + if err := db.Swap(fd.Name(), db.FKEnabled(), db.WALEnabled()); err != nil { + return cmd, false, &fsmGenericResponse{error: fmt.Errorf("error swapping databases: %s", err)} + } return cmd, true, &fsmGenericResponse{} case proto.Command_COMMAND_TYPE_LOAD_CHUNK: var lcr proto.LoadChunkRequest @@ -129,25 +132,9 @@ func (c *CommandProcessor) Process(data []byte, pDB **sql.DB) (*proto.Command, b c.logger.Printf("invalid chunked database file - ignoring") return cmd, false, &fsmGenericResponse{error: fmt.Errorf("invalid chunked database file - ignoring")} } - - // Close the underlying database before we overwrite it. - if err := db.Close(); err != nil { - return cmd, false, &fsmGenericResponse{error: fmt.Errorf("failed to close post-load database: %s", err)} + if err := db.Swap(path, db.FKEnabled(), db.WALEnabled()); err != nil { + return cmd, false, &fsmGenericResponse{error: fmt.Errorf("error swapping databases: %s", err)} } - if err := sql.RemoveFiles(db.Path()); err != nil { - return cmd, false, &fsmGenericResponse{error: fmt.Errorf("failed to remove existing database files: %s", err)} - } - - if err := os.Rename(path, db.Path()); err != nil { - return cmd, false, &fsmGenericResponse{error: fmt.Errorf("failed to rename temporary database file: %s", err)} - } - newDB, err := sql.Open(db.Path(), db.FKEnabled(), db.WALEnabled()) - if err != nil { - return cmd, false, &fsmGenericResponse{error: fmt.Errorf("failed to open new on-disk database: %s", err)} - } - - // Swap the underlying database to the new one. - *pDB = newDB } } return cmd, true, &fsmGenericResponse{} diff --git a/store/state.go b/store/state.go index c1164699..dd27280c 100644 --- a/store/state.go +++ b/store/state.go @@ -104,7 +104,7 @@ func RecoverNode(dataDir string, logger *log.Logger, logs raft.LogStore, stable } // Now, open the database so we can replay any outstanding Raft log entries. - db, err := sql.Open(tmpDBPath, false, true) + db, err := sql.OpenSwappable(tmpDBPath, false, true) if err != nil { return fmt.Errorf("failed to open temporary database: %s", err) } @@ -135,7 +135,7 @@ func RecoverNode(dataDir string, logger *log.Logger, logs raft.LogStore, stable return fmt.Errorf("failed to get log at index %d: %v", index, err) } if entry.Type == raft.LogCommand { - cmdProc.Process(entry.Data, &db) + cmdProc.Process(entry.Data, db) } lastIndex = entry.Index lastTerm = entry.Term diff --git a/store/store.go b/store/store.go index 0a33d7f7..e886e69b 100644 --- a/store/store.go +++ b/store/store.go @@ -6,6 +6,7 @@ package store import ( "bytes" "compress/gzip" + "encoding/binary" "errors" "expvar" "fmt" @@ -85,6 +86,7 @@ const ( restoreScratchPattern = "rqlite-restore-*" bootScatchPattern = "rqlite-boot-*" backupScatchPattern = "rqlite-backup-*" + vacuumScatchPattern = "rqlite-vacuum-*" raftDBPath = "raft.db" // Changing this will break backwards compatibility. peersPath = "raft/peers.json" peersInfoPath = "raft/peers.info" @@ -100,6 +102,8 @@ const ( raftLogCacheSize = 512 trailingScale = 1.25 observerChanLen = 50 + + lastVacuumTimeKey = "rqlite_last_vacuum" ) const ( @@ -111,6 +115,9 @@ const ( numWALSnapshotsFailed = "num_wal_snapshots_failed" numSnapshotsFull = "num_snapshots_full" numSnapshotsIncremental = "num_snapshots_incremental" + numAutoVacuums = "num_auto_vacuums" + numAutoVacuumsFailed = "num_auto_vacuums_failed" + autoVacuumDuration = "auto_vacuum_duration" numBoots = "num_boots" numBackups = "num_backups" numLoads = "num_loads" @@ -159,6 +166,9 @@ func ResetStats() { stats.Add(numWALSnapshotsFailed, 0) stats.Add(numSnapshotsFull, 0) stats.Add(numSnapshotsIncremental, 0) + stats.Add(numAutoVacuums, 0) + stats.Add(numAutoVacuumsFailed, 0) + stats.Add(autoVacuumDuration, 0) stats.Add(numBoots, 0) stats.Add(numBackups, 0) stats.Add(numLoads, 0) @@ -228,12 +238,12 @@ type Store struct { raft *raft.Raft // The consensus mechanism. ly Layer raftTn *NodeTransport - raftID string // Node ID. - dbConf *DBConfig // SQLite database config. - dbPath string // Path to underlying SQLite file. - walPath string // Path to WAL file. - dbDir string // Path to directory containing SQLite file. - db *sql.DB // The underlying SQLite store. + raftID string // Node ID. + dbConf *DBConfig // SQLite database config. + dbPath string // Path to underlying SQLite file. + walPath string // Path to WAL file. + dbDir string // Path to directory containing SQLite file. + db *sql.SwappableDB // The underlying SQLite store. dechunkManager *chunking.DechunkerManager cmdProc *CommandProcessor @@ -301,6 +311,7 @@ type Store struct { ApplyTimeout time.Duration RaftLogLevel string NoFreeListSync bool + AutoVacInterval time.Duration // Node-reaping configuration ReapTimeout time.Duration @@ -309,6 +320,7 @@ type Store struct { numTrailingLogs uint64 // For whitebox testing + numAutoVacuums int numIgnoredJoins int numNoops int numSnapshotsMu sync.Mutex @@ -482,16 +494,19 @@ func (s *Store) Open() (retErr error) { s.logger.Printf("first log index: %d, last log index: %d, last applied index: %d, last command log index: %d:", s.firstIdxOnOpen, s.lastIdxOnOpen, s.lastAppliedIdxOnOpen, s.lastCommandIdxOnOpen) - s.db, err = createOnDisk(nil, s.dbPath, s.dbConf.FKConstraints, true) + s.db, err = createOnDisk(s.dbPath, s.dbConf.FKConstraints, true) if err != nil { return fmt.Errorf("failed to create on-disk database: %s", err) } - s.logger.Printf("created on-disk database at open") // Clean up any files from aborted operations. This tries to catch the case where scratch files // were created in the Raft directory, not cleaned up, and then the node was restarted with an // explicit SQLite path set. - for _, pattern := range []string{restoreScratchPattern, bootScatchPattern, backupScatchPattern} { + for _, pattern := range []string{ + restoreScratchPattern, + bootScatchPattern, + backupScatchPattern, + vacuumScatchPattern} { for _, dir := range []string{s.raftDir, s.dbDir} { files, err := filepath.Glob(filepath.Join(dir, pattern)) if err != nil { @@ -530,6 +545,9 @@ func (s *Store) Open() (retErr error) { // Periodically update the applied index for faster startup. s.appliedIdxUpdateDone = s.updateAppliedIndex() + if err := s.initLastVacuumTime(); err != nil { + return fmt.Errorf("failed to initialize last vacuum time: %s", err.Error()) + } return nil } @@ -726,6 +744,16 @@ func (s *Store) State() ClusterState { } } +// LastVacuumTime returns the time of the last automatic VACUUM. +func (s *Store) LastVacuumTime() (time.Time, error) { + vt, err := s.boltStore.Get([]byte(lastVacuumTimeKey)) + if err != nil { + return time.Time{}, fmt.Errorf("failed to get last vacuum time: %s", err) + } + n := int64(binary.LittleEndian.Uint64(vt)) + return time.Unix(0, n), nil +} + // Path returns the path to the store's storage directory. func (s *Store) Path() string { return s.raftDir @@ -991,6 +1019,9 @@ func (s *Store) Stats() (map[string]interface{}, error) { "sqlite3": dbStatus, "db_conf": s.dbConf, } + if lVac, err := s.LastVacuumTime(); err == nil { + status["last_vacuum"] = lVac.String() + } // Snapshot stats may be in flux if a snapshot is in progress. Only // report them if they are available. @@ -1321,7 +1352,7 @@ func (s *Store) ReadFrom(r io.Reader) (int64, error) { return 0, ErrNotSingleNode } - // Write the data to a temporary file.. + // Write the data to a temporary file. f, err := os.CreateTemp(s.dbDir, bootScatchPattern) if err != nil { return 0, err @@ -1356,20 +1387,9 @@ func (s *Store) ReadFrom(r io.Reader) (int64, error) { } // Swap in new database file. - if err := s.db.Close(); err != nil { - return n, err - } - if err := sql.RemoveFiles(s.dbPath); err != nil { - return n, err + if err := s.db.Swap(f.Name(), s.dbConf.FKConstraints, true); err != nil { + return n, fmt.Errorf("error swapping database file: %v", err) } - if err := os.Rename(f.Name(), s.dbPath); err != nil { - return n, err - } - db, err := sql.Open(s.dbPath, s.dbConf.FKConstraints, true) - if err != nil { - return n, err - } - s.db = db // Snapshot, so we load the new database into the Raft system. if err := s.snapshotStore.SetFullNeeded(); err != nil { @@ -1382,6 +1402,55 @@ func (s *Store) ReadFrom(r io.Reader) (int64, error) { return n, nil } +// Vacuum performs a VACUUM operation on the underlying database. It is up +// to the caller to ensure that no writes are taking place during this call. +func (s *Store) Vacuum() error { + fd, err := os.CreateTemp(s.dbDir, vacuumScatchPattern) + if err != nil { + return err + } + if err := fd.Close(); err != nil { + return err + } + defer os.Remove(fd.Name()) + if err := s.db.VacuumInto(fd.Name()); err != nil { + return err + } + + // Verify that the VACUUMed database is valid. + if !sql.IsValidSQLiteFile(fd.Name()) { + return fmt.Errorf("invalid SQLite file post VACUUM") + } + + // Swap in new database file. + if err := s.db.Swap(fd.Name(), s.dbConf.FKConstraints, true); err != nil { + return fmt.Errorf("error swapping database file: %v", err) + } + + if err := s.snapshotStore.SetFullNeeded(); err != nil { + return err + } + if err := s.setLastVacuumTime(time.Now()); err != nil { + return err + } + return nil +} + +// Database returns a copy of the underlying database. The caller MUST +// ensure that no transaction is taking place during this call, or an error may +// be returned. If leader is true, this operation is performed with a read +// consistency level equivalent to "weak". Otherwise, no guarantees are made +// about the read consistency level. +// +// http://sqlite.org/howtocorrupt.html states it is safe to do this +// as long as the database is not written to during the call. +func (s *Store) Database(leader bool) ([]byte, error) { + if leader && s.raft.State() != raft.Leader { + return nil, ErrNotLeader + } + return s.db.Serialize() +} + // Notify notifies this Store that a node is ready for bootstrapping at the // given address. Once the number of known nodes reaches the expected level // bootstrapping will be attempted using this Store. "Expected level" includes @@ -1606,6 +1675,24 @@ func (s *Store) remove(id string) error { return f.Error() } +func (s *Store) initLastVacuumTime() error { + if _, err := s.LastVacuumTime(); err != nil { + return s.setLastVacuumTime(time.Now()) + } + return nil +} + +func (s *Store) setLastVacuumTime(t time.Time) error { + buf := bytes.NewBuffer(make([]byte, 0, 8)) + if err := binary.Write(buf, binary.LittleEndian, t.UnixNano()); err != nil { + return fmt.Errorf("failed to encode last vacuum time: %s", err) + } + if err := s.boltStore.Set([]byte(lastVacuumTimeKey), buf.Bytes()); err != nil { + return fmt.Errorf("failed to set last vacuum time: %s", err) + } + return nil +} + // raftConfig returns a new Raft config for the store. func (s *Store) raftConfig() *raft.Config { config := raft.DefaultConfig() @@ -1702,7 +1789,7 @@ func (s *Store) fsmApply(l *raft.Log) (e interface{}) { s.logger.Printf("first log applied since node start, log at index %d", l.Index) } - cmd, mutated, r := s.cmdProc.Process(l.Data, &s.db) + cmd, mutated, r := s.cmdProc.Process(l.Data, s.db) if mutated { s.dbAppliedIdxMu.Lock() s.dbAppliedIdx = l.Index @@ -1722,21 +1809,6 @@ func (s *Store) fsmApply(l *raft.Log) (e interface{}) { return r } -// Database returns a copy of the underlying database. The caller MUST -// ensure that no transaction is taking place during this call, or an error may -// be returned. If leader is true, this operation is performed with a read -// consistency level equivalent to "weak". Otherwise, no guarantees are made -// about the read consistency level. -// -// http://sqlite.org/howtocorrupt.html states it is safe to do this -// as long as the database is not written to during the call. -func (s *Store) Database(leader bool) ([]byte, error) { - if leader && s.raft.State() != raft.Leader { - return nil, ErrNotLeader - } - return s.db.Serialize() -} - // fsmSnapshot returns a snapshot of the database. // // The system must ensure that no transaction is taking place during this call. @@ -1748,6 +1820,9 @@ func (s *Store) Database(leader bool) ([]byte, error) { // http://sqlite.org/howtocorrupt.html states it is safe to copy or serialize the // database as long as no writes to the database are in progress. func (s *Store) fsmSnapshot() (fSnap raft.FSMSnapshot, retErr error) { + s.queryTxMu.Lock() + defer s.queryTxMu.Unlock() + if err := s.snapshotCAS.Begin(); err != nil { return nil, err } @@ -1760,6 +1835,24 @@ func (s *Store) fsmSnapshot() (fSnap raft.FSMSnapshot, retErr error) { } }() + // Automatic VACUUM needed? + if avn, err := s.autoVacNeeded(time.Now()); err != nil { + return nil, err + } else if avn { + vacStart := time.Now() + if err := s.Vacuum(); err != nil { + stats.Add(numAutoVacuumsFailed, 1) + return nil, err + } + s.logger.Printf("database vacuumed in %s", time.Since(vacStart)) + stats.Get(autoVacuumDuration).(*expvar.Int).Set(time.Since(vacStart).Milliseconds()) + stats.Add(numAutoVacuums, 1) + s.numAutoVacuums++ + if err := s.setLastVacuumTime(time.Now()); err != nil { + return nil, err + } + } + fullNeeded, err := s.snapshotStore.FullNeeded() if err != nil { return nil, err @@ -1771,9 +1864,6 @@ func (s *Store) fsmSnapshot() (fSnap raft.FSMSnapshot, retErr error) { s.numSnapshots++ }() - s.queryTxMu.Lock() - defer s.queryTxMu.Unlock() - var fsmSnapshot raft.FSMSnapshot if fullNeeded { if err := s.db.Checkpoint(); err != nil { @@ -1867,27 +1957,9 @@ func (s *Store) fsmRestore(rc io.ReadCloser) (retErr error) { return fmt.Errorf("error creating temporary file for restore operation: %v", err) } - // Must wipe out all pre-existing state if being asked to do a restore, and put - // the new database in place. - if !sql.IsValidSQLiteFile(tmpFile.Name()) { - return fmt.Errorf("invalid SQLite data") - } - if err := s.db.Close(); err != nil { - return fmt.Errorf("failed to close pre-restore database: %s", err) - } - if err := sql.RemoveFiles(s.db.Path()); err != nil { - return fmt.Errorf("failed to remove pre-restore database files: %s", err) - } - if err := os.Rename(tmpFile.Name(), s.db.Path()); err != nil { - return fmt.Errorf("failed to rename restored database: %s", err) - } - - var db *sql.DB - db, err = sql.Open(s.dbPath, s.dbConf.FKConstraints, true) - if err != nil { - return fmt.Errorf("open SQLite file during restore: %s", err) + if err := s.db.Swap(tmpFile.Name(), s.dbConf.FKConstraints, true); err != nil { + return fmt.Errorf("error swapping database file: %v", err) } - s.db = db s.logger.Printf("successfully opened database at %s due to restore", s.db.Path()) // Take conservative approach and assume that everything has changed, so update @@ -2142,19 +2214,25 @@ func (s *Store) tryCompress(rq command.Requester) ([]byte, bool, error) { return b, compressed, nil } -// createOnDisk opens an on-disk database file at the configured path. If b is -// non-nil, any preexisting file will first be overwritten with those contents. -// Otherwise, any preexisting file will be removed before the database is opened. -func createOnDisk(b []byte, path string, fkConstraints, wal bool) (*sql.DB, error) { +// autoVacNeeded returns true if an automatic VACUUM is needed. +func (s *Store) autoVacNeeded(t time.Time) (bool, error) { + if s.AutoVacInterval == 0 { + return false, nil + } + lvt, err := s.LastVacuumTime() + if err != nil { + return false, err + } + return t.Sub(lvt) > s.AutoVacInterval, nil +} + +// createOnDisk opens an on-disk database file at the configured path. Any +// preexisting file will be removed before the database is opened. +func createOnDisk(path string, fkConstraints, wal bool) (*sql.SwappableDB, error) { if err := sql.RemoveFiles(path); err != nil { return nil, err } - if b != nil { - if err := os.WriteFile(path, b, 0660); err != nil { - return nil, err - } - } - return sql.Open(path, fkConstraints, wal) + return sql.OpenSwappable(path, fkConstraints, wal) } func copyFromReaderToFile(path string, r io.Reader) (int64, error) { diff --git a/store/store_test.go b/store/store_test.go index 0579689b..633aa163 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -11,6 +11,7 @@ import ( "os" "path/filepath" "strings" + "sync" "testing" "time" @@ -51,6 +52,68 @@ func Test_OpenStoreSingleNode(t *testing.T) { } } +func Test_OpenStoreSingleNode_LastVacuum(t *testing.T) { + s, ln := mustNewStore(t) + defer s.Close(true) + defer ln.Close() + + now := time.Now() + if err := s.Open(); err != nil { + t.Fatalf("failed to open single-node store: %s", err.Error()) + } + lv, err := s.LastVacuumTime() + if err != nil { + t.Fatalf("failed to retrieve last vacuum time: %s", err.Error()) + } + if lv.Before(now) { + t.Fatalf("last vacuum time is before now, lv: %s, now: %s", lv, now) + } +} + +// Test_OpenStoreSingleNode_VacuumFullNeeded tests that running a VACUUM +// means that a full snapshot is needed. +func Test_OpenStoreSingleNode_VacuumFullNeeded(t *testing.T) { + s, ln := mustNewStore(t) + defer s.Close(true) + defer ln.Close() + + if err := s.Open(); err != nil { + t.Fatalf("failed to open single-node store: %s", err.Error()) + } + defer s.Close(true) + if err := s.Bootstrap(NewServer(s.ID(), s.Addr(), true)); err != nil { + t.Fatalf("failed to bootstrap single-node store: %s", err.Error()) + } + if _, err := s.WaitForLeader(10 * time.Second); err != nil { + t.Fatalf("Error waiting for leader: %s", err) + } + + er := executeRequestFromStrings([]string{ + `CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)`, + }, false, false) + _, err := s.Execute(er) + if err != nil { + t.Fatalf("failed to execute on single node: %s", err.Error()) + } + + if err := s.Snapshot(0); err != nil { + t.Fatalf("failed to snapshot store: %s", err.Error()) + } + if fn, err := s.snapshotStore.FullNeeded(); err != nil { + t.Fatalf("failed to determine full snapshot needed: %s", err.Error()) + } else if fn { + t.Fatalf("full snapshot marked as needed") + } + if err := s.Vacuum(); err != nil { + t.Fatalf("failed to vacuum database: %s", err.Error()) + } + if fn, err := s.snapshotStore.FullNeeded(); err != nil { + t.Fatalf("failed to determine full snapshot needed: %s", err.Error()) + } else if !fn { + t.Fatalf("full snapshot not marked as needed") + } +} + // Test_SingleNodeSQLitePath ensures that basic functionality works when the SQLite database path // is explicitly specificed. func Test_SingleNodeOnDiskSQLitePath(t *testing.T) { @@ -1810,6 +1873,197 @@ func Test_SingleNode_WALTriggeredSnapshot(t *testing.T) { t.Fatalf("wrong snapshot store file: %s", f.Name()) } } +} + +// Test_SingleNode_SnapshotWithAutoVac tests that a Store correctly operates +// when performing both Snapshots and Auto-Vacuums. +func Test_SingleNode_SnapshotWithAutoVac(t *testing.T) { + s, ln := mustNewStore(t) + defer ln.Close() + s.SnapshotThreshold = 8192 // Ensures Snapshot not triggered during testing. + if err := s.Open(); err != nil { + t.Fatalf("failed to open single-node store: %s", err.Error()) + } + defer s.Close(true) + if err := s.Bootstrap(NewServer(s.ID(), s.Addr(), true)); err != nil { + t.Fatalf("failed to bootstrap single-node store: %s", err.Error()) + } + if _, err := s.WaitForLeader(10 * time.Second); err != nil { + t.Fatalf("Error waiting for leader: %s", err) + } + + // Create a table, and insert some data. + er := executeRequestFromString(`CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)`, + false, false) + _, err := s.Execute(er) + if err != nil { + t.Fatalf("failed to execute on single node: %s", err.Error()) + } + for i := 0; i < 100; i++ { + _, err := s.Execute(executeRequestFromString(`INSERT INTO foo(name) VALUES("fiona")`, false, false)) + if err != nil { + t.Fatalf("failed to execute INSERT on single node: %s", err.Error()) + } + } + + // Force an initial snapshot, shouldn't need a full snapshot afterwards + if err := s.Snapshot(0); err != nil { + t.Fatalf("failed to snapshot single-node store: %s", err.Error()) + } + if fn, err := s.snapshotStore.FullNeeded(); err != nil { + t.Fatalf("failed to check if snapshot store needs a full snapshot: %s", err.Error()) + } else if fn { + t.Fatalf("expected snapshot store to not need a full snapshot") + } + + // Enable auto-vacuuming. + s.AutoVacInterval = 1 * time.Nanosecond + if n, err := s.autoVacNeeded(time.Now()); err != nil { + t.Fatalf("failed to check if auto-vacuum is needed: %s", err.Error()) + } else if !n { + t.Fatalf("expected auto-vacuum to be needed") + } + + // Force another snapshot, this time a VACUUM should be triggered. + if err := s.Snapshot(0); err != nil { + t.Fatalf("failed to snapshot single-node store: %s", err.Error()) + } + if exp, got := 1, s.numAutoVacuums; exp != got { + t.Fatalf("expected %d auto-vacuums, got %d", exp, got) + } + + s.AutoVacInterval = 1 * time.Hour // Effectively disable auto-vacuuming. + if n, err := s.autoVacNeeded(time.Now()); err != nil { + t.Fatalf("failed to check if auto-vacuum is needed: %s", err.Error()) + } else if n { + t.Fatalf("auto-vacuum should not be needed") + } + + // Query the data, make sure it looks good after all this. + qr := queryRequestFromString("SELECT COUNT(*) FROM foo", false, true) + r, err := s.Query(qr) + if err != nil { + t.Fatalf("failed to query single node: %s", err.Error()) + } + if exp, got := `[{"columns":["COUNT(*)"],"types":["integer"],"values":[[100]]}]`, asJSON(r); exp != got { + t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got) + } + + // Make sure the node works across a restart + preAVT, err := s.LastVacuumTime() + if err != nil { + t.Fatalf("failed to get last vacuum time: %s", err.Error()) + } + + if err := s.Close(true); err != nil { + t.Fatalf("failed to close store: %s", err.Error()) + } + if err := s.Open(); err != nil { + t.Fatalf("failed to open store: %s", err.Error()) + } + defer s.Close(true) + if _, err := s.WaitForLeader(10 * time.Second); err != nil { + t.Fatalf("Error waiting for leader: %s", err) + } + + postAVT, err := s.LastVacuumTime() + if err != nil { + t.Fatalf("failed to get last vacuum time: %s", err.Error()) + } + if preAVT != postAVT { + t.Fatalf("expected last vacuum time to be the same across restarts") + } + if n, err := s.autoVacNeeded(time.Now()); err != nil { + t.Fatalf("failed to check if auto-vacuum is needed: %s", err.Error()) + } else if n { + t.Fatalf("auto-vacuum should not be needed") + } + + // Query the data again, make sure it still looks good after all this. + r, err = s.Query(qr) + if err != nil { + t.Fatalf("failed to query single node: %s", err.Error()) + } + if exp, got := `[{"columns":["COUNT(*)"],"types":["integer"],"values":[[100]]}]`, asJSON(r); exp != got { + t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got) + } +} + +func Test_SingleNode_SnapshotWithAutoVac_Stress(t *testing.T) { + s, ln := mustNewStore(t) + defer ln.Close() + s.SnapshotThreshold = 50 + s.SnapshotInterval = 100 * time.Millisecond + s.AutoVacInterval = 500 * time.Millisecond + + if err := s.Open(); err != nil { + t.Fatalf("failed to open single-node store: %s", err.Error()) + } + defer s.Close(true) + if err := s.Bootstrap(NewServer(s.ID(), s.Addr(), true)); err != nil { + t.Fatalf("failed to bootstrap single-node store: %s", err.Error()) + } + if _, err := s.WaitForLeader(10 * time.Second); err != nil { + t.Fatalf("Error waiting for leader: %s", err) + } + + // Create a table + er := executeRequestFromString(`CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)`, + false, false) + _, err := s.Execute(er) + if err != nil { + t.Fatalf("failed to execute on single node: %s", err.Error()) + } + + // Insert a bunch of data concurrently, putting some load on the Store. + var wg sync.WaitGroup + wg.Add(5) + insertFn := func() { + defer wg.Done() + for i := 0; i < 500; i++ { + _, err := s.Execute(executeRequestFromString(`INSERT INTO foo(name) VALUES("fiona")`, false, false)) + if err != nil { + t.Errorf("failed to execute INSERT on single node: %s", err.Error()) + } + } + } + for i := 0; i < 5; i++ { + go insertFn() + } + wg.Wait() + if s.WaitForAllApplied(5*time.Second) != nil { + t.Fatalf("failed to wait for all data to be applied") + } + + // Query the data, make sure it looks good after all this. + qr := queryRequestFromString("SELECT COUNT(*) FROM foo", false, true) + qr.Level = proto.QueryRequest_QUERY_REQUEST_LEVEL_STRONG + r, err := s.Query(qr) + if err != nil { + t.Fatalf("failed to query single node: %s", err.Error()) + } + if exp, got := `[{"columns":["COUNT(*)"],"types":["integer"],"values":[[2500]]}]`, asJSON(r); exp != got { + t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got) + } + + // Restart the Store, make sure it still works. + if err := s.Close(true); err != nil { + t.Fatalf("failed to close store: %s", err.Error()) + } + if err := s.Open(); err != nil { + t.Fatalf("failed to open store: %s", err.Error()) + } + defer s.Close(true) + if _, err := s.WaitForLeader(10 * time.Second); err != nil { + t.Fatalf("Error waiting for leader: %s", err) + } + r, err = s.Query(qr) + if err != nil { + t.Fatalf("failed to query single node: %s", err.Error()) + } + if exp, got := `[{"columns":["COUNT(*)"],"types":["integer"],"values":[[2500]]}]`, asJSON(r); exp != got { + t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got) + } }