1
0
Fork 0

Remove snapshot 1

master
Philip O'Toole 11 months ago
parent 6a60108f0c
commit a69aa7838d

@ -1,223 +0,0 @@
package snapshot
import (
"encoding/json"
"fmt"
"log"
"os"
"path/filepath"
)
// Sink is a sink for writing snapshot data to a Snapshot store.
type Sink struct {
str *Store
workDir string
curGenDir string
nextGenDir string
meta *Meta
nWritten int64
dataFD *os.File
logger *log.Logger
closed bool
}
// NewSink creates a new Sink object.
func NewSink(str *Store, workDir, currGenDir, nextGenDir string, meta *Meta) *Sink {
return &Sink{
str: str,
workDir: workDir,
curGenDir: currGenDir,
nextGenDir: nextGenDir,
meta: meta,
logger: log.New(os.Stderr, "[snapshot-sink] ", log.LstdFlags),
}
}
// Open opens the sink for writing.
func (s *Sink) Open() error {
dataPath := filepath.Join(s.workDir, "snapshot-data.tmp")
dataFD, err := os.Create(dataPath)
if err != nil {
return err
}
s.dataFD = dataFD
return nil
}
// Write writes snapshot data to the sink. The snapshot is not in place
// until Close is called.
func (s *Sink) Write(p []byte) (n int, err error) {
n, err = s.dataFD.Write(p)
s.nWritten += int64(n)
return
}
// ID returns the ID of the snapshot being written.
func (s *Sink) ID() string {
return s.meta.ID
}
// Cancel cancels the snapshot. Cancel must be called if the snapshot is not
// going to be closed.
func (s *Sink) Cancel() error {
s.closed = true
s.cleanup() // Best effort, ignore errors.
return nil
}
// Close closes the sink, and finalizes creation of the snapshot. It is critical
// that Close is called, or the snapshot will not be in place.
func (s *Sink) Close() error {
if s.closed {
return nil
}
s.closed = true
defer s.cleanup()
if err := s.processSnapshotData(); err != nil {
return err
}
if !s.str.noAutoreap {
return s.str.Reap()
}
return nil
}
func (s *Sink) processSnapshotData() error {
if s.nWritten == 0 {
return nil
}
if _, err := s.dataFD.Seek(0, 0); err != nil {
return err
}
strHdr, _, err := NewStreamHeaderFromReader(s.dataFD)
if err != nil {
return fmt.Errorf("error reading stream header: %v", err)
}
if strHdr.GetVersion() != streamVersion {
return fmt.Errorf("unsupported snapshot version %d", strHdr.GetVersion())
}
if incSnap := strHdr.GetIncrementalSnapshot(); incSnap != nil {
return s.processIncrementalSnapshot(incSnap)
}
fullSnap := strHdr.GetFullSnapshot()
if fullSnap == nil {
return fmt.Errorf("got nil FullSnapshot")
}
return s.processFullSnapshot(fullSnap)
}
func (s *Sink) processIncrementalSnapshot(incSnap *IncrementalSnapshot) error {
s.logger.Printf("processing incremental snapshot")
incSnapDir := tmpName(filepath.Join(s.curGenDir, s.meta.ID))
if err := os.Mkdir(incSnapDir, 0755); err != nil {
return fmt.Errorf("error creating incremental snapshot directory: %v", err)
}
walPath := filepath.Join(incSnapDir, snapWALFile)
if err := os.WriteFile(walPath, incSnap.Data, 0644); err != nil {
return fmt.Errorf("error writing WAL data: %v", err)
}
if err := s.writeMeta(incSnapDir, false); err != nil {
return err
}
// We're done! Move the directory into place.
dstDir, err := moveFromTmpSync(incSnapDir)
if err != nil {
s.logger.Printf("failed to move incremental snapshot directory into place: %s", err)
return err
}
s.logger.Printf("incremental snapshot (ID %s) written to %s", s.meta.ID, dstDir)
return nil
}
func (s *Sink) processFullSnapshot(fullSnap *FullSnapshot) error {
s.logger.Printf("processing full snapshot")
// We need a new generational directory, and need to create the first
// snapshot in that directory.
nextGenDir := tmpName(s.nextGenDir)
if err := os.MkdirAll(nextGenDir, 0755); err != nil {
return fmt.Errorf("error creating full snapshot directory: %v", err)
}
// Rebuild the SQLite database from the snapshot data.
sqliteBasePath := filepath.Join(nextGenDir, baseSqliteFile)
if err := ReplayDB(fullSnap, s.dataFD, sqliteBasePath); err != nil {
return fmt.Errorf("error replaying DB: %v", err)
}
// Now create the first snapshot directory in the new generation.
snapDir := filepath.Join(nextGenDir, s.meta.ID)
if err := os.MkdirAll(snapDir, 0755); err != nil {
return fmt.Errorf("error creating full snapshot directory: %v", err)
}
if err := s.writeMeta(snapDir, true); err != nil {
return err
}
// We're done! Move the generational directory into place.
dstDir, err := moveFromTmpSync(nextGenDir)
if err != nil {
s.logger.Printf("failed to move full snapshot directory into place: %s", err)
return err
}
// XXXX need to clear out any snaphot directories older than the one
// we just created. Maybe this should be done at startup? It's an edge case.
// Yeah, this is why empty snap directories need the "full" flag.
// Any snapshot directories older than a full snapshot directory can be
// removed.
s.logger.Printf("full snapshot (ID %s) written to %s", s.meta.ID, dstDir)
return nil
}
func (s *Sink) writeMeta(dir string, full bool) error {
s.meta.Full = full
return writeMeta(dir, s.meta)
}
func (s *Sink) cleanup() error {
if s.dataFD != nil {
if err := s.dataFD.Close(); err != nil {
return err
}
if err := os.Remove(s.dataFD.Name()); err != nil {
return err
}
}
if err := os.RemoveAll(tmpName(s.nextGenDir)); err != nil {
return err
}
if err := os.RemoveAll(tmpName(s.curGenDir)); err != nil {
return err
}
return nil
}
func writeMeta(dir string, meta *Meta) error {
fh, err := os.Create(filepath.Join(dir, metaFileName))
if err != nil {
return fmt.Errorf("error creating meta file: %v", err)
}
defer fh.Close()
// Write out as JSON
enc := json.NewEncoder(fh)
if err = enc.Encode(meta); err != nil {
return fmt.Errorf("failed to encode meta: %v", err)
}
if err := fh.Sync(); err != nil {
return err
}
return fh.Close()
}

@ -1,250 +0,0 @@
package snapshot
import (
"bytes"
"fmt"
"io"
"os"
"path/filepath"
"testing"
"github.com/hashicorp/raft"
"github.com/rqlite/rqlite/command/encoding"
"github.com/rqlite/rqlite/db"
)
func Test_NewSinkOpenCloseOK(t *testing.T) {
tmpDir := t.TempDir()
workDir := filepath.Join(tmpDir, "work")
mustCreateDir(workDir)
currGenDir := filepath.Join(tmpDir, "curr")
nextGenDir := filepath.Join(tmpDir, "next")
str := mustNewStoreForSinkTest(t)
s := NewSink(str, workDir, currGenDir, nextGenDir, &Meta{})
if err := s.Open(); err != nil {
t.Fatal(err)
}
if err := s.Close(); err != nil {
t.Fatal(err)
}
}
func Test_SinkFullSnapshot(t *testing.T) {
tmpDir := t.TempDir()
workDir := filepath.Join(tmpDir, "work")
mustCreateDir(workDir)
currGenDir := filepath.Join(tmpDir, "curr")
nextGenDir := filepath.Join(tmpDir, "next")
str := mustNewStoreForSinkTest(t)
s := NewSink(str, workDir, currGenDir, nextGenDir, makeMeta("snap-1234", 3, 2, 1))
if err := s.Open(); err != nil {
t.Fatal(err)
}
sqliteFile := "testdata/db-and-wals/backup.db"
wal0 := "testdata/db-and-wals/wal-00"
wal1 := "testdata/db-and-wals/wal-01"
wal2 := "testdata/db-and-wals/wal-02"
wal3 := "testdata/db-and-wals/wal-03"
stream, err := NewFullStream(sqliteFile, wal0, wal1, wal2, wal3)
if err != nil {
t.Fatal(err)
}
defer stream.Close()
if io.Copy(s, stream); err != nil {
t.Fatal(err)
}
if err := s.Close(); err != nil {
t.Fatal(err)
}
// Next generation directory should exist and contain a snapshot.
if !dirExists(nextGenDir) {
t.Fatalf("next generation directory %s does not exist", nextGenDir)
}
if !dirExists(filepath.Join(nextGenDir, "snap-1234")) {
t.Fatalf("next generation directory %s does not contain snapshot directory", nextGenDir)
}
if !fileExists(filepath.Join(nextGenDir, baseSqliteFile)) {
t.Fatalf("next generation directory %s does not contain base SQLite file", nextGenDir)
}
expMetaPath := filepath.Join(nextGenDir, "snap-1234", metaFileName)
if !fileExists(expMetaPath) {
t.Fatalf("meta file does not exist at %s", expMetaPath)
}
// Check SQLite database has been created correctly.
db, err := db.Open(filepath.Join(nextGenDir, baseSqliteFile), false, false)
if err != nil {
t.Fatal(err)
}
defer db.Close()
rows, err := db.QueryStringStmt("SELECT COUNT(*) FROM foo")
if err != nil {
t.Fatal(err)
}
if exp, got := `[{"columns":["COUNT(*)"],"types":["integer"],"values":[[4]]}]`, asJSON(rows); exp != got {
t.Fatalf("unexpected results for query, expected %s, got %s", exp, got)
}
}
func Test_SinkIncrementalSnapshot(t *testing.T) {
tmpDir := t.TempDir()
workDir := filepath.Join(tmpDir, "work")
mustCreateDir(workDir)
currGenDir := filepath.Join(tmpDir, "curr")
mustCreateDir(currGenDir)
nextGenDir := filepath.Join(tmpDir, "next")
str := mustNewStoreForSinkTest(t)
s := NewSink(str, workDir, currGenDir, nextGenDir, makeMeta("snap-1234", 3, 2, 1))
if err := s.Open(); err != nil {
t.Fatal(err)
}
walData := mustReadFile("testdata/db-and-wals/wal-00")
stream, err := NewIncrementalStream(walData)
if err != nil {
t.Fatal(err)
}
defer stream.Close()
if io.Copy(s, stream); err != nil {
t.Fatal(err)
}
if err := s.Close(); err != nil {
t.Fatal(err)
}
if dirExists(nextGenDir) {
t.Fatalf("next generation directory %s exists", nextGenDir)
}
if !dirExists(filepath.Join(currGenDir, "snap-1234")) {
t.Fatalf("current generation directory %s does not contain snapshot directory", currGenDir)
}
expWALPath := filepath.Join(currGenDir, "snap-1234", snapWALFile)
if !fileExists(expWALPath) {
t.Fatalf("WAL file does not exist at %s", expWALPath)
}
if !bytes.Equal(walData, mustReadFile(expWALPath)) {
t.Fatalf("WAL file data does not match")
}
expMetaPath := filepath.Join(currGenDir, "snap-1234", metaFileName)
if !fileExists(expMetaPath) {
t.Fatalf("meta file does not exist at %s", expMetaPath)
}
}
func Test_SinkIncrementalSnapshot_NoWALData(t *testing.T) {
tmpDir := t.TempDir()
workDir := filepath.Join(tmpDir, "work")
mustCreateDir(workDir)
currGenDir := filepath.Join(tmpDir, "curr")
mustCreateDir(currGenDir)
nextGenDir := filepath.Join(tmpDir, "next")
str := mustNewStoreForSinkTest(t)
s := NewSink(str, workDir, currGenDir, nextGenDir, makeMeta("snap-1234", 3, 2, 1))
if err := s.Open(); err != nil {
t.Fatal(err)
}
stream, err := NewIncrementalStream(nil)
if err != nil {
t.Fatal(err)
}
defer stream.Close()
if io.Copy(s, stream); err != nil {
t.Fatal(err)
}
if err := s.Close(); err != nil {
t.Fatal(err)
}
if dirExists(nextGenDir) {
t.Fatalf("next generation directory %s exists", nextGenDir)
}
if !dirExists(filepath.Join(currGenDir, "snap-1234")) {
t.Fatalf("current generation directory %s does not contain snapshot directory", currGenDir)
}
expWALPath := filepath.Join(currGenDir, "snap-1234", snapWALFile)
if !emptyFileExists(expWALPath) {
t.Fatalf("expected empty WAL file at %s", expWALPath)
}
expMetaPath := filepath.Join(currGenDir, "snap-1234", metaFileName)
if !fileExists(expMetaPath) {
t.Fatalf("meta file does not exist at %s", expMetaPath)
}
}
func mustNewStoreForSinkTest(t *testing.T) *Store {
tmpDir := t.TempDir()
str, err := NewStore(tmpDir)
if err != nil {
t.Fatal(err)
}
return str
}
func mustCreateDir(path string) {
if err := os.MkdirAll(path, 0755); err != nil {
panic(err)
}
}
func mustReadFile(path string) []byte {
b, err := os.ReadFile(path)
if err != nil {
panic(err)
}
return b
}
func emptyFileExists(path string) bool {
info, err := os.Stat(path)
if err != nil {
return false
}
return info.Size() == 0
}
func makeTestConfiguration(i, a string) raft.Configuration {
return raft.Configuration{
Servers: []raft.Server{
{
ID: raft.ServerID(i),
Address: raft.ServerAddress(a),
},
},
}
}
func makeMeta(id string, index, term, cfgIndex uint64) *Meta {
return &Meta{
SnapshotMeta: raft.SnapshotMeta{
ID: id,
Index: index,
Term: term,
Configuration: makeTestConfiguration("1", "localhost:1"),
ConfigurationIndex: cfgIndex,
Version: 1,
},
}
}
func asJSON(v interface{}) string {
enc := encoding.Encoder{}
b, err := enc.JSONMarshal(v)
if err != nil {
panic(fmt.Sprintf("failed to JSON marshal value: %s", err.Error()))
}
return string(b)
}

@ -1,120 +0,0 @@
package snapshot
import (
"expvar"
"fmt"
"io"
"os"
"path/filepath"
"time"
"github.com/hashicorp/raft"
"github.com/rqlite/rqlite/db"
)
// Snapshot represents a snapshot of the database state.
type Snapshot struct {
walData []byte
files []string
}
// NewWALSnapshot creates a new snapshot from a WAL.
func NewWALSnapshot(b []byte) *Snapshot {
return &Snapshot{
walData: b,
}
}
// NewFullSnapshot creates a new snapshot from a SQLite file and WALs.
func NewFullSnapshot(files ...string) *Snapshot {
return &Snapshot{
files: files,
}
}
// Persist writes the snapshot to the given sink.
func (s *Snapshot) Persist(sink raft.SnapshotSink) error {
startT := time.Now()
stream, err := s.OpenStream()
if err != nil {
return err
}
defer stream.Close()
n, err := io.Copy(sink, stream)
if err != nil {
return err
}
dur := time.Since(startT)
stats.Get(persistSize).(*expvar.Int).Set(n)
stats.Get(persistDuration).(*expvar.Int).Set(dur.Milliseconds())
return err
}
// Release is a no-op.
func (s *Snapshot) Release() {}
// OpenStream returns a stream for reading the snapshot.
func (s *Snapshot) OpenStream() (*Stream, error) {
if len(s.files) > 0 {
return NewFullStream(s.files...)
}
return NewIncrementalStream(s.walData)
}
// ReplayDB reconstructs the database from the given reader, and writes it to
// the given path.
func ReplayDB(fullSnap *FullSnapshot, r io.Reader, path string) error {
dbInfo := fullSnap.GetDb()
if dbInfo == nil {
return fmt.Errorf("got nil DB info")
}
sqliteBaseFD, err := os.Create(path)
if err != nil {
return fmt.Errorf("error creating SQLite file: %v", err)
}
defer sqliteBaseFD.Close()
if _, err := io.CopyN(sqliteBaseFD, r, dbInfo.Size); err != nil {
return fmt.Errorf("error writing SQLite file data: %v", err)
}
if sqliteBaseFD.Sync() != nil {
return fmt.Errorf("error syncing SQLite file: %v", err)
}
if err := sqliteBaseFD.Close(); err != nil {
return fmt.Errorf("error closing SQLite file: %v", err)
}
// Write out any WALs.
var walFiles []string
for i, wal := range fullSnap.GetWals() {
if err := func() error {
if wal == nil {
return fmt.Errorf("got nil WAL")
}
walName := filepath.Join(filepath.Dir(path), baseSqliteWALFile+fmt.Sprintf("%d", i))
walFD, err := os.Create(walName)
if err != nil {
return fmt.Errorf("error creating WAL file: %v", err)
}
defer walFD.Close()
if _, err := io.CopyN(walFD, r, wal.Size); err != nil {
return fmt.Errorf("error writing WAL file data: %v", err)
}
if walFD.Sync() != nil {
return fmt.Errorf("error syncing WAL file: %v", err)
}
walFiles = append(walFiles, walName)
return nil
}(); err != nil {
return err
}
}
// Checkpoint the WAL files into the base SQLite file
if err := db.ReplayWAL(path, walFiles, false); err != nil {
return fmt.Errorf("error checkpointing WAL: %v", err)
}
return nil
}

@ -1,882 +0,0 @@
package snapshot
import (
"encoding/json"
"errors"
"expvar"
"fmt"
"io"
"log"
"os"
"path/filepath"
"runtime"
"sort"
"strconv"
"strings"
sync "sync"
"time"
"github.com/hashicorp/raft"
"github.com/rqlite/rqlite/db"
)
func init() {
stats = expvar.NewMap("snapshot")
ResetStats()
}
const (
minSnapshotRetain = 2
generationsDir = "generations"
firstGeneration = "0000000001"
baseSqliteFile = "base.sqlite"
baseSqliteWALFile = "base.sqlite-wal"
snapWALFile = "wal"
metaFileName = "meta.json"
tmpSuffix = ".tmp"
)
const (
persistSize = "latest_persist_size"
persistDuration = "latest_persist_duration"
reap_snapshots_duration = "reap_snapshots_duration"
numSnapshotsReaped = "num_snapshots_reaped"
numGenerationsReaped = "num_generations_reaped"
)
var (
// ErrRetainCountTooLow is returned when the retain count is too low.
ErrRetainCountTooLow = errors.New("retain count must be >= 2")
// ErrSnapshotNotFound is returned when a snapshot is not found.
ErrSnapshotNotFound = errors.New("snapshot not found")
// ErrSnapshotBaseMissing is returned when a snapshot base SQLite file is missing.
ErrSnapshotBaseMissing = errors.New("snapshot base SQLite file missing")
)
// stats captures stats for the Store.
var stats *expvar.Map
// ResetStats resets the expvar stats for this module. Mostly for test purposes.
func ResetStats() {
stats.Init()
stats.Add(persistSize, 0)
stats.Add(persistDuration, 0)
stats.Add(reap_snapshots_duration, 0)
stats.Add(numSnapshotsReaped, 0)
stats.Add(numGenerationsReaped, 0)
}
// Meta represents the metadata for a snapshot.
type Meta struct {
raft.SnapshotMeta
Full bool
}
// LockingSink is a wrapper around a SnapshotSink that ensures that the
// Store has handed out only 1 sink at a time.
type LockingSink struct {
raft.SnapshotSink
str *Store
}
// Close closes the sink, unlocking the Store for creation of a new sink.
func (s *LockingSink) Close() error {
s.str.sinkMu.Unlock()
return s.SnapshotSink.Close()
}
// Cancel cancels the sink, unlocking the Store for creation of a new sink.
func (s *LockingSink) Cancel() error {
s.str.sinkMu.Unlock()
return s.SnapshotSink.Cancel()
}
// Store is a store for snapshots.
type Store struct {
rootDir string
workDir string
generationsDir string
sinkMu sync.Mutex
noAutoreap bool
logger *log.Logger
}
// NewStore creates a new Store object.
func NewStore(dir string) (*Store, error) {
genDir := filepath.Join(dir, generationsDir)
if err := os.MkdirAll(genDir, 0755); err != nil {
return nil, err
}
s := &Store{
rootDir: dir,
workDir: filepath.Join(dir, "scratchpad"),
generationsDir: genDir,
logger: log.New(os.Stderr, "[snapshot-store] ", log.LstdFlags),
}
if err := s.check(); err != nil {
return nil, fmt.Errorf("check failed: %s", err)
}
return s, nil
}
// Create creates a new Sink object, ready for writing a snapshot. Sinks make certain assumptions about
// the state of the store, and if those assumptions were changed by another Sink writing to the store
// it could cause failures. Therefore we only allow 1 Sink to be in existence at a time. This shouldn't
// be a problem, since snapshots are taken infrequently in one at a time.
func (s *Store) Create(version raft.SnapshotVersion, index, term uint64, configuration raft.Configuration,
configurationIndex uint64, trans raft.Transport) (retSink raft.SnapshotSink, retErr error) {
s.sinkMu.Lock()
defer func() {
if retErr != nil {
s.sinkMu.Unlock()
}
}()
currGenDir, ok, err := s.GetCurrentGenerationDir()
if err != nil {
return nil, err
}
nextGenDir, err := s.GetNextGenerationDir()
if err != nil {
return nil, err
}
if !ok {
// With an empty store, the snapshot will be written to the same directory
// regardless of whether it's a full or incremental snapshot.
currGenDir = nextGenDir
}
meta := &Meta{
SnapshotMeta: raft.SnapshotMeta{
ID: snapshotName(term, index),
Index: index,
Term: term,
Configuration: configuration,
ConfigurationIndex: configurationIndex,
Version: version,
},
}
sink := NewSink(s, s.workDir, currGenDir, nextGenDir, meta)
if err := sink.Open(); err != nil {
sink.Cancel()
return nil, fmt.Errorf("failed to open Sink: %v", err)
}
return &LockingSink{sink, s}, nil
}
// List returns a list of all the snapshots in the Store.
func (s *Store) List() ([]*raft.SnapshotMeta, error) {
gen, ok, err := s.GetCurrentGenerationDir()
if err != nil {
return nil, err
}
if !ok {
return nil, nil
}
snapshots, err := s.getSnapshots(gen)
if err != nil {
return nil, err
}
// Convert to the type Raft expects and make only 1 available.
var snaps = []*raft.SnapshotMeta{}
if len(snapshots) > 0 {
snaps = append(snaps, &snapshots[0].SnapshotMeta)
}
return snaps, nil
}
// Open opens the snapshot with the given ID.
func (s *Store) Open(id string) (*raft.SnapshotMeta, io.ReadCloser, error) {
generations, err := s.GetGenerations()
if err != nil {
return nil, nil, err
}
var meta *raft.SnapshotMeta
for i := len(generations) - 1; i >= 0; i-- {
genDir := filepath.Join(s.generationsDir, generations[i])
snapshots, err := s.getSnapshots(genDir)
if err != nil {
return nil, nil, err
}
if len(snapshots) == 0 {
continue
}
sort.Sort(metaSlice(snapshots))
if !metaSlice(snapshots).Contains(id) {
// Try the previous generation.
continue
}
// Always include the base SQLite file. There may not be a snapshot directory
// if it's been checkpointed due to snapshot-reaping.
baseSqliteFilePath := filepath.Join(genDir, baseSqliteFile)
if !fileExists(baseSqliteFilePath) {
return nil, nil, ErrSnapshotBaseMissing
}
files := []string{baseSqliteFilePath}
for _, snap := range snapshots {
if !snap.Full {
// Only include WAL files for incremental snapshots, since base SQLite database
// is always included
snapWALFilePath := filepath.Join(genDir, snap.ID, snapWALFile)
if !fileExists(snapWALFilePath) {
return nil, nil, fmt.Errorf("WAL file %s does not exist", snapWALFilePath)
}
files = append(files, snapWALFilePath)
}
if snap.ID == id {
// Stop after we've reached the requested snapshot
meta = &raft.SnapshotMeta{
ID: snap.ID,
Index: snap.Index,
Term: snap.Term,
Configuration: snap.Configuration,
ConfigurationIndex: snap.ConfigurationIndex,
Version: snap.Version,
}
break
}
}
str, err := NewFullStream(files...)
if err != nil {
return nil, nil, err
}
meta.Size = str.Size()
s.logger.Printf("opened snapshot %s successfully (size=%d)", id, meta.Size)
return meta, str, nil
}
return nil, nil, ErrSnapshotNotFound
}
// Dir returns the directory where the snapshots are stored.
func (s *Store) Dir() string {
return s.rootDir
}
// Restore restores the snapshot with the given ID to the given path.
func (s *Store) Restore(id string, dir string) (string, error) {
_, rc, err := s.Open(id)
if err != nil {
return "", err
}
defer rc.Close()
return s.RestoreFromReader(rc, dir)
}
// RestoreFromReader restores the snapshot from the given reader to the given path.
func (s *Store) RestoreFromReader(r io.Reader, dir string) (string, error) {
// Create the destination directory and SQLite file path
if err := os.MkdirAll(dir, 0755); err != nil {
return "", err
}
sqliteFD, err := os.CreateTemp(dir, "restored-*.sqlite")
if err != nil {
return "", err
}
if err := sqliteFD.Close(); err != nil {
return "", err
}
strHdr, _, err := NewStreamHeaderFromReader(r)
if err != nil {
return "", fmt.Errorf("error reading stream header: %v", err)
}
fullSnap := strHdr.GetFullSnapshot()
if fullSnap == nil {
return "", fmt.Errorf("got nil FullSnapshot")
}
if err := ReplayDB(fullSnap, r, sqliteFD.Name()); err != nil {
return "", fmt.Errorf("error replaying DB: %v", err)
}
return sqliteFD.Name(), nil
}
// Stats returns stats about the Snapshot Store.
func (s *Store) Stats() (map[string]interface{}, error) {
ng, err := s.GetNextGeneration()
if err != nil {
return nil, err
}
dirSize, err := dirSize(s.rootDir)
if err != nil {
return nil, err
}
stats := map[string]interface{}{
"root_dir": s.rootDir,
"size": dirSize,
"full_needed": s.FullNeeded(),
"next_generation": ng,
"auto_reap": !s.noAutoreap,
}
snaps, err := s.List()
if err != nil {
return nil, err
}
if len(snaps) > 0 {
var snapsAvailable []string
for i := range snaps {
snapsAvailable = append(snapsAvailable, snaps[i].ID)
}
stats["available"] = snapsAvailable
}
generations, err := s.GetGenerations()
if err != nil {
return nil, err
}
if len(generations) > 0 {
stats["generations"] = generations
}
return stats, nil
}
// FullNeeded returns true if the next type of snapshot needed
// by the Store is a full snapshot.
func (s *Store) FullNeeded() bool {
currGenDir, ok, err := s.GetCurrentGenerationDir()
if err != nil {
return false
}
return !ok || !fileExists(filepath.Join(currGenDir, baseSqliteFile))
}
// GetNextGeneration returns the name of the next generation.
func (s *Store) GetNextGeneration() (string, error) {
generations, err := s.GetGenerations()
if err != nil {
return "", err
}
nextGen := 1
if len(generations) > 0 {
i, err := strconv.Atoi(generations[len(generations)-1])
if err != nil {
return "", err
}
nextGen = i + 1
}
return fmt.Sprintf("%010d", nextGen), nil
}
// GetNextGenerationDir returns the directory path of the next generation.
// It is not guaranteed to exist.
func (s *Store) GetNextGenerationDir() (string, error) {
nextGen, err := s.GetNextGeneration()
if err != nil {
return "", err
}
return filepath.Join(s.generationsDir, nextGen), nil
}
// GetGenerations returns a list of all existing generations, sorted
// from oldest to newest.
func (s *Store) GetGenerations() ([]string, error) {
entries, err := os.ReadDir(s.generationsDir)
if err != nil {
return nil, err
}
var generations []string
for _, entry := range entries {
if !entry.IsDir() || isTmpName(entry.Name()) {
continue
}
if _, err := strconv.Atoi(entry.Name()); err != nil {
continue
}
generations = append(generations, entry.Name())
}
return generations, nil
}
// GetCurrentGenerationDir returns the directory path of the current generation.
// If there are no generations, the function returns false, but no error.
func (s *Store) GetCurrentGenerationDir() (string, bool, error) {
generations, err := s.GetGenerations()
if err != nil {
return "", false, err
}
if len(generations) == 0 {
return "", false, nil
}
return filepath.Join(s.generationsDir, generations[len(generations)-1]), true, nil
}
// Reap reaps old generations, and reaps snapshots within the remaining generation.
func (s *Store) Reap() error {
if _, err := s.ReapGenerations(); err != nil {
return fmt.Errorf("failed to reap generations during reap: %s", err)
}
currDir, ok, err := s.GetCurrentGenerationDir()
if err != nil {
return fmt.Errorf("failed to get current generation directory during reap: %s", err)
}
if ok {
_, err = s.ReapSnapshots(currDir, 2)
if err != nil {
return fmt.Errorf("failed to reap snapshots during reap: %s", err)
}
}
return nil
}
// ReapGenerations removes old generations. It returns the number of generations
// removed, or an error.
func (s *Store) ReapGenerations() (int, error) {
generations, err := s.GetGenerations()
if err != nil {
return 0, err
}
if len(generations) == 0 {
return 0, nil
}
n := 0
for i := 0; i < len(generations)-1; i++ {
genDir := filepath.Join(s.generationsDir, generations[i])
if err := os.RemoveAll(genDir); err != nil {
return n, err
}
s.logger.Printf("reaped generation %s successfully", generations[i])
n++
}
stats.Add(numGenerationsReaped, int64(n))
return n, nil
}
// ReapSnapshots removes snapshots that are no longer needed. It does this by
// checkpointing WAL-based snapshots into the base SQLite file. The function
// returns the number of snapshots removed, or an error. The retain parameter
// specifies the number of snapshots to retain.
func (s *Store) ReapSnapshots(dir string, retain int) (n int, err error) {
startT := time.Now()
defer func() {
stats.Add(numSnapshotsReaped, int64(n))
if err == nil {
dur := time.Since(startT)
stats.Get(reap_snapshots_duration).(*expvar.Int).Set(dur.Milliseconds())
}
}()
if retain < minSnapshotRetain {
return 0, ErrRetainCountTooLow
}
snapshots, err := s.getSnapshots(dir)
if err != nil {
s.logger.Printf("failed to get snapshots in directory %s: %s", dir, err)
return 0, err
}
// Keeping multiple snapshots makes it much easier to reason about the fixing
// up the Snapshot store if we crash in the middle of snapshotting or reaping.
if len(snapshots) <= retain {
return 0, nil
}
// We need to checkpoint the WAL files starting with the oldest snapshot. We'll
// do this by opening the base SQLite file and then replaying the WAL files into it.
// We'll then delete each snapshot once we've checkpointed it.
sort.Sort(metaSlice(snapshots))
baseSqliteFilePath := filepath.Join(dir, baseSqliteFile)
n = 0
for _, snap := range snapshots[0 : len(snapshots)-retain] {
snapDirPath := filepath.Join(dir, snap.ID) // Path to the snapshot directory
walFileInSnapshot := filepath.Join(snapDirPath, snapWALFile) // Path to the WAL file in the snapshot
walToCheckpointFilePath := filepath.Join(dir, baseSqliteWALFile) // Path to the WAL file to checkpoint
// If the snapshot directory doesn't contain a WAL file, then the base SQLite
// file is the snapshot state, and there is no checkpointing to do.
if fileExists(walFileInSnapshot) {
// Copy the WAL file from the snapshot to a temporary location beside the base SQLite file.
// We do this so that we only delete the snapshot directory once we can be sure that
// we've copied it out fully. Renaming is not atomic on every OS, so let's be sure. We
// also use a temporary file name, so we know where the WAL came from if we exit here
// and need to clean up on a restart.
if err := copyWALFromSnapshot(walFileInSnapshot, walToCheckpointFilePath); err != nil {
s.logger.Printf("failed to copy WAL file from snapshot %s: %s", walFileInSnapshot, err)
return n, err
}
// Checkpoint the WAL file into the base SQLite file
if err := db.ReplayWAL(baseSqliteFilePath, []string{walToCheckpointFilePath}, false); err != nil {
s.logger.Printf("failed to checkpoint WAL file %s: %s", walToCheckpointFilePath, err)
return n, err
}
} else {
if err := removeDirSync(snapDirPath); err != nil {
s.logger.Printf("failed to remove full snapshot directory %s: %s", snapDirPath, err)
return n, err
}
}
n++
s.logger.Printf("reaped snapshot %s successfully", snap.ID)
}
return n, nil
}
// getSnapshots returns a list of all the snapshots in the given directory, sorted from
// most recently created to oldest created.
func (s *Store) getSnapshots(dir string) ([]*Meta, error) {
var snapMeta []*Meta
entries, err := os.ReadDir(dir)
if err != nil {
// If the directory doesn't exist, that's fine, just return an empty list
if os.IsNotExist(err) {
return snapMeta, nil
}
return nil, err
}
// Populate the metadata
for _, entry := range entries {
// Ignore any files or temporary snapshots
if !entry.IsDir() || isTmpName(entry.Name()) {
continue
}
// Try to read the meta data
meta, err := s.readMeta(filepath.Join(dir, entry.Name()))
if err != nil {
return nil, fmt.Errorf("failed to read meta for snapshot %s: %s", entry.Name(), err)
}
snapMeta = append(snapMeta, meta)
}
// Sort the snapshot, reverse so we get new -> old
sort.Sort(sort.Reverse(metaSlice(snapMeta)))
return snapMeta, nil
}
// readMeta is used to read the meta data in a given snapshot directory.
func (s *Store) readMeta(dir string) (*Meta, error) {
// Open the meta file
metaPath := filepath.Join(dir, metaFileName)
fh, err := os.Open(metaPath)
if err != nil {
return nil, err
}
defer fh.Close()
// Read in the JSON
meta := &Meta{}
dec := json.NewDecoder(fh)
if err := dec.Decode(meta); err != nil {
return nil, err
}
return meta, nil
}
func (s *Store) check() (retError error) {
defer s.logger.Printf("check complete")
s.logger.Printf("checking snapshot store at %s", s.rootDir)
var n int
if err := s.resetWorkDir(); err != nil {
return fmt.Errorf("failed to reset work directory: %s", err)
}
// Simplify logic by reaping generations first.
n, err := s.ReapGenerations()
if err != nil {
return fmt.Errorf("failed to reap generations: %s", err)
}
s.logger.Printf("reaped %d generations", n)
// Remove any temporary generational directories. They represent operations
// that were interrupted.
entries, err := os.ReadDir(s.generationsDir)
if err != nil {
return err
}
for _, entry := range entries {
if !isTmpName(entry.Name()) {
continue
}
if err := os.RemoveAll(filepath.Join(s.generationsDir, entry.Name())); err != nil {
return fmt.Errorf("failed to remove temporary generation directory %s: %s", entry.Name(), err)
}
n++
}
s.logger.Printf("removed %d temporary generation directories", n)
// Remove any temporary files in the current generation.
currGenDir, ok, err := s.GetCurrentGenerationDir()
if err != nil {
return err
}
if !ok {
return nil
}
entries, err = os.ReadDir(currGenDir)
if err != nil {
return err
}
n = 0
for _, entry := range entries {
if isTmpName(entry.Name()) {
if err := os.RemoveAll(filepath.Join(currGenDir, entry.Name())); err != nil {
return fmt.Errorf("failed to remove temporary file %s: %s", entry.Name(), err)
}
n++
}
}
s.logger.Printf("removed %d temporary files from current generation", n)
baseSqliteFilePath := filepath.Join(currGenDir, baseSqliteFile)
baseSqliteWALFilePath := filepath.Join(currGenDir, baseSqliteWALFile)
// Any snapshots in the current generation?
snapshots, err := s.getSnapshots(currGenDir)
if err != nil {
return fmt.Errorf("failed to get snapshots: %s", err)
}
if len(snapshots) == 0 {
// An empty current generation is useless. This could happen if the very first
// snapshot was interrupted after writing the base SQLite file, but before
// moving its snapshot directory into place.
if err := os.RemoveAll(currGenDir); err != nil {
return fmt.Errorf("failed to remove empty current generation directory %s: %s", currGenDir, err)
}
s.logger.Printf("removed an empty current generation directory")
return nil
}
// If we have no base file, we shouldn't have any snapshot directories. If we
// do it's an inconsistent state which we cannot repair, and needs to be flagged.
if !fileExists(baseSqliteFilePath) {
return ErrSnapshotBaseMissing
}
s.logger.Printf("found base SQLite file at %s", baseSqliteFilePath)
// If we have a WAL file in the current generation which ends with the same ID as
// the oldest snapshot, then the copy of the WAL from the snapshot and subsequent
// checkpointing was interrupted. We need to redo the move-from-snapshot operation.
sort.Sort(metaSlice(snapshots))
walSnapshotCopyPath := walSnapCopyName(currGenDir, snapshots[0].ID)
snapDirPath := filepath.Join(currGenDir, snapshots[0].ID)
if fileExists(walSnapshotCopyPath) {
s.logger.Printf("found uncheckpointed copy of WAL file from snapshot %s", snapshots[0].ID)
if err := os.Remove(walSnapshotCopyPath); err != nil {
return fmt.Errorf("failed to remove copy of WAL file %s: %s", walSnapshotCopyPath, err)
}
if err := copyWALFromSnapshot(snapDirPath, baseSqliteWALFilePath); err != nil {
s.logger.Printf("failed to copy WAL file from snapshot %s: %s", snapshots[0].ID, err)
return err
}
// Now we can remove the snapshot directory.
if err := removeDirSync(snapDirPath); err != nil {
return fmt.Errorf("failed to remove snapshot directory %s: %s", snapDirPath, err)
}
s.logger.Printf("completed copy of WAL file from snapshot %s", snapshots[0].ID)
}
// If we have a base SQLite file, and a WAL file sitting beside it, this implies
// that we were interrupted before completing a checkpoint operation, as part of
// reaping snapshots. Complete the checkpoint operation now.
if fileExists(baseSqliteFilePath) && fileExists(baseSqliteWALFilePath) {
if err := db.ReplayWAL(baseSqliteFilePath, []string{baseSqliteWALFilePath},
false); err != nil {
return fmt.Errorf("failed to replay WALs: %s", err)
}
if err := os.Remove(baseSqliteWALFilePath); err != nil {
return fmt.Errorf("failed to remove WAL file %s: %s", baseSqliteWALFilePath, err)
}
s.logger.Printf("completed checkpoint of WAL file %s", baseSqliteWALFilePath)
}
return nil
}
func (s *Store) resetWorkDir() error {
if err := os.RemoveAll(s.workDir); err != nil {
return fmt.Errorf("failed to remove work directory %s: %s", s.workDir, err)
}
if err := os.MkdirAll(s.workDir, 0755); err != nil {
return fmt.Errorf("failed to create work directory %s: %s", s.workDir, err)
}
return nil
}
// copyWALFromSnapshot copies the WAL file from the snapshot at the given path
// to the file at the given path. It does this in stages, so that we can be sure
// that the copy is complete before deleting the snapshot directory.
func copyWALFromSnapshot(srcWALPath string, dstWALPath string) error {
snapName := filepath.Base(srcWALPath)
snapDirPath := filepath.Dir(srcWALPath)
dstWALDir := filepath.Dir(dstWALPath)
walFileInSnapshotCopy := walSnapCopyName(dstWALDir, snapName)
if err := copyFileSync(srcWALPath, walFileInSnapshotCopy); err != nil {
return fmt.Errorf("failed to copy WAL file %s from snapshot: %s", srcWALPath, err)
}
// Delete the snapshot directory, since we have what we need now.
if err := removeDirSync(snapDirPath); err != nil {
return fmt.Errorf("failed to remove incremental snapshot directory %s: %s", snapDirPath, err)
}
// NOT HANDLING CRASHING HERE. XXXX FIX IN CHECK
// Move the WAL file to the correct name for checkpointing.
if err := os.Rename(walFileInSnapshotCopy, dstWALPath); err != nil {
return fmt.Errorf("failed to move WAL file %s: %s", walFileInSnapshotCopy, err)
}
return nil
}
// walSnapCopyName returns the path of the file used for the intermediate copy of
// the WAL file, for a given source snapshot. dstDir is the directory where the
// copy will be placed, and snapName is the name of the source snapshot.
func walSnapCopyName(dstDir, snapName string) string {
return filepath.Join(dstDir, baseSqliteWALFile+"."+snapName)
}
func isTmpName(name string) bool {
return filepath.Ext(name) == tmpSuffix
}
func fileExists(path string) bool {
_, err := os.Stat(path)
return !os.IsNotExist(err)
}
func dirExists(path string) bool {
stat, err := os.Stat(path)
return err == nil && stat.IsDir()
}
func copyFileSync(src, dst string) error {
srcFd, err := os.Open(src)
if err != nil {
return err
}
defer srcFd.Close()
dstFd, err := os.Create(dst)
if err != nil {
return err
}
defer dstFd.Close()
if _, err = io.Copy(dstFd, srcFd); err != nil {
return err
}
return dstFd.Sync()
}
func parentDir(dir string) string {
return filepath.Dir(dir)
}
func tmpName(path string) string {
return path + tmpSuffix
}
func nonTmpName(path string) string {
return strings.TrimSuffix(path, tmpSuffix)
}
func moveFromTmpSync(src string) (string, error) {
dst := nonTmpName(src)
if err := os.Rename(src, dst); err != nil {
return "", err
}
return dst, syncDirParentMaybe(dst)
}
func removeDirSync(dir string) error {
if err := os.RemoveAll(dir); err != nil {
return err
}
return syncDirParentMaybe(dir)
}
func syncDir(dir string) error {
fh, err := os.Open(dir)
if err != nil {
return err
}
defer fh.Close()
return fh.Sync()
}
// syncDirParentMaybe syncs the parent directory of the given
// directory, but only on non-Windows platforms.
func syncDirParentMaybe(dir string) error {
if runtime.GOOS == "windows" {
return nil
}
return syncDir(parentDir(dir))
}
// snapshotName generates a name for the snapshot.
func snapshotName(term, index uint64) string {
now := time.Now()
msec := now.UnixNano() / int64(time.Millisecond)
return fmt.Sprintf("%d-%d-%d", term, index, msec)
}
// metaSlice is a sortable slice of Meta, which are sorted
// by term, index, and then ID. Snapshots are sorted from oldest to newest.
type metaSlice []*Meta
func (s metaSlice) Len() int {
return len(s)
}
func (s metaSlice) Less(i, j int) bool {
if s[i].Term != s[j].Term {
return s[i].Term < s[j].Term
}
if s[i].Index != s[j].Index {
return s[i].Index < s[j].Index
}
return s[i].ID < s[j].ID
}
func (s metaSlice) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
func (s metaSlice) Contains(id string) bool {
for _, snap := range s {
if snap.ID == id {
return true
}
}
return false
}
// dirSize returns the total size of all files in the given directory
func dirSize(path string) (int64, error) {
var size int64
err := filepath.Walk(path, func(_ string, info os.FileInfo, err error) error {
if err != nil {
// If the file doesn't exist, we can ignore it. Snapshot files might
// disappear during walking.
if os.IsNotExist(err) {
return nil
}
return err
}
if !info.IsDir() {
size += info.Size()
}
return err
})
return size, err
}

@ -1,683 +0,0 @@
package snapshot
import (
"bytes"
"io"
"strings"
"testing"
"github.com/hashicorp/raft"
"github.com/rqlite/rqlite/db"
)
func Test_NewStore(t *testing.T) {
tmpDir := t.TempDir()
s, err := NewStore(tmpDir)
if err != nil {
t.Fatal(err)
}
if s == nil {
t.Fatal("expected non-nil store")
}
generations, err := s.GetGenerations()
if err != nil {
t.Fatalf("failed to get generations: %s", err.Error())
}
if len(generations) != 0 {
t.Fatalf("expected 0 generation, got %d", len(generations))
}
_, ok, err := s.GetCurrentGenerationDir()
if err != nil {
t.Fatalf("failed to get current generation dir: %s", err.Error())
}
if ok {
t.Fatalf("expected current generation dir not to exist")
}
nextGenDir, err := s.GetNextGenerationDir()
if err != nil {
t.Fatalf("failed to get next generation dir: %s", err.Error())
}
if !strings.HasSuffix(nextGenDir, firstGeneration) {
t.Fatalf("expected next generation dir to be empty, got %s", nextGenDir)
}
}
func Test_NewStore_ListOpenEmpty(t *testing.T) {
dir := t.TempDir()
s, err := NewStore(dir)
if err != nil {
t.Fatalf("failed to create snapshot store: %s", err)
}
if !s.FullNeeded() {
t.Fatalf("expected full snapshots to be needed")
}
if snaps, err := s.List(); err != nil {
t.Fatalf("failed to list snapshots: %s", err)
} else if len(snaps) != 0 {
t.Fatalf("expected 1 snapshots, got %d", len(snaps))
}
if _, _, err := s.Open("non-existent"); err != ErrSnapshotNotFound {
t.Fatalf("expected ErrSnapshotNotFound, got %s", err)
}
}
// Test_WALSnapshotStore_CreateFull performs detailed testing of the
// snapshot creation process. It is critical that snapshots are created
// correctly, so this test is thorough.
func Test_Store_CreateFullThenIncremental(t *testing.T) {
checkSnapshotCount := func(s *Store, exp int) *raft.SnapshotMeta {
snaps, err := s.List()
if err != nil {
t.Fatalf("failed to list snapshots: %s", err)
}
if exp, got := exp, len(snaps); exp != got {
t.Fatalf("expected %d snapshots, got %d", exp, got)
}
if len(snaps) == 0 {
return nil
}
return snaps[0]
}
dir := t.TempDir()
str, err := NewStore(dir)
if err != nil {
t.Fatalf("failed to create snapshot store: %s", err)
}
if !str.FullNeeded() {
t.Fatalf("expected full snapshots to be needed")
}
testConfig1 := makeTestConfiguration("1", "2")
sink, err := str.Create(1, 22, 33, testConfig1, 4, nil)
if err != nil {
t.Fatalf("failed to create 1st snapshot sink: %s", err)
}
//////////////////////////////////////////////////////////////////////////
// Create a full snapshot and write it to the sink.
fullSnap := NewFullSnapshot("testdata/db-and-wals/backup.db")
if err := fullSnap.Persist(sink); err != nil {
t.Fatalf("failed to persist full snapshot: %s", err)
}
if err := sink.Close(); err != nil {
t.Fatalf("failed to close sink: %s", err)
}
if str.FullNeeded() {
t.Fatalf("full snapshot still needed")
}
meta := checkSnapshotCount(str, 1)
if meta.Index != 22 || meta.Term != 33 {
t.Fatalf("unexpected snapshot metadata: %+v", meta)
}
// Open the latest snapshot and check that it's correct.
raftMeta, rc, err := str.Open(meta.ID)
if err != nil {
t.Fatalf("failed to open snapshot %s: %s", meta.ID, err)
}
crc := &countingReadCloser{rc: rc}
streamHdr, _, err := NewStreamHeaderFromReader(crc)
if err != nil {
t.Fatalf("error reading stream header: %v", err)
}
streamSnap := streamHdr.GetFullSnapshot()
if streamSnap == nil {
t.Fatal("got nil FullSnapshot")
}
dbInfo := streamSnap.GetDb()
if dbInfo == nil {
t.Fatal("got nil DB info")
}
if !compareReaderToFile(crc, "testdata/db-and-wals/backup.db") {
t.Fatalf("database file does not match what is in snapshot")
}
// should be no more data
if _, err := crc.Read(make([]byte, 1)); err != io.EOF {
t.Fatalf("expected EOF, got %v", err)
}
if err := crc.Close(); err != nil {
t.Fatalf("failed to close snapshot reader: %s", err)
}
if exp, got := raftMeta.Size, int64(crc.n); exp != got {
t.Fatalf("expected snapshot size to be %d, got %d", exp, got)
}
crc.Close()
//////////////////////////////////////////////////////////////////////////
// Incremental snapshot next
sink, err = str.Create(2, 55, 66, testConfig1, 4, nil)
if err != nil {
t.Fatalf("failed to create 2nd snapshot sink: %s", err)
}
walData := mustReadFile("testdata/db-and-wals/wal-00")
incSnap := NewWALSnapshot(walData)
if err := incSnap.Persist(sink); err != nil {
t.Fatalf("failed to persist incremental snapshot: %s", err)
}
if err := sink.Close(); err != nil {
t.Fatalf("failed to close sink: %s", err)
}
meta = checkSnapshotCount(str, 1)
if meta.Index != 55 || meta.Term != 66 {
t.Fatalf("unexpected snapshot metadata: %+v", meta)
}
// Open the latest snapshot again, and recreate the database so we
// can check its contents.
raftMeta, rc, err = str.Open(meta.ID)
if err != nil {
t.Fatalf("failed to open snapshot %s: %s", meta.ID, err)
}
crc = &countingReadCloser{rc: rc}
streamHdr, _, err = NewStreamHeaderFromReader(crc)
if err != nil {
t.Fatalf("error reading stream header: %v", err)
}
streamSnap = streamHdr.GetFullSnapshot()
if streamSnap == nil {
t.Fatal("got nil FullSnapshot")
}
tmpFile := t.TempDir() + "/db"
if err := ReplayDB(streamSnap, crc, tmpFile); err != nil {
t.Fatalf("failed to replay database: %s", err)
}
checkDB, err := db.Open(tmpFile, false, true)
if err != nil {
t.Fatalf("failed to open database: %s", err)
}
defer checkDB.Close()
// Database should now have 1 one after replaying the WAL.
rows, err := checkDB.QueryStringStmt("SELECT * FROM foo")
if err != nil {
t.Fatalf("failed to query database: %s", err)
}
if exp, got := `[{"columns":["id","value"],"types":["integer","text"],"values":[[1,"Row 0"]]}]`, asJSON(rows); exp != got {
t.Fatalf("unexpected results for query, exp %s, got %s", exp, got)
}
// should be no more data
if _, err := crc.Read(make([]byte, 1)); err != io.EOF {
t.Fatalf("expected EOF, got %v", err)
}
if exp, got := raftMeta.Size, int64(crc.n); exp != got {
t.Fatalf("expected snapshot size to be %d, got %d", exp, got)
}
crc.Close()
//////////////////////////////////////////////////////////////////////////
// Do it again!
sink, err = str.Create(2, 77, 88, testConfig1, 4, nil)
if err != nil {
t.Fatalf("failed to create 2nd snapshot sink: %s", err)
}
walData = mustReadFile("testdata/db-and-wals/wal-01")
incSnap = NewWALSnapshot(walData)
if err := incSnap.Persist(sink); err != nil {
t.Fatalf("failed to persist incremental snapshot: %s", err)
}
if err := sink.Close(); err != nil {
t.Fatalf("failed to close sink: %s", err)
}
meta = checkSnapshotCount(str, 1)
if meta.Index != 77 || meta.Term != 88 {
t.Fatalf("unexpected snapshot metadata: %+v", meta)
}
// Open the latest snapshot again, and recreate the database so we
// can check its contents.
raftMeta, rc, err = str.Open(meta.ID)
if err != nil {
t.Fatalf("failed to open snapshot %s: %s", meta.ID, err)
}
crc = &countingReadCloser{rc: rc}
streamHdr, _, err = NewStreamHeaderFromReader(crc)
if err != nil {
t.Fatalf("error reading stream header: %v", err)
}
streamSnap = streamHdr.GetFullSnapshot()
if streamSnap == nil {
t.Fatal("got nil FullSnapshot")
}
tmpFile = t.TempDir() + "/db"
if err := ReplayDB(streamSnap, crc, tmpFile); err != nil {
t.Fatalf("failed to replay database: %s", err)
}
checkDB, err = db.Open(tmpFile, false, true)
if err != nil {
t.Fatalf("failed to open database: %s", err)
}
defer checkDB.Close()
rows, err = checkDB.QueryStringStmt("SELECT * FROM foo")
if err != nil {
t.Fatalf("failed to query database: %s", err)
}
if exp, got := `[{"columns":["id","value"],"types":["integer","text"],"values":[[1,"Row 0"],[2,"Row 1"]]}]`, asJSON(rows); exp != got {
t.Fatalf("unexpected results for query, exp %s, got %s", exp, got)
}
// should be no more data
if _, err := crc.Read(make([]byte, 1)); err != io.EOF {
t.Fatalf("expected EOF, got %v", err)
}
if exp, got := raftMeta.Size, int64(crc.n); exp != got {
t.Fatalf("expected snapshot size to be %d, got %d", exp, got)
}
crc.Close()
//////////////////////////////////////////////////////////////////////////
// One last time, after a reaping took place in the middle.
sink, err = str.Create(2, 100, 200, testConfig1, 4, nil)
if err != nil {
t.Fatalf("failed to create 2nd snapshot sink: %s", err)
}
walData = mustReadFile("testdata/db-and-wals/wal-02")
incSnap = NewWALSnapshot(walData)
if err := incSnap.Persist(sink); err != nil {
t.Fatalf("failed to persist incremental snapshot: %s", err)
}
if err := sink.Close(); err != nil {
t.Fatalf("failed to close sink: %s", err)
}
meta = checkSnapshotCount(str, 1)
if meta.Index != 100 || meta.Term != 200 {
t.Fatalf("unexpected snapshot metadata: %+v", meta)
}
// Open the latest snapshot again, and recreate the database so we
// can check its contents.
raftMeta, rc, err = str.Open(meta.ID)
if err != nil {
t.Fatalf("failed to open snapshot %s: %s", meta.ID, err)
}
crc = &countingReadCloser{rc: rc}
streamHdr, _, err = NewStreamHeaderFromReader(crc)
if err != nil {
t.Fatalf("error reading stream header: %v", err)
}
streamSnap = streamHdr.GetFullSnapshot()
if streamSnap == nil {
t.Fatal("got nil FullSnapshot")
}
tmpFile = t.TempDir() + "/db"
if err := ReplayDB(streamSnap, crc, tmpFile); err != nil {
t.Fatalf("failed to replay database: %s", err)
}
checkDB, err = db.Open(tmpFile, false, true)
if err != nil {
t.Fatalf("failed to open database: %s", err)
}
defer checkDB.Close()
rows, err = checkDB.QueryStringStmt("SELECT * FROM foo")
if err != nil {
t.Fatalf("failed to query database: %s", err)
}
if exp, got := `[{"columns":["id","value"],"types":["integer","text"],"values":[[1,"Row 0"],[2,"Row 1"],[3,"Row 2"]]}]`, asJSON(rows); exp != got {
t.Fatalf("unexpected results for query, exp %s, got %s", exp, got)
}
// should be no more data
if _, err := crc.Read(make([]byte, 1)); err != io.EOF {
t.Fatalf("expected EOF, got %v", err)
}
if exp, got := raftMeta.Size, int64(crc.n); exp != got {
t.Fatalf("expected snapshot size to be %d, got %d", exp, got)
}
crc.Close()
// Finally, test via Restore() function.
tmpDir := t.TempDir()
rPath, err := str.Restore(meta.ID, tmpDir)
if err != nil {
t.Fatalf("failed to restore snapshot: %s", err)
}
restoredDB, err := db.Open(rPath, false, true)
if err != nil {
t.Fatalf("failed to open database: %s", err)
}
defer restoredDB.Close()
rows, err = restoredDB.QueryStringStmt("SELECT * FROM foo")
if err != nil {
t.Fatalf("failed to query database: %s", err)
}
if exp, got := `[{"columns":["id","value"],"types":["integer","text"],"values":[[1,"Row 0"],[2,"Row 1"],[3,"Row 2"]]}]`, asJSON(rows); exp != got {
t.Fatalf("unexpected results for query, exp %s, got %s", exp, got)
}
}
// Test_WALSnapshotStore_CreateFullThenFull ensures two full snapshots
// can be created and persisted back-to-back.
func Test_Store_CreateFullThenFull(t *testing.T) {
checkSnapshotCount := func(s *Store, exp int) *raft.SnapshotMeta {
snaps, err := s.List()
if err != nil {
t.Fatalf("failed to list snapshots: %s", err)
}
if exp, got := exp, len(snaps); exp != got {
t.Fatalf("expected %d snapshots, got %d", exp, got)
}
if len(snaps) == 0 {
return nil
}
return snaps[0]
}
dir := t.TempDir()
str, err := NewStore(dir)
if err != nil {
t.Fatalf("failed to create snapshot store: %s", err)
}
if !str.FullNeeded() {
t.Fatalf("expected full snapshots to be needed")
}
testConfig1 := makeTestConfiguration("1", "2")
//////////////////////////////////////////////////////////////////////////
// Create a full snapshot and write it to the sink.
sink, err := str.Create(1, 22, 33, testConfig1, 4, nil)
if err != nil {
t.Fatalf("failed to create 1st snapshot sink: %s", err)
}
fullSnap := NewFullSnapshot("testdata/db-and-wals/backup.db")
if err := fullSnap.Persist(sink); err != nil {
t.Fatalf("failed to persist full snapshot: %s", err)
}
if err := sink.Close(); err != nil {
t.Fatalf("failed to close sink: %s", err)
}
if str.FullNeeded() {
t.Fatalf("full snapshot still needed")
}
meta := checkSnapshotCount(str, 1)
if meta.Index != 22 || meta.Term != 33 {
t.Fatalf("unexpected snapshot metadata: %+v", meta)
}
//////////////////////////////////////////////////////////////////////////
// Create a second full snapshot and write it to the sink.
sink, err = str.Create(1, 44, 55, testConfig1, 4, nil)
if err != nil {
t.Fatalf("failed to create 1st snapshot sink: %s", err)
}
fullSnap = NewFullSnapshot("testdata/db-and-wals/backup.db")
if err := fullSnap.Persist(sink); err != nil {
t.Fatalf("failed to persist full snapshot: %s", err)
}
if err := sink.Close(); err != nil {
t.Fatalf("failed to close sink: %s", err)
}
if str.FullNeeded() {
t.Fatalf("full snapshot still needed")
}
meta = checkSnapshotCount(str, 1)
if meta.Index != 44 || meta.Term != 55 {
t.Fatalf("unexpected snapshot metadata: %+v", meta)
}
}
func Test_Store_ReapGenerations(t *testing.T) {
dir := t.TempDir()
s, err := NewStore(dir)
if err != nil {
t.Fatalf("failed to create snapshot store: %s", err)
}
testCurrGenDirIs := func(exp string) string {
curGenDir, ok, err := s.GetCurrentGenerationDir()
if err != nil {
t.Fatalf("failed to get current generation dir: %s", err.Error())
}
if !ok {
t.Fatalf("expected current generation dir to exist")
}
if curGenDir != exp {
t.Fatalf("expected current generation dir to be %s, got %s", exp, curGenDir)
}
return curGenDir
}
testGenCountIs := func(exp int) {
generations, err := s.GetGenerations()
if err != nil {
t.Fatalf("failed to get generations: %s", err.Error())
}
if exp, got := exp, len(generations); exp != got {
t.Fatalf("expected %d generations, got %d", exp, got)
}
}
testReapsOK := func(expN int) {
n, err := s.ReapGenerations()
if err != nil {
t.Fatalf("reaping failed: %s", err.Error())
}
if n != expN {
t.Fatalf("expected %d generations to be reaped, got %d", expN, n)
}
}
var nextGenDir string
nextGenDir, err = s.GetNextGenerationDir()
if err != nil {
t.Fatalf("failed to get next generation dir: %s", err.Error())
}
mustCreateDir(nextGenDir)
testCurrGenDirIs(nextGenDir)
testReapsOK(0)
// Create another generation and then tell the Store to reap.
nextGenDir, err = s.GetNextGenerationDir()
if err != nil {
t.Fatalf("failed to get next generation dir: %s", err.Error())
}
mustCreateDir(nextGenDir)
testGenCountIs(2)
testReapsOK(1)
testCurrGenDirIs(nextGenDir)
// Finally, test reaping lots of generations.
for i := 0; i < 10; i++ {
nextGenDir, err = s.GetNextGenerationDir()
if err != nil {
t.Fatalf("failed to get next generation dir: %s", err.Error())
}
mustCreateDir(nextGenDir)
}
testGenCountIs(11)
testReapsOK(10)
testGenCountIs(1)
testCurrGenDirIs(nextGenDir)
}
func compareReaderToFile(r io.Reader, path string) bool {
b := mustReadFile(path)
rb := mustReadAll(r)
return bytes.Equal(b, rb)
}
func mustReadAll(r io.Reader) []byte {
b, err := io.ReadAll(r)
if err != nil {
panic(err)
}
return b
}
type countingReadCloser struct {
rc io.ReadCloser
n int
}
func (c *countingReadCloser) Read(p []byte) (int, error) {
n, err := c.rc.Read(p)
c.n += n
return n, err
}
func (c *countingReadCloser) Close() error {
return c.rc.Close()
}
func Test_StoreReaping(t *testing.T) {
dir := t.TempDir()
str, err := NewStore(dir)
if err != nil {
t.Fatalf("failed to create snapshot store: %s", err)
}
str.noAutoreap = true
testConfig := makeTestConfiguration("1", "2")
// Create a full snapshot.
snapshot := NewFullSnapshot("testdata/db-and-wals/backup.db")
sink, err := str.Create(1, 1, 1, testConfig, 4, nil)
if err != nil {
t.Fatalf("failed to create snapshot sink: %s", err)
}
stream, err := snapshot.OpenStream()
if err != nil {
t.Fatalf("failed to open snapshot stream: %s", err)
}
_, err = io.Copy(sink, stream)
if err != nil {
t.Fatalf("failed to write snapshot: %s", err)
}
if err := sink.Close(); err != nil {
t.Fatalf("failed to close snapshot sink: %s", err)
}
createIncSnapshot := func(index, term uint64, file string) {
snapshot := NewWALSnapshot(mustReadFile(file))
sink, err := str.Create(1, index, term, testConfig, 4, nil)
if err != nil {
t.Fatalf("failed to create snapshot sink: %s", err)
}
stream, err := snapshot.OpenStream()
if err != nil {
t.Fatalf("failed to open snapshot stream: %s", err)
}
_, err = io.Copy(sink, stream)
if err != nil {
t.Fatalf("failed to write snapshot: %s", err)
}
if err := sink.Close(); err != nil {
t.Fatalf("failed to close snapshot sink: %s", err)
}
}
createIncSnapshot(3, 2, "testdata/db-and-wals/wal-00")
createIncSnapshot(5, 3, "testdata/db-and-wals/wal-01")
createIncSnapshot(7, 4, "testdata/db-and-wals/wal-02")
createIncSnapshot(9, 5, "testdata/db-and-wals/wal-03")
// There should be 5 snapshot directories in the current generation.
generationsDir, ok, err := str.GetCurrentGenerationDir()
if err != nil {
t.Fatalf("failed to get generations dir: %s", err)
}
if !ok {
t.Fatalf("expected generations dir to exist")
}
snaps, err := str.getSnapshots(generationsDir)
if err != nil {
t.Fatalf("failed to list snapshots: %s", err)
}
if exp, got := 5, len(snaps); exp != got {
t.Fatalf("expected %d snapshots, got %d", exp, got)
}
for _, snap := range snaps[0:4] {
if snap.Full {
t.Fatalf("snapshot %s is full", snap.ID)
}
}
if !snaps[4].Full {
t.Fatalf("snapshot %s is incremental", snaps[4].ID)
}
// Reap just the first snapshot, which is full.
n, err := str.ReapSnapshots(generationsDir, 4)
if err != nil {
t.Fatalf("failed to reap full snapshot: %s", err)
}
if exp, got := 1, n; exp != got {
t.Fatalf("expected %d snapshots to be reaped, got %d", exp, got)
}
snaps, err = str.getSnapshots(generationsDir)
if err != nil {
t.Fatalf("failed to list snapshots: %s", err)
}
if exp, got := 4, len(snaps); exp != got {
t.Fatalf("expected %d snapshots, got %d", exp, got)
}
// Reap all but the last two snapshots. The remaining snapshots
// should all be incremental.
n, err = str.ReapSnapshots(generationsDir, 2)
if err != nil {
t.Fatalf("failed to reap snapshots: %s", err)
}
if exp, got := 2, n; exp != got {
t.Fatalf("expected %d snapshots to be reaped, got %d", exp, got)
}
snaps, err = str.getSnapshots(generationsDir)
if err != nil {
t.Fatalf("failed to list snapshots: %s", err)
}
if exp, got := 2, len(snaps); exp != got {
t.Fatalf("expected %d snapshots, got %d", exp, got)
}
for _, snap := range snaps {
if snap.Full {
t.Fatalf("snapshot %s is full", snap.ID)
}
}
if snaps[0].Index != 9 && snaps[1].Term != 5 {
t.Fatal("snap 0 is wrong, exp: ", snaps[0].Index, snaps[1].Term)
}
if snaps[1].Index != 7 && snaps[1].Term != 3 {
t.Fatal("snap 1 is wrong, exp:", snaps[1].Index, snaps[1].Term)
}
// Open the latest snapshot, write it to disk, and check its contents.
_, rc, err := str.Open(snaps[0].ID)
if err != nil {
t.Fatalf("failed to open snapshot %s: %s", snaps[0].ID, err)
}
defer rc.Close()
strHdr, _, err := NewStreamHeaderFromReader(rc)
if err != nil {
t.Fatalf("error reading stream header: %v", err)
}
streamSnap := strHdr.GetFullSnapshot()
if streamSnap == nil {
t.Fatal("got nil FullSnapshot")
}
tmpFile := t.TempDir() + "/db"
if err := ReplayDB(streamSnap, rc, tmpFile); err != nil {
t.Fatalf("failed to replay database: %s", err)
}
// Check the database.
db, err := db.Open(tmpFile, false, true)
if err != nil {
t.Fatalf("failed to open database: %s", err)
}
defer db.Close()
rows, err := db.QueryStringStmt("SELECT COUNT(*) FROM foo")
if err != nil {
t.Fatalf("failed to query database: %s", err)
}
if exp, got := `[{"columns":["COUNT(*)"],"types":["integer"],"values":[[4]]}]`, asJSON(rows); exp != got {
t.Fatalf("unexpected results for query exp: %s got: %s", exp, got)
}
}

@ -1,246 +0,0 @@
package snapshot
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"io"
"os"
"google.golang.org/protobuf/proto"
)
const (
strHeaderLenSize = 8
streamVersion = 1
)
// NewStreamHeader creates a new StreamHeader.
func NewStreamHeader() *StreamHeader {
return &StreamHeader{
Version: streamVersion,
}
}
// NewStreamHeaderFromReader reads a StreamHeader from the given reader.
func NewStreamHeaderFromReader(r io.Reader) (*StreamHeader, int64, error) {
var totalSizeRead int64
b := make([]byte, strHeaderLenSize)
n, err := io.ReadFull(r, b)
if err != nil {
return nil, 0, fmt.Errorf("error reading snapshot header length: %v", err)
}
totalSizeRead += int64(n)
strHdrLen := binary.LittleEndian.Uint64(b)
b = make([]byte, strHdrLen)
n, err = io.ReadFull(r, b)
if err != nil {
return nil, 0, fmt.Errorf("error reading snapshot header %v", err)
}
totalSizeRead += int64(n)
strHdr := &StreamHeader{}
err = proto.Unmarshal(b, strHdr)
if err != nil {
return nil, 0, fmt.Errorf("error unmarshaling FSM snapshot: %v", err)
}
if strHdr.GetVersion() != streamVersion {
return nil, 0, fmt.Errorf("unsupported snapshot version %d", strHdr.GetVersion())
}
return strHdr, totalSizeRead, nil
}
// FileSize returns the total size of the files in the snapshot.
func (s *StreamHeader) FileSize() int64 {
if fs := s.GetFullSnapshot(); fs != nil {
var size int64
for _, di := range fs.Wals {
size += di.Size
}
size += fs.Db.Size
return size
}
return 0
}
// Stream is a stream of data that can be read from a snapshot.
type Stream struct {
headerLen int64
readClosers []io.ReadCloser
readClosersIdx int
totalFileSize int64
}
// NewIncrementalStream creates a new stream from a byte slice, presumably
// representing WAL data.
func NewIncrementalStream(data []byte) (*Stream, error) {
strHdr := NewStreamHeader()
strHdr.Payload = &StreamHeader_IncrementalSnapshot{
IncrementalSnapshot: &IncrementalSnapshot{
Data: data,
},
}
strHdrPb, err := proto.Marshal(strHdr)
if err != nil {
return nil, err
}
buf := make([]byte, strHeaderLenSize)
binary.LittleEndian.PutUint64(buf, uint64(len(strHdrPb)))
buf = append(buf, strHdrPb...)
return &Stream{
headerLen: int64(len(strHdrPb)),
readClosers: []io.ReadCloser{newRCBuffer(buf)},
}, nil
}
// NewFullStream creates a new stream from a SQLite file and 0 or more
// WAL files.
func NewFullStream(files ...string) (*Stream, error) {
if len(files) == 0 {
return nil, errors.New("no files provided")
}
var totalFileSize int64
// First file must be the SQLite database file.
fi, err := os.Stat(files[0])
if err != nil {
return nil, err
}
dbDataInfo := &FullSnapshot_DataInfo{
Size: fi.Size(),
}
totalFileSize += fi.Size()
// Rest, if any, are WAL files.
walDataInfos := make([]*FullSnapshot_DataInfo, len(files)-1)
for i := 1; i < len(files); i++ {
fi, err := os.Stat(files[i])
if err != nil {
return nil, err
}
walDataInfos[i-1] = &FullSnapshot_DataInfo{
Size: fi.Size(),
}
totalFileSize += fi.Size()
}
strHdr := NewStreamHeader()
strHdr.Payload = &StreamHeader_FullSnapshot{
FullSnapshot: &FullSnapshot{
Db: dbDataInfo,
Wals: walDataInfos,
},
}
strHdrPb, err := proto.Marshal(strHdr)
if err != nil {
return nil, err
}
buf := make([]byte, strHeaderLenSize)
binary.LittleEndian.PutUint64(buf, uint64(len(strHdrPb)))
buf = append(buf, strHdrPb...)
var readClosers []io.ReadCloser
readClosers = append(readClosers, newRCBuffer(buf))
for _, file := range files {
fd, err := os.Open(file)
if err != nil {
for _, rc := range readClosers {
rc.Close() // Ignore the error during cleanup
}
return nil, err
}
readClosers = append(readClosers, fd)
}
return &Stream{
headerLen: int64(len(strHdrPb)),
readClosers: readClosers,
totalFileSize: strHdr.FileSize(),
}, nil
}
// Size returns the total number of bytes that will be read from the stream,
// if the stream is fully read.
func (s *Stream) Size() int64 {
return int64(strHeaderLenSize + int64(s.headerLen) + s.totalFileSize)
}
// Read reads from the stream.
func (s *Stream) Read(p []byte) (n int, err error) {
if s.readClosersIdx >= len(s.readClosers) {
return 0, io.EOF
}
n, err = s.readClosers[s.readClosersIdx].Read(p)
if err != nil {
if err == io.EOF {
s.readClosersIdx++
if s.readClosersIdx < len(s.readClosers) {
err = nil
}
}
}
return n, err
}
// Close closes the stream.
func (s *Stream) Close() error {
for _, r := range s.readClosers {
if err := r.Close(); err != nil {
return err
}
}
return nil
}
// FilesFromStream reads a stream and returns the files contained within it.
// The first file is the SQLite database file, and the rest are WAL files.
// The function will return an error if the stream does not contain a
// FullSnapshot.
func FilesFromStream(r io.Reader) (string, []string, error) {
strHdr, _, err := NewStreamHeaderFromReader(r)
if err != nil {
return "", nil, fmt.Errorf("error reading stream header: %v", err)
}
fullSnap := strHdr.GetFullSnapshot()
if fullSnap == nil {
return "", nil, fmt.Errorf("got nil FullSnapshot")
}
dbInfo := fullSnap.GetDb()
if dbInfo == nil {
return "", nil, fmt.Errorf("got nil DB info")
}
sqliteFd, err := os.CreateTemp("", "stream-db.sqlite3")
if _, err := io.CopyN(sqliteFd, r, dbInfo.Size); err != nil {
return "", nil, fmt.Errorf("error writing SQLite file data: %v", err)
}
if sqliteFd.Close(); err != nil {
return "", nil, fmt.Errorf("error closing SQLite data file %v", err)
}
var walFiles []string
for i, di := range fullSnap.Wals {
tmpFd, err := os.CreateTemp("", fmt.Sprintf("stream-wal-%d.wal", i))
if err != nil {
return "", nil, fmt.Errorf("error creating WAL file: %v", err)
}
if _, err := io.CopyN(tmpFd, r, di.Size); err != nil {
return "", nil, fmt.Errorf("error writing WAL file data: %v", err)
}
if err := tmpFd.Close(); err != nil {
return "", nil, fmt.Errorf("error closing WAL file: %v", err)
}
walFiles = append(walFiles, tmpFd.Name())
}
return sqliteFd.Name(), walFiles, nil
}
func newRCBuffer(b []byte) io.ReadCloser {
return io.NopCloser(bytes.NewBuffer(b))
}

@ -1,404 +0,0 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.31.0
// protoc v3.6.1
// source: snapshot/stream_header.pb
package snapshot
import (
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
type IncrementalSnapshot struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Data []byte `protobuf:"bytes,1,opt,name=data,proto3" json:"data,omitempty"`
}
func (x *IncrementalSnapshot) Reset() {
*x = IncrementalSnapshot{}
if protoimpl.UnsafeEnabled {
mi := &file_snapshot_stream_header_pb_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *IncrementalSnapshot) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*IncrementalSnapshot) ProtoMessage() {}
func (x *IncrementalSnapshot) ProtoReflect() protoreflect.Message {
mi := &file_snapshot_stream_header_pb_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use IncrementalSnapshot.ProtoReflect.Descriptor instead.
func (*IncrementalSnapshot) Descriptor() ([]byte, []int) {
return file_snapshot_stream_header_pb_rawDescGZIP(), []int{0}
}
func (x *IncrementalSnapshot) GetData() []byte {
if x != nil {
return x.Data
}
return nil
}
type FullSnapshot struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Db *FullSnapshot_DataInfo `protobuf:"bytes,3,opt,name=db,proto3" json:"db,omitempty"`
Wals []*FullSnapshot_DataInfo `protobuf:"bytes,4,rep,name=wals,proto3" json:"wals,omitempty"`
}
func (x *FullSnapshot) Reset() {
*x = FullSnapshot{}
if protoimpl.UnsafeEnabled {
mi := &file_snapshot_stream_header_pb_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *FullSnapshot) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*FullSnapshot) ProtoMessage() {}
func (x *FullSnapshot) ProtoReflect() protoreflect.Message {
mi := &file_snapshot_stream_header_pb_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use FullSnapshot.ProtoReflect.Descriptor instead.
func (*FullSnapshot) Descriptor() ([]byte, []int) {
return file_snapshot_stream_header_pb_rawDescGZIP(), []int{1}
}
func (x *FullSnapshot) GetDb() *FullSnapshot_DataInfo {
if x != nil {
return x.Db
}
return nil
}
func (x *FullSnapshot) GetWals() []*FullSnapshot_DataInfo {
if x != nil {
return x.Wals
}
return nil
}
type StreamHeader struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Version int32 `protobuf:"varint,1,opt,name=version,proto3" json:"version,omitempty"`
// Types that are assignable to Payload:
//
// *StreamHeader_IncrementalSnapshot
// *StreamHeader_FullSnapshot
Payload isStreamHeader_Payload `protobuf_oneof:"payload"`
}
func (x *StreamHeader) Reset() {
*x = StreamHeader{}
if protoimpl.UnsafeEnabled {
mi := &file_snapshot_stream_header_pb_msgTypes[2]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *StreamHeader) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*StreamHeader) ProtoMessage() {}
func (x *StreamHeader) ProtoReflect() protoreflect.Message {
mi := &file_snapshot_stream_header_pb_msgTypes[2]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use StreamHeader.ProtoReflect.Descriptor instead.
func (*StreamHeader) Descriptor() ([]byte, []int) {
return file_snapshot_stream_header_pb_rawDescGZIP(), []int{2}
}
func (x *StreamHeader) GetVersion() int32 {
if x != nil {
return x.Version
}
return 0
}
func (m *StreamHeader) GetPayload() isStreamHeader_Payload {
if m != nil {
return m.Payload
}
return nil
}
func (x *StreamHeader) GetIncrementalSnapshot() *IncrementalSnapshot {
if x, ok := x.GetPayload().(*StreamHeader_IncrementalSnapshot); ok {
return x.IncrementalSnapshot
}
return nil
}
func (x *StreamHeader) GetFullSnapshot() *FullSnapshot {
if x, ok := x.GetPayload().(*StreamHeader_FullSnapshot); ok {
return x.FullSnapshot
}
return nil
}
type isStreamHeader_Payload interface {
isStreamHeader_Payload()
}
type StreamHeader_IncrementalSnapshot struct {
IncrementalSnapshot *IncrementalSnapshot `protobuf:"bytes,2,opt,name=incremental_snapshot,json=incrementalSnapshot,proto3,oneof"`
}
type StreamHeader_FullSnapshot struct {
FullSnapshot *FullSnapshot `protobuf:"bytes,3,opt,name=full_snapshot,json=fullSnapshot,proto3,oneof"`
}
func (*StreamHeader_IncrementalSnapshot) isStreamHeader_Payload() {}
func (*StreamHeader_FullSnapshot) isStreamHeader_Payload() {}
type FullSnapshot_DataInfo struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Size int64 `protobuf:"varint,1,opt,name=size,proto3" json:"size,omitempty"`
}
func (x *FullSnapshot_DataInfo) Reset() {
*x = FullSnapshot_DataInfo{}
if protoimpl.UnsafeEnabled {
mi := &file_snapshot_stream_header_pb_msgTypes[3]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *FullSnapshot_DataInfo) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*FullSnapshot_DataInfo) ProtoMessage() {}
func (x *FullSnapshot_DataInfo) ProtoReflect() protoreflect.Message {
mi := &file_snapshot_stream_header_pb_msgTypes[3]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use FullSnapshot_DataInfo.ProtoReflect.Descriptor instead.
func (*FullSnapshot_DataInfo) Descriptor() ([]byte, []int) {
return file_snapshot_stream_header_pb_rawDescGZIP(), []int{1, 0}
}
func (x *FullSnapshot_DataInfo) GetSize() int64 {
if x != nil {
return x.Size
}
return 0
}
var File_snapshot_stream_header_pb protoreflect.FileDescriptor
var file_snapshot_stream_header_pb_rawDesc = []byte{
0x0a, 0x19, 0x73, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x2f, 0x73, 0x74, 0x72, 0x65, 0x61,
0x6d, 0x5f, 0x68, 0x65, 0x61, 0x64, 0x65, 0x72, 0x2e, 0x70, 0x62, 0x12, 0x08, 0x73, 0x74, 0x72,
0x65, 0x61, 0x6d, 0x65, 0x72, 0x22, 0x29, 0x0a, 0x13, 0x49, 0x6e, 0x63, 0x72, 0x65, 0x6d, 0x65,
0x6e, 0x74, 0x61, 0x6c, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x12, 0x12, 0x0a, 0x04,
0x64, 0x61, 0x74, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61,
0x22, 0x94, 0x01, 0x0a, 0x0c, 0x46, 0x75, 0x6c, 0x6c, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f,
0x74, 0x12, 0x2f, 0x0a, 0x02, 0x64, 0x62, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1f, 0x2e,
0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x65, 0x72, 0x2e, 0x46, 0x75, 0x6c, 0x6c, 0x53, 0x6e, 0x61,
0x70, 0x73, 0x68, 0x6f, 0x74, 0x2e, 0x44, 0x61, 0x74, 0x61, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x02,
0x64, 0x62, 0x12, 0x33, 0x0a, 0x04, 0x77, 0x61, 0x6c, 0x73, 0x18, 0x04, 0x20, 0x03, 0x28, 0x0b,
0x32, 0x1f, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x65, 0x72, 0x2e, 0x46, 0x75, 0x6c, 0x6c,
0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x2e, 0x44, 0x61, 0x74, 0x61, 0x49, 0x6e, 0x66,
0x6f, 0x52, 0x04, 0x77, 0x61, 0x6c, 0x73, 0x1a, 0x1e, 0x0a, 0x08, 0x44, 0x61, 0x74, 0x61, 0x49,
0x6e, 0x66, 0x6f, 0x12, 0x12, 0x0a, 0x04, 0x73, 0x69, 0x7a, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28,
0x03, 0x52, 0x04, 0x73, 0x69, 0x7a, 0x65, 0x22, 0xc6, 0x01, 0x0a, 0x0c, 0x53, 0x74, 0x72, 0x65,
0x61, 0x6d, 0x48, 0x65, 0x61, 0x64, 0x65, 0x72, 0x12, 0x18, 0x0a, 0x07, 0x76, 0x65, 0x72, 0x73,
0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x05, 0x52, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69,
0x6f, 0x6e, 0x12, 0x52, 0x0a, 0x14, 0x69, 0x6e, 0x63, 0x72, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x61,
0x6c, 0x5f, 0x73, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b,
0x32, 0x1d, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x65, 0x72, 0x2e, 0x49, 0x6e, 0x63, 0x72,
0x65, 0x6d, 0x65, 0x6e, 0x74, 0x61, 0x6c, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x48,
0x00, 0x52, 0x13, 0x69, 0x6e, 0x63, 0x72, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x61, 0x6c, 0x53, 0x6e,
0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x12, 0x3d, 0x0a, 0x0d, 0x66, 0x75, 0x6c, 0x6c, 0x5f, 0x73,
0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x16, 0x2e,
0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x65, 0x72, 0x2e, 0x46, 0x75, 0x6c, 0x6c, 0x53, 0x6e, 0x61,
0x70, 0x73, 0x68, 0x6f, 0x74, 0x48, 0x00, 0x52, 0x0c, 0x66, 0x75, 0x6c, 0x6c, 0x53, 0x6e, 0x61,
0x70, 0x73, 0x68, 0x6f, 0x74, 0x42, 0x09, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64,
0x42, 0x23, 0x5a, 0x21, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x72,
0x71, 0x6c, 0x69, 0x74, 0x65, 0x2f, 0x72, 0x71, 0x6c, 0x69, 0x74, 0x65, 0x2f, 0x73, 0x6e, 0x61,
0x70, 0x73, 0x68, 0x6f, 0x74, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
file_snapshot_stream_header_pb_rawDescOnce sync.Once
file_snapshot_stream_header_pb_rawDescData = file_snapshot_stream_header_pb_rawDesc
)
func file_snapshot_stream_header_pb_rawDescGZIP() []byte {
file_snapshot_stream_header_pb_rawDescOnce.Do(func() {
file_snapshot_stream_header_pb_rawDescData = protoimpl.X.CompressGZIP(file_snapshot_stream_header_pb_rawDescData)
})
return file_snapshot_stream_header_pb_rawDescData
}
var file_snapshot_stream_header_pb_msgTypes = make([]protoimpl.MessageInfo, 4)
var file_snapshot_stream_header_pb_goTypes = []interface{}{
(*IncrementalSnapshot)(nil), // 0: streamer.IncrementalSnapshot
(*FullSnapshot)(nil), // 1: streamer.FullSnapshot
(*StreamHeader)(nil), // 2: streamer.StreamHeader
(*FullSnapshot_DataInfo)(nil), // 3: streamer.FullSnapshot.DataInfo
}
var file_snapshot_stream_header_pb_depIdxs = []int32{
3, // 0: streamer.FullSnapshot.db:type_name -> streamer.FullSnapshot.DataInfo
3, // 1: streamer.FullSnapshot.wals:type_name -> streamer.FullSnapshot.DataInfo
0, // 2: streamer.StreamHeader.incremental_snapshot:type_name -> streamer.IncrementalSnapshot
1, // 3: streamer.StreamHeader.full_snapshot:type_name -> streamer.FullSnapshot
4, // [4:4] is the sub-list for method output_type
4, // [4:4] is the sub-list for method input_type
4, // [4:4] is the sub-list for extension type_name
4, // [4:4] is the sub-list for extension extendee
0, // [0:4] is the sub-list for field type_name
}
func init() { file_snapshot_stream_header_pb_init() }
func file_snapshot_stream_header_pb_init() {
if File_snapshot_stream_header_pb != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_snapshot_stream_header_pb_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*IncrementalSnapshot); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_snapshot_stream_header_pb_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*FullSnapshot); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_snapshot_stream_header_pb_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*StreamHeader); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_snapshot_stream_header_pb_msgTypes[3].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*FullSnapshot_DataInfo); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
file_snapshot_stream_header_pb_msgTypes[2].OneofWrappers = []interface{}{
(*StreamHeader_IncrementalSnapshot)(nil),
(*StreamHeader_FullSnapshot)(nil),
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_snapshot_stream_header_pb_rawDesc,
NumEnums: 0,
NumMessages: 4,
NumExtensions: 0,
NumServices: 0,
},
GoTypes: file_snapshot_stream_header_pb_goTypes,
DependencyIndexes: file_snapshot_stream_header_pb_depIdxs,
MessageInfos: file_snapshot_stream_header_pb_msgTypes,
}.Build()
File_snapshot_stream_header_pb = out.File
file_snapshot_stream_header_pb_rawDesc = nil
file_snapshot_stream_header_pb_goTypes = nil
file_snapshot_stream_header_pb_depIdxs = nil
}

@ -1,25 +0,0 @@
syntax = "proto3";
package streamer;
option go_package = "github.com/rqlite/rqlite/snapshot";
message IncrementalSnapshot {
bytes data = 1;
}
message FullSnapshot {
message DataInfo {
int64 size = 1;
}
DataInfo db = 3;
repeated DataInfo wals = 4;
}
message StreamHeader {
int32 version = 1;
oneof payload {
IncrementalSnapshot incremental_snapshot = 2;
FullSnapshot full_snapshot = 3;
}
}

@ -1,209 +0,0 @@
package snapshot
import (
"bytes"
"io"
"os"
"testing"
)
func Test_NewStreamHeader(t *testing.T) {
strHdr := NewStreamHeader()
if strHdr == nil {
t.Fatal("StreamHeader is nil")
}
if strHdr.Version != streamVersion {
t.Errorf("StreamHeader version is incorrect, got: %d, want: %d", strHdr.Version, streamVersion)
}
if strHdr.Payload != nil {
t.Error("StreamHeader payload should be nil")
}
if strHdr.FileSize() != 0 {
t.Errorf("Expected file size to be 0, got: %d", strHdr.FileSize())
}
}
func Test_StreamHeaderFileSize(t *testing.T) {
strHdr := NewStreamHeader()
if strHdr == nil {
t.Fatal("StreamHeader is nil")
}
// Test with no full snapshot
if size := strHdr.FileSize(); size != 0 {
t.Errorf("Expected file size to be 0 for no full snapshot, got: %d", size)
}
// Test with a full snapshot
dbSize := int64(100)
walSizes := []int64{200, 300}
strHdr.Payload = &StreamHeader_FullSnapshot{
FullSnapshot: &FullSnapshot{
Db: &FullSnapshot_DataInfo{
Size: dbSize,
},
Wals: []*FullSnapshot_DataInfo{
{Size: walSizes[0]},
{Size: walSizes[1]},
},
},
}
expectedSize := dbSize + walSizes[0] + walSizes[1]
if size := strHdr.FileSize(); size != expectedSize {
t.Errorf("Expected file size to be %d, got: %d", expectedSize, size)
}
}
func Test_NewIncrementalStream(t *testing.T) {
data := []byte("test data")
stream, err := NewIncrementalStream(data)
if err != nil {
t.Fatalf("Failed to create new incremental stream: %v", err)
}
if stream == nil {
t.Fatal("Expected non-nil stream, got nil")
}
// Get the header
strHdr, n, err := NewStreamHeaderFromReader(stream)
if err != nil {
t.Fatalf("Failed to read from stream: %v", err)
}
if n != stream.Size() {
t.Errorf("Expected to read %d bytes, got: %d", stream.Size(), n)
}
if strHdr.FileSize() != 0 {
t.Errorf("Expected file size to be 0, got: %d", strHdr.FileSize())
}
// Check the data
if strHdr.GetIncrementalSnapshot() == nil {
t.Error("StreamHeader payload should not be nil")
}
if !bytes.Equal(strHdr.GetIncrementalSnapshot().Data, data) {
t.Errorf("Expected data to be %s, got: %s", data, strHdr.GetIncrementalSnapshot().Data)
}
// Should be no more data
buf := make([]byte, 1)
if _, err := stream.Read(buf); err != io.EOF {
t.Fatalf("Expected EOF, got: %v", err)
}
if err := stream.Close(); err != nil {
t.Fatalf("unexpected error closing IncrementalStream: %v", err)
}
}
func Test_NewFullStream(t *testing.T) {
contents := [][]byte{
[]byte("test1.db contents"),
[]byte("test1.db-wal0 contents"),
[]byte("test1.db-wal1 contents"),
}
contentsSz := int64(0)
files := make([]string, len(contents))
for i, c := range contents {
files[i] = mustWriteToTemp(c)
contentsSz += int64(len(c))
}
defer func() {
for _, f := range files {
os.Remove(f)
}
}()
str, err := NewFullStream(files...)
if err != nil {
t.Fatalf("unexpected error creating FullStream: %v", err)
}
totalSizeRead := int64(0)
// Get the header
strHdr, sz, err := NewStreamHeaderFromReader(str)
if err != nil {
t.Fatalf("Failed to read from stream: %v", err)
}
if strHdr.FileSize() != contentsSz {
t.Errorf("Expected file size to be %d, got: %d", contentsSz, strHdr.FileSize())
}
totalSizeRead += sz
// Read the database contents and compare to the first file.
fullSnapshot := strHdr.GetFullSnapshot()
if fullSnapshot == nil {
t.Fatalf("got nil FullSnapshot")
}
dbData := fullSnapshot.GetDb()
if dbData == nil {
t.Fatalf("got nil Db")
}
if dbData.Size != int64(len(contents[0])) {
t.Errorf("unexpected Db size, got: %d, want: %d", dbData.Size, len(contents[0]))
}
buf := make([]byte, dbData.Size)
n, err := io.ReadFull(str, buf)
if err != nil {
t.Fatalf("unexpected error reading from FullEncoder: %v", err)
}
totalSizeRead += int64(n)
if string(buf) != string(contents[0]) {
t.Errorf("unexpected database contents, got: %s, want: %s", buf, contents[0])
}
// Check the "WALs"
if len(fullSnapshot.GetWals()) != 2 {
t.Fatalf("unexpected number of WALs, got: %d, want: %d", len(fullSnapshot.GetWals()), 2)
}
for i := 0; i < len(fullSnapshot.GetWals()); i++ {
walData := fullSnapshot.GetWals()[i]
if walData == nil {
t.Fatalf("got nil WAL")
}
if walData.Size != int64(len(contents[i+1])) {
t.Errorf("unexpected WAL size, got: %d, want: %d", walData.Size, len(contents[i+1]))
}
buf = make([]byte, walData.Size)
n, err = io.ReadFull(str, buf)
if err != nil {
t.Fatalf("unexpected error reading from FullEncoder: %v", err)
}
totalSizeRead += int64(n)
if string(buf) != string(contents[i+1]) {
t.Errorf("unexpected WAL contents, got: %s, want: %s", buf, contents[i+1])
}
}
// Should be no more data to read
buf = make([]byte, 1)
n, err = str.Read(buf)
if err != io.EOF {
t.Fatalf("expected EOF, got: %v", err)
}
totalSizeRead += int64(n)
// Verify that the total number of bytes read from the FullEncoder
// matches the expected size
if totalSizeRead != str.Size() {
t.Errorf("unexpected total number of bytes read from FullEncoder, got: %d, want: %d", totalSizeRead, str.Size())
}
if err := str.Close(); err != nil {
t.Fatalf("unexpected error closing FullStream: %v", err)
}
}
func mustWriteToTemp(b []byte) string {
f, err := os.CreateTemp("", "snapshot-enc-dec-test-*")
if err != nil {
panic(err)
}
defer f.Close()
if _, err := f.Write(b); err != nil {
panic(err)
}
return f.Name()
}

Binary file not shown.

@ -1,34 +0,0 @@
import sqlite3
import shutil
import os
# Database file
db_file = 'mydatabase.db'
# Open a connection to SQLite database
conn = sqlite3.connect(db_file)
cursor = conn.cursor()
# Enable WAL mode and disable automatic checkpointing
cursor.execute("PRAGMA journal_mode=WAL;")
cursor.execute("PRAGMA wal_autocheckpoint=0;")
cursor.execute("CREATE TABLE foo (id INTEGER PRIMARY KEY, value TEXT);")
conn.commit()
# Checkpoint the WAL file so we've got just a SQLite file
conn.execute("PRAGMA wal_checkpoint(TRUNCATE);")
shutil.copy(db_file, 'backup.db')
for i in range(0, 4):
# Write a new row
cursor.execute(f"INSERT INTO foo (value) VALUES ('Row {i}');")
conn.commit()
# Copy the newly-created WAL
shutil.copy(db_file + '-wal', f'wal-{i:02}')
# Checkpoint the WAL file
conn.execute("PRAGMA wal_checkpoint(TRUNCATE);")
conn.commit()
conn.close()

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

@ -1 +0,0 @@
{"Version":1,"ID":"2-18-1686659761026","Index":18,"Term":2,"Peers":"ka5sb2NhbGhvc3Q6NDAwMg==","Configuration":{"Servers":[{"Suffrage":0,"ID":"node1","Address":"localhost:4002"}]},"ConfigurationIndex":1,"Size":293,"CRC":"QhU1tAtfgpo="}

@ -1 +0,0 @@
{"Version":1,"ID":"2-8-1686659756627","Index":8,"Term":2,"Peers":"ka5sb2NhbGhvc3Q6NDAwMg==","Configuration":{"Servers":[{"Suffrage":0,"ID":"node1","Address":"localhost:4002"}]},"ConfigurationIndex":1,"Size":250,"CRC":"oInhBEZSzOo="}

@ -1,221 +0,0 @@
package snapshot
import (
"compress/gzip"
"encoding/json"
"fmt"
"io"
"log"
"os"
"path/filepath"
"sort"
"github.com/hashicorp/raft"
"github.com/rqlite/rqlite/db"
)
const (
v7StateFile = "state.bin"
)
// Upgrade writes a copy of the 7.x-format Snapshot dircectory at 'old' to a
// new Snapshot directory at 'new'. If the upgrade is successful, the
// 'old' directory is removed before the function returns.
func Upgrade(old, new string, logger *log.Logger) error {
newTmpDir := tmpName(new)
newGenerationDir := filepath.Join(newTmpDir, generationsDir, firstGeneration)
// If a temporary version of the new snapshot exists, remove it. This implies a
// previous upgrade attempt was interrupted. We will need to start over.
if dirExists(newTmpDir) {
if err := os.RemoveAll(newTmpDir); err != nil {
return fmt.Errorf("failed to remove temporary upgraded snapshot directory %s: %s", newTmpDir, err)
}
logger.Println("detected temporary upgraded snapshot directory, removing")
}
if dirExists(old) {
oldIsEmpty, err := dirIsEmpty(old)
if err != nil {
return fmt.Errorf("failed to check if old snapshot directory %s is empty: %s", old, err)
}
if oldIsEmpty {
logger.Printf("old snapshot directory %s is empty, nothing to upgrade", old)
if err := os.RemoveAll(old); err != nil {
return fmt.Errorf("failed to remove old snapshot directory %s: %s", old, err)
}
return nil
}
if dirExists(new) {
logger.Printf("new snapshot directory %s exists", old)
if err := os.RemoveAll(old); err != nil {
return fmt.Errorf("failed to remove old snapshot directory %s: %s", old, err)
}
logger.Printf("removed old snapshot directory %s as no upgrade is needed", old)
return nil
}
} else {
logger.Printf("old snapshot directory %s does not exist, nothing to upgrade", old)
return nil
}
// Start the upgrade process.
if err := os.MkdirAll(newTmpDir, 0755); err != nil {
return fmt.Errorf("failed to create temporary snapshot directory %s: %s", newTmpDir, err)
}
oldMeta, err := getNewest7Snapshot(old)
if err != nil {
return fmt.Errorf("failed to get newest snapshot from old snapshots directory %s: %s", old, err)
}
if oldMeta == nil {
// No snapshot to upgrade, this shouldn't happen since we checked for an empty old
// directory earlier.
return fmt.Errorf("no snapshot to upgrade in old snapshots directory %s", old)
}
// Write out the new meta file.
newSnapshotPath := filepath.Join(newGenerationDir, oldMeta.ID)
if err := os.MkdirAll(newSnapshotPath, 0755); err != nil {
return fmt.Errorf("failed to create new snapshot directory %s: %s", newSnapshotPath, err)
}
newMeta := &Meta{
SnapshotMeta: *oldMeta,
Full: true,
}
if err := writeMeta(newSnapshotPath, newMeta); err != nil {
return fmt.Errorf("failed to write new snapshot meta file: %s", err)
}
// Ensure all file handles are closed before any directory is renamed or removed.
if err := func() error {
// Write SQLite data into generation directory, as the base SQLite file.
newSqliteBasePath := filepath.Join(newGenerationDir, baseSqliteFile)
newSqliteFd, err := os.Create(newSqliteBasePath)
if err != nil {
return fmt.Errorf("failed to create new SQLite file %s: %s", newSqliteBasePath, err)
}
defer newSqliteFd.Close()
// Copy the old state file into the new generation directory.
oldStatePath := filepath.Join(old, oldMeta.ID, v7StateFile)
stateFd, err := os.Open(oldStatePath)
if err != nil {
return fmt.Errorf("failed to open old state file %s: %s", oldStatePath, err)
}
defer stateFd.Close()
// Skip past the header and length of the old state file.
if _, err := stateFd.Seek(16, 0); err != nil {
return fmt.Errorf("failed to seek to beginning of old SQLite data %s: %s", oldStatePath, err)
}
gzipReader, err := gzip.NewReader(stateFd)
if err != nil {
return fmt.Errorf("failed to create gzip reader for new SQLite file %s: %s", newSqliteBasePath, err)
}
defer gzipReader.Close()
if _, err := io.Copy(newSqliteFd, gzipReader); err != nil {
return fmt.Errorf("failed to copy old SQLite file %s to new SQLite file %s: %s", oldStatePath,
newSqliteBasePath, err)
}
// Sanity-check the SQLite data.
if !db.IsValidSQLiteFile(newSqliteBasePath) {
return fmt.Errorf("migrated SQLite file %s is not valid", newSqliteBasePath)
}
return nil
}(); err != nil {
return err
}
// Move the upgraded snapshot directory into place.
if err := os.Rename(newTmpDir, new); err != nil {
return fmt.Errorf("failed to move temporary snapshot directory %s to %s: %s", newTmpDir, new, err)
}
if err := syncDirParentMaybe(new); err != nil {
return fmt.Errorf("failed to sync parent directory of new snapshot directory %s: %s", new, err)
}
// We're done! Remove old.
if err := removeDirSync(old); err != nil {
return fmt.Errorf("failed to remove old snapshot directory %s: %s", old, err)
}
logger.Printf("upgraded snapshot directory %s to %s", old, new)
return nil
}
// getNewest7Snapshot returns the newest snapshot Raft meta in the given directory.
func getNewest7Snapshot(dir string) (*raft.SnapshotMeta, error) {
entries, err := os.ReadDir(dir)
if err != nil {
return nil, err
}
var snapshots []*raft.SnapshotMeta
for _, entry := range entries {
if !entry.IsDir() {
continue
}
metaPath := filepath.Join(dir, entry.Name(), metaFileName)
if !fileExists(metaPath) {
continue
}
fh, err := os.Open(metaPath)
if err != nil {
return nil, err
}
defer fh.Close()
meta := &raft.SnapshotMeta{}
dec := json.NewDecoder(fh)
if err := dec.Decode(meta); err != nil {
return nil, err
}
snapshots = append(snapshots, meta)
}
if len(snapshots) == 0 {
return nil, nil
}
return raftMetaSlice(snapshots).Newest(), nil
}
func dirIsEmpty(dir string) (bool, error) {
files, err := os.ReadDir(dir)
if err != nil {
return false, err
}
return len(files) == 0, nil
}
// raftMetaSlice is a sortable slice of Raft Meta, which are sorted
// by term, index, and then ID. Snapshots are sorted from oldest to newest.
type raftMetaSlice []*raft.SnapshotMeta
func (s raftMetaSlice) Newest() *raft.SnapshotMeta {
if len(s) == 0 {
return nil
}
sort.Sort(s)
return s[len(s)-1]
}
func (s raftMetaSlice) Len() int {
return len(s)
}
func (s raftMetaSlice) Less(i, j int) bool {
if s[i].Term != s[j].Term {
return s[i].Term < s[j].Term
}
if s[i].Index != s[j].Index {
return s[i].Index < s[j].Index
}
return s[i].ID < s[j].ID
}
func (s raftMetaSlice) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}

@ -1,202 +0,0 @@
package snapshot
import (
"fmt"
"io"
"io/fs"
"log"
"os"
"path/filepath"
"testing"
)
func Test_Upgrade_NothingToDo(t *testing.T) {
logger := log.New(os.Stderr, "[snapshot-store-upgrader] ", 0)
if err := Upgrade("/does/not/exist", "/does/not/exist/either", logger); err != nil {
t.Fatalf("failed to upgrade non-existent directories: %s", err)
}
oldEmpty := t.TempDir()
newEmpty := t.TempDir()
if err := Upgrade(oldEmpty, newEmpty, logger); err != nil {
t.Fatalf("failed to upgrade empty directories: %s", err)
}
}
func Test_Upgrade_OK(t *testing.T) {
logger := log.New(os.Stderr, "[snapshot-store-upgrader] ", 0)
v7Snapshot := "testdata/upgrade/v7.20.3-snapshots"
v7SnapshotID := "2-18-1686659761026"
oldTemp := filepath.Join(t.TempDir(), "snapshots")
newTemp := filepath.Join(t.TempDir(), "rsnapshots")
// Copy directory because succeessful test runs will delete it.
copyDir(v7Snapshot, oldTemp)
// Upgrade it.
if err := Upgrade(oldTemp, newTemp, logger); err != nil {
t.Fatalf("failed to upgrade empty directories: %s", err)
}
// Create new SnapshotStore from the upgraded directory.
store, err := NewStore(newTemp)
if err != nil {
t.Fatalf("failed to create new snapshot store: %s", err)
}
currGen, ok, err := store.GetCurrentGenerationDir()
if err != nil {
t.Fatalf("failed to get current generation directory: %s", err)
}
if !ok {
t.Fatalf("no current generation directory")
}
if exp, got := firstGeneration, filepath.Base(currGen); exp != got {
t.Fatalf("expected current generation directory %s, got %s", exp, got)
}
snapshots, err := store.List()
if err != nil {
t.Fatalf("failed to list snapshots: %s", err)
}
if len(snapshots) != 1 {
t.Fatalf("expected 1 snapshot, got %d", len(snapshots))
}
if got, exp := snapshots[0].ID, v7SnapshotID; got != exp {
t.Fatalf("expected snapshot ID %s, got %s", exp, got)
}
meta, rc, err := store.Open(snapshots[0].ID)
if err != nil {
t.Fatalf("failed to open snapshot: %s", err)
}
rc.Close() // Removing test resources, when running on Windows, will fail otherwise.
if exp, got := v7SnapshotID, meta.ID; exp != got {
t.Fatalf("expected meta ID %s, got %s", exp, got)
}
}
/* MIT License
*
* Copyright (c) 2017 Roland Singer [roland.singer@desertbit.com]
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
// copyFile copies the contents of the file named src to the file named
// by dst. The file will be created if it does not already exist. If the
// destination file exists, all it's contents will be replaced by the contents
// of the source file. The file mode will be copied from the source and
// the copied data is synced/flushed to stable storage.
func copyFile(src, dst string) (err error) {
in, err := os.Open(src)
if err != nil {
return
}
defer in.Close()
out, err := os.Create(dst)
if err != nil {
return
}
defer func() {
if e := out.Close(); e != nil {
err = e
}
}()
_, err = io.Copy(out, in)
if err != nil {
return
}
err = out.Sync()
if err != nil {
return
}
si, err := os.Stat(src)
if err != nil {
return
}
err = os.Chmod(dst, si.Mode())
if err != nil {
return
}
return
}
// copyDir recursively copies a directory tree, attempting to preserve permissions.
// Source directory must exist, destination directory must *not* exist.
// Symlinks are ignored and skipped.
func copyDir(src string, dst string) (err error) {
src = filepath.Clean(src)
dst = filepath.Clean(dst)
si, err := os.Stat(src)
if err != nil {
return err
}
if !si.IsDir() {
return fmt.Errorf("source is not a directory")
}
_, err = os.Stat(dst)
if err != nil && !os.IsNotExist(err) {
return
}
if err == nil {
return fmt.Errorf("destination already exists")
}
err = os.MkdirAll(dst, si.Mode())
if err != nil {
return
}
entries, err := os.ReadDir(src)
if err != nil {
return
}
for _, entry := range entries {
srcPath := filepath.Join(src, entry.Name())
dstPath := filepath.Join(dst, entry.Name())
if entry.IsDir() {
err = copyDir(srcPath, dstPath)
if err != nil {
return
}
} else {
// Skip symlinks.
if entry.Type()&fs.ModeSymlink != 0 {
continue
}
err = copyFile(srcPath, dstPath)
if err != nil {
return
}
}
}
return
}

@ -1,159 +0,0 @@
package snapshot
import (
"bytes"
"compress/gzip"
"encoding/binary"
"fmt"
"io"
"io/ioutil"
"math"
"unsafe"
)
// V1Encoder creates a new V1 snapshot.
type V1Encoder struct {
data []byte
}
// NewV1Encoder returns an initialized V1 encoder
func NewV1Encoder(b []byte) *V1Encoder {
return &V1Encoder{
data: b,
}
}
// WriteTo writes the snapshot to the given writer.
func (v *V1Encoder) WriteTo(w io.Writer) (int64, error) {
var totalN int64
// Indicate that the data is compressed by writing max uint64 value first.
if err := binary.Write(w, binary.LittleEndian, uint64(math.MaxUint64)); err != nil {
return 0, fmt.Errorf("failed to write max uint64: %w", err)
}
totalN += 8 // 8 bytes for uint64
// Get compressed copy of data.
cdata, err := v.compressedData()
if err != nil {
return 0, fmt.Errorf("failed to get compressed data: %w", err)
}
// Write size of compressed data.
if err := binary.Write(w, binary.LittleEndian, uint64(len(cdata))); err != nil {
return 0, fmt.Errorf("failed to write compressed data size: %w", err)
}
totalN += 8 // 8 bytes for uint64
if len(cdata) != 0 {
// Write compressed data.
n, err := w.Write(cdata)
if err != nil {
return 0, fmt.Errorf("failed to write compressed data: %w", err)
}
totalN += int64(n)
}
return totalN, nil
}
func (v *V1Encoder) compressedData() ([]byte, error) {
if v.data == nil {
return nil, nil
}
var buf bytes.Buffer
gz, err := gzip.NewWriterLevel(&buf, gzip.BestCompression)
if err != nil {
return nil, err
}
if _, err := gz.Write(v.data); err != nil {
return nil, err
}
if err := gz.Close(); err != nil {
return nil, err
}
return buf.Bytes(), nil
}
// V1Decoder reads a V1 snapshot.
type V1Decoder struct {
r io.Reader
}
// NewV1Decoder returns an initialized V1 decoder
func NewV1Decoder(r io.Reader) *V1Decoder {
return &V1Decoder{
r: r,
}
}
// WriteTo writes the decoded snapshot data to the given writer.
func (v *V1Decoder) WriteTo(w io.Writer) (int64, error) {
var uint64Size uint64
inc := int64(unsafe.Sizeof(uint64Size))
// Read all the data into RAM, since we have to decode known-length
// chunks of various forms.
var offset int64
b, err := ioutil.ReadAll(v.r)
if err != nil {
return 0, fmt.Errorf("readall: %s", err)
}
// Get size of data, checking for compression.
compressed := false
sz, err := readUint64(b[offset : offset+inc])
if err != nil {
return 0, fmt.Errorf("read compression check: %s", err)
}
offset = offset + inc
if sz == math.MaxUint64 {
compressed = true
// Data is actually compressed, read actual size next.
sz, err = readUint64(b[offset : offset+inc])
if err != nil {
return 0, fmt.Errorf("read compressed size: %s", err)
}
offset = offset + inc
}
// Now read in the data, decompressing if necessary.
var totalN int64
if sz > 0 {
if compressed {
gz, err := gzip.NewReader(bytes.NewReader(b[offset : offset+int64(sz)]))
if err != nil {
return 0, err
}
n, err := io.Copy(w, gz)
if err != nil {
return 0, fmt.Errorf("data decompress: %s", err)
}
totalN += n
if err := gz.Close(); err != nil {
return 0, err
}
} else {
// write the data directly
n, err := w.Write(b[offset : offset+int64(sz)])
if err != nil {
return 0, fmt.Errorf("uncompressed data write: %s", err)
}
totalN += int64(n)
}
}
return totalN, nil
}
func readUint64(b []byte) (uint64, error) {
var sz uint64
if err := binary.Read(bytes.NewReader(b), binary.LittleEndian, &sz); err != nil {
return 0, err
}
return sz, nil
}

@ -1,129 +0,0 @@
package snapshot
import (
"bytes"
"compress/gzip"
"encoding/binary"
"io"
"io/ioutil"
"math"
"testing"
)
func Test_V1EncoderCreate(t *testing.T) {
// Original data to compress and write.
data := []byte("test data")
// Create new V1 snapshot.
snap := NewV1Encoder(data)
// Write snapshot to buffer.
var buf bytes.Buffer
n, err := snap.WriteTo(&buf)
if err != nil {
t.Fatalf("failed to write snapshot: %v", err)
}
// Read back the data from the buffer.
r := bytes.NewReader(buf.Bytes())
// Read and verify the compression flag.
var flag uint64
if err := binary.Read(r, binary.LittleEndian, &flag); err != nil {
t.Fatalf("failed to read compression flag: %v", err)
}
if flag != math.MaxUint64 {
t.Fatalf("compression flag is wrong")
}
// Read and verify the size of the compressed data.
var size uint64
if err := binary.Read(r, binary.LittleEndian, &size); err != nil {
t.Fatalf("failed to read compressed data size: %v", err)
}
if size != uint64(n-16) { // Subtract 16 bytes for the flag and size.
t.Fatalf("unexpected compressed data size; got %v, want %v", size, uint64(n-16))
}
// Read and verify the compressed data.
cdata := make([]byte, size)
if _, err := io.ReadFull(r, cdata); err != nil {
t.Fatalf("failed to read compressed data: %v", err)
}
gr, err := gzip.NewReader(bytes.NewReader(cdata))
if err != nil {
t.Fatalf("failed to create gzip reader: %v", err)
}
decData, err := ioutil.ReadAll(gr)
if err != nil {
t.Fatalf("failed to decompress data: %v", err)
}
if !bytes.Equal(decData, data) {
t.Fatalf("unexpected decompressed data; got %q, want %q", decData, data)
}
}
func Test_V1EncoderNilSlice(t *testing.T) {
v := NewV1Encoder(nil)
var buf bytes.Buffer
n, err := v.WriteTo(&buf)
if err != nil {
t.Fatalf("failed to write to buffer: %v", err)
}
if n != 16 { // 16 bytes for the flag and size.
t.Errorf("unexpected number of bytes written; got %d, want %d", n, 16)
}
r := bytes.NewReader(buf.Bytes())
// Read and verify the compression flag.
var flag uint64
if err := binary.Read(r, binary.LittleEndian, &flag); err != nil {
t.Fatalf("failed to read compression flag: %v", err)
}
if flag != math.MaxUint64 {
t.Errorf("unexpected compression flag")
}
// Read and verify the size of the compressed data.
var size uint64
if err := binary.Read(r, binary.LittleEndian, &size); err != nil {
t.Fatalf("failed to read compressed data size: %v", err)
}
if size != 0 { // The compressed data size should be 0.
t.Errorf("unexpected compressed data size; got %d, want %d", size, 0)
}
// Verify that there is no more data.
if r.Len() != 0 {
t.Errorf("unexpected remaining data; got %d, want %d", r.Len(), 0)
}
}
func TestV1Decoder(t *testing.T) {
// Create a test data.
data := []byte("This is a test data.")
// Create a new V1Encoder and compress the data.
encoder := NewV1Encoder(data)
var encBuf bytes.Buffer
_, err := encoder.WriteTo(&encBuf)
if err != nil {
t.Fatalf("Failed to write to encoder: %v", err)
}
// Create a new V1Decoder and decode the compressed data.
decoder := NewV1Decoder(&encBuf)
var decBuf bytes.Buffer
_, err = decoder.WriteTo(&decBuf)
if err != nil {
t.Fatalf("Failed to write to decoder: %v", err)
}
// The original and decoded data should match.
if !bytes.Equal(data, decBuf.Bytes()) {
t.Fatalf("Data mismatch; got %s, want %s", decBuf.Bytes(), data)
}
}
Loading…
Cancel
Save