1
0
Fork 0

First testing of full snapshotting

master
Philip O'Toole 11 months ago
parent cdb78e0486
commit bbcb8f7c9b

@ -1,7 +1,7 @@
package snapshot2
import (
"errors"
"fmt"
"os"
"path/filepath"
@ -9,14 +9,6 @@ import (
"github.com/rqlite/rqlite/db"
)
var (
// ErrInvalidSnapshot is returned when a snapshot is invalid.
ErrInvalidSnapshot = errors.New("invalid snapshot")
// ErrInvalidStore is returned when a store is in an invalid state.
ErrInvalidStore = errors.New("invalid store")
)
// Sink is a sink for writing snapshot data to a Snapshot store.
type Sink struct {
str *Store
@ -123,7 +115,7 @@ func (s *Sink) processSnapshotData() (retErr error) {
return err
}
} else {
return ErrInvalidSnapshot
return fmt.Errorf("invalid snapshot data file: %s", s.dataFD.Name())
}
// Indicate snapshot data been successfully persisted to disk.
@ -144,7 +136,7 @@ func (s *Sink) processSnapshotData() (retErr error) {
// double-check that it's valid.
snapDB := filepath.Join(s.str.Dir(), snapshots[0]+".db")
if !db.IsValidSQLiteFile(snapDB) {
return ErrInvalidStore
return fmt.Errorf("data for first snapshot is not a SQLite file: %s", snapDB)
}
return nil
} else if len(snapshots) >= 2 {
@ -157,7 +149,7 @@ func (s *Sink) processSnapshotData() (retErr error) {
if db.IsValidSQLiteWALFile(snapNewWAL) {
// Double-check that the previous snapshot is a valid SQLite file.
if !db.IsValidSQLiteFile(snapPrevDB) {
return ErrInvalidStore
return fmt.Errorf("pre-existing data is not a SQLite file: %s", snapPrevDB)
}
// It is, so rename it and replay the WAL into it.
if err := os.Rename(snapPrevDB, snapNewDB); err != nil {
@ -169,10 +161,11 @@ func (s *Sink) processSnapshotData() (retErr error) {
} else if !db.IsValidSQLiteFile(snapNewDB) {
// There is no valid WAL file for the latest snapshot, and no valid
// SQLite file for the latest snapshot. This is an invalid state.
return ErrInvalidStore
return fmt.Errorf("no valid SQLite file or WAL file for latest snapshot")
}
} else {
return ErrInvalidStore
return fmt.Errorf("unexpected number of snapshots: %d", len(snapshots))
}
s.str.Reap()

@ -1,6 +1,9 @@
package snapshot2
import (
"bytes"
"io"
"os"
"testing"
"github.com/hashicorp/raft"
@ -58,6 +61,93 @@ func Test_NewSinkOpenCloseFail(t *testing.T) {
}
}
func Test_SinkFullSnapshot(t *testing.T) {
store := mustStore(t)
sink := NewSink(store, makeRaftMeta("snap-1234", 3, 2, 1))
if sink == nil {
t.Fatalf("Failed to create new sink")
}
if err := sink.Open(); err != nil {
t.Fatalf("Failed to open sink: %v", err)
}
sqliteFile := mustOpenFile(t, "testdata/db-and-wals/backup.db")
defer sqliteFile.Close()
n, err := io.Copy(sink, sqliteFile)
if err != nil {
t.Fatalf("Failed to copy SQLite file: %v", err)
}
if n != mustGetFileSize(t, "testdata/db-and-wals/backup.db") {
t.Fatalf("Unexpected number of bytes copied: %d", n)
}
if err := sink.Close(); err != nil {
t.Fatalf("Failed to close sink: %v", err)
}
// Check snapshot is available and correct.
expMeta := makeRaftMeta("snap-1234", 3, 2, 1)
metas, err := store.List()
if err != nil {
t.Fatalf("Failed to list snapshots: %v", err)
}
if len(metas) != 1 {
t.Fatalf("Expected 1 snapshot, got %d", len(metas))
}
compareMetas(t, expMeta, metas[0])
// Check snapshot data is available and correct.
meta, fd, err := store.Open("snap-1234")
if err != nil {
t.Fatalf("Failed to open snapshot: %v", err)
}
defer fd.Close()
compareMetas(t, expMeta, meta)
if !compareReaderToFile(t, fd, "testdata/db-and-wals/backup.db") {
t.Fatalf("Snapshot data does not match")
}
// Write a second full snapshot, it should be installed without issue.
}
func compareMetas(t *testing.T, m1, m2 *raft.SnapshotMeta) {
t.Helper()
if m1.ID != m2.ID {
t.Fatalf("Unexpected snapshot ID: %s", m1.ID)
}
if m1.Index != m2.Index {
t.Fatalf("Unexpected snapshot index: %d", m1.Index)
}
if m1.Term != m2.Term {
t.Fatalf("Unexpected snapshot term: %d", m1.Term)
}
if m1.ConfigurationIndex != m2.ConfigurationIndex {
t.Fatalf("Unexpected snapshot configuration index: %d", m1.ConfigurationIndex)
}
if m1.Version != m2.Version {
t.Fatalf("Unexpected snapshot version: %d", m1.Version)
}
}
func compareReaderToFile(t *testing.T, r io.Reader, path string) bool {
t.Helper()
fd := mustOpenFile(t, path)
defer fd.Close()
return compareReaderToReader(t, r, fd)
}
func compareReaderToReader(t *testing.T, r1, r2 io.Reader) bool {
t.Helper()
buf1, err := io.ReadAll(r1)
if err != nil {
t.Fatalf("Failed to read from reader 1: %v", err)
}
buf2, err := io.ReadAll(r2)
if err != nil {
t.Fatalf("Failed to read from reader 2: %v", err)
}
return bytes.Equal(buf1, buf2)
}
func mustStore(t *testing.T) *Store {
t.Helper()
str, err := NewStore(t.TempDir())
@ -77,3 +167,20 @@ func makeRaftMeta(id string, index, term, cfgIndex uint64) *raft.SnapshotMeta {
Version: 1,
}
}
func mustOpenFile(t *testing.T, path string) *os.File {
t.Helper()
fd, err := os.Open(path)
if err != nil {
t.Fatalf("Failed to open file: %v", err)
}
return fd
}
func mustGetFileSize(t *testing.T, path string) int64 {
stat, err := os.Stat(path)
if err != nil {
t.Fatalf("Failed to stat file: %v", err)
}
return stat.Size()
}

@ -5,6 +5,7 @@ import (
"expvar"
"fmt"
"io"
"log"
"os"
"path/filepath"
"runtime"
@ -40,12 +41,18 @@ func ResetStats() {
// Store stores Snapshots.
type Store struct {
dir string
dir string
logger *log.Logger
}
// NewStore returns a new Snapshot Store.
func NewStore(dir string) (*Store, error) {
return &Store{dir: dir}, nil
str := &Store{
dir: dir,
logger: log.New(os.Stderr, "[snapshot-store] ", log.LstdFlags),
}
str.logger.Printf("store initialized in %s", dir)
return str, nil
}
// Create creates a new Sink object, ready for writing a snapshot. Sinks make certain assumptions about
@ -142,7 +149,7 @@ func (s *Store) getSnapshots() ([]string, error) {
}
var snapshots []string
for _, d := range directories {
if !isTmpName(d.Name()) {
if !isTmpName(d.Name()) && d.IsDir() {
snapshots = append(snapshots, d.Name())
}
}

Binary file not shown.

@ -0,0 +1,34 @@
import sqlite3
import shutil
import os
# Database file
db_file = 'mydatabase.db'
# Open a connection to SQLite database
conn = sqlite3.connect(db_file)
cursor = conn.cursor()
# Enable WAL mode and disable automatic checkpointing
cursor.execute("PRAGMA journal_mode=WAL;")
cursor.execute("PRAGMA wal_autocheckpoint=0;")
cursor.execute("CREATE TABLE foo (id INTEGER PRIMARY KEY, value TEXT);")
conn.commit()
# Checkpoint the WAL file so we've got just a SQLite file
conn.execute("PRAGMA wal_checkpoint(TRUNCATE);")
shutil.copy(db_file, 'backup.db')
for i in range(0, 4):
# Write a new row
cursor.execute(f"INSERT INTO foo (value) VALUES ('Row {i}');")
conn.commit()
# Copy the newly-created WAL
shutil.copy(db_file + '-wal', f'wal-{i:02}')
# Checkpoint the WAL file
conn.execute("PRAGMA wal_checkpoint(TRUNCATE);")
conn.commit()
conn.close()

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

@ -0,0 +1 @@
{"Version":1,"ID":"2-18-1686659761026","Index":18,"Term":2,"Peers":"ka5sb2NhbGhvc3Q6NDAwMg==","Configuration":{"Servers":[{"Suffrage":0,"ID":"node1","Address":"localhost:4002"}]},"ConfigurationIndex":1,"Size":293,"CRC":"QhU1tAtfgpo="}

@ -0,0 +1 @@
{"Version":1,"ID":"2-8-1686659756627","Index":8,"Term":2,"Peers":"ka5sb2NhbGhvc3Q6NDAwMg==","Configuration":{"Servers":[{"Suffrage":0,"ID":"node1","Address":"localhost:4002"}]},"ConfigurationIndex":1,"Size":250,"CRC":"oInhBEZSzOo="}
Loading…
Cancel
Save