From 958b67908bb9c549d9bc74230b5a0850331e0367 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Sat, 27 Jan 2024 17:55:23 -0500 Subject: [PATCH 01/17] Move to explicit checkpoint type --- db/db.go | 39 +++++++++++++++++++++++++++++++++------ db/db_test.go | 16 ++++++++-------- db/state.go | 2 +- db/state_test.go | 16 ++++++++-------- db/swappable_db.go | 4 ++-- store/state.go | 2 +- store/store.go | 4 ++-- 7 files changed, 55 insertions(+), 28 deletions(-) diff --git a/db/db.go b/db/db.go index 14b6c32f..7aaecfa6 100644 --- a/db/db.go +++ b/db/db.go @@ -43,12 +43,39 @@ const ( numETx = "execute_transactions" numQTx = "query_transactions" numRTx = "request_transactions" - - CheckpointQuery = "PRAGMA wal_checkpoint(TRUNCATE)" // rqlite WAL compaction requires truncation ) var ( + // ErrWALReplayDirectoryMismatch is returned when the WAL file(s) are not in the same + // directory as the database file. ErrWALReplayDirectoryMismatch = errors.New("WAL file(s) not in same directory as database file") + + // ErrCheckpointTimeout is returned when a checkpoint does not complete within the + // given duration. + ErrCheckpointTimeout = errors.New("checkpoint timeout") +) + +// CheckpointMode is the mode in which a checkpoint runs. +type CheckpointMode int + +const ( + // CheckpointPassive instructs the checkpoint to run in passive mode. + CheckpointPassive CheckpointMode = iota + // CheckpointFull instructs the checkpoint to run in full mode. + CheckpointFull + // CheckpointRestart instructs the checkpoint to run in restart mode. + CheckpointRestart + // CheckpointTruncate instructs the checkpoint to run in truncate mode. + CheckpointTruncate +) + +var ( + checkpointPRAGMAs = map[CheckpointMode]string{ + CheckpointPassive: "PRAGMA wal_checkpoint(PASSIVE)", + CheckpointFull: "PRAGMA wal_checkpoint(FULL)", + CheckpointRestart: "PRAGMA wal_checkpoint(RESTART)", + CheckpointTruncate: "PRAGMA wal_checkpoint(TRUNCATE)", + } ) // DBVersion is the SQLite version. @@ -302,14 +329,14 @@ func (db *DB) WALSize() (int64, error) { // Checkpoint checkpoints the WAL file. If the WAL file is not enabled, this // function is a no-op. -func (db *DB) Checkpoint() error { - return db.CheckpointWithTimeout(0) +func (db *DB) Checkpoint(mode CheckpointMode) error { + return db.CheckpointWithTimeout(mode, 0) } // CheckpointWithTimeout performs a WAL checkpoint. If the checkpoint does not // complete within the given duration, an error is returned. If the duration is 0, // the checkpoint will be attempted only once. -func (db *DB) CheckpointWithTimeout(dur time.Duration) (err error) { +func (db *DB) CheckpointWithTimeout(mode CheckpointMode, dur time.Duration) (err error) { start := time.Now() defer func() { if err != nil { @@ -325,7 +352,7 @@ func (db *DB) CheckpointWithTimeout(dur time.Duration) (err error) { var nMoved int f := func() error { - err := db.rwDB.QueryRow(CheckpointQuery).Scan(&ok, &nPages, &nMoved) + err := db.rwDB.QueryRow(checkpointPRAGMAs[mode]).Scan(&ok, &nPages, &nMoved) stats.Add(numCheckpointedPages, int64(nPages)) stats.Add(numCheckpointedMoves, int64(nMoved)) if err != nil { diff --git a/db/db_test.go b/db/db_test.go index c2396c7d..72f31243 100644 --- a/db/db_test.go +++ b/db/db_test.go @@ -132,7 +132,7 @@ func Test_TableCreation(t *testing.T) { testQ() // Confirm checkpoint works without error. - if err := db.Checkpoint(); err != nil { + if err := db.Checkpoint(CheckpointRestart); err != nil { t.Fatalf("failed to checkpoint database: %s", err.Error()) } testQ() @@ -176,7 +176,7 @@ func Test_DBSums(t *testing.T) { t.Fatalf("WAL sum did not change after insertion") } - if err := db.Checkpoint(); err != nil { + if err := db.Checkpoint(CheckpointRestart); err != nil { t.Fatalf("failed to checkpoint database: %s", err.Error()) } @@ -234,7 +234,7 @@ func Test_DBLastModified(t *testing.T) { // Checkpoint, and check time is later. On some platforms the time resolution isn't that // high, so we sleep so the test won't suffer a false failure. time.Sleep(1 * time.Second) - if err := db.Checkpoint(); err != nil { + if err := db.Checkpoint(CheckpointRestart); err != nil { t.Fatalf("failed to checkpoint database: %s", err.Error()) } lm3, err := db.LastModified() @@ -669,14 +669,14 @@ func Test_WALDatabaseCreatedOK(t *testing.T) { t.Fatalf("WAL file exists but is empty") } - if err := db.Checkpoint(); err != nil { + if err := db.Checkpoint(CheckpointTruncate); err != nil { t.Fatalf("failed to checkpoint database in WAL mode: %s", err.Error()) } if mustFileSize(walPath) != 0 { t.Fatalf("WAL file exists but is non-empty") } // Checkpoint a second time, to ensure it's idempotent. - if err := db.Checkpoint(); err != nil { + if err := db.Checkpoint(CheckpointTruncate); err != nil { t.Fatalf("failed to checkpoint database in WAL mode: %s", err.Error()) } } @@ -698,7 +698,7 @@ func Test_WALDatabaseCheckpointOKNoWAL(t *testing.T) { t.Fatalf("WAL file exists when no writes have happened") } defer db.Close() - if err := db.Checkpoint(); err != nil { + if err := db.Checkpoint(CheckpointTruncate); err != nil { t.Fatalf("failed to checkpoint database in WAL mode with non-existent WAL: %s", err.Error()) } } @@ -717,7 +717,7 @@ func Test_WALDatabaseCheckpointOKDelete(t *testing.T) { t.Fatalf("WAL mode enabled") } defer db.Close() - if err := db.Checkpoint(); err != nil { + if err := db.Checkpoint(CheckpointTruncate); err != nil { t.Fatalf("failed to checkpoint database in DELETE mode: %s", err.Error()) } } @@ -839,7 +839,7 @@ func test_FileCreationOnDisk(t *testing.T, db *DB) { // Confirm checkpoint works on all types of on-disk databases. Worst case, this // should be ignored. - if err := db.Checkpoint(); err != nil { + if err := db.Checkpoint(CheckpointRestart); err != nil { t.Fatalf("failed to checkpoint database in DELETE mode: %s", err.Error()) } } diff --git a/db/state.go b/db/state.go index 8e1af562..ba3282fe 100644 --- a/db/state.go +++ b/db/state.go @@ -223,7 +223,7 @@ func ReplayWAL(path string, wals []string, deleteMode bool) error { if err != nil { return err } - if err := db.Checkpoint(); err != nil { + if err := db.Checkpoint(CheckpointTruncate); err != nil { return fmt.Errorf("checkpoint WAL %s: %s", wal, err.Error()) } diff --git a/db/state_test.go b/db/state_test.go index f188b4d5..21892483 100644 --- a/db/state_test.go +++ b/db/state_test.go @@ -220,7 +220,7 @@ func Test_WALReplayOK(t *testing.T) { } mustCopyFile(replayDBPath, dbPath) mustCopyFile(filepath.Join(replayDir, walFile+"_001"), walPath) - if err := db.Checkpoint(); err != nil { + if err := db.Checkpoint(CheckpointTruncate); err != nil { t.Fatalf("failed to checkpoint database in WAL mode: %s", err.Error()) } @@ -233,7 +233,7 @@ func Test_WALReplayOK(t *testing.T) { t.Fatalf("WAL file at %s does not exist", walPath) } mustCopyFile(filepath.Join(replayDir, walFile+"_002"), walPath) - if err := db.Checkpoint(); err != nil { + if err := db.Checkpoint(CheckpointTruncate); err != nil { t.Fatalf("failed to checkpoint database in WAL mode: %s", err.Error()) } @@ -321,7 +321,7 @@ func Test_WALReplayOK_Complex(t *testing.T) { if _, err := srcDB.ExecuteStringStmt("CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)"); err != nil { t.Fatalf("failed to create table: %s", err.Error()) } - if err := srcDB.Checkpoint(); err != nil { + if err := srcDB.Checkpoint(CheckpointTruncate); err != nil { t.Fatalf("failed to checkpoint database in WAL mode: %s", err.Error()) } mustCopyFile(dstPath, srcPath) @@ -353,7 +353,7 @@ func Test_WALReplayOK_Complex(t *testing.T) { dstWALPath := fmt.Sprintf("%s-%d", dstPath, i) mustCopyFile(dstWALPath, srcWALPath) dstWALs = append(dstWALs, dstWALPath) - if err := srcDB.Checkpoint(); err != nil { + if err := srcDB.Checkpoint(CheckpointTruncate); err != nil { t.Fatalf("failed to checkpoint database in WAL mode: %s", err.Error()) } } @@ -369,7 +369,7 @@ func Test_WALReplayOK_Complex(t *testing.T) { dstWALPath := fmt.Sprintf("%s-postdelete", dstPath) mustCopyFile(dstWALPath, srcWALPath) dstWALs = append(dstWALs, dstWALPath) - if err := srcDB.Checkpoint(); err != nil { + if err := srcDB.Checkpoint(CheckpointTruncate); err != nil { t.Fatalf("failed to checkpoint database in WAL mode: %s", err.Error()) } @@ -379,7 +379,7 @@ func Test_WALReplayOK_Complex(t *testing.T) { dstWALPath = fmt.Sprintf("%s-postupdate", dstPath) mustCopyFile(dstWALPath, srcWALPath) dstWALs = append(dstWALs, dstWALPath) - if err := srcDB.Checkpoint(); err != nil { + if err := srcDB.Checkpoint(CheckpointTruncate); err != nil { t.Fatalf("failed to checkpoint database in WAL mode: %s", err.Error()) } @@ -394,7 +394,7 @@ func Test_WALReplayOK_Complex(t *testing.T) { dstWALPath = fmt.Sprintf("%s-create-tables", dstPath) mustCopyFile(dstWALPath, srcWALPath) dstWALs = append(dstWALs, dstWALPath) - if err := srcDB.Checkpoint(); err != nil { + if err := srcDB.Checkpoint(CheckpointTruncate); err != nil { t.Fatalf("failed to checkpoint database in WAL mode: %s", err.Error()) } @@ -407,7 +407,7 @@ func Test_WALReplayOK_Complex(t *testing.T) { dstWALPath = fmt.Sprintf("%s-post-create-tables", dstPath) mustCopyFile(dstWALPath, srcWALPath) dstWALs = append(dstWALs, dstWALPath) - if err := srcDB.Checkpoint(); err != nil { + if err := srcDB.Checkpoint(CheckpointTruncate); err != nil { t.Fatalf("failed to checkpoint database in WAL mode: %s", err.Error()) } diff --git a/db/swappable_db.go b/db/swappable_db.go index e6ca7533..3e2107e9 100644 --- a/db/swappable_db.go +++ b/db/swappable_db.go @@ -124,10 +124,10 @@ func (s *SwappableDB) StmtReadOnly(sql string) (bool, error) { } // Checkpoint calls Checkpoint on the underlying database. -func (s *SwappableDB) Checkpoint() error { +func (s *SwappableDB) Checkpoint(mode CheckpointMode) error { s.dbMu.RLock() defer s.dbMu.RUnlock() - return s.db.Checkpoint() + return s.db.Checkpoint(mode) } // Path calls Path on the underlying database. diff --git a/store/state.go b/store/state.go index dd27280c..07ca8a97 100644 --- a/store/state.go +++ b/store/state.go @@ -143,7 +143,7 @@ func RecoverNode(dataDir string, logger *log.Logger, logs raft.LogStore, stable // Create a new snapshot, placing the configuration in as if it was // committed at index 1. - if err := db.Checkpoint(); err != nil { + if err := db.Checkpoint(sql.CheckpointTruncate); err != nil { return fmt.Errorf("failed to checkpoint database: %s", err) } tmpDBFD, err := os.Open(tmpDBPath) diff --git a/store/store.go b/store/store.go index a9f933c8..1d0f75c0 100644 --- a/store/store.go +++ b/store/store.go @@ -1903,7 +1903,7 @@ func (s *Store) fsmSnapshot() (fSnap raft.FSMSnapshot, retErr error) { var fsmSnapshot raft.FSMSnapshot if fullNeeded { - if err := s.db.Checkpoint(); err != nil { + if err := s.db.Checkpoint(sql.CheckpointTruncate); err != nil { stats.Add(numFullCheckpointFailed, 1) return nil, err } @@ -1942,7 +1942,7 @@ func (s *Store) fsmSnapshot() (fSnap raft.FSMSnapshot, retErr error) { } stats.Get(snapshotWALSize).(*expvar.Int).Set(int64(compactedBuf.Len())) stats.Get(snapshotPrecompactWALSize).(*expvar.Int).Set(walSz) - if err := s.db.Checkpoint(); err != nil { + if err := s.db.Checkpoint(sql.CheckpointTruncate); err != nil { stats.Add(numWALCheckpointFailed, 1) // Failing to checkpoint the WAL leaves the main database in an inconsistent // state (if a WAL file was partially checkpointed, then the next WAL file will not From cf49cb1fc46fb20d817e99fe54ef815a77fe4a35 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Sat, 27 Jan 2024 17:56:35 -0500 Subject: [PATCH 02/17] Only offer support checkpointing modes --- db/db.go | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/db/db.go b/db/db.go index 7aaecfa6..588bb5de 100644 --- a/db/db.go +++ b/db/db.go @@ -59,20 +59,14 @@ var ( type CheckpointMode int const ( - // CheckpointPassive instructs the checkpoint to run in passive mode. - CheckpointPassive CheckpointMode = iota - // CheckpointFull instructs the checkpoint to run in full mode. - CheckpointFull // CheckpointRestart instructs the checkpoint to run in restart mode. - CheckpointRestart + CheckpointRestart CheckpointMode = iota // CheckpointTruncate instructs the checkpoint to run in truncate mode. CheckpointTruncate ) var ( checkpointPRAGMAs = map[CheckpointMode]string{ - CheckpointPassive: "PRAGMA wal_checkpoint(PASSIVE)", - CheckpointFull: "PRAGMA wal_checkpoint(FULL)", CheckpointRestart: "PRAGMA wal_checkpoint(RESTART)", CheckpointTruncate: "PRAGMA wal_checkpoint(TRUNCATE)", } From 4bfd41411ea0d37c26e95b349fc8f2c6733e4c1a Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Sat, 27 Jan 2024 17:57:39 -0500 Subject: [PATCH 03/17] CHANGELOG --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index a346054e..7926c381 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +## 8.18.2 (unreleased) +### Implementation changes and bug fixes +- [PR #1636](https://github.com/rqlite/rqlite/pull/1636): Move to explicit choice of SQLite Checkpointing mode. + ## 8.18.1 (January 26th 2024) ### Implementation changes and bug fixes - [PR #1633](https://github.com/rqlite/rqlite/pull/1633): Improve error messages for internode communication failures. From a987ec2e289f566212f5b5be41b9673d9c48325a Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Sat, 27 Jan 2024 18:08:41 -0500 Subject: [PATCH 04/17] Remove unneeded SetFullNeeded post checkpoint fail --- store/store.go | 9 ------ store/store_test.go | 70 --------------------------------------------- 2 files changed, 79 deletions(-) diff --git a/store/store.go b/store/store.go index a9f933c8..2bad5aa3 100644 --- a/store/store.go +++ b/store/store.go @@ -1944,15 +1944,6 @@ func (s *Store) fsmSnapshot() (fSnap raft.FSMSnapshot, retErr error) { stats.Get(snapshotPrecompactWALSize).(*expvar.Int).Set(walSz) if err := s.db.Checkpoint(); err != nil { stats.Add(numWALCheckpointFailed, 1) - // Failing to checkpoint the WAL leaves the main database in an inconsistent - // state (if a WAL file was partially checkpointed, then the next WAL file will not - // be in sequence with what is in the Snapshot store), so attempt a Full snapshot next - // time. - if err := s.snapshotStore.SetFullNeeded(); err != nil { - // Give up! - s.logger.Fatalf("failed to set full snapshot needed after failed WAL checkpoint: %s", - err.Error()) - } return nil, err } } diff --git a/store/store_test.go b/store/store_test.go index 2ea6a7a1..4a6ffcd2 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -1813,76 +1813,6 @@ func Test_SingleNode_WALTriggeredSnapshot(t *testing.T) { } } -// Test_OpenStoreSingleNode_WALCheckpointFail tests that a WAL checkpoint -// failure will trigger a full snapshot. -func Test_OpenStoreSingleNode_WALCheckpointFail(t *testing.T) { - s, ln := mustNewStore(t) - defer s.Close(true) - defer ln.Close() - if err := s.Open(); err != nil { - t.Fatalf("failed to open single-node store: %s", err.Error()) - } - defer s.Close(true) - if err := s.Bootstrap(NewServer(s.ID(), s.Addr(), true)); err != nil { - t.Fatalf("failed to bootstrap single-node store: %s", err.Error()) - } - if _, err := s.WaitForLeader(10 * time.Second); err != nil { - t.Fatalf("Error waiting for leader: %s", err) - } - - er := executeRequestFromStrings([]string{ - `CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)`, - }, false, false) - _, err := s.Execute(er) - if err != nil { - t.Fatalf("failed to execute on single node: %s", err.Error()) - } - for i := 0; i < 100; i++ { - _, err := s.Execute(executeRequestFromString(`INSERT INTO foo(name) VALUES("fiona")`, false, false)) - if err != nil { - t.Fatalf("failed to execute INSERT on single node: %s", err.Error()) - } - } - - if err := s.Snapshot(0); err != nil { - t.Fatalf("failed to snapshot store: %s", err.Error()) - } - if fn, err := s.snapshotStore.FullNeeded(); err != nil { - t.Fatalf("failed to determine full snapshot needed: %s", err.Error()) - } else if fn { - t.Fatalf("full snapshot marked as needed") - } - - for i := 0; i < 100; i++ { - _, err := s.Execute(executeRequestFromString(`INSERT INTO foo(name) VALUES("fiona")`, false, false)) - if err != nil { - t.Fatalf("failed to execute INSERT on single node: %s", err.Error()) - } - } - - // Do another snapshot, which should trigger a WAL checkpoint. - // However, open the SQLite file and start a transaction, causing - // the checkpoint to fail. - db, err := db.Open(s.dbPath, false, true) - if err != nil { - t.Fatalf("failed to open SQLite database: %s", err.Error()) - } - defer db.Close() - _, err = db.ExecuteStringStmt("BEGIN TRANSACTION; SELECT * FROM foo") - if err != nil { - t.Fatalf("failed to begin transaction: %s", err.Error()) - } - - if err := s.Snapshot(0); err == nil { - t.Fatalf("expected error snapshotting store") - } - if fn, err := s.snapshotStore.FullNeeded(); err != nil { - t.Fatalf("failed to determine full snapshot needed: %s", err.Error()) - } else if !fn { - t.Fatalf("full snapshot should be marked as needed") - } -} - func Test_OpenStoreSingleNode_VacuumTimes(t *testing.T) { s0, ln0 := mustNewStore(t) defer s0.Close(true) From fd1b4bc5e2f9546b2ef7221584a67f750dbe177f Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Sat, 27 Jan 2024 18:10:08 -0500 Subject: [PATCH 05/17] CHANGELOG --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7926c381..421e6901 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ ## 8.18.2 (unreleased) ### Implementation changes and bug fixes - [PR #1636](https://github.com/rqlite/rqlite/pull/1636): Move to explicit choice of SQLite Checkpointing mode. +- [PR #1637](https://github.com/rqlite/rqlite/pull/1637): Remove unneeded SetFullNeeded post WAL checkpoint failure. ## 8.18.1 (January 26th 2024) ### Implementation changes and bug fixes From 1c5f4a8f0d4841d431d59c87230e90bdcebec020 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Sat, 27 Jan 2024 18:25:35 -0500 Subject: [PATCH 06/17] Unit test DB showing RESTART doesn't alter WAL --- db/db_checkpoint_test.go | 117 +++++++++++++++++++++++++++++++++++++++ db/db_test.go | 41 -------------- 2 files changed, 117 insertions(+), 41 deletions(-) create mode 100644 db/db_checkpoint_test.go diff --git a/db/db_checkpoint_test.go b/db/db_checkpoint_test.go new file mode 100644 index 00000000..e7b60dda --- /dev/null +++ b/db/db_checkpoint_test.go @@ -0,0 +1,117 @@ +package db + +import ( + "bytes" + "os" + "testing" +) + +// Test_WALDatabaseCheckpointOKNoWAL tests that a checkpoint succeeds +// even when no WAL file exists. +func Test_WALDatabaseCheckpointOKNoWAL(t *testing.T) { + path := mustTempFile() + defer os.Remove(path) + + db, err := Open(path, false, true) + if err != nil { + t.Fatalf("failed to open database in WAL mode: %s", err.Error()) + } + if !db.WALEnabled() { + t.Fatalf("WAL mode not enabled") + } + if fileExists(db.WALPath()) { + t.Fatalf("WAL file exists when no writes have happened") + } + defer db.Close() + if err := db.Checkpoint(CheckpointTruncate); err != nil { + t.Fatalf("failed to checkpoint database in WAL mode with non-existent WAL: %s", err.Error()) + } +} + +// Test_WALDatabaseCheckpointOKDelete tests that a checkpoint returns no error +// even when the database is opened in DELETE mode. +func Test_WALDatabaseCheckpointOKDelete(t *testing.T) { + path := mustTempFile() + defer os.Remove(path) + + db, err := Open(path, false, false) + if err != nil { + t.Fatalf("failed to open database in DELETE mode: %s", err.Error()) + } + if db.WALEnabled() { + t.Fatalf("WAL mode enabled") + } + defer db.Close() + if err := db.Checkpoint(CheckpointTruncate); err != nil { + t.Fatalf("failed to checkpoint database in DELETE mode: %s", err.Error()) + } +} + +// Test_WALDatabaseCheckpoint_Restart tests that a checkpoint restart +// returns no error and that the WAL file is not modified even though +// all the WAL pages are copied to the database file. Then Truncate +// is called and the WAL file is deleted. +func Test_WALDatabaseCheckpoint_RestartTruncate(t *testing.T) { + path := mustTempFile() + defer os.Remove(path) + db, err := Open(path, false, true) + if err != nil { + t.Fatalf("failed to open database in DELETE mode: %s", err.Error()) + } + + _, err = db.ExecuteStringStmt(`CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)`) + if err != nil { + t.Fatalf("failed to execute on single node: %s", err.Error()) + } + for i := 0; i < 100; i++ { + _, err := db.ExecuteStringStmt(`INSERT INTO foo(name) VALUES("fiona")`) + if err != nil { + t.Fatalf("failed to execute INSERT on single node: %s", err.Error()) + } + } + + walPreBytes, err := os.ReadFile(db.WALPath()) + if err != nil { + t.Fatalf("failed to read wal file: %s", err.Error()) + } + if err := db.Checkpoint(CheckpointRestart); err != nil { + t.Fatalf("failed to checkpoint database in DELETE mode: %s", err.Error()) + } + walPostBytes, err := os.ReadFile(db.WALPath()) + if err != nil { + t.Fatalf("failed to read wal file: %s", err.Error()) + } + if !bytes.Equal(walPreBytes, walPostBytes) { + t.Fatalf("wal files should be identical after checkpoint restart") + } + + // query the data to make sure all is well. + rows, err := db.QueryStringStmt(`SELECT COUNT(*) FROM foo`) + if err != nil { + t.Fatalf("failed to execute query on single node: %s", err.Error()) + } + if exp, got := `[{"columns":["COUNT(*)"],"types":["integer"],"values":[[100]]}]`, asJSON(rows); exp != got { + t.Fatalf("expected %s, got %s", exp, got) + } + + if err := db.Checkpoint(CheckpointTruncate); err != nil { + t.Fatalf("failed to checkpoint database in DELETE mode: %s", err.Error()) + } + sz, err := fileSize(db.WALPath()) + if err != nil { + t.Fatalf("wal file should be deleted after checkpoint truncate") + } + if sz != 0 { + t.Fatalf("wal file should be zero length after checkpoint truncate") + } + + // query the data to make sure all is still well. + rows, err = db.QueryStringStmt(`SELECT COUNT(*) FROM foo`) + if err != nil { + t.Fatalf("failed to execute query on single node: %s", err.Error()) + } + if exp, got := `[{"columns":["COUNT(*)"],"types":["integer"],"values":[[100]]}]`, asJSON(rows); exp != got { + t.Fatalf("expected %s, got %s", exp, got) + } + +} diff --git a/db/db_test.go b/db/db_test.go index 72f31243..70c6dd9b 100644 --- a/db/db_test.go +++ b/db/db_test.go @@ -681,47 +681,6 @@ func Test_WALDatabaseCreatedOK(t *testing.T) { } } -// Test_WALDatabaseCheckpointOKNoWAL tests that a checkpoint succeeds -// even when no WAL file exists. -func Test_WALDatabaseCheckpointOKNoWAL(t *testing.T) { - path := mustTempFile() - defer os.Remove(path) - - db, err := Open(path, false, true) - if err != nil { - t.Fatalf("failed to open database in WAL mode: %s", err.Error()) - } - if !db.WALEnabled() { - t.Fatalf("WAL mode not enabled") - } - if fileExists(db.WALPath()) { - t.Fatalf("WAL file exists when no writes have happened") - } - defer db.Close() - if err := db.Checkpoint(CheckpointTruncate); err != nil { - t.Fatalf("failed to checkpoint database in WAL mode with non-existent WAL: %s", err.Error()) - } -} - -// Test_WALDatabaseCheckpointOKDelete tests that a checkpoint returns no error -// even when the database is opened in DELETE mode. -func Test_WALDatabaseCheckpointOKDelete(t *testing.T) { - path := mustTempFile() - defer os.Remove(path) - - db, err := Open(path, false, false) - if err != nil { - t.Fatalf("failed to open database in DELETE mode: %s", err.Error()) - } - if db.WALEnabled() { - t.Fatalf("WAL mode enabled") - } - defer db.Close() - if err := db.Checkpoint(CheckpointTruncate); err != nil { - t.Fatalf("failed to checkpoint database in DELETE mode: %s", err.Error()) - } -} - // Test_WALDatabaseCreatedOKFromDELETE tests that a WAL database is created properly, // even when supplied with a DELETE-mode database. func Test_WALDatabaseCreatedOKFromDELETE(t *testing.T) { From 8093b818a86935571a750470bdac6bc7f3f8eb0a Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Sat, 27 Jan 2024 18:29:21 -0500 Subject: [PATCH 07/17] Set Checkpoint Retry interval to 20ms --- db/db.go | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/db/db.go b/db/db.go index 588bb5de..e9de59b6 100644 --- a/db/db.go +++ b/db/db.go @@ -22,10 +22,11 @@ import ( ) const ( - SQLiteHeaderSize = 32 - bkDelay = 250 - sizeAtOpenWarn = 1024 * 1024 * 1024 - durToOpenLog = 2 * time.Second + SQLiteHeaderSize = 32 + bkDelay = 250 + sizeAtOpenWarn = 1024 * 1024 * 1024 + durToOpenLog = 2 * time.Second + checkpointRetryDelay = 20 * time.Millisecond ) const ( @@ -328,8 +329,8 @@ func (db *DB) Checkpoint(mode CheckpointMode) error { } // CheckpointWithTimeout performs a WAL checkpoint. If the checkpoint does not -// complete within the given duration, an error is returned. If the duration is 0, -// the checkpoint will be attempted only once. +// run to completion within the given duration, an error is returned. If the +// duration is 0, the checkpoint will be attempted only once. func (db *DB) CheckpointWithTimeout(mode CheckpointMode, dur time.Duration) (err error) { start := time.Now() defer func() { @@ -368,8 +369,7 @@ func (db *DB) CheckpointWithTimeout(mode CheckpointMode, dur time.Duration) (err return err } - var lastError error - t := time.NewTicker(100 * time.Millisecond) + t := time.NewTicker(checkpointRetryDelay) defer t.Stop() for { select { @@ -377,9 +377,8 @@ func (db *DB) CheckpointWithTimeout(mode CheckpointMode, dur time.Duration) (err if err := f(); err == nil { return nil } - lastError = err case <-time.After(dur): - return fmt.Errorf("checkpoint timeout: %v", lastError) + return ErrCheckpointTimeout } } } From 5694556ed2eff2d4c116a017d93550a50fa03e6a Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Sat, 27 Jan 2024 18:49:41 -0500 Subject: [PATCH 08/17] Test Restart Checkpoint timeouts --- db/db.go | 15 +++++++--- db/db_checkpoint_test.go | 60 ++++++++++++++++++++++++++++++++++++---- 2 files changed, 65 insertions(+), 10 deletions(-) diff --git a/db/db.go b/db/db.go index e9de59b6..3fcb0b27 100644 --- a/db/db.go +++ b/db/db.go @@ -163,6 +163,11 @@ func Open(dbPath string, fkEnabled, wal bool) (retDB *DB, retErr error) { return nil, fmt.Errorf("disable autocheckpointing: %s", err.Error()) } + // Unset any busy_timeout + if _, err := rwDB.Exec("PRAGMA busy_timeout=0"); err != nil { + return nil, fmt.Errorf("disable busy_timeout: %s", err.Error()) + } + roDSN := MakeDSN(dbPath, ModeReadOnly, fkEnabled, wal) roDB, err := sql.Open("sqlite3", roDSN) if err != nil { @@ -369,15 +374,17 @@ func (db *DB) CheckpointWithTimeout(mode CheckpointMode, dur time.Duration) (err return err } - t := time.NewTicker(checkpointRetryDelay) - defer t.Stop() + ticker := time.NewTicker(checkpointRetryDelay) + timer := time.NewTimer(dur) + defer ticker.Stop() + defer timer.Stop() for { select { - case <-t.C: + case <-ticker.C: if err := f(); err == nil { return nil } - case <-time.After(dur): + case <-timer.C: return ErrCheckpointTimeout } } diff --git a/db/db_checkpoint_test.go b/db/db_checkpoint_test.go index e7b60dda..111ea716 100644 --- a/db/db_checkpoint_test.go +++ b/db/db_checkpoint_test.go @@ -4,6 +4,7 @@ import ( "bytes" "os" "testing" + "time" ) // Test_WALDatabaseCheckpointOKNoWAL tests that a checkpoint succeeds @@ -56,14 +57,15 @@ func Test_WALDatabaseCheckpoint_RestartTruncate(t *testing.T) { defer os.Remove(path) db, err := Open(path, false, true) if err != nil { - t.Fatalf("failed to open database in DELETE mode: %s", err.Error()) + t.Fatalf("failed to open database in WAL mode: %s", err.Error()) } + defer db.Close() _, err = db.ExecuteStringStmt(`CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)`) if err != nil { t.Fatalf("failed to execute on single node: %s", err.Error()) } - for i := 0; i < 100; i++ { + for i := 0; i < 50; i++ { _, err := db.ExecuteStringStmt(`INSERT INTO foo(name) VALUES("fiona")`) if err != nil { t.Fatalf("failed to execute INSERT on single node: %s", err.Error()) @@ -75,7 +77,7 @@ func Test_WALDatabaseCheckpoint_RestartTruncate(t *testing.T) { t.Fatalf("failed to read wal file: %s", err.Error()) } if err := db.Checkpoint(CheckpointRestart); err != nil { - t.Fatalf("failed to checkpoint database in DELETE mode: %s", err.Error()) + t.Fatalf("failed to checkpoint database: %s", err.Error()) } walPostBytes, err := os.ReadFile(db.WALPath()) if err != nil { @@ -90,12 +92,12 @@ func Test_WALDatabaseCheckpoint_RestartTruncate(t *testing.T) { if err != nil { t.Fatalf("failed to execute query on single node: %s", err.Error()) } - if exp, got := `[{"columns":["COUNT(*)"],"types":["integer"],"values":[[100]]}]`, asJSON(rows); exp != got { + if exp, got := `[{"columns":["COUNT(*)"],"types":["integer"],"values":[[50]]}]`, asJSON(rows); exp != got { t.Fatalf("expected %s, got %s", exp, got) } if err := db.Checkpoint(CheckpointTruncate); err != nil { - t.Fatalf("failed to checkpoint database in DELETE mode: %s", err.Error()) + t.Fatalf("failed to checkpoint database: %s", err.Error()) } sz, err := fileSize(db.WALPath()) if err != nil { @@ -110,8 +112,54 @@ func Test_WALDatabaseCheckpoint_RestartTruncate(t *testing.T) { if err != nil { t.Fatalf("failed to execute query on single node: %s", err.Error()) } - if exp, got := `[{"columns":["COUNT(*)"],"types":["integer"],"values":[[100]]}]`, asJSON(rows); exp != got { + if exp, got := `[{"columns":["COUNT(*)"],"types":["integer"],"values":[[50]]}]`, asJSON(rows); exp != got { + t.Fatalf("expected %s, got %s", exp, got) + } +} + +func Test_WALDatabaseCheckpoint_RestartTimeout(t *testing.T) { + path := mustTempFile() + defer os.Remove(path) + db, err := Open(path, false, true) + if err != nil { + t.Fatalf("failed to open database in WAL mode: %s", err.Error()) + } + defer db.Close() + + _, err = db.ExecuteStringStmt(`CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)`) + if err != nil { + t.Fatalf("failed to execute on single node: %s", err.Error()) + } + for i := 0; i < 50; i++ { + _, err := db.ExecuteStringStmt(`INSERT INTO foo(name) VALUES("fiona")`) + if err != nil { + t.Fatalf("failed to execute INSERT on single node: %s", err.Error()) + } + } + + blockingDB, err := Open(path, false, true) + if err != nil { + t.Fatalf("failed to open blocking database in WAL mode: %s", err.Error()) + } + defer blockingDB.Close() + _, err = blockingDB.QueryStringStmt(`BEGIN TRANSACTION`) + if err != nil { + t.Fatalf("failed to execute query on single node: %s", err.Error()) + } + rows, err := blockingDB.QueryStringStmt(`SELECT COUNT(*) FROM foo`) + if err != nil { + t.Fatalf("failed to execute query on single node: %s", err.Error()) + } + if exp, got := `[{"columns":["COUNT(*)"],"types":["integer"],"values":[[50]]}]`, asJSON(rows); exp != got { t.Fatalf("expected %s, got %s", exp, got) } + if err := db.CheckpointWithTimeout(CheckpointTruncate, 250*time.Millisecond); err != ErrCheckpointTimeout { + t.Fatal("expected timeout error") + } + + blockingDB.Close() + if err := db.CheckpointWithTimeout(CheckpointTruncate, 250*time.Millisecond); err != nil { + t.Fatalf("failed to checkpoint database: %s", err.Error()) + } } From 19276e6f1ec72a13cf9934b3e0ce1633b1d82b18 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Sat, 27 Jan 2024 18:58:33 -0500 Subject: [PATCH 09/17] Use dedicated connection for checkpointing --- db/db.go | 30 ++++++++++++++++++++---------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/db/db.go b/db/db.go index 3fcb0b27..6d26248c 100644 --- a/db/db.go +++ b/db/db.go @@ -111,8 +111,9 @@ type DB struct { fkEnabled bool // Foreign key constraints enabled wal bool - rwDB *sql.DB // Database connection for database reads and writes. - roDB *sql.DB // Database connection database reads. + rwDB *sql.DB // Database connection for database reads and writes. + roDB *sql.DB // Database connection database reads. + chkDB *sql.DB // Database connection for checkpointing. rwDSN string // DSN used for read-write connection roDSN string // DSN used for read-only connections @@ -152,22 +153,29 @@ func Open(dbPath string, fkEnabled, wal bool) (retDB *DB, retErr error) { logger.Printf("database file is %s, SQLite may take longer to open it", humanize.Bytes(uint64(sz))) } + ///////////////////////////////////////////////////////////////////////// + // Main RW connection rwDSN := MakeDSN(dbPath, ModeReadWrite, fkEnabled, wal) rwDB, err := sql.Open("sqlite3", rwDSN) if err != nil { return nil, fmt.Errorf("open: %s", err.Error()) } - - // Critical that rqlite has full control over the checkpointing process. if _, err := rwDB.Exec("PRAGMA wal_autocheckpoint=0"); err != nil { return nil, fmt.Errorf("disable autocheckpointing: %s", err.Error()) } - // Unset any busy_timeout - if _, err := rwDB.Exec("PRAGMA busy_timeout=0"); err != nil { - return nil, fmt.Errorf("disable busy_timeout: %s", err.Error()) + ///////////////////////////////////////////////////////////////////////// + // Checkpointing connection + chkDB, err := sql.Open("sqlite3", rwDSN) + if err != nil { + return nil, fmt.Errorf("open checkpointing database: %s", err.Error()) + } + if _, err := chkDB.Exec("PRAGMA busy_timeout=0"); err != nil { + return nil, fmt.Errorf("disable busy_timeout on checkpointing connection: %s", err.Error()) } + ///////////////////////////////////////////////////////////////////////// + // Read-only connection roDSN := MakeDSN(dbPath, ModeReadOnly, fkEnabled, wal) roDB, err := sql.Open("sqlite3", roDSN) if err != nil { @@ -192,6 +200,7 @@ func Open(dbPath string, fkEnabled, wal bool) (retDB *DB, retErr error) { wal: wal, rwDB: rwDB, roDB: roDB, + chkDB: chkDB, rwDSN: rwDSN, roDSN: roDSN, logger: logger, @@ -352,7 +361,7 @@ func (db *DB) CheckpointWithTimeout(mode CheckpointMode, dur time.Duration) (err var nMoved int f := func() error { - err := db.rwDB.QueryRow(checkpointPRAGMAs[mode]).Scan(&ok, &nPages, &nMoved) + err := db.chkDB.QueryRow(checkpointPRAGMAs[mode]).Scan(&ok, &nPages, &nMoved) stats.Add(numCheckpointedPages, int64(nPages)) stats.Add(numCheckpointedMoves, int64(nMoved)) if err != nil { @@ -1104,8 +1113,9 @@ func (db *DB) StmtReadOnlyWithConn(sql string, conn *sql.Conn) (bool, error) { func (db *DB) pragmas() (map[string]interface{}, error) { conns := map[string]*sql.DB{ - "rw": db.rwDB, - "ro": db.roDB, + "rw": db.rwDB, + "ro": db.roDB, + "chk": db.chkDB, } connsMap := make(map[string]interface{}) From 3cfb4411b3ac612bee32e893d868813554956ca3 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Sat, 27 Jan 2024 19:34:38 -0500 Subject: [PATCH 10/17] PRAGMA busy timeout for Checkpoints --- db/db.go | 43 +++++++++++-------------------------------- 1 file changed, 11 insertions(+), 32 deletions(-) diff --git a/db/db.go b/db/db.go index 6d26248c..d9c96e2b 100644 --- a/db/db.go +++ b/db/db.go @@ -360,43 +360,22 @@ func (db *DB) CheckpointWithTimeout(mode CheckpointMode, dur time.Duration) (err var nPages int var nMoved int - f := func() error { - err := db.chkDB.QueryRow(checkpointPRAGMAs[mode]).Scan(&ok, &nPages, &nMoved) - stats.Add(numCheckpointedPages, int64(nPages)) - stats.Add(numCheckpointedMoves, int64(nMoved)) - if err != nil { - return fmt.Errorf("error checkpointing WAL: %s", err.Error()) - } - if ok != 0 { - return fmt.Errorf("failed to completely checkpoint WAL (%d ok, %d pages, %d moved)", - ok, nPages, nMoved) + if dur > 0 { + if _, err := db.chkDB.Exec(fmt.Sprintf("PRAGMA busy_timeout=%d", dur.Milliseconds())); err != nil { + return fmt.Errorf("failed to set busy_timeout on checkpointing connection: %s", err.Error()) } - return nil } - // Try fast path - err = f() - if err == nil { - return nil - } - if dur == 0 { - return err + if err := db.chkDB.QueryRow(checkpointPRAGMAs[mode]).Scan(&ok, &nPages, &nMoved); err != nil { + return fmt.Errorf("error checkpointing WAL: %s", err.Error()) } - - ticker := time.NewTicker(checkpointRetryDelay) - timer := time.NewTimer(dur) - defer ticker.Stop() - defer timer.Stop() - for { - select { - case <-ticker.C: - if err := f(); err == nil { - return nil - } - case <-timer.C: - return ErrCheckpointTimeout - } + stats.Add(numCheckpointedPages, int64(nPages)) + stats.Add(numCheckpointedMoves, int64(nMoved)) + if ok != 0 { + return fmt.Errorf("failed to completely checkpoint WAL (%d ok, %d pages, %d moved)", + ok, nPages, nMoved) } + return nil } // DisableCheckpointing disables the automatic checkpointing that occurs when From cf1a525a245204b1ea7736372966caf26cea71db Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Sat, 27 Jan 2024 19:35:11 -0500 Subject: [PATCH 11/17] Move code around --- db/db.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/db/db.go b/db/db.go index d9c96e2b..f92ca5d6 100644 --- a/db/db.go +++ b/db/db.go @@ -356,16 +356,15 @@ func (db *DB) CheckpointWithTimeout(mode CheckpointMode, dur time.Duration) (err } }() - var ok int - var nPages int - var nMoved int - if dur > 0 { if _, err := db.chkDB.Exec(fmt.Sprintf("PRAGMA busy_timeout=%d", dur.Milliseconds())); err != nil { return fmt.Errorf("failed to set busy_timeout on checkpointing connection: %s", err.Error()) } } + var ok int + var nPages int + var nMoved int if err := db.chkDB.QueryRow(checkpointPRAGMAs[mode]).Scan(&ok, &nPages, &nMoved); err != nil { return fmt.Errorf("error checkpointing WAL: %s", err.Error()) } From 9aab1169bdbe91f058e85df95da96d6e5c2fa529 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Sat, 27 Jan 2024 19:39:59 -0500 Subject: [PATCH 12/17] Remove dedicated checkpointing DB connection --- db/db.go | 33 +++++++++++++-------------------- 1 file changed, 13 insertions(+), 20 deletions(-) diff --git a/db/db.go b/db/db.go index f92ca5d6..98cfdd96 100644 --- a/db/db.go +++ b/db/db.go @@ -111,9 +111,8 @@ type DB struct { fkEnabled bool // Foreign key constraints enabled wal bool - rwDB *sql.DB // Database connection for database reads and writes. - roDB *sql.DB // Database connection database reads. - chkDB *sql.DB // Database connection for checkpointing. + rwDB *sql.DB // Database connection for database reads and writes. + roDB *sql.DB // Database connection database reads. rwDSN string // DSN used for read-write connection roDSN string // DSN used for read-only connections @@ -164,16 +163,6 @@ func Open(dbPath string, fkEnabled, wal bool) (retDB *DB, retErr error) { return nil, fmt.Errorf("disable autocheckpointing: %s", err.Error()) } - ///////////////////////////////////////////////////////////////////////// - // Checkpointing connection - chkDB, err := sql.Open("sqlite3", rwDSN) - if err != nil { - return nil, fmt.Errorf("open checkpointing database: %s", err.Error()) - } - if _, err := chkDB.Exec("PRAGMA busy_timeout=0"); err != nil { - return nil, fmt.Errorf("disable busy_timeout on checkpointing connection: %s", err.Error()) - } - ///////////////////////////////////////////////////////////////////////// // Read-only connection roDSN := MakeDSN(dbPath, ModeReadOnly, fkEnabled, wal) @@ -200,7 +189,6 @@ func Open(dbPath string, fkEnabled, wal bool) (retDB *DB, retErr error) { wal: wal, rwDB: rwDB, roDB: roDB, - chkDB: chkDB, rwDSN: rwDSN, roDSN: roDSN, logger: logger, @@ -344,7 +332,8 @@ func (db *DB) Checkpoint(mode CheckpointMode) error { // CheckpointWithTimeout performs a WAL checkpoint. If the checkpoint does not // run to completion within the given duration, an error is returned. If the -// duration is 0, the checkpoint will be attempted only once. +// duration is 0, the busy timeout is not modified before executing the +// checkpoint. func (db *DB) CheckpointWithTimeout(mode CheckpointMode, dur time.Duration) (err error) { start := time.Now() defer func() { @@ -357,15 +346,20 @@ func (db *DB) CheckpointWithTimeout(mode CheckpointMode, dur time.Duration) (err }() if dur > 0 { - if _, err := db.chkDB.Exec(fmt.Sprintf("PRAGMA busy_timeout=%d", dur.Milliseconds())); err != nil { + if _, err := db.rwDB.Exec(fmt.Sprintf("PRAGMA busy_timeout=%d", dur.Milliseconds())); err != nil { return fmt.Errorf("failed to set busy_timeout on checkpointing connection: %s", err.Error()) } + defer func() { + if _, err := db.rwDB.Exec("PRAGMA busy_timeout=5000"); err != nil { + db.logger.Printf("failed to reset busy_timeout on checkpointing connection: %s", err.Error()) + } + }() } var ok int var nPages int var nMoved int - if err := db.chkDB.QueryRow(checkpointPRAGMAs[mode]).Scan(&ok, &nPages, &nMoved); err != nil { + if err := db.rwDB.QueryRow(checkpointPRAGMAs[mode]).Scan(&ok, &nPages, &nMoved); err != nil { return fmt.Errorf("error checkpointing WAL: %s", err.Error()) } stats.Add(numCheckpointedPages, int64(nPages)) @@ -1091,9 +1085,8 @@ func (db *DB) StmtReadOnlyWithConn(sql string, conn *sql.Conn) (bool, error) { func (db *DB) pragmas() (map[string]interface{}, error) { conns := map[string]*sql.DB{ - "rw": db.rwDB, - "ro": db.roDB, - "chk": db.chkDB, + "rw": db.rwDB, + "ro": db.roDB, } connsMap := make(map[string]interface{}) From ec91f1b7c573bb7af9ed6ae9d3564faeb22c3be4 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Sat, 27 Jan 2024 20:07:23 -0500 Subject: [PATCH 13/17] Fix checkpoint test --- db/db.go | 4 ---- db/db_checkpoint_test.go | 4 ++-- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/db/db.go b/db/db.go index 98cfdd96..ccd062b5 100644 --- a/db/db.go +++ b/db/db.go @@ -50,10 +50,6 @@ var ( // ErrWALReplayDirectoryMismatch is returned when the WAL file(s) are not in the same // directory as the database file. ErrWALReplayDirectoryMismatch = errors.New("WAL file(s) not in same directory as database file") - - // ErrCheckpointTimeout is returned when a checkpoint does not complete within the - // given duration. - ErrCheckpointTimeout = errors.New("checkpoint timeout") ) // CheckpointMode is the mode in which a checkpoint runs. diff --git a/db/db_checkpoint_test.go b/db/db_checkpoint_test.go index 111ea716..862af48a 100644 --- a/db/db_checkpoint_test.go +++ b/db/db_checkpoint_test.go @@ -154,8 +154,8 @@ func Test_WALDatabaseCheckpoint_RestartTimeout(t *testing.T) { t.Fatalf("expected %s, got %s", exp, got) } - if err := db.CheckpointWithTimeout(CheckpointTruncate, 250*time.Millisecond); err != ErrCheckpointTimeout { - t.Fatal("expected timeout error") + if err := db.CheckpointWithTimeout(CheckpointTruncate, 250*time.Millisecond); err == nil { + t.Fatal("expected error due to failure to checkpoint") } blockingDB.Close() From d39c0c83674dfca94d86f537f4d9b0787b1ae13f Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Sat, 27 Jan 2024 20:18:46 -0500 Subject: [PATCH 14/17] Restart, then Truncate, the WAL --- store/store.go | 109 ++++++++++++++++++++++++++++--------------------- 1 file changed, 63 insertions(+), 46 deletions(-) diff --git a/store/store.go b/store/store.go index 8733b2c3..3e9cc7f7 100644 --- a/store/store.go +++ b/store/store.go @@ -108,46 +108,47 @@ const ( ) const ( - numSnapshots = "num_snapshots" - numSnapshotsFailed = "num_snapshots_failed" - numUserSnapshots = "num_user_snapshots" - numUserSnapshotsFailed = "num_user_snapshots_failed" - numWALSnapshots = "num_wal_snapshots" - numWALSnapshotsFailed = "num_wal_snapshots_failed" - numSnapshotsFull = "num_snapshots_full" - numSnapshotsIncremental = "num_snapshots_incremental" - numFullCheckpointFailed = "num_full_checkpoint_failed" - numWALCheckpointFailed = "num_wal_checkpoint_failed" - numAutoVacuums = "num_auto_vacuums" - numAutoVacuumsFailed = "num_auto_vacuums_failed" - autoVacuumDuration = "auto_vacuum_duration" - numBoots = "num_boots" - numBackups = "num_backups" - numLoads = "num_loads" - numRestores = "num_restores" - numRestoresFailed = "num_restores_failed" - numAutoRestores = "num_auto_restores" - numAutoRestoresSkipped = "num_auto_restores_skipped" - numAutoRestoresFailed = "num_auto_restores_failed" - numRecoveries = "num_recoveries" - numProviderChecks = "num_provider_checks" - numProviderProvides = "num_provider_provides" - numProviderProvidesFail = "num_provider_provides_fail" - numUncompressedCommands = "num_uncompressed_commands" - numCompressedCommands = "num_compressed_commands" - numJoins = "num_joins" - numIgnoredJoins = "num_ignored_joins" - numRemovedBeforeJoins = "num_removed_before_joins" - numDBStatsErrors = "num_db_stats_errors" - snapshotCreateDuration = "snapshot_create_duration" - snapshotPersistDuration = "snapshot_persist_duration" - snapshotPrecompactWALSize = "snapshot_precompact_wal_size" - snapshotWALSize = "snapshot_wal_size" - leaderChangesObserved = "leader_changes_observed" - leaderChangesDropped = "leader_changes_dropped" - failedHeartbeatObserved = "failed_heartbeat_observed" - nodesReapedOK = "nodes_reaped_ok" - nodesReapedFailed = "nodes_reaped_failed" + numSnapshots = "num_snapshots" + numSnapshotsFailed = "num_snapshots_failed" + numUserSnapshots = "num_user_snapshots" + numUserSnapshotsFailed = "num_user_snapshots_failed" + numWALSnapshots = "num_wal_snapshots" + numWALSnapshotsFailed = "num_wal_snapshots_failed" + numSnapshotsFull = "num_snapshots_full" + numSnapshotsIncremental = "num_snapshots_incremental" + numFullCheckpointFailed = "num_full_checkpoint_failed" + numWALCheckpointRestartFailed = "num_wal_checkpoint_restart_failed" + numWALCheckpointTruncateFailed = "num_wal_checkpoint_truncate_failed" + numAutoVacuums = "num_auto_vacuums" + numAutoVacuumsFailed = "num_auto_vacuums_failed" + autoVacuumDuration = "auto_vacuum_duration" + numBoots = "num_boots" + numBackups = "num_backups" + numLoads = "num_loads" + numRestores = "num_restores" + numRestoresFailed = "num_restores_failed" + numAutoRestores = "num_auto_restores" + numAutoRestoresSkipped = "num_auto_restores_skipped" + numAutoRestoresFailed = "num_auto_restores_failed" + numRecoveries = "num_recoveries" + numProviderChecks = "num_provider_checks" + numProviderProvides = "num_provider_provides" + numProviderProvidesFail = "num_provider_provides_fail" + numUncompressedCommands = "num_uncompressed_commands" + numCompressedCommands = "num_compressed_commands" + numJoins = "num_joins" + numIgnoredJoins = "num_ignored_joins" + numRemovedBeforeJoins = "num_removed_before_joins" + numDBStatsErrors = "num_db_stats_errors" + snapshotCreateDuration = "snapshot_create_duration" + snapshotPersistDuration = "snapshot_persist_duration" + snapshotPrecompactWALSize = "snapshot_precompact_wal_size" + snapshotWALSize = "snapshot_wal_size" + leaderChangesObserved = "leader_changes_observed" + leaderChangesDropped = "leader_changes_dropped" + failedHeartbeatObserved = "failed_heartbeat_observed" + nodesReapedOK = "nodes_reaped_ok" + nodesReapedFailed = "nodes_reaped_failed" ) // stats captures stats for the Store. @@ -170,7 +171,8 @@ func ResetStats() { stats.Add(numSnapshotsFull, 0) stats.Add(numSnapshotsIncremental, 0) stats.Add(numFullCheckpointFailed, 0) - stats.Add(numWALCheckpointFailed, 0) + stats.Add(numWALCheckpointRestartFailed, 0) + stats.Add(numWALCheckpointTruncateFailed, 0) stats.Add(numAutoVacuums, 0) stats.Add(numAutoVacuumsFailed, 0) stats.Add(autoVacuumDuration, 0) @@ -1917,12 +1919,23 @@ func (s *Store) fsmSnapshot() (fSnap raft.FSMSnapshot, retErr error) { compactedBuf := bytes.NewBuffer(nil) var err error if pathExistsWithData(s.walPath) { + // Attempt to checkpoint everything into the main database file. Only + // if this works should we bother to compact-scan the WAL. It's possible + // it fails if some query is in progress. If it fails, return an error + // and Raft will retry later. But if it succeeds it means that all readers + // are reading from the main database file. + if err := s.db.Checkpoint(sql.CheckpointRestart); err != nil { + stats.Add(numWALCheckpointRestartFailed, 1) + return nil, err + } + + // Read a compacted version of the WAL into memory, and write it + // to the Snapshot store. walFD, err := os.Open(s.walPath) if err != nil { return nil, err } defer walFD.Close() // Make sure it closes. - scanner, err := wal.NewFastCompactingScanner(walFD) if err != nil { return nil, err @@ -1936,16 +1949,20 @@ func (s *Store) fsmSnapshot() (fSnap raft.FSMSnapshot, retErr error) { } walFD.Close() // We need it closed for the next step. + // Clean-up by truncating the WAL. This should be fast because all the pages + // have been checkpointed into the main database file, and writes are + // blocked during this process by Raft. In otherwords, the WAL file should + // be unchanged. walSz, err := fileSize(s.walPath) if err != nil { return nil, err } - stats.Get(snapshotWALSize).(*expvar.Int).Set(int64(compactedBuf.Len())) - stats.Get(snapshotPrecompactWALSize).(*expvar.Int).Set(walSz) if err := s.db.Checkpoint(sql.CheckpointTruncate); err != nil { - stats.Add(numWALCheckpointFailed, 1) - return nil, err + stats.Add(numWALCheckpointTruncateFailed, 1) + return nil, fmt.Errorf("failed to truncate WAL: %s", err.Error()) } + stats.Get(snapshotWALSize).(*expvar.Int).Set(int64(compactedBuf.Len())) + stats.Get(snapshotPrecompactWALSize).(*expvar.Int).Set(walSz) } fsmSnapshot = snapshot.NewSnapshot(io.NopCloser(compactedBuf)) if err != nil { From 240fb86e1397dc99c17898d89613406b52e0449c Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Sat, 27 Jan 2024 20:32:40 -0500 Subject: [PATCH 15/17] Clearer control for busy timeout --- db/db.go | 25 +++++++++++++++++++++++-- db/db_common_test.go | 31 +++++++++++++++++++++++++++++++ 2 files changed, 54 insertions(+), 2 deletions(-) diff --git a/db/db.go b/db/db.go index ccd062b5..f358e4ce 100644 --- a/db/db.go +++ b/db/db.go @@ -320,6 +320,22 @@ func (db *DB) WALSize() (int64, error) { return 0, err } +// SetBusyTimeout sets the busy timeout for the database. +func (db *DB) SetBusyTimeout(ms int) error { + _, err := db.rwDB.Exec(fmt.Sprintf("PRAGMA busy_timeout=%d", ms)) + return err +} + +// BusyTimeout returns the current busy timeout value. +func (db *DB) BusyTimeout() (int, error) { + var rwN int + err := db.rwDB.QueryRow("PRAGMA busy_timeout").Scan(&rwN) + if err != nil { + return 0, err + } + return rwN, err +} + // Checkpoint checkpoints the WAL file. If the WAL file is not enabled, this // function is a no-op. func (db *DB) Checkpoint(mode CheckpointMode) error { @@ -342,11 +358,16 @@ func (db *DB) CheckpointWithTimeout(mode CheckpointMode, dur time.Duration) (err }() if dur > 0 { - if _, err := db.rwDB.Exec(fmt.Sprintf("PRAGMA busy_timeout=%d", dur.Milliseconds())); err != nil { + bt, err := db.BusyTimeout() + if err != nil { + return fmt.Errorf("failed to get busy_timeout on checkpointing connection: %s", err.Error()) + } + if err := db.SetBusyTimeout(int(dur.Milliseconds())); err != nil { return fmt.Errorf("failed to set busy_timeout on checkpointing connection: %s", err.Error()) } defer func() { - if _, err := db.rwDB.Exec("PRAGMA busy_timeout=5000"); err != nil { + // Reset back to default + if _, err := db.rwDB.Exec(fmt.Sprintf("PRAGMA busy_timeout=%d", bt)); err != nil { db.logger.Printf("failed to reset busy_timeout on checkpointing connection: %s", err.Error()) } }() diff --git a/db/db_common_test.go b/db/db_common_test.go index bc4f15f0..2204ed14 100644 --- a/db/db_common_test.go +++ b/db/db_common_test.go @@ -2,14 +2,44 @@ package db import ( "errors" + "fmt" "os" "strings" "testing" command "github.com/rqlite/rqlite/v8/command/proto" + "github.com/rqlite/rqlite/v8/random" "github.com/rqlite/rqlite/v8/testdata/chinook" ) +func testBusyTimeout(t *testing.T, db *DB) { + rbt := random.Intn(10000) + _, err := db.ExecuteStringStmt(fmt.Sprintf("PRAGMA busy_timeout=%d", rbt)) + if err != nil { + t.Fatalf("failed to set busy_timeout: %s", err.Error()) + } + + bt, err := db.BusyTimeout() + if err != nil { + t.Fatalf("failed to get busy_timeout: %s", err.Error()) + } + if exp, got := rbt, bt; exp != got { + t.Fatalf("expected busy_timeout %d, got %d", exp, got) + } + + rbt2 := random.Intn(10000) + if err := db.SetBusyTimeout(rbt2); err != nil { + t.Fatalf("failed to set busy_timeout: %s", err.Error()) + } + bt, err = db.BusyTimeout() + if err != nil { + t.Fatalf("failed to get busy_timeout: %s", err.Error()) + } + if exp, got := rbt2, bt; exp != got { + t.Fatalf("expected busy_timeout %d, got %d", exp, got) + } +} + func testCompileOptions(t *testing.T, db *DB) { _, err := db.CompileOptions() if err != nil { @@ -1529,6 +1559,7 @@ func Test_DatabaseCommonOperations(t *testing.T) { name string testFunc func(*testing.T, *DB) }{ + {"BusyTimeout", testBusyTimeout}, {"SetSynchronousMode", testSetSynchronousMode}, {"CompileOptions", testCompileOptions}, {"TableNotExist", testTableNotExist}, From 96ab00574507044b79abd2b3b924a8cd973ee7a4 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Sat, 27 Jan 2024 20:34:10 -0500 Subject: [PATCH 16/17] Remove unneeded constant --- db/db.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/db/db.go b/db/db.go index f358e4ce..75a85fba 100644 --- a/db/db.go +++ b/db/db.go @@ -22,11 +22,10 @@ import ( ) const ( - SQLiteHeaderSize = 32 - bkDelay = 250 - sizeAtOpenWarn = 1024 * 1024 * 1024 - durToOpenLog = 2 * time.Second - checkpointRetryDelay = 20 * time.Millisecond + SQLiteHeaderSize = 32 + bkDelay = 250 + sizeAtOpenWarn = 1024 * 1024 * 1024 + durToOpenLog = 2 * time.Second ) const ( From 3167d13348d4b4019052a62ed5567be2423fa9e9 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Sat, 27 Jan 2024 20:35:25 -0500 Subject: [PATCH 17/17] Restore comment --- db/db.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/db/db.go b/db/db.go index 75a85fba..1436b410 100644 --- a/db/db.go +++ b/db/db.go @@ -154,6 +154,8 @@ func Open(dbPath string, fkEnabled, wal bool) (retDB *DB, retErr error) { if err != nil { return nil, fmt.Errorf("open: %s", err.Error()) } + + // Critical that rqlite has full control over the checkpointing process. if _, err := rwDB.Exec("PRAGMA wal_autocheckpoint=0"); err != nil { return nil, fmt.Errorf("disable autocheckpointing: %s", err.Error()) }