From 958b67908bb9c549d9bc74230b5a0850331e0367 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Sat, 27 Jan 2024 17:55:23 -0500 Subject: [PATCH] Move to explicit checkpoint type --- db/db.go | 39 +++++++++++++++++++++++++++++++++------ db/db_test.go | 16 ++++++++-------- db/state.go | 2 +- db/state_test.go | 16 ++++++++-------- db/swappable_db.go | 4 ++-- store/state.go | 2 +- store/store.go | 4 ++-- 7 files changed, 55 insertions(+), 28 deletions(-) diff --git a/db/db.go b/db/db.go index 14b6c32f..7aaecfa6 100644 --- a/db/db.go +++ b/db/db.go @@ -43,12 +43,39 @@ const ( numETx = "execute_transactions" numQTx = "query_transactions" numRTx = "request_transactions" - - CheckpointQuery = "PRAGMA wal_checkpoint(TRUNCATE)" // rqlite WAL compaction requires truncation ) var ( + // ErrWALReplayDirectoryMismatch is returned when the WAL file(s) are not in the same + // directory as the database file. ErrWALReplayDirectoryMismatch = errors.New("WAL file(s) not in same directory as database file") + + // ErrCheckpointTimeout is returned when a checkpoint does not complete within the + // given duration. + ErrCheckpointTimeout = errors.New("checkpoint timeout") +) + +// CheckpointMode is the mode in which a checkpoint runs. +type CheckpointMode int + +const ( + // CheckpointPassive instructs the checkpoint to run in passive mode. + CheckpointPassive CheckpointMode = iota + // CheckpointFull instructs the checkpoint to run in full mode. + CheckpointFull + // CheckpointRestart instructs the checkpoint to run in restart mode. + CheckpointRestart + // CheckpointTruncate instructs the checkpoint to run in truncate mode. + CheckpointTruncate +) + +var ( + checkpointPRAGMAs = map[CheckpointMode]string{ + CheckpointPassive: "PRAGMA wal_checkpoint(PASSIVE)", + CheckpointFull: "PRAGMA wal_checkpoint(FULL)", + CheckpointRestart: "PRAGMA wal_checkpoint(RESTART)", + CheckpointTruncate: "PRAGMA wal_checkpoint(TRUNCATE)", + } ) // DBVersion is the SQLite version. @@ -302,14 +329,14 @@ func (db *DB) WALSize() (int64, error) { // Checkpoint checkpoints the WAL file. If the WAL file is not enabled, this // function is a no-op. -func (db *DB) Checkpoint() error { - return db.CheckpointWithTimeout(0) +func (db *DB) Checkpoint(mode CheckpointMode) error { + return db.CheckpointWithTimeout(mode, 0) } // CheckpointWithTimeout performs a WAL checkpoint. If the checkpoint does not // complete within the given duration, an error is returned. If the duration is 0, // the checkpoint will be attempted only once. -func (db *DB) CheckpointWithTimeout(dur time.Duration) (err error) { +func (db *DB) CheckpointWithTimeout(mode CheckpointMode, dur time.Duration) (err error) { start := time.Now() defer func() { if err != nil { @@ -325,7 +352,7 @@ func (db *DB) CheckpointWithTimeout(dur time.Duration) (err error) { var nMoved int f := func() error { - err := db.rwDB.QueryRow(CheckpointQuery).Scan(&ok, &nPages, &nMoved) + err := db.rwDB.QueryRow(checkpointPRAGMAs[mode]).Scan(&ok, &nPages, &nMoved) stats.Add(numCheckpointedPages, int64(nPages)) stats.Add(numCheckpointedMoves, int64(nMoved)) if err != nil { diff --git a/db/db_test.go b/db/db_test.go index c2396c7d..72f31243 100644 --- a/db/db_test.go +++ b/db/db_test.go @@ -132,7 +132,7 @@ func Test_TableCreation(t *testing.T) { testQ() // Confirm checkpoint works without error. - if err := db.Checkpoint(); err != nil { + if err := db.Checkpoint(CheckpointRestart); err != nil { t.Fatalf("failed to checkpoint database: %s", err.Error()) } testQ() @@ -176,7 +176,7 @@ func Test_DBSums(t *testing.T) { t.Fatalf("WAL sum did not change after insertion") } - if err := db.Checkpoint(); err != nil { + if err := db.Checkpoint(CheckpointRestart); err != nil { t.Fatalf("failed to checkpoint database: %s", err.Error()) } @@ -234,7 +234,7 @@ func Test_DBLastModified(t *testing.T) { // Checkpoint, and check time is later. On some platforms the time resolution isn't that // high, so we sleep so the test won't suffer a false failure. time.Sleep(1 * time.Second) - if err := db.Checkpoint(); err != nil { + if err := db.Checkpoint(CheckpointRestart); err != nil { t.Fatalf("failed to checkpoint database: %s", err.Error()) } lm3, err := db.LastModified() @@ -669,14 +669,14 @@ func Test_WALDatabaseCreatedOK(t *testing.T) { t.Fatalf("WAL file exists but is empty") } - if err := db.Checkpoint(); err != nil { + if err := db.Checkpoint(CheckpointTruncate); err != nil { t.Fatalf("failed to checkpoint database in WAL mode: %s", err.Error()) } if mustFileSize(walPath) != 0 { t.Fatalf("WAL file exists but is non-empty") } // Checkpoint a second time, to ensure it's idempotent. - if err := db.Checkpoint(); err != nil { + if err := db.Checkpoint(CheckpointTruncate); err != nil { t.Fatalf("failed to checkpoint database in WAL mode: %s", err.Error()) } } @@ -698,7 +698,7 @@ func Test_WALDatabaseCheckpointOKNoWAL(t *testing.T) { t.Fatalf("WAL file exists when no writes have happened") } defer db.Close() - if err := db.Checkpoint(); err != nil { + if err := db.Checkpoint(CheckpointTruncate); err != nil { t.Fatalf("failed to checkpoint database in WAL mode with non-existent WAL: %s", err.Error()) } } @@ -717,7 +717,7 @@ func Test_WALDatabaseCheckpointOKDelete(t *testing.T) { t.Fatalf("WAL mode enabled") } defer db.Close() - if err := db.Checkpoint(); err != nil { + if err := db.Checkpoint(CheckpointTruncate); err != nil { t.Fatalf("failed to checkpoint database in DELETE mode: %s", err.Error()) } } @@ -839,7 +839,7 @@ func test_FileCreationOnDisk(t *testing.T, db *DB) { // Confirm checkpoint works on all types of on-disk databases. Worst case, this // should be ignored. - if err := db.Checkpoint(); err != nil { + if err := db.Checkpoint(CheckpointRestart); err != nil { t.Fatalf("failed to checkpoint database in DELETE mode: %s", err.Error()) } } diff --git a/db/state.go b/db/state.go index 8e1af562..ba3282fe 100644 --- a/db/state.go +++ b/db/state.go @@ -223,7 +223,7 @@ func ReplayWAL(path string, wals []string, deleteMode bool) error { if err != nil { return err } - if err := db.Checkpoint(); err != nil { + if err := db.Checkpoint(CheckpointTruncate); err != nil { return fmt.Errorf("checkpoint WAL %s: %s", wal, err.Error()) } diff --git a/db/state_test.go b/db/state_test.go index f188b4d5..21892483 100644 --- a/db/state_test.go +++ b/db/state_test.go @@ -220,7 +220,7 @@ func Test_WALReplayOK(t *testing.T) { } mustCopyFile(replayDBPath, dbPath) mustCopyFile(filepath.Join(replayDir, walFile+"_001"), walPath) - if err := db.Checkpoint(); err != nil { + if err := db.Checkpoint(CheckpointTruncate); err != nil { t.Fatalf("failed to checkpoint database in WAL mode: %s", err.Error()) } @@ -233,7 +233,7 @@ func Test_WALReplayOK(t *testing.T) { t.Fatalf("WAL file at %s does not exist", walPath) } mustCopyFile(filepath.Join(replayDir, walFile+"_002"), walPath) - if err := db.Checkpoint(); err != nil { + if err := db.Checkpoint(CheckpointTruncate); err != nil { t.Fatalf("failed to checkpoint database in WAL mode: %s", err.Error()) } @@ -321,7 +321,7 @@ func Test_WALReplayOK_Complex(t *testing.T) { if _, err := srcDB.ExecuteStringStmt("CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)"); err != nil { t.Fatalf("failed to create table: %s", err.Error()) } - if err := srcDB.Checkpoint(); err != nil { + if err := srcDB.Checkpoint(CheckpointTruncate); err != nil { t.Fatalf("failed to checkpoint database in WAL mode: %s", err.Error()) } mustCopyFile(dstPath, srcPath) @@ -353,7 +353,7 @@ func Test_WALReplayOK_Complex(t *testing.T) { dstWALPath := fmt.Sprintf("%s-%d", dstPath, i) mustCopyFile(dstWALPath, srcWALPath) dstWALs = append(dstWALs, dstWALPath) - if err := srcDB.Checkpoint(); err != nil { + if err := srcDB.Checkpoint(CheckpointTruncate); err != nil { t.Fatalf("failed to checkpoint database in WAL mode: %s", err.Error()) } } @@ -369,7 +369,7 @@ func Test_WALReplayOK_Complex(t *testing.T) { dstWALPath := fmt.Sprintf("%s-postdelete", dstPath) mustCopyFile(dstWALPath, srcWALPath) dstWALs = append(dstWALs, dstWALPath) - if err := srcDB.Checkpoint(); err != nil { + if err := srcDB.Checkpoint(CheckpointTruncate); err != nil { t.Fatalf("failed to checkpoint database in WAL mode: %s", err.Error()) } @@ -379,7 +379,7 @@ func Test_WALReplayOK_Complex(t *testing.T) { dstWALPath = fmt.Sprintf("%s-postupdate", dstPath) mustCopyFile(dstWALPath, srcWALPath) dstWALs = append(dstWALs, dstWALPath) - if err := srcDB.Checkpoint(); err != nil { + if err := srcDB.Checkpoint(CheckpointTruncate); err != nil { t.Fatalf("failed to checkpoint database in WAL mode: %s", err.Error()) } @@ -394,7 +394,7 @@ func Test_WALReplayOK_Complex(t *testing.T) { dstWALPath = fmt.Sprintf("%s-create-tables", dstPath) mustCopyFile(dstWALPath, srcWALPath) dstWALs = append(dstWALs, dstWALPath) - if err := srcDB.Checkpoint(); err != nil { + if err := srcDB.Checkpoint(CheckpointTruncate); err != nil { t.Fatalf("failed to checkpoint database in WAL mode: %s", err.Error()) } @@ -407,7 +407,7 @@ func Test_WALReplayOK_Complex(t *testing.T) { dstWALPath = fmt.Sprintf("%s-post-create-tables", dstPath) mustCopyFile(dstWALPath, srcWALPath) dstWALs = append(dstWALs, dstWALPath) - if err := srcDB.Checkpoint(); err != nil { + if err := srcDB.Checkpoint(CheckpointTruncate); err != nil { t.Fatalf("failed to checkpoint database in WAL mode: %s", err.Error()) } diff --git a/db/swappable_db.go b/db/swappable_db.go index e6ca7533..3e2107e9 100644 --- a/db/swappable_db.go +++ b/db/swappable_db.go @@ -124,10 +124,10 @@ func (s *SwappableDB) StmtReadOnly(sql string) (bool, error) { } // Checkpoint calls Checkpoint on the underlying database. -func (s *SwappableDB) Checkpoint() error { +func (s *SwappableDB) Checkpoint(mode CheckpointMode) error { s.dbMu.RLock() defer s.dbMu.RUnlock() - return s.db.Checkpoint() + return s.db.Checkpoint(mode) } // Path calls Path on the underlying database. diff --git a/store/state.go b/store/state.go index dd27280c..07ca8a97 100644 --- a/store/state.go +++ b/store/state.go @@ -143,7 +143,7 @@ func RecoverNode(dataDir string, logger *log.Logger, logs raft.LogStore, stable // Create a new snapshot, placing the configuration in as if it was // committed at index 1. - if err := db.Checkpoint(); err != nil { + if err := db.Checkpoint(sql.CheckpointTruncate); err != nil { return fmt.Errorf("failed to checkpoint database: %s", err) } tmpDBFD, err := os.Open(tmpDBPath) diff --git a/store/store.go b/store/store.go index a9f933c8..1d0f75c0 100644 --- a/store/store.go +++ b/store/store.go @@ -1903,7 +1903,7 @@ func (s *Store) fsmSnapshot() (fSnap raft.FSMSnapshot, retErr error) { var fsmSnapshot raft.FSMSnapshot if fullNeeded { - if err := s.db.Checkpoint(); err != nil { + if err := s.db.Checkpoint(sql.CheckpointTruncate); err != nil { stats.Add(numFullCheckpointFailed, 1) return nil, err } @@ -1942,7 +1942,7 @@ func (s *Store) fsmSnapshot() (fSnap raft.FSMSnapshot, retErr error) { } stats.Get(snapshotWALSize).(*expvar.Int).Set(int64(compactedBuf.Len())) stats.Get(snapshotPrecompactWALSize).(*expvar.Int).Set(walSz) - if err := s.db.Checkpoint(); err != nil { + if err := s.db.Checkpoint(sql.CheckpointTruncate); err != nil { stats.Add(numWALCheckpointFailed, 1) // Failing to checkpoint the WAL leaves the main database in an inconsistent // state (if a WAL file was partially checkpointed, then the next WAL file will not