1
0
Fork 0

Move to explicit checkpoint type

master
Philip O'Toole 8 months ago
parent 355c4baa06
commit 958b67908b

@ -43,12 +43,39 @@ const (
numETx = "execute_transactions"
numQTx = "query_transactions"
numRTx = "request_transactions"
CheckpointQuery = "PRAGMA wal_checkpoint(TRUNCATE)" // rqlite WAL compaction requires truncation
)
var (
// ErrWALReplayDirectoryMismatch is returned when the WAL file(s) are not in the same
// directory as the database file.
ErrWALReplayDirectoryMismatch = errors.New("WAL file(s) not in same directory as database file")
// ErrCheckpointTimeout is returned when a checkpoint does not complete within the
// given duration.
ErrCheckpointTimeout = errors.New("checkpoint timeout")
)
// CheckpointMode is the mode in which a checkpoint runs.
type CheckpointMode int
const (
// CheckpointPassive instructs the checkpoint to run in passive mode.
CheckpointPassive CheckpointMode = iota
// CheckpointFull instructs the checkpoint to run in full mode.
CheckpointFull
// CheckpointRestart instructs the checkpoint to run in restart mode.
CheckpointRestart
// CheckpointTruncate instructs the checkpoint to run in truncate mode.
CheckpointTruncate
)
var (
checkpointPRAGMAs = map[CheckpointMode]string{
CheckpointPassive: "PRAGMA wal_checkpoint(PASSIVE)",
CheckpointFull: "PRAGMA wal_checkpoint(FULL)",
CheckpointRestart: "PRAGMA wal_checkpoint(RESTART)",
CheckpointTruncate: "PRAGMA wal_checkpoint(TRUNCATE)",
}
)
// DBVersion is the SQLite version.
@ -302,14 +329,14 @@ func (db *DB) WALSize() (int64, error) {
// Checkpoint checkpoints the WAL file. If the WAL file is not enabled, this
// function is a no-op.
func (db *DB) Checkpoint() error {
return db.CheckpointWithTimeout(0)
func (db *DB) Checkpoint(mode CheckpointMode) error {
return db.CheckpointWithTimeout(mode, 0)
}
// CheckpointWithTimeout performs a WAL checkpoint. If the checkpoint does not
// complete within the given duration, an error is returned. If the duration is 0,
// the checkpoint will be attempted only once.
func (db *DB) CheckpointWithTimeout(dur time.Duration) (err error) {
func (db *DB) CheckpointWithTimeout(mode CheckpointMode, dur time.Duration) (err error) {
start := time.Now()
defer func() {
if err != nil {
@ -325,7 +352,7 @@ func (db *DB) CheckpointWithTimeout(dur time.Duration) (err error) {
var nMoved int
f := func() error {
err := db.rwDB.QueryRow(CheckpointQuery).Scan(&ok, &nPages, &nMoved)
err := db.rwDB.QueryRow(checkpointPRAGMAs[mode]).Scan(&ok, &nPages, &nMoved)
stats.Add(numCheckpointedPages, int64(nPages))
stats.Add(numCheckpointedMoves, int64(nMoved))
if err != nil {

@ -132,7 +132,7 @@ func Test_TableCreation(t *testing.T) {
testQ()
// Confirm checkpoint works without error.
if err := db.Checkpoint(); err != nil {
if err := db.Checkpoint(CheckpointRestart); err != nil {
t.Fatalf("failed to checkpoint database: %s", err.Error())
}
testQ()
@ -176,7 +176,7 @@ func Test_DBSums(t *testing.T) {
t.Fatalf("WAL sum did not change after insertion")
}
if err := db.Checkpoint(); err != nil {
if err := db.Checkpoint(CheckpointRestart); err != nil {
t.Fatalf("failed to checkpoint database: %s", err.Error())
}
@ -234,7 +234,7 @@ func Test_DBLastModified(t *testing.T) {
// Checkpoint, and check time is later. On some platforms the time resolution isn't that
// high, so we sleep so the test won't suffer a false failure.
time.Sleep(1 * time.Second)
if err := db.Checkpoint(); err != nil {
if err := db.Checkpoint(CheckpointRestart); err != nil {
t.Fatalf("failed to checkpoint database: %s", err.Error())
}
lm3, err := db.LastModified()
@ -669,14 +669,14 @@ func Test_WALDatabaseCreatedOK(t *testing.T) {
t.Fatalf("WAL file exists but is empty")
}
if err := db.Checkpoint(); err != nil {
if err := db.Checkpoint(CheckpointTruncate); err != nil {
t.Fatalf("failed to checkpoint database in WAL mode: %s", err.Error())
}
if mustFileSize(walPath) != 0 {
t.Fatalf("WAL file exists but is non-empty")
}
// Checkpoint a second time, to ensure it's idempotent.
if err := db.Checkpoint(); err != nil {
if err := db.Checkpoint(CheckpointTruncate); err != nil {
t.Fatalf("failed to checkpoint database in WAL mode: %s", err.Error())
}
}
@ -698,7 +698,7 @@ func Test_WALDatabaseCheckpointOKNoWAL(t *testing.T) {
t.Fatalf("WAL file exists when no writes have happened")
}
defer db.Close()
if err := db.Checkpoint(); err != nil {
if err := db.Checkpoint(CheckpointTruncate); err != nil {
t.Fatalf("failed to checkpoint database in WAL mode with non-existent WAL: %s", err.Error())
}
}
@ -717,7 +717,7 @@ func Test_WALDatabaseCheckpointOKDelete(t *testing.T) {
t.Fatalf("WAL mode enabled")
}
defer db.Close()
if err := db.Checkpoint(); err != nil {
if err := db.Checkpoint(CheckpointTruncate); err != nil {
t.Fatalf("failed to checkpoint database in DELETE mode: %s", err.Error())
}
}
@ -839,7 +839,7 @@ func test_FileCreationOnDisk(t *testing.T, db *DB) {
// Confirm checkpoint works on all types of on-disk databases. Worst case, this
// should be ignored.
if err := db.Checkpoint(); err != nil {
if err := db.Checkpoint(CheckpointRestart); err != nil {
t.Fatalf("failed to checkpoint database in DELETE mode: %s", err.Error())
}
}

@ -223,7 +223,7 @@ func ReplayWAL(path string, wals []string, deleteMode bool) error {
if err != nil {
return err
}
if err := db.Checkpoint(); err != nil {
if err := db.Checkpoint(CheckpointTruncate); err != nil {
return fmt.Errorf("checkpoint WAL %s: %s", wal, err.Error())
}

@ -220,7 +220,7 @@ func Test_WALReplayOK(t *testing.T) {
}
mustCopyFile(replayDBPath, dbPath)
mustCopyFile(filepath.Join(replayDir, walFile+"_001"), walPath)
if err := db.Checkpoint(); err != nil {
if err := db.Checkpoint(CheckpointTruncate); err != nil {
t.Fatalf("failed to checkpoint database in WAL mode: %s", err.Error())
}
@ -233,7 +233,7 @@ func Test_WALReplayOK(t *testing.T) {
t.Fatalf("WAL file at %s does not exist", walPath)
}
mustCopyFile(filepath.Join(replayDir, walFile+"_002"), walPath)
if err := db.Checkpoint(); err != nil {
if err := db.Checkpoint(CheckpointTruncate); err != nil {
t.Fatalf("failed to checkpoint database in WAL mode: %s", err.Error())
}
@ -321,7 +321,7 @@ func Test_WALReplayOK_Complex(t *testing.T) {
if _, err := srcDB.ExecuteStringStmt("CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)"); err != nil {
t.Fatalf("failed to create table: %s", err.Error())
}
if err := srcDB.Checkpoint(); err != nil {
if err := srcDB.Checkpoint(CheckpointTruncate); err != nil {
t.Fatalf("failed to checkpoint database in WAL mode: %s", err.Error())
}
mustCopyFile(dstPath, srcPath)
@ -353,7 +353,7 @@ func Test_WALReplayOK_Complex(t *testing.T) {
dstWALPath := fmt.Sprintf("%s-%d", dstPath, i)
mustCopyFile(dstWALPath, srcWALPath)
dstWALs = append(dstWALs, dstWALPath)
if err := srcDB.Checkpoint(); err != nil {
if err := srcDB.Checkpoint(CheckpointTruncate); err != nil {
t.Fatalf("failed to checkpoint database in WAL mode: %s", err.Error())
}
}
@ -369,7 +369,7 @@ func Test_WALReplayOK_Complex(t *testing.T) {
dstWALPath := fmt.Sprintf("%s-postdelete", dstPath)
mustCopyFile(dstWALPath, srcWALPath)
dstWALs = append(dstWALs, dstWALPath)
if err := srcDB.Checkpoint(); err != nil {
if err := srcDB.Checkpoint(CheckpointTruncate); err != nil {
t.Fatalf("failed to checkpoint database in WAL mode: %s", err.Error())
}
@ -379,7 +379,7 @@ func Test_WALReplayOK_Complex(t *testing.T) {
dstWALPath = fmt.Sprintf("%s-postupdate", dstPath)
mustCopyFile(dstWALPath, srcWALPath)
dstWALs = append(dstWALs, dstWALPath)
if err := srcDB.Checkpoint(); err != nil {
if err := srcDB.Checkpoint(CheckpointTruncate); err != nil {
t.Fatalf("failed to checkpoint database in WAL mode: %s", err.Error())
}
@ -394,7 +394,7 @@ func Test_WALReplayOK_Complex(t *testing.T) {
dstWALPath = fmt.Sprintf("%s-create-tables", dstPath)
mustCopyFile(dstWALPath, srcWALPath)
dstWALs = append(dstWALs, dstWALPath)
if err := srcDB.Checkpoint(); err != nil {
if err := srcDB.Checkpoint(CheckpointTruncate); err != nil {
t.Fatalf("failed to checkpoint database in WAL mode: %s", err.Error())
}
@ -407,7 +407,7 @@ func Test_WALReplayOK_Complex(t *testing.T) {
dstWALPath = fmt.Sprintf("%s-post-create-tables", dstPath)
mustCopyFile(dstWALPath, srcWALPath)
dstWALs = append(dstWALs, dstWALPath)
if err := srcDB.Checkpoint(); err != nil {
if err := srcDB.Checkpoint(CheckpointTruncate); err != nil {
t.Fatalf("failed to checkpoint database in WAL mode: %s", err.Error())
}

@ -124,10 +124,10 @@ func (s *SwappableDB) StmtReadOnly(sql string) (bool, error) {
}
// Checkpoint calls Checkpoint on the underlying database.
func (s *SwappableDB) Checkpoint() error {
func (s *SwappableDB) Checkpoint(mode CheckpointMode) error {
s.dbMu.RLock()
defer s.dbMu.RUnlock()
return s.db.Checkpoint()
return s.db.Checkpoint(mode)
}
// Path calls Path on the underlying database.

@ -143,7 +143,7 @@ func RecoverNode(dataDir string, logger *log.Logger, logs raft.LogStore, stable
// Create a new snapshot, placing the configuration in as if it was
// committed at index 1.
if err := db.Checkpoint(); err != nil {
if err := db.Checkpoint(sql.CheckpointTruncate); err != nil {
return fmt.Errorf("failed to checkpoint database: %s", err)
}
tmpDBFD, err := os.Open(tmpDBPath)

@ -1903,7 +1903,7 @@ func (s *Store) fsmSnapshot() (fSnap raft.FSMSnapshot, retErr error) {
var fsmSnapshot raft.FSMSnapshot
if fullNeeded {
if err := s.db.Checkpoint(); err != nil {
if err := s.db.Checkpoint(sql.CheckpointTruncate); err != nil {
stats.Add(numFullCheckpointFailed, 1)
return nil, err
}
@ -1942,7 +1942,7 @@ func (s *Store) fsmSnapshot() (fSnap raft.FSMSnapshot, retErr error) {
}
stats.Get(snapshotWALSize).(*expvar.Int).Set(int64(compactedBuf.Len()))
stats.Get(snapshotPrecompactWALSize).(*expvar.Int).Set(walSz)
if err := s.db.Checkpoint(); err != nil {
if err := s.db.Checkpoint(sql.CheckpointTruncate); err != nil {
stats.Add(numWALCheckpointFailed, 1)
// Failing to checkpoint the WAL leaves the main database in an inconsistent
// state (if a WAL file was partially checkpointed, then the next WAL file will not

Loading…
Cancel
Save