1
0
Fork 0

Merge pull request #1619 from rqlite/automatic-vacuum

Support automatic VACUUM
master
Philip O'Toole 8 months ago committed by GitHub
commit 0d09f973b0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -1,8 +1,12 @@
## 8.17.0 (unreleased)
### New features
- [PR #1619](https://github.com/rqlite/rqlite/pull/1619): Support automatic `VACUUM` of the SQLite database. Fixes [#1609](https://github.com/rqlite/rqlite/issues/1609).
## 8.16.8 (January 20th 2024)
### Implementation changes and bug fixes
- [PR #1615](https://github.com/rqlite/rqlite/pull/1615): Add extensive WAL checkpoint test at the database level.
- [PR #1616](https://github.com/rqlite/rqlite/pull/1616): Add time and checksum based change-detection functions to the database level.
- [PR #1617](https://github.com/rqlite/rqlite/pull/1617): Add VacuumInto to database layer.
- [PR #1617](https://github.com/rqlite/rqlite/pull/1617): Add `VacuumInto` to database layer.
- [PR #1621](https://github.com/rqlite/rqlite/pull/1621): Fix a panic in the rqlite shell by not stomping on an "outer" error.
## 8.16.7 (January 18th 2024)

@ -14,7 +14,7 @@ handle_ctrl_c() {
exit 1
}
COUNT=10000
COUNT=1000000
trap 'handle_ctrl_c' SIGINT
@ -29,4 +29,6 @@ $RQBENCH -p "/db/query" -n 150000000 -m 1000 "SELECT COUNT(*) FROM foo" &
$RQBENCH -p "/db/query" -n 150000000 -m 1000 "SELECT COUNT(*) FROM bar" &
$RQBENCH -p "/db/query" -n 150000000 -m 1000 "SELECT * FROM qux LIMIT 10" &
$RQBENCH -p "/db/execute" -m 100 -n $COUNT -a $EXECUTE_HOST 'DELETE FROM qux WHERE id IN (SELECT id FROM qux ORDER BY RANDOM() LIMIT 10)'
wait

@ -135,6 +135,9 @@ type Config struct {
// FKConstraints enables SQLite foreign key constraints.
FKConstraints bool
// AutoVacInterval sets the automatic VACUUM interval. Use 0s to disable.
AutoVacInterval time.Duration
// RaftLogLevel sets the minimum logging level for the Raft subsystem.
RaftLogLevel string
@ -460,6 +463,7 @@ func ParseFlags(name, desc string, build *BuildInfo) (*Config, error) {
flag.StringVar(&config.OnDiskPath, "on-disk-path", "", "Path for SQLite on-disk database file. If not set, use a file in data directory")
flag.BoolVar(&config.FKConstraints, "fk", false, "Enable SQLite foreign key constraints")
flag.BoolVar(&showVersion, "version", false, "Show version information and exit")
flag.DurationVar(&config.AutoVacInterval, "auto-vacuum-int", 0, "Period between automatic VACUUMs. It not set, not enabled")
flag.BoolVar(&config.RaftNonVoter, "raft-non-voter", false, "Configure as non-voting node")
flag.DurationVar(&config.RaftHeartbeatTimeout, "raft-timeout", time.Second, "Raft heartbeat timeout")
flag.DurationVar(&config.RaftElectionTimeout, "raft-election-timeout", time.Second, "Raft election timeout")

@ -282,6 +282,7 @@ func createStore(cfg *Config, ln *tcp.Layer) (*store.Store, error) {
str.BootstrapExpect = cfg.BootstrapExpect
str.ReapTimeout = cfg.RaftReapNodeTimeout
str.ReapReadOnlyTimeout = cfg.RaftReapReadOnlyNodeTimeout
str.AutoVacInterval = cfg.AutoVacInterval
if store.IsNewNode(cfg.DataPath) {
log.Printf("no preexisting node state detected in %s, node may be bootstrapping", cfg.DataPath)

@ -0,0 +1,151 @@
package db
import (
"fmt"
"io"
"os"
"sync"
command "github.com/rqlite/rqlite/v8/command/proto"
)
// SwappableDB is a wrapper around DB that allows the underlying database to be swapped out
// in a thread-safe manner.
type SwappableDB struct {
db *DB
dbMu sync.RWMutex
}
// OpenSwappable returns a new SwappableDB instance, which opens the database at the given path.
func OpenSwappable(dbPath string, fkEnabled, wal bool) (*SwappableDB, error) {
db, err := Open(dbPath, fkEnabled, wal)
if err != nil {
return nil, err
}
return &SwappableDB{db: db}, nil
}
// Swap swaps the underlying database with that at the given path.
func (s *SwappableDB) Swap(path string, fkConstraints, walEnabled bool) error {
if !IsValidSQLiteFile(path) {
return fmt.Errorf("invalid SQLite data")
}
s.dbMu.Lock()
defer s.dbMu.Unlock()
if err := s.db.Close(); err != nil {
return fmt.Errorf("failed to close: %s", err)
}
if err := RemoveFiles(s.db.Path()); err != nil {
return fmt.Errorf("failed to remove files: %s", err)
}
if err := os.Rename(path, s.db.Path()); err != nil {
return fmt.Errorf("failed to rename database: %s", err)
}
var db *DB
db, err := Open(s.db.Path(), fkConstraints, walEnabled)
if err != nil {
return fmt.Errorf("open SQLite file failed: %s", err)
}
s.db = db
return nil
}
// Close closes the underlying database.
func (s *SwappableDB) Close() error {
s.dbMu.RLock()
defer s.dbMu.RUnlock()
return s.db.Close()
}
// Stats returns the underlying database's stats.
func (s *SwappableDB) Stats() (map[string]interface{}, error) {
s.dbMu.RLock()
defer s.dbMu.RUnlock()
return s.db.Stats()
}
// Request calls Request on the underlying database.
func (s *SwappableDB) Request(req *command.Request, xTime bool) ([]*command.ExecuteQueryResponse, error) {
s.dbMu.RLock()
defer s.dbMu.RUnlock()
return s.db.Request(req, xTime)
}
// Execute calls Execute on the underlying database.
func (s *SwappableDB) Execute(ex *command.Request, xTime bool) ([]*command.ExecuteResult, error) {
s.dbMu.RLock()
defer s.dbMu.RUnlock()
return s.db.Execute(ex, xTime)
}
// Query calls Query on the underlying database.
func (s *SwappableDB) Query(q *command.Request, xTime bool) ([]*command.QueryRows, error) {
s.dbMu.RLock()
defer s.dbMu.RUnlock()
return s.db.Query(q, xTime)
}
// VacuumInto calls VacuumInto on the underlying database.
func (s *SwappableDB) VacuumInto(path string) error {
s.dbMu.RLock()
defer s.dbMu.RUnlock()
return s.db.VacuumInto(path)
}
// Backup calls Backup on the underlying database.
func (s *SwappableDB) Backup(path string, vacuum bool) error {
s.dbMu.RLock()
defer s.dbMu.RUnlock()
return s.db.Backup(path, vacuum)
}
// Serialize calls Serialize on the underlying database.
func (s *SwappableDB) Serialize() ([]byte, error) {
s.dbMu.RLock()
defer s.dbMu.RUnlock()
return s.db.Serialize()
}
// StmtReadOnly calls StmtReadOnly on the underlying database.
func (s *SwappableDB) StmtReadOnly(sql string) (bool, error) {
s.dbMu.RLock()
defer s.dbMu.RUnlock()
return s.db.StmtReadOnly(sql)
}
// Checkpoint calls Checkpoint on the underlying database.
func (s *SwappableDB) Checkpoint() error {
s.dbMu.RLock()
defer s.dbMu.RUnlock()
return s.db.Checkpoint()
}
// Path calls Path on the underlying database.
func (s *SwappableDB) Path() string {
s.dbMu.RLock()
defer s.dbMu.RUnlock()
return s.db.Path()
}
// Dump calls Dump on the underlying database.
func (s *SwappableDB) Dump(w io.Writer) error {
s.dbMu.RLock()
defer s.dbMu.RUnlock()
return s.db.Dump(w)
}
// FKEnabled calls FKEnabled on the underlying database.
func (s *SwappableDB) FKEnabled() bool {
s.dbMu.RLock()
defer s.dbMu.RUnlock()
return s.db.FKEnabled()
}
// WALEnabled calls WALEnabled on the underlying database.
func (s *SwappableDB) WALEnabled() bool {
s.dbMu.RLock()
defer s.dbMu.RUnlock()
return s.db.WALEnabled()
}

@ -4,6 +4,7 @@ import (
"fmt"
"log"
"os"
"path/filepath"
"github.com/rqlite/rqlite/v8/command"
"github.com/rqlite/rqlite/v8/command/chunking"
@ -41,8 +42,7 @@ func NewCommandProcessor(logger *log.Logger, dm *chunking.DechunkerManager) *Com
}
// Process processes the given command against the given database.
func (c *CommandProcessor) Process(data []byte, pDB **sql.DB) (*proto.Command, bool, interface{}) {
db := *pDB
func (c *CommandProcessor) Process(data []byte, db *sql.SwappableDB) (*proto.Command, bool, interface{}) {
cmd := &proto.Command{}
if err := command.Unmarshal(data, cmd); err != nil {
panic(fmt.Sprintf("failed to unmarshal cluster command: %s", err.Error()))
@ -76,20 +76,23 @@ func (c *CommandProcessor) Process(data []byte, pDB **sql.DB) (*proto.Command, b
panic(fmt.Sprintf("failed to unmarshal load subcommand: %s", err.Error()))
}
// Swap the underlying database to the new one.
if err := db.Close(); err != nil {
return cmd, false, &fsmGenericResponse{error: fmt.Errorf("failed to close post-load database: %s", err)}
}
if err := sql.RemoveFiles(db.Path()); err != nil {
return cmd, false, &fsmGenericResponse{error: fmt.Errorf("failed to remove existing database files: %s", err)}
// create a scratch file in the same directory as s.db.Path()
fd, err := os.CreateTemp(filepath.Dir(db.Path()), "rqlilte-load-")
if err != nil {
return cmd, false, &fsmGenericResponse{error: fmt.Errorf("failed to create temporary database file: %s", err)}
}
newDB, err := createOnDisk(lr.Data, db.Path(), db.FKEnabled(), db.WALEnabled())
defer os.Remove(fd.Name())
defer fd.Close()
_, err = fd.Write(lr.Data)
if err != nil {
return cmd, false, &fsmGenericResponse{error: fmt.Errorf("failed to create on-disk database: %s", err)}
return cmd, false, &fsmGenericResponse{error: fmt.Errorf("failed to write to temporary database file: %s", err)}
}
fd.Close()
*pDB = newDB
// Swap the underlying database to the new one.
if err := db.Swap(fd.Name(), db.FKEnabled(), db.WALEnabled()); err != nil {
return cmd, false, &fsmGenericResponse{error: fmt.Errorf("error swapping databases: %s", err)}
}
return cmd, true, &fsmGenericResponse{}
case proto.Command_COMMAND_TYPE_LOAD_CHUNK:
var lcr proto.LoadChunkRequest
@ -129,25 +132,9 @@ func (c *CommandProcessor) Process(data []byte, pDB **sql.DB) (*proto.Command, b
c.logger.Printf("invalid chunked database file - ignoring")
return cmd, false, &fsmGenericResponse{error: fmt.Errorf("invalid chunked database file - ignoring")}
}
// Close the underlying database before we overwrite it.
if err := db.Close(); err != nil {
return cmd, false, &fsmGenericResponse{error: fmt.Errorf("failed to close post-load database: %s", err)}
if err := db.Swap(path, db.FKEnabled(), db.WALEnabled()); err != nil {
return cmd, false, &fsmGenericResponse{error: fmt.Errorf("error swapping databases: %s", err)}
}
if err := sql.RemoveFiles(db.Path()); err != nil {
return cmd, false, &fsmGenericResponse{error: fmt.Errorf("failed to remove existing database files: %s", err)}
}
if err := os.Rename(path, db.Path()); err != nil {
return cmd, false, &fsmGenericResponse{error: fmt.Errorf("failed to rename temporary database file: %s", err)}
}
newDB, err := sql.Open(db.Path(), db.FKEnabled(), db.WALEnabled())
if err != nil {
return cmd, false, &fsmGenericResponse{error: fmt.Errorf("failed to open new on-disk database: %s", err)}
}
// Swap the underlying database to the new one.
*pDB = newDB
}
}
return cmd, true, &fsmGenericResponse{}

@ -104,7 +104,7 @@ func RecoverNode(dataDir string, logger *log.Logger, logs raft.LogStore, stable
}
// Now, open the database so we can replay any outstanding Raft log entries.
db, err := sql.Open(tmpDBPath, false, true)
db, err := sql.OpenSwappable(tmpDBPath, false, true)
if err != nil {
return fmt.Errorf("failed to open temporary database: %s", err)
}
@ -135,7 +135,7 @@ func RecoverNode(dataDir string, logger *log.Logger, logs raft.LogStore, stable
return fmt.Errorf("failed to get log at index %d: %v", index, err)
}
if entry.Type == raft.LogCommand {
cmdProc.Process(entry.Data, &db)
cmdProc.Process(entry.Data, db)
}
lastIndex = entry.Index
lastTerm = entry.Term

@ -6,6 +6,7 @@ package store
import (
"bytes"
"compress/gzip"
"encoding/binary"
"errors"
"expvar"
"fmt"
@ -85,6 +86,7 @@ const (
restoreScratchPattern = "rqlite-restore-*"
bootScatchPattern = "rqlite-boot-*"
backupScatchPattern = "rqlite-backup-*"
vacuumScatchPattern = "rqlite-vacuum-*"
raftDBPath = "raft.db" // Changing this will break backwards compatibility.
peersPath = "raft/peers.json"
peersInfoPath = "raft/peers.info"
@ -100,6 +102,8 @@ const (
raftLogCacheSize = 512
trailingScale = 1.25
observerChanLen = 50
lastVacuumTimeKey = "rqlite_last_vacuum"
)
const (
@ -111,6 +115,9 @@ const (
numWALSnapshotsFailed = "num_wal_snapshots_failed"
numSnapshotsFull = "num_snapshots_full"
numSnapshotsIncremental = "num_snapshots_incremental"
numAutoVacuums = "num_auto_vacuums"
numAutoVacuumsFailed = "num_auto_vacuums_failed"
autoVacuumDuration = "auto_vacuum_duration"
numBoots = "num_boots"
numBackups = "num_backups"
numLoads = "num_loads"
@ -159,6 +166,9 @@ func ResetStats() {
stats.Add(numWALSnapshotsFailed, 0)
stats.Add(numSnapshotsFull, 0)
stats.Add(numSnapshotsIncremental, 0)
stats.Add(numAutoVacuums, 0)
stats.Add(numAutoVacuumsFailed, 0)
stats.Add(autoVacuumDuration, 0)
stats.Add(numBoots, 0)
stats.Add(numBackups, 0)
stats.Add(numLoads, 0)
@ -228,12 +238,12 @@ type Store struct {
raft *raft.Raft // The consensus mechanism.
ly Layer
raftTn *NodeTransport
raftID string // Node ID.
dbConf *DBConfig // SQLite database config.
dbPath string // Path to underlying SQLite file.
walPath string // Path to WAL file.
dbDir string // Path to directory containing SQLite file.
db *sql.DB // The underlying SQLite store.
raftID string // Node ID.
dbConf *DBConfig // SQLite database config.
dbPath string // Path to underlying SQLite file.
walPath string // Path to WAL file.
dbDir string // Path to directory containing SQLite file.
db *sql.SwappableDB // The underlying SQLite store.
dechunkManager *chunking.DechunkerManager
cmdProc *CommandProcessor
@ -301,6 +311,7 @@ type Store struct {
ApplyTimeout time.Duration
RaftLogLevel string
NoFreeListSync bool
AutoVacInterval time.Duration
// Node-reaping configuration
ReapTimeout time.Duration
@ -309,6 +320,7 @@ type Store struct {
numTrailingLogs uint64
// For whitebox testing
numAutoVacuums int
numIgnoredJoins int
numNoops int
numSnapshotsMu sync.Mutex
@ -482,16 +494,19 @@ func (s *Store) Open() (retErr error) {
s.logger.Printf("first log index: %d, last log index: %d, last applied index: %d, last command log index: %d:",
s.firstIdxOnOpen, s.lastIdxOnOpen, s.lastAppliedIdxOnOpen, s.lastCommandIdxOnOpen)
s.db, err = createOnDisk(nil, s.dbPath, s.dbConf.FKConstraints, true)
s.db, err = createOnDisk(s.dbPath, s.dbConf.FKConstraints, true)
if err != nil {
return fmt.Errorf("failed to create on-disk database: %s", err)
}
s.logger.Printf("created on-disk database at open")
// Clean up any files from aborted operations. This tries to catch the case where scratch files
// were created in the Raft directory, not cleaned up, and then the node was restarted with an
// explicit SQLite path set.
for _, pattern := range []string{restoreScratchPattern, bootScatchPattern, backupScatchPattern} {
for _, pattern := range []string{
restoreScratchPattern,
bootScatchPattern,
backupScatchPattern,
vacuumScatchPattern} {
for _, dir := range []string{s.raftDir, s.dbDir} {
files, err := filepath.Glob(filepath.Join(dir, pattern))
if err != nil {
@ -530,6 +545,9 @@ func (s *Store) Open() (retErr error) {
// Periodically update the applied index for faster startup.
s.appliedIdxUpdateDone = s.updateAppliedIndex()
if err := s.initLastVacuumTime(); err != nil {
return fmt.Errorf("failed to initialize last vacuum time: %s", err.Error())
}
return nil
}
@ -726,6 +744,16 @@ func (s *Store) State() ClusterState {
}
}
// LastVacuumTime returns the time of the last automatic VACUUM.
func (s *Store) LastVacuumTime() (time.Time, error) {
vt, err := s.boltStore.Get([]byte(lastVacuumTimeKey))
if err != nil {
return time.Time{}, fmt.Errorf("failed to get last vacuum time: %s", err)
}
n := int64(binary.LittleEndian.Uint64(vt))
return time.Unix(0, n), nil
}
// Path returns the path to the store's storage directory.
func (s *Store) Path() string {
return s.raftDir
@ -991,6 +1019,9 @@ func (s *Store) Stats() (map[string]interface{}, error) {
"sqlite3": dbStatus,
"db_conf": s.dbConf,
}
if lVac, err := s.LastVacuumTime(); err == nil {
status["last_vacuum"] = lVac.String()
}
// Snapshot stats may be in flux if a snapshot is in progress. Only
// report them if they are available.
@ -1321,7 +1352,7 @@ func (s *Store) ReadFrom(r io.Reader) (int64, error) {
return 0, ErrNotSingleNode
}
// Write the data to a temporary file..
// Write the data to a temporary file.
f, err := os.CreateTemp(s.dbDir, bootScatchPattern)
if err != nil {
return 0, err
@ -1356,20 +1387,9 @@ func (s *Store) ReadFrom(r io.Reader) (int64, error) {
}
// Swap in new database file.
if err := s.db.Close(); err != nil {
return n, err
}
if err := sql.RemoveFiles(s.dbPath); err != nil {
return n, err
if err := s.db.Swap(f.Name(), s.dbConf.FKConstraints, true); err != nil {
return n, fmt.Errorf("error swapping database file: %v", err)
}
if err := os.Rename(f.Name(), s.dbPath); err != nil {
return n, err
}
db, err := sql.Open(s.dbPath, s.dbConf.FKConstraints, true)
if err != nil {
return n, err
}
s.db = db
// Snapshot, so we load the new database into the Raft system.
if err := s.snapshotStore.SetFullNeeded(); err != nil {
@ -1382,6 +1402,55 @@ func (s *Store) ReadFrom(r io.Reader) (int64, error) {
return n, nil
}
// Vacuum performs a VACUUM operation on the underlying database. It is up
// to the caller to ensure that no writes are taking place during this call.
func (s *Store) Vacuum() error {
fd, err := os.CreateTemp(s.dbDir, vacuumScatchPattern)
if err != nil {
return err
}
if err := fd.Close(); err != nil {
return err
}
defer os.Remove(fd.Name())
if err := s.db.VacuumInto(fd.Name()); err != nil {
return err
}
// Verify that the VACUUMed database is valid.
if !sql.IsValidSQLiteFile(fd.Name()) {
return fmt.Errorf("invalid SQLite file post VACUUM")
}
// Swap in new database file.
if err := s.db.Swap(fd.Name(), s.dbConf.FKConstraints, true); err != nil {
return fmt.Errorf("error swapping database file: %v", err)
}
if err := s.snapshotStore.SetFullNeeded(); err != nil {
return err
}
if err := s.setLastVacuumTime(time.Now()); err != nil {
return err
}
return nil
}
// Database returns a copy of the underlying database. The caller MUST
// ensure that no transaction is taking place during this call, or an error may
// be returned. If leader is true, this operation is performed with a read
// consistency level equivalent to "weak". Otherwise, no guarantees are made
// about the read consistency level.
//
// http://sqlite.org/howtocorrupt.html states it is safe to do this
// as long as the database is not written to during the call.
func (s *Store) Database(leader bool) ([]byte, error) {
if leader && s.raft.State() != raft.Leader {
return nil, ErrNotLeader
}
return s.db.Serialize()
}
// Notify notifies this Store that a node is ready for bootstrapping at the
// given address. Once the number of known nodes reaches the expected level
// bootstrapping will be attempted using this Store. "Expected level" includes
@ -1606,6 +1675,24 @@ func (s *Store) remove(id string) error {
return f.Error()
}
func (s *Store) initLastVacuumTime() error {
if _, err := s.LastVacuumTime(); err != nil {
return s.setLastVacuumTime(time.Now())
}
return nil
}
func (s *Store) setLastVacuumTime(t time.Time) error {
buf := bytes.NewBuffer(make([]byte, 0, 8))
if err := binary.Write(buf, binary.LittleEndian, t.UnixNano()); err != nil {
return fmt.Errorf("failed to encode last vacuum time: %s", err)
}
if err := s.boltStore.Set([]byte(lastVacuumTimeKey), buf.Bytes()); err != nil {
return fmt.Errorf("failed to set last vacuum time: %s", err)
}
return nil
}
// raftConfig returns a new Raft config for the store.
func (s *Store) raftConfig() *raft.Config {
config := raft.DefaultConfig()
@ -1702,7 +1789,7 @@ func (s *Store) fsmApply(l *raft.Log) (e interface{}) {
s.logger.Printf("first log applied since node start, log at index %d", l.Index)
}
cmd, mutated, r := s.cmdProc.Process(l.Data, &s.db)
cmd, mutated, r := s.cmdProc.Process(l.Data, s.db)
if mutated {
s.dbAppliedIdxMu.Lock()
s.dbAppliedIdx = l.Index
@ -1722,21 +1809,6 @@ func (s *Store) fsmApply(l *raft.Log) (e interface{}) {
return r
}
// Database returns a copy of the underlying database. The caller MUST
// ensure that no transaction is taking place during this call, or an error may
// be returned. If leader is true, this operation is performed with a read
// consistency level equivalent to "weak". Otherwise, no guarantees are made
// about the read consistency level.
//
// http://sqlite.org/howtocorrupt.html states it is safe to do this
// as long as the database is not written to during the call.
func (s *Store) Database(leader bool) ([]byte, error) {
if leader && s.raft.State() != raft.Leader {
return nil, ErrNotLeader
}
return s.db.Serialize()
}
// fsmSnapshot returns a snapshot of the database.
//
// The system must ensure that no transaction is taking place during this call.
@ -1748,6 +1820,9 @@ func (s *Store) Database(leader bool) ([]byte, error) {
// http://sqlite.org/howtocorrupt.html states it is safe to copy or serialize the
// database as long as no writes to the database are in progress.
func (s *Store) fsmSnapshot() (fSnap raft.FSMSnapshot, retErr error) {
s.queryTxMu.Lock()
defer s.queryTxMu.Unlock()
if err := s.snapshotCAS.Begin(); err != nil {
return nil, err
}
@ -1760,6 +1835,24 @@ func (s *Store) fsmSnapshot() (fSnap raft.FSMSnapshot, retErr error) {
}
}()
// Automatic VACUUM needed?
if avn, err := s.autoVacNeeded(time.Now()); err != nil {
return nil, err
} else if avn {
vacStart := time.Now()
if err := s.Vacuum(); err != nil {
stats.Add(numAutoVacuumsFailed, 1)
return nil, err
}
s.logger.Printf("database vacuumed in %s", time.Since(vacStart))
stats.Get(autoVacuumDuration).(*expvar.Int).Set(time.Since(vacStart).Milliseconds())
stats.Add(numAutoVacuums, 1)
s.numAutoVacuums++
if err := s.setLastVacuumTime(time.Now()); err != nil {
return nil, err
}
}
fullNeeded, err := s.snapshotStore.FullNeeded()
if err != nil {
return nil, err
@ -1771,9 +1864,6 @@ func (s *Store) fsmSnapshot() (fSnap raft.FSMSnapshot, retErr error) {
s.numSnapshots++
}()
s.queryTxMu.Lock()
defer s.queryTxMu.Unlock()
var fsmSnapshot raft.FSMSnapshot
if fullNeeded {
if err := s.db.Checkpoint(); err != nil {
@ -1867,27 +1957,9 @@ func (s *Store) fsmRestore(rc io.ReadCloser) (retErr error) {
return fmt.Errorf("error creating temporary file for restore operation: %v", err)
}
// Must wipe out all pre-existing state if being asked to do a restore, and put
// the new database in place.
if !sql.IsValidSQLiteFile(tmpFile.Name()) {
return fmt.Errorf("invalid SQLite data")
}
if err := s.db.Close(); err != nil {
return fmt.Errorf("failed to close pre-restore database: %s", err)
}
if err := sql.RemoveFiles(s.db.Path()); err != nil {
return fmt.Errorf("failed to remove pre-restore database files: %s", err)
}
if err := os.Rename(tmpFile.Name(), s.db.Path()); err != nil {
return fmt.Errorf("failed to rename restored database: %s", err)
}
var db *sql.DB
db, err = sql.Open(s.dbPath, s.dbConf.FKConstraints, true)
if err != nil {
return fmt.Errorf("open SQLite file during restore: %s", err)
if err := s.db.Swap(tmpFile.Name(), s.dbConf.FKConstraints, true); err != nil {
return fmt.Errorf("error swapping database file: %v", err)
}
s.db = db
s.logger.Printf("successfully opened database at %s due to restore", s.db.Path())
// Take conservative approach and assume that everything has changed, so update
@ -2142,19 +2214,25 @@ func (s *Store) tryCompress(rq command.Requester) ([]byte, bool, error) {
return b, compressed, nil
}
// createOnDisk opens an on-disk database file at the configured path. If b is
// non-nil, any preexisting file will first be overwritten with those contents.
// Otherwise, any preexisting file will be removed before the database is opened.
func createOnDisk(b []byte, path string, fkConstraints, wal bool) (*sql.DB, error) {
// autoVacNeeded returns true if an automatic VACUUM is needed.
func (s *Store) autoVacNeeded(t time.Time) (bool, error) {
if s.AutoVacInterval == 0 {
return false, nil
}
lvt, err := s.LastVacuumTime()
if err != nil {
return false, err
}
return t.Sub(lvt) > s.AutoVacInterval, nil
}
// createOnDisk opens an on-disk database file at the configured path. Any
// preexisting file will be removed before the database is opened.
func createOnDisk(path string, fkConstraints, wal bool) (*sql.SwappableDB, error) {
if err := sql.RemoveFiles(path); err != nil {
return nil, err
}
if b != nil {
if err := os.WriteFile(path, b, 0660); err != nil {
return nil, err
}
}
return sql.Open(path, fkConstraints, wal)
return sql.OpenSwappable(path, fkConstraints, wal)
}
func copyFromReaderToFile(path string, r io.Reader) (int64, error) {

@ -11,6 +11,7 @@ import (
"os"
"path/filepath"
"strings"
"sync"
"testing"
"time"
@ -51,6 +52,68 @@ func Test_OpenStoreSingleNode(t *testing.T) {
}
}
func Test_OpenStoreSingleNode_LastVacuum(t *testing.T) {
s, ln := mustNewStore(t)
defer s.Close(true)
defer ln.Close()
now := time.Now()
if err := s.Open(); err != nil {
t.Fatalf("failed to open single-node store: %s", err.Error())
}
lv, err := s.LastVacuumTime()
if err != nil {
t.Fatalf("failed to retrieve last vacuum time: %s", err.Error())
}
if lv.Before(now) {
t.Fatalf("last vacuum time is before now, lv: %s, now: %s", lv, now)
}
}
// Test_OpenStoreSingleNode_VacuumFullNeeded tests that running a VACUUM
// means that a full snapshot is needed.
func Test_OpenStoreSingleNode_VacuumFullNeeded(t *testing.T) {
s, ln := mustNewStore(t)
defer s.Close(true)
defer ln.Close()
if err := s.Open(); err != nil {
t.Fatalf("failed to open single-node store: %s", err.Error())
}
defer s.Close(true)
if err := s.Bootstrap(NewServer(s.ID(), s.Addr(), true)); err != nil {
t.Fatalf("failed to bootstrap single-node store: %s", err.Error())
}
if _, err := s.WaitForLeader(10 * time.Second); err != nil {
t.Fatalf("Error waiting for leader: %s", err)
}
er := executeRequestFromStrings([]string{
`CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)`,
}, false, false)
_, err := s.Execute(er)
if err != nil {
t.Fatalf("failed to execute on single node: %s", err.Error())
}
if err := s.Snapshot(0); err != nil {
t.Fatalf("failed to snapshot store: %s", err.Error())
}
if fn, err := s.snapshotStore.FullNeeded(); err != nil {
t.Fatalf("failed to determine full snapshot needed: %s", err.Error())
} else if fn {
t.Fatalf("full snapshot marked as needed")
}
if err := s.Vacuum(); err != nil {
t.Fatalf("failed to vacuum database: %s", err.Error())
}
if fn, err := s.snapshotStore.FullNeeded(); err != nil {
t.Fatalf("failed to determine full snapshot needed: %s", err.Error())
} else if !fn {
t.Fatalf("full snapshot not marked as needed")
}
}
// Test_SingleNodeSQLitePath ensures that basic functionality works when the SQLite database path
// is explicitly specificed.
func Test_SingleNodeOnDiskSQLitePath(t *testing.T) {
@ -1810,6 +1873,197 @@ func Test_SingleNode_WALTriggeredSnapshot(t *testing.T) {
t.Fatalf("wrong snapshot store file: %s", f.Name())
}
}
}
// Test_SingleNode_SnapshotWithAutoVac tests that a Store correctly operates
// when performing both Snapshots and Auto-Vacuums.
func Test_SingleNode_SnapshotWithAutoVac(t *testing.T) {
s, ln := mustNewStore(t)
defer ln.Close()
s.SnapshotThreshold = 8192 // Ensures Snapshot not triggered during testing.
if err := s.Open(); err != nil {
t.Fatalf("failed to open single-node store: %s", err.Error())
}
defer s.Close(true)
if err := s.Bootstrap(NewServer(s.ID(), s.Addr(), true)); err != nil {
t.Fatalf("failed to bootstrap single-node store: %s", err.Error())
}
if _, err := s.WaitForLeader(10 * time.Second); err != nil {
t.Fatalf("Error waiting for leader: %s", err)
}
// Create a table, and insert some data.
er := executeRequestFromString(`CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)`,
false, false)
_, err := s.Execute(er)
if err != nil {
t.Fatalf("failed to execute on single node: %s", err.Error())
}
for i := 0; i < 100; i++ {
_, err := s.Execute(executeRequestFromString(`INSERT INTO foo(name) VALUES("fiona")`, false, false))
if err != nil {
t.Fatalf("failed to execute INSERT on single node: %s", err.Error())
}
}
// Force an initial snapshot, shouldn't need a full snapshot afterwards
if err := s.Snapshot(0); err != nil {
t.Fatalf("failed to snapshot single-node store: %s", err.Error())
}
if fn, err := s.snapshotStore.FullNeeded(); err != nil {
t.Fatalf("failed to check if snapshot store needs a full snapshot: %s", err.Error())
} else if fn {
t.Fatalf("expected snapshot store to not need a full snapshot")
}
// Enable auto-vacuuming.
s.AutoVacInterval = 1 * time.Nanosecond
if n, err := s.autoVacNeeded(time.Now()); err != nil {
t.Fatalf("failed to check if auto-vacuum is needed: %s", err.Error())
} else if !n {
t.Fatalf("expected auto-vacuum to be needed")
}
// Force another snapshot, this time a VACUUM should be triggered.
if err := s.Snapshot(0); err != nil {
t.Fatalf("failed to snapshot single-node store: %s", err.Error())
}
if exp, got := 1, s.numAutoVacuums; exp != got {
t.Fatalf("expected %d auto-vacuums, got %d", exp, got)
}
s.AutoVacInterval = 1 * time.Hour // Effectively disable auto-vacuuming.
if n, err := s.autoVacNeeded(time.Now()); err != nil {
t.Fatalf("failed to check if auto-vacuum is needed: %s", err.Error())
} else if n {
t.Fatalf("auto-vacuum should not be needed")
}
// Query the data, make sure it looks good after all this.
qr := queryRequestFromString("SELECT COUNT(*) FROM foo", false, true)
r, err := s.Query(qr)
if err != nil {
t.Fatalf("failed to query single node: %s", err.Error())
}
if exp, got := `[{"columns":["COUNT(*)"],"types":["integer"],"values":[[100]]}]`, asJSON(r); exp != got {
t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
}
// Make sure the node works across a restart
preAVT, err := s.LastVacuumTime()
if err != nil {
t.Fatalf("failed to get last vacuum time: %s", err.Error())
}
if err := s.Close(true); err != nil {
t.Fatalf("failed to close store: %s", err.Error())
}
if err := s.Open(); err != nil {
t.Fatalf("failed to open store: %s", err.Error())
}
defer s.Close(true)
if _, err := s.WaitForLeader(10 * time.Second); err != nil {
t.Fatalf("Error waiting for leader: %s", err)
}
postAVT, err := s.LastVacuumTime()
if err != nil {
t.Fatalf("failed to get last vacuum time: %s", err.Error())
}
if preAVT != postAVT {
t.Fatalf("expected last vacuum time to be the same across restarts")
}
if n, err := s.autoVacNeeded(time.Now()); err != nil {
t.Fatalf("failed to check if auto-vacuum is needed: %s", err.Error())
} else if n {
t.Fatalf("auto-vacuum should not be needed")
}
// Query the data again, make sure it still looks good after all this.
r, err = s.Query(qr)
if err != nil {
t.Fatalf("failed to query single node: %s", err.Error())
}
if exp, got := `[{"columns":["COUNT(*)"],"types":["integer"],"values":[[100]]}]`, asJSON(r); exp != got {
t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
}
}
func Test_SingleNode_SnapshotWithAutoVac_Stress(t *testing.T) {
s, ln := mustNewStore(t)
defer ln.Close()
s.SnapshotThreshold = 50
s.SnapshotInterval = 100 * time.Millisecond
s.AutoVacInterval = 500 * time.Millisecond
if err := s.Open(); err != nil {
t.Fatalf("failed to open single-node store: %s", err.Error())
}
defer s.Close(true)
if err := s.Bootstrap(NewServer(s.ID(), s.Addr(), true)); err != nil {
t.Fatalf("failed to bootstrap single-node store: %s", err.Error())
}
if _, err := s.WaitForLeader(10 * time.Second); err != nil {
t.Fatalf("Error waiting for leader: %s", err)
}
// Create a table
er := executeRequestFromString(`CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)`,
false, false)
_, err := s.Execute(er)
if err != nil {
t.Fatalf("failed to execute on single node: %s", err.Error())
}
// Insert a bunch of data concurrently, putting some load on the Store.
var wg sync.WaitGroup
wg.Add(5)
insertFn := func() {
defer wg.Done()
for i := 0; i < 500; i++ {
_, err := s.Execute(executeRequestFromString(`INSERT INTO foo(name) VALUES("fiona")`, false, false))
if err != nil {
t.Errorf("failed to execute INSERT on single node: %s", err.Error())
}
}
}
for i := 0; i < 5; i++ {
go insertFn()
}
wg.Wait()
if s.WaitForAllApplied(5*time.Second) != nil {
t.Fatalf("failed to wait for all data to be applied")
}
// Query the data, make sure it looks good after all this.
qr := queryRequestFromString("SELECT COUNT(*) FROM foo", false, true)
qr.Level = proto.QueryRequest_QUERY_REQUEST_LEVEL_STRONG
r, err := s.Query(qr)
if err != nil {
t.Fatalf("failed to query single node: %s", err.Error())
}
if exp, got := `[{"columns":["COUNT(*)"],"types":["integer"],"values":[[2500]]}]`, asJSON(r); exp != got {
t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
}
// Restart the Store, make sure it still works.
if err := s.Close(true); err != nil {
t.Fatalf("failed to close store: %s", err.Error())
}
if err := s.Open(); err != nil {
t.Fatalf("failed to open store: %s", err.Error())
}
defer s.Close(true)
if _, err := s.WaitForLeader(10 * time.Second); err != nil {
t.Fatalf("Error waiting for leader: %s", err)
}
r, err = s.Query(qr)
if err != nil {
t.Fatalf("failed to query single node: %s", err.Error())
}
if exp, got := `[{"columns":["COUNT(*)"],"types":["integer"],"values":[[2500]]}]`, asJSON(r); exp != got {
t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
}
}

Loading…
Cancel
Save