1
0
Fork 0

Merge remote-tracking branch 'origin' into queries-only-do-local

master
Philip O'Toole 8 months ago
commit 039e907e3f

@ -1,6 +1,8 @@
## 8.18.2 (unreleased)
### Implementation changes and bug fixes
- [PR #1635](https://github.com/rqlite/rqlite/pull/1635): Always execute ExecuteRequests locally if possible.
- [PR #1636](https://github.com/rqlite/rqlite/pull/1636): Move to explicit choice of SQLite Checkpointing mode.
- [PR #1637](https://github.com/rqlite/rqlite/pull/1637): Remove unneeded SetFullNeeded post WAL checkpoint failure.
## 8.18.1 (January 26th 2024)
### Implementation changes and bug fixes

@ -43,14 +43,31 @@ const (
numETx = "execute_transactions"
numQTx = "query_transactions"
numRTx = "request_transactions"
CheckpointQuery = "PRAGMA wal_checkpoint(TRUNCATE)" // rqlite WAL compaction requires truncation
)
var (
// ErrWALReplayDirectoryMismatch is returned when the WAL file(s) are not in the same
// directory as the database file.
ErrWALReplayDirectoryMismatch = errors.New("WAL file(s) not in same directory as database file")
)
// CheckpointMode is the mode in which a checkpoint runs.
type CheckpointMode int
const (
// CheckpointRestart instructs the checkpoint to run in restart mode.
CheckpointRestart CheckpointMode = iota
// CheckpointTruncate instructs the checkpoint to run in truncate mode.
CheckpointTruncate
)
var (
checkpointPRAGMAs = map[CheckpointMode]string{
CheckpointRestart: "PRAGMA wal_checkpoint(RESTART)",
CheckpointTruncate: "PRAGMA wal_checkpoint(TRUNCATE)",
}
)
// DBVersion is the SQLite version.
var DBVersion string
@ -130,6 +147,8 @@ func Open(dbPath string, fkEnabled, wal bool) (retDB *DB, retErr error) {
logger.Printf("database file is %s, SQLite may take longer to open it", humanize.Bytes(uint64(sz)))
}
/////////////////////////////////////////////////////////////////////////
// Main RW connection
rwDSN := MakeDSN(dbPath, ModeReadWrite, fkEnabled, wal)
rwDB, err := sql.Open("sqlite3", rwDSN)
if err != nil {
@ -141,6 +160,8 @@ func Open(dbPath string, fkEnabled, wal bool) (retDB *DB, retErr error) {
return nil, fmt.Errorf("disable autocheckpointing: %s", err.Error())
}
/////////////////////////////////////////////////////////////////////////
// Read-only connection
roDSN := MakeDSN(dbPath, ModeReadOnly, fkEnabled, wal)
roDB, err := sql.Open("sqlite3", roDSN)
if err != nil {
@ -300,16 +321,33 @@ func (db *DB) WALSize() (int64, error) {
return 0, err
}
// SetBusyTimeout sets the busy timeout for the database.
func (db *DB) SetBusyTimeout(ms int) error {
_, err := db.rwDB.Exec(fmt.Sprintf("PRAGMA busy_timeout=%d", ms))
return err
}
// BusyTimeout returns the current busy timeout value.
func (db *DB) BusyTimeout() (int, error) {
var rwN int
err := db.rwDB.QueryRow("PRAGMA busy_timeout").Scan(&rwN)
if err != nil {
return 0, err
}
return rwN, err
}
// Checkpoint checkpoints the WAL file. If the WAL file is not enabled, this
// function is a no-op.
func (db *DB) Checkpoint() error {
return db.CheckpointWithTimeout(0)
func (db *DB) Checkpoint(mode CheckpointMode) error {
return db.CheckpointWithTimeout(mode, 0)
}
// CheckpointWithTimeout performs a WAL checkpoint. If the checkpoint does not
// complete within the given duration, an error is returned. If the duration is 0,
// the checkpoint will be attempted only once.
func (db *DB) CheckpointWithTimeout(dur time.Duration) (err error) {
// run to completion within the given duration, an error is returned. If the
// duration is 0, the busy timeout is not modified before executing the
// checkpoint.
func (db *DB) CheckpointWithTimeout(mode CheckpointMode, dur time.Duration) (err error) {
start := time.Now()
defer func() {
if err != nil {
@ -320,47 +358,35 @@ func (db *DB) CheckpointWithTimeout(dur time.Duration) (err error) {
}
}()
var ok int
var nPages int
var nMoved int
f := func() error {
err := db.rwDB.QueryRow(CheckpointQuery).Scan(&ok, &nPages, &nMoved)
stats.Add(numCheckpointedPages, int64(nPages))
stats.Add(numCheckpointedMoves, int64(nMoved))
if dur > 0 {
bt, err := db.BusyTimeout()
if err != nil {
return fmt.Errorf("error checkpointing WAL: %s", err.Error())
return fmt.Errorf("failed to get busy_timeout on checkpointing connection: %s", err.Error())
}
if ok != 0 {
return fmt.Errorf("failed to completely checkpoint WAL (%d ok, %d pages, %d moved)",
ok, nPages, nMoved)
if err := db.SetBusyTimeout(int(dur.Milliseconds())); err != nil {
return fmt.Errorf("failed to set busy_timeout on checkpointing connection: %s", err.Error())
}
return nil
defer func() {
// Reset back to default
if _, err := db.rwDB.Exec(fmt.Sprintf("PRAGMA busy_timeout=%d", bt)); err != nil {
db.logger.Printf("failed to reset busy_timeout on checkpointing connection: %s", err.Error())
}
}()
}
// Try fast path
err = f()
if err == nil {
return nil
}
if dur == 0 {
return err
var ok int
var nPages int
var nMoved int
if err := db.rwDB.QueryRow(checkpointPRAGMAs[mode]).Scan(&ok, &nPages, &nMoved); err != nil {
return fmt.Errorf("error checkpointing WAL: %s", err.Error())
}
var lastError error
t := time.NewTicker(100 * time.Millisecond)
defer t.Stop()
for {
select {
case <-t.C:
if err := f(); err == nil {
return nil
}
lastError = err
case <-time.After(dur):
return fmt.Errorf("checkpoint timeout: %v", lastError)
}
stats.Add(numCheckpointedPages, int64(nPages))
stats.Add(numCheckpointedMoves, int64(nMoved))
if ok != 0 {
return fmt.Errorf("failed to completely checkpoint WAL (%d ok, %d pages, %d moved)",
ok, nPages, nMoved)
}
return nil
}
// DisableCheckpointing disables the automatic checkpointing that occurs when

@ -0,0 +1,165 @@
package db
import (
"bytes"
"os"
"testing"
"time"
)
// Test_WALDatabaseCheckpointOKNoWAL tests that a checkpoint succeeds
// even when no WAL file exists.
func Test_WALDatabaseCheckpointOKNoWAL(t *testing.T) {
path := mustTempFile()
defer os.Remove(path)
db, err := Open(path, false, true)
if err != nil {
t.Fatalf("failed to open database in WAL mode: %s", err.Error())
}
if !db.WALEnabled() {
t.Fatalf("WAL mode not enabled")
}
if fileExists(db.WALPath()) {
t.Fatalf("WAL file exists when no writes have happened")
}
defer db.Close()
if err := db.Checkpoint(CheckpointTruncate); err != nil {
t.Fatalf("failed to checkpoint database in WAL mode with non-existent WAL: %s", err.Error())
}
}
// Test_WALDatabaseCheckpointOKDelete tests that a checkpoint returns no error
// even when the database is opened in DELETE mode.
func Test_WALDatabaseCheckpointOKDelete(t *testing.T) {
path := mustTempFile()
defer os.Remove(path)
db, err := Open(path, false, false)
if err != nil {
t.Fatalf("failed to open database in DELETE mode: %s", err.Error())
}
if db.WALEnabled() {
t.Fatalf("WAL mode enabled")
}
defer db.Close()
if err := db.Checkpoint(CheckpointTruncate); err != nil {
t.Fatalf("failed to checkpoint database in DELETE mode: %s", err.Error())
}
}
// Test_WALDatabaseCheckpoint_Restart tests that a checkpoint restart
// returns no error and that the WAL file is not modified even though
// all the WAL pages are copied to the database file. Then Truncate
// is called and the WAL file is deleted.
func Test_WALDatabaseCheckpoint_RestartTruncate(t *testing.T) {
path := mustTempFile()
defer os.Remove(path)
db, err := Open(path, false, true)
if err != nil {
t.Fatalf("failed to open database in WAL mode: %s", err.Error())
}
defer db.Close()
_, err = db.ExecuteStringStmt(`CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)`)
if err != nil {
t.Fatalf("failed to execute on single node: %s", err.Error())
}
for i := 0; i < 50; i++ {
_, err := db.ExecuteStringStmt(`INSERT INTO foo(name) VALUES("fiona")`)
if err != nil {
t.Fatalf("failed to execute INSERT on single node: %s", err.Error())
}
}
walPreBytes, err := os.ReadFile(db.WALPath())
if err != nil {
t.Fatalf("failed to read wal file: %s", err.Error())
}
if err := db.Checkpoint(CheckpointRestart); err != nil {
t.Fatalf("failed to checkpoint database: %s", err.Error())
}
walPostBytes, err := os.ReadFile(db.WALPath())
if err != nil {
t.Fatalf("failed to read wal file: %s", err.Error())
}
if !bytes.Equal(walPreBytes, walPostBytes) {
t.Fatalf("wal files should be identical after checkpoint restart")
}
// query the data to make sure all is well.
rows, err := db.QueryStringStmt(`SELECT COUNT(*) FROM foo`)
if err != nil {
t.Fatalf("failed to execute query on single node: %s", err.Error())
}
if exp, got := `[{"columns":["COUNT(*)"],"types":["integer"],"values":[[50]]}]`, asJSON(rows); exp != got {
t.Fatalf("expected %s, got %s", exp, got)
}
if err := db.Checkpoint(CheckpointTruncate); err != nil {
t.Fatalf("failed to checkpoint database: %s", err.Error())
}
sz, err := fileSize(db.WALPath())
if err != nil {
t.Fatalf("wal file should be deleted after checkpoint truncate")
}
if sz != 0 {
t.Fatalf("wal file should be zero length after checkpoint truncate")
}
// query the data to make sure all is still well.
rows, err = db.QueryStringStmt(`SELECT COUNT(*) FROM foo`)
if err != nil {
t.Fatalf("failed to execute query on single node: %s", err.Error())
}
if exp, got := `[{"columns":["COUNT(*)"],"types":["integer"],"values":[[50]]}]`, asJSON(rows); exp != got {
t.Fatalf("expected %s, got %s", exp, got)
}
}
func Test_WALDatabaseCheckpoint_RestartTimeout(t *testing.T) {
path := mustTempFile()
defer os.Remove(path)
db, err := Open(path, false, true)
if err != nil {
t.Fatalf("failed to open database in WAL mode: %s", err.Error())
}
defer db.Close()
_, err = db.ExecuteStringStmt(`CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)`)
if err != nil {
t.Fatalf("failed to execute on single node: %s", err.Error())
}
for i := 0; i < 50; i++ {
_, err := db.ExecuteStringStmt(`INSERT INTO foo(name) VALUES("fiona")`)
if err != nil {
t.Fatalf("failed to execute INSERT on single node: %s", err.Error())
}
}
blockingDB, err := Open(path, false, true)
if err != nil {
t.Fatalf("failed to open blocking database in WAL mode: %s", err.Error())
}
defer blockingDB.Close()
_, err = blockingDB.QueryStringStmt(`BEGIN TRANSACTION`)
if err != nil {
t.Fatalf("failed to execute query on single node: %s", err.Error())
}
rows, err := blockingDB.QueryStringStmt(`SELECT COUNT(*) FROM foo`)
if err != nil {
t.Fatalf("failed to execute query on single node: %s", err.Error())
}
if exp, got := `[{"columns":["COUNT(*)"],"types":["integer"],"values":[[50]]}]`, asJSON(rows); exp != got {
t.Fatalf("expected %s, got %s", exp, got)
}
if err := db.CheckpointWithTimeout(CheckpointTruncate, 250*time.Millisecond); err == nil {
t.Fatal("expected error due to failure to checkpoint")
}
blockingDB.Close()
if err := db.CheckpointWithTimeout(CheckpointTruncate, 250*time.Millisecond); err != nil {
t.Fatalf("failed to checkpoint database: %s", err.Error())
}
}

@ -2,14 +2,44 @@ package db
import (
"errors"
"fmt"
"os"
"strings"
"testing"
command "github.com/rqlite/rqlite/v8/command/proto"
"github.com/rqlite/rqlite/v8/random"
"github.com/rqlite/rqlite/v8/testdata/chinook"
)
func testBusyTimeout(t *testing.T, db *DB) {
rbt := random.Intn(10000)
_, err := db.ExecuteStringStmt(fmt.Sprintf("PRAGMA busy_timeout=%d", rbt))
if err != nil {
t.Fatalf("failed to set busy_timeout: %s", err.Error())
}
bt, err := db.BusyTimeout()
if err != nil {
t.Fatalf("failed to get busy_timeout: %s", err.Error())
}
if exp, got := rbt, bt; exp != got {
t.Fatalf("expected busy_timeout %d, got %d", exp, got)
}
rbt2 := random.Intn(10000)
if err := db.SetBusyTimeout(rbt2); err != nil {
t.Fatalf("failed to set busy_timeout: %s", err.Error())
}
bt, err = db.BusyTimeout()
if err != nil {
t.Fatalf("failed to get busy_timeout: %s", err.Error())
}
if exp, got := rbt2, bt; exp != got {
t.Fatalf("expected busy_timeout %d, got %d", exp, got)
}
}
func testCompileOptions(t *testing.T, db *DB) {
_, err := db.CompileOptions()
if err != nil {
@ -1529,6 +1559,7 @@ func Test_DatabaseCommonOperations(t *testing.T) {
name string
testFunc func(*testing.T, *DB)
}{
{"BusyTimeout", testBusyTimeout},
{"SetSynchronousMode", testSetSynchronousMode},
{"CompileOptions", testCompileOptions},
{"TableNotExist", testTableNotExist},

@ -132,7 +132,7 @@ func Test_TableCreation(t *testing.T) {
testQ()
// Confirm checkpoint works without error.
if err := db.Checkpoint(); err != nil {
if err := db.Checkpoint(CheckpointRestart); err != nil {
t.Fatalf("failed to checkpoint database: %s", err.Error())
}
testQ()
@ -176,7 +176,7 @@ func Test_DBSums(t *testing.T) {
t.Fatalf("WAL sum did not change after insertion")
}
if err := db.Checkpoint(); err != nil {
if err := db.Checkpoint(CheckpointRestart); err != nil {
t.Fatalf("failed to checkpoint database: %s", err.Error())
}
@ -234,7 +234,7 @@ func Test_DBLastModified(t *testing.T) {
// Checkpoint, and check time is later. On some platforms the time resolution isn't that
// high, so we sleep so the test won't suffer a false failure.
time.Sleep(1 * time.Second)
if err := db.Checkpoint(); err != nil {
if err := db.Checkpoint(CheckpointRestart); err != nil {
t.Fatalf("failed to checkpoint database: %s", err.Error())
}
lm3, err := db.LastModified()
@ -669,59 +669,18 @@ func Test_WALDatabaseCreatedOK(t *testing.T) {
t.Fatalf("WAL file exists but is empty")
}
if err := db.Checkpoint(); err != nil {
if err := db.Checkpoint(CheckpointTruncate); err != nil {
t.Fatalf("failed to checkpoint database in WAL mode: %s", err.Error())
}
if mustFileSize(walPath) != 0 {
t.Fatalf("WAL file exists but is non-empty")
}
// Checkpoint a second time, to ensure it's idempotent.
if err := db.Checkpoint(); err != nil {
if err := db.Checkpoint(CheckpointTruncate); err != nil {
t.Fatalf("failed to checkpoint database in WAL mode: %s", err.Error())
}
}
// Test_WALDatabaseCheckpointOKNoWAL tests that a checkpoint succeeds
// even when no WAL file exists.
func Test_WALDatabaseCheckpointOKNoWAL(t *testing.T) {
path := mustTempFile()
defer os.Remove(path)
db, err := Open(path, false, true)
if err != nil {
t.Fatalf("failed to open database in WAL mode: %s", err.Error())
}
if !db.WALEnabled() {
t.Fatalf("WAL mode not enabled")
}
if fileExists(db.WALPath()) {
t.Fatalf("WAL file exists when no writes have happened")
}
defer db.Close()
if err := db.Checkpoint(); err != nil {
t.Fatalf("failed to checkpoint database in WAL mode with non-existent WAL: %s", err.Error())
}
}
// Test_WALDatabaseCheckpointOKDelete tests that a checkpoint returns no error
// even when the database is opened in DELETE mode.
func Test_WALDatabaseCheckpointOKDelete(t *testing.T) {
path := mustTempFile()
defer os.Remove(path)
db, err := Open(path, false, false)
if err != nil {
t.Fatalf("failed to open database in DELETE mode: %s", err.Error())
}
if db.WALEnabled() {
t.Fatalf("WAL mode enabled")
}
defer db.Close()
if err := db.Checkpoint(); err != nil {
t.Fatalf("failed to checkpoint database in DELETE mode: %s", err.Error())
}
}
// Test_WALDatabaseCreatedOKFromDELETE tests that a WAL database is created properly,
// even when supplied with a DELETE-mode database.
func Test_WALDatabaseCreatedOKFromDELETE(t *testing.T) {
@ -839,7 +798,7 @@ func test_FileCreationOnDisk(t *testing.T, db *DB) {
// Confirm checkpoint works on all types of on-disk databases. Worst case, this
// should be ignored.
if err := db.Checkpoint(); err != nil {
if err := db.Checkpoint(CheckpointRestart); err != nil {
t.Fatalf("failed to checkpoint database in DELETE mode: %s", err.Error())
}
}

@ -223,7 +223,7 @@ func ReplayWAL(path string, wals []string, deleteMode bool) error {
if err != nil {
return err
}
if err := db.Checkpoint(); err != nil {
if err := db.Checkpoint(CheckpointTruncate); err != nil {
return fmt.Errorf("checkpoint WAL %s: %s", wal, err.Error())
}

@ -220,7 +220,7 @@ func Test_WALReplayOK(t *testing.T) {
}
mustCopyFile(replayDBPath, dbPath)
mustCopyFile(filepath.Join(replayDir, walFile+"_001"), walPath)
if err := db.Checkpoint(); err != nil {
if err := db.Checkpoint(CheckpointTruncate); err != nil {
t.Fatalf("failed to checkpoint database in WAL mode: %s", err.Error())
}
@ -233,7 +233,7 @@ func Test_WALReplayOK(t *testing.T) {
t.Fatalf("WAL file at %s does not exist", walPath)
}
mustCopyFile(filepath.Join(replayDir, walFile+"_002"), walPath)
if err := db.Checkpoint(); err != nil {
if err := db.Checkpoint(CheckpointTruncate); err != nil {
t.Fatalf("failed to checkpoint database in WAL mode: %s", err.Error())
}
@ -321,7 +321,7 @@ func Test_WALReplayOK_Complex(t *testing.T) {
if _, err := srcDB.ExecuteStringStmt("CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)"); err != nil {
t.Fatalf("failed to create table: %s", err.Error())
}
if err := srcDB.Checkpoint(); err != nil {
if err := srcDB.Checkpoint(CheckpointTruncate); err != nil {
t.Fatalf("failed to checkpoint database in WAL mode: %s", err.Error())
}
mustCopyFile(dstPath, srcPath)
@ -353,7 +353,7 @@ func Test_WALReplayOK_Complex(t *testing.T) {
dstWALPath := fmt.Sprintf("%s-%d", dstPath, i)
mustCopyFile(dstWALPath, srcWALPath)
dstWALs = append(dstWALs, dstWALPath)
if err := srcDB.Checkpoint(); err != nil {
if err := srcDB.Checkpoint(CheckpointTruncate); err != nil {
t.Fatalf("failed to checkpoint database in WAL mode: %s", err.Error())
}
}
@ -369,7 +369,7 @@ func Test_WALReplayOK_Complex(t *testing.T) {
dstWALPath := fmt.Sprintf("%s-postdelete", dstPath)
mustCopyFile(dstWALPath, srcWALPath)
dstWALs = append(dstWALs, dstWALPath)
if err := srcDB.Checkpoint(); err != nil {
if err := srcDB.Checkpoint(CheckpointTruncate); err != nil {
t.Fatalf("failed to checkpoint database in WAL mode: %s", err.Error())
}
@ -379,7 +379,7 @@ func Test_WALReplayOK_Complex(t *testing.T) {
dstWALPath = fmt.Sprintf("%s-postupdate", dstPath)
mustCopyFile(dstWALPath, srcWALPath)
dstWALs = append(dstWALs, dstWALPath)
if err := srcDB.Checkpoint(); err != nil {
if err := srcDB.Checkpoint(CheckpointTruncate); err != nil {
t.Fatalf("failed to checkpoint database in WAL mode: %s", err.Error())
}
@ -394,7 +394,7 @@ func Test_WALReplayOK_Complex(t *testing.T) {
dstWALPath = fmt.Sprintf("%s-create-tables", dstPath)
mustCopyFile(dstWALPath, srcWALPath)
dstWALs = append(dstWALs, dstWALPath)
if err := srcDB.Checkpoint(); err != nil {
if err := srcDB.Checkpoint(CheckpointTruncate); err != nil {
t.Fatalf("failed to checkpoint database in WAL mode: %s", err.Error())
}
@ -407,7 +407,7 @@ func Test_WALReplayOK_Complex(t *testing.T) {
dstWALPath = fmt.Sprintf("%s-post-create-tables", dstPath)
mustCopyFile(dstWALPath, srcWALPath)
dstWALs = append(dstWALs, dstWALPath)
if err := srcDB.Checkpoint(); err != nil {
if err := srcDB.Checkpoint(CheckpointTruncate); err != nil {
t.Fatalf("failed to checkpoint database in WAL mode: %s", err.Error())
}

@ -124,10 +124,10 @@ func (s *SwappableDB) StmtReadOnly(sql string) (bool, error) {
}
// Checkpoint calls Checkpoint on the underlying database.
func (s *SwappableDB) Checkpoint() error {
func (s *SwappableDB) Checkpoint(mode CheckpointMode) error {
s.dbMu.RLock()
defer s.dbMu.RUnlock()
return s.db.Checkpoint()
return s.db.Checkpoint(mode)
}
// Path calls Path on the underlying database.

@ -143,7 +143,7 @@ func RecoverNode(dataDir string, logger *log.Logger, logs raft.LogStore, stable
// Create a new snapshot, placing the configuration in as if it was
// committed at index 1.
if err := db.Checkpoint(); err != nil {
if err := db.Checkpoint(sql.CheckpointTruncate); err != nil {
return fmt.Errorf("failed to checkpoint database: %s", err)
}
tmpDBFD, err := os.Open(tmpDBPath)

@ -108,46 +108,47 @@ const (
)
const (
numSnapshots = "num_snapshots"
numSnapshotsFailed = "num_snapshots_failed"
numUserSnapshots = "num_user_snapshots"
numUserSnapshotsFailed = "num_user_snapshots_failed"
numWALSnapshots = "num_wal_snapshots"
numWALSnapshotsFailed = "num_wal_snapshots_failed"
numSnapshotsFull = "num_snapshots_full"
numSnapshotsIncremental = "num_snapshots_incremental"
numFullCheckpointFailed = "num_full_checkpoint_failed"
numWALCheckpointFailed = "num_wal_checkpoint_failed"
numAutoVacuums = "num_auto_vacuums"
numAutoVacuumsFailed = "num_auto_vacuums_failed"
autoVacuumDuration = "auto_vacuum_duration"
numBoots = "num_boots"
numBackups = "num_backups"
numLoads = "num_loads"
numRestores = "num_restores"
numRestoresFailed = "num_restores_failed"
numAutoRestores = "num_auto_restores"
numAutoRestoresSkipped = "num_auto_restores_skipped"
numAutoRestoresFailed = "num_auto_restores_failed"
numRecoveries = "num_recoveries"
numProviderChecks = "num_provider_checks"
numProviderProvides = "num_provider_provides"
numProviderProvidesFail = "num_provider_provides_fail"
numUncompressedCommands = "num_uncompressed_commands"
numCompressedCommands = "num_compressed_commands"
numJoins = "num_joins"
numIgnoredJoins = "num_ignored_joins"
numRemovedBeforeJoins = "num_removed_before_joins"
numDBStatsErrors = "num_db_stats_errors"
snapshotCreateDuration = "snapshot_create_duration"
snapshotPersistDuration = "snapshot_persist_duration"
snapshotPrecompactWALSize = "snapshot_precompact_wal_size"
snapshotWALSize = "snapshot_wal_size"
leaderChangesObserved = "leader_changes_observed"
leaderChangesDropped = "leader_changes_dropped"
failedHeartbeatObserved = "failed_heartbeat_observed"
nodesReapedOK = "nodes_reaped_ok"
nodesReapedFailed = "nodes_reaped_failed"
numSnapshots = "num_snapshots"
numSnapshotsFailed = "num_snapshots_failed"
numUserSnapshots = "num_user_snapshots"
numUserSnapshotsFailed = "num_user_snapshots_failed"
numWALSnapshots = "num_wal_snapshots"
numWALSnapshotsFailed = "num_wal_snapshots_failed"
numSnapshotsFull = "num_snapshots_full"
numSnapshotsIncremental = "num_snapshots_incremental"
numFullCheckpointFailed = "num_full_checkpoint_failed"
numWALCheckpointRestartFailed = "num_wal_checkpoint_restart_failed"
numWALCheckpointTruncateFailed = "num_wal_checkpoint_truncate_failed"
numAutoVacuums = "num_auto_vacuums"
numAutoVacuumsFailed = "num_auto_vacuums_failed"
autoVacuumDuration = "auto_vacuum_duration"
numBoots = "num_boots"
numBackups = "num_backups"
numLoads = "num_loads"
numRestores = "num_restores"
numRestoresFailed = "num_restores_failed"
numAutoRestores = "num_auto_restores"
numAutoRestoresSkipped = "num_auto_restores_skipped"
numAutoRestoresFailed = "num_auto_restores_failed"
numRecoveries = "num_recoveries"
numProviderChecks = "num_provider_checks"
numProviderProvides = "num_provider_provides"
numProviderProvidesFail = "num_provider_provides_fail"
numUncompressedCommands = "num_uncompressed_commands"
numCompressedCommands = "num_compressed_commands"
numJoins = "num_joins"
numIgnoredJoins = "num_ignored_joins"
numRemovedBeforeJoins = "num_removed_before_joins"
numDBStatsErrors = "num_db_stats_errors"
snapshotCreateDuration = "snapshot_create_duration"
snapshotPersistDuration = "snapshot_persist_duration"
snapshotPrecompactWALSize = "snapshot_precompact_wal_size"
snapshotWALSize = "snapshot_wal_size"
leaderChangesObserved = "leader_changes_observed"
leaderChangesDropped = "leader_changes_dropped"
failedHeartbeatObserved = "failed_heartbeat_observed"
nodesReapedOK = "nodes_reaped_ok"
nodesReapedFailed = "nodes_reaped_failed"
)
// stats captures stats for the Store.
@ -170,7 +171,8 @@ func ResetStats() {
stats.Add(numSnapshotsFull, 0)
stats.Add(numSnapshotsIncremental, 0)
stats.Add(numFullCheckpointFailed, 0)
stats.Add(numWALCheckpointFailed, 0)
stats.Add(numWALCheckpointRestartFailed, 0)
stats.Add(numWALCheckpointTruncateFailed, 0)
stats.Add(numAutoVacuums, 0)
stats.Add(numAutoVacuumsFailed, 0)
stats.Add(autoVacuumDuration, 0)
@ -1906,7 +1908,7 @@ func (s *Store) fsmSnapshot() (fSnap raft.FSMSnapshot, retErr error) {
var fsmSnapshot raft.FSMSnapshot
if fullNeeded {
if err := s.db.Checkpoint(); err != nil {
if err := s.db.Checkpoint(sql.CheckpointTruncate); err != nil {
stats.Add(numFullCheckpointFailed, 1)
return nil, err
}
@ -1920,12 +1922,23 @@ func (s *Store) fsmSnapshot() (fSnap raft.FSMSnapshot, retErr error) {
compactedBuf := bytes.NewBuffer(nil)
var err error
if pathExistsWithData(s.walPath) {
// Attempt to checkpoint everything into the main database file. Only
// if this works should we bother to compact-scan the WAL. It's possible
// it fails if some query is in progress. If it fails, return an error
// and Raft will retry later. But if it succeeds it means that all readers
// are reading from the main database file.
if err := s.db.Checkpoint(sql.CheckpointRestart); err != nil {
stats.Add(numWALCheckpointRestartFailed, 1)
return nil, err
}
// Read a compacted version of the WAL into memory, and write it
// to the Snapshot store.
walFD, err := os.Open(s.walPath)
if err != nil {
return nil, err
}
defer walFD.Close() // Make sure it closes.
scanner, err := wal.NewFastCompactingScanner(walFD)
if err != nil {
return nil, err
@ -1939,25 +1952,20 @@ func (s *Store) fsmSnapshot() (fSnap raft.FSMSnapshot, retErr error) {
}
walFD.Close() // We need it closed for the next step.
// Clean-up by truncating the WAL. This should be fast because all the pages
// have been checkpointed into the main database file, and writes are
// blocked during this process by Raft. In otherwords, the WAL file should
// be unchanged.
walSz, err := fileSize(s.walPath)
if err != nil {
return nil, err
}
if err := s.db.Checkpoint(sql.CheckpointTruncate); err != nil {
stats.Add(numWALCheckpointTruncateFailed, 1)
return nil, fmt.Errorf("failed to truncate WAL: %s", err.Error())
}
stats.Get(snapshotWALSize).(*expvar.Int).Set(int64(compactedBuf.Len()))
stats.Get(snapshotPrecompactWALSize).(*expvar.Int).Set(walSz)
if err := s.db.Checkpoint(); err != nil {
stats.Add(numWALCheckpointFailed, 1)
// Failing to checkpoint the WAL leaves the main database in an inconsistent
// state (if a WAL file was partially checkpointed, then the next WAL file will not
// be in sequence with what is in the Snapshot store), so attempt a Full snapshot next
// time.
if err := s.snapshotStore.SetFullNeeded(); err != nil {
// Give up!
s.logger.Fatalf("failed to set full snapshot needed after failed WAL checkpoint: %s",
err.Error())
}
return nil, err
}
}
fsmSnapshot = snapshot.NewSnapshot(io.NopCloser(compactedBuf))
if err != nil {

@ -1813,76 +1813,6 @@ func Test_SingleNode_WALTriggeredSnapshot(t *testing.T) {
}
}
// Test_OpenStoreSingleNode_WALCheckpointFail tests that a WAL checkpoint
// failure will trigger a full snapshot.
func Test_OpenStoreSingleNode_WALCheckpointFail(t *testing.T) {
s, ln := mustNewStore(t)
defer s.Close(true)
defer ln.Close()
if err := s.Open(); err != nil {
t.Fatalf("failed to open single-node store: %s", err.Error())
}
defer s.Close(true)
if err := s.Bootstrap(NewServer(s.ID(), s.Addr(), true)); err != nil {
t.Fatalf("failed to bootstrap single-node store: %s", err.Error())
}
if _, err := s.WaitForLeader(10 * time.Second); err != nil {
t.Fatalf("Error waiting for leader: %s", err)
}
er := executeRequestFromStrings([]string{
`CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)`,
}, false, false)
_, err := s.Execute(er)
if err != nil {
t.Fatalf("failed to execute on single node: %s", err.Error())
}
for i := 0; i < 100; i++ {
_, err := s.Execute(executeRequestFromString(`INSERT INTO foo(name) VALUES("fiona")`, false, false))
if err != nil {
t.Fatalf("failed to execute INSERT on single node: %s", err.Error())
}
}
if err := s.Snapshot(0); err != nil {
t.Fatalf("failed to snapshot store: %s", err.Error())
}
if fn, err := s.snapshotStore.FullNeeded(); err != nil {
t.Fatalf("failed to determine full snapshot needed: %s", err.Error())
} else if fn {
t.Fatalf("full snapshot marked as needed")
}
for i := 0; i < 100; i++ {
_, err := s.Execute(executeRequestFromString(`INSERT INTO foo(name) VALUES("fiona")`, false, false))
if err != nil {
t.Fatalf("failed to execute INSERT on single node: %s", err.Error())
}
}
// Do another snapshot, which should trigger a WAL checkpoint.
// However, open the SQLite file and start a transaction, causing
// the checkpoint to fail.
db, err := db.Open(s.dbPath, false, true)
if err != nil {
t.Fatalf("failed to open SQLite database: %s", err.Error())
}
defer db.Close()
_, err = db.ExecuteStringStmt("BEGIN TRANSACTION; SELECT * FROM foo")
if err != nil {
t.Fatalf("failed to begin transaction: %s", err.Error())
}
if err := s.Snapshot(0); err == nil {
t.Fatalf("expected error snapshotting store")
}
if fn, err := s.snapshotStore.FullNeeded(); err != nil {
t.Fatalf("failed to determine full snapshot needed: %s", err.Error())
} else if !fn {
t.Fatalf("full snapshot should be marked as needed")
}
}
func Test_OpenStoreSingleNode_VacuumTimes(t *testing.T) {
s0, ln0 := mustNewStore(t)
defer s0.Close(true)

Loading…
Cancel
Save