diff --git a/snapshot/sink.go b/snapshot/sink.go index 69b7c6f7..8fdb558c 100644 --- a/snapshot/sink.go +++ b/snapshot/sink.go @@ -81,7 +81,10 @@ func (s *Sink) Close() error { return err } - return s.str.Reap() + if !s.str.noAutoreap { + return s.str.Reap() + } + return nil } func (s *Sink) processSnapshotData() error { @@ -218,7 +221,7 @@ func (s *Sink) processFullSnapshot(fullSnap *FullSnapshot) error { func (s *Sink) writeMeta(dir string, full bool) error { fh, err := os.Create(filepath.Join(dir, metaFileName)) if err != nil { - return err + return fmt.Errorf("error creating meta file: %v", err) } defer fh.Close() s.meta.Full = full @@ -226,7 +229,7 @@ func (s *Sink) writeMeta(dir string, full bool) error { // Write out as JSON enc := json.NewEncoder(fh) if err = enc.Encode(s.meta); err != nil { - return err + return fmt.Errorf("failed to encode meta: %v", err) } if err := fh.Sync(); err != nil { @@ -275,7 +278,7 @@ func moveFromTmpSync(src string) (string, error) { // Sync parent directory to ensure snapshot is visible, but it's only // needed on *nix style file systems. if runtime.GOOS != "windows" { - if err := syncFile(parentDir(dst)); err != nil { + if err := syncDir(parentDir(dst)); err != nil { return "", err } } diff --git a/snapshot/sink_test.go b/snapshot/sink_test.go index 51cdcefa..e97a97d4 100644 --- a/snapshot/sink_test.go +++ b/snapshot/sink_test.go @@ -70,6 +70,10 @@ func Test_SinkFullSnapshot(t *testing.T) { if !fileExists(filepath.Join(nextGenDir, baseSqliteFile)) { t.Fatalf("next generation directory %s does not contain base SQLite file", nextGenDir) } + expMetaPath := filepath.Join(nextGenDir, "snap-1234", metaFileName) + if !fileExists(expMetaPath) { + t.Fatalf("meta file does not exist at %s", expMetaPath) + } // Check SQLite database has been created correctly. db, err := db.Open(filepath.Join(nextGenDir, baseSqliteFile), false, false) @@ -84,8 +88,6 @@ func Test_SinkFullSnapshot(t *testing.T) { if exp, got := `[{"columns":["COUNT(*)"],"types":["integer"],"values":[[4]]}]`, asJSON(rows); exp != got { t.Fatalf("unexpected results for query, expected %s, got %s", exp, got) } - - // confirm that snapshot is valid (full?, contains meta?) } func Test_SinkIncrementalSnapshot(t *testing.T) { @@ -130,7 +132,10 @@ func Test_SinkIncrementalSnapshot(t *testing.T) { t.Fatalf("WAL file data does not match") } - // confirm that snapshot is valid (incremental?, contains meta?) + expMetaPath := filepath.Join(currGenDir, "snap-1234", metaFileName) + if !fileExists(expMetaPath) { + t.Fatalf("meta file does not exist at %s", expMetaPath) + } } func mustNewStoreForSinkTest(t *testing.T) *Store { diff --git a/snapshot/snapshot.go b/snapshot/snapshot.go index 3f8cc0cc..8302f06d 100644 --- a/snapshot/snapshot.go +++ b/snapshot/snapshot.go @@ -1,6 +1,8 @@ package snapshot -import "io" +import ( + "io" +) type Snapshot struct { walData []byte @@ -19,7 +21,7 @@ func NewFullSnapshot(files ...string) *Snapshot { } } -func (s *Snapshot) Persist(sink *Sink) error { +func (s *Snapshot) Persist(sink io.Writer) error { stream, err := s.OpenStream() if err != nil { return err diff --git a/snapshot/store.go b/snapshot/store.go index 7b3f98eb..dca5ed78 100644 --- a/snapshot/store.go +++ b/snapshot/store.go @@ -50,7 +50,9 @@ type Meta struct { type Store struct { rootDir string generationsDir string - logger *log.Logger + + noAutoreap bool + logger *log.Logger } // NewStore creates a new Store object. @@ -250,16 +252,18 @@ func (s *Store) GetCurrentGenerationDir() (string, bool, error) { // Reap reaps old generations, and reaps snapshots within the remaining generation. func (s *Store) Reap() error { if _, err := s.ReapGenerations(); err != nil { - return err + return fmt.Errorf("failed to reap generations during reap: %s", err) } currDir, ok, err := s.GetCurrentGenerationDir() if err != nil { - return err + return fmt.Errorf("failed to get current generation directory during reap: %s", err) } if ok { _, err = s.ReapSnapshots(currDir, 2) - return err + if err != nil { + return fmt.Errorf("failed to reap snapshots during reap: %s", err) + } } return nil } @@ -290,16 +294,13 @@ func (s *Store) ReapGenerations() (int, error) { // returns the number of snapshots removed, or an error. The retain parameter // specifies the number of snapshots to retain. func (s *Store) ReapSnapshots(dir string, retain int) (int, error) { - // s.mu.Lock() - // defer s.mu.Unlock() - if retain < minSnapshotRetain { return 0, ErrRetainCountTooLow } snapshots, err := s.getSnapshots(dir) if err != nil { - s.logger.Printf("failed to get snapshots: %s", err) + s.logger.Printf("failed to get snapshots in directory %s: %s", dir, err) return 0, err } @@ -341,7 +342,7 @@ func (s *Store) ReapSnapshots(dir string, retain int) (int, error) { s.logger.Printf("failed to delete snapshot %s: %s", snap.ID, err) return n, err } - if err := syncFile(dir); err != nil { + if err := syncDir(dir); err != nil { s.logger.Printf("failed to sync directory containing snapshots: %s", err) } @@ -387,15 +388,14 @@ func (s *Store) getSnapshots(dir string) ([]*Meta, error) { } // Ignore any temporary snapshots - snapName := snap.Name() - if isTmpName(snapName) { + if isTmpName(snap.Name()) { continue } // Try to read the meta data - meta, err := s.readMeta(snapName) + meta, err := s.readMeta(filepath.Join(dir, snap.Name())) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to read meta for snapshot %s: %s", snap.Name(), err) } snapMeta = append(snapMeta, meta) } @@ -406,10 +406,10 @@ func (s *Store) getSnapshots(dir string) ([]*Meta, error) { return snapMeta, nil } -// readMeta is used to read the meta data for a given named backup -func (s *Store) readMeta(name string) (*Meta, error) { +// readMeta is used to read the meta data in a given snapshot directory. +func (s *Store) readMeta(dir string) (*Meta, error) { // Open the meta file - metaPath := filepath.Join(s.rootDir, name, metaFileName) + metaPath := filepath.Join(dir, metaFileName) fh, err := os.Open(metaPath) if err != nil { return nil, err @@ -470,6 +470,19 @@ func syncFile(path string) error { return fd.Sync() } +func syncDir(dir string) error { + fh, err := os.Open(dir) + if err != nil { + return err + } + defer fh.Close() + + if err := fh.Sync(); err != nil { + return err + } + return fh.Close() +} + // snapshotName generates a name for the snapshot. func snapshotName(term, index uint64) string { now := time.Now() diff --git a/snapshot/store_test.go b/snapshot/store_test.go index 90831215..b464d83a 100644 --- a/snapshot/store_test.go +++ b/snapshot/store_test.go @@ -3,6 +3,8 @@ package snapshot import ( "strings" "testing" + + "github.com/hashicorp/raft" ) func Test_NewStore(t *testing.T) { @@ -57,6 +59,80 @@ func Test_NewStore_ListEmpty(t *testing.T) { } } +// Test_WALSnapshotStore_CreateFull performs detailed testing of the +// snapshot creation process. +func Test_Store_CreateFullThenIncremental(t *testing.T) { + + checkSnapshotCount := func(s *Store, exp int) *raft.SnapshotMeta { + snaps, err := s.List() + if err != nil { + t.Fatalf("failed to list snapshots: %s", err) + } + if exp, got := exp, len(snaps); exp != got { + t.Fatalf("expected %d snapshots, got %d", exp, got) + } + if len(snaps) == 0 { + return nil + } + return snaps[0] + } + + dir := t.TempDir() + str, err := NewStore(dir) + if err != nil { + t.Fatalf("failed to create snapshot store: %s", err) + } + if !str.FullNeeded() { + t.Fatalf("expected full snapshots to be needed") + } + + testConfig1 := makeTestConfiguration("1", "2") + sink, err := str.Create(1, 22, 33, testConfig1, 4, nil) + if err != nil { + t.Fatalf("failed to create 1st snapshot sink: %s", err) + } + + // Create a full snapshot and write it to the sink. + fullSnap := NewFullSnapshot("testdata/db-and-wals/backup.db") + if err := fullSnap.Persist(sink); err != nil { + t.Fatalf("failed to persist full snapshot: %s", err) + } + if err := sink.Close(); err != nil { + t.Fatalf("failed to close sink: %s", err) + } + if str.FullNeeded() { + t.Fatalf("full snapshot still needed") + } + meta := checkSnapshotCount(str, 1) + if meta.Index != 22 || meta.Term != 33 { + t.Fatalf("unexpected snapshot metadata: %+v", meta) + } + + // Open the latest snapshot and check that it's correct. + _, _, err = str.Open(meta.ID) + if err != nil { + t.Fatalf("failed to open snapshot %s: %s", meta.ID, err) + } + + // Incremental snapshot next + sink, err = str.Create(2, 55, 66, testConfig1, 4, nil) + if err != nil { + t.Fatalf("failed to create 2nd snapshot sink: %s", err) + } + walData := mustReadFile("testdata/db-and-wals/wal-00") + incSnap := NewWALSnapshot(walData) + if err := incSnap.Persist(sink); err != nil { + t.Fatalf("failed to persist incremental snapshot: %s", err) + } + if err := sink.Close(); err != nil { + t.Fatalf("failed to close sink: %s", err) + } + meta = checkSnapshotCount(str, 1) + if meta.Index != 55 || meta.Term != 66 { + t.Fatalf("unexpected snapshot metadata: %+v", meta) + } +} + func Test_Store_ReapGenerations(t *testing.T) { dir := t.TempDir() s, err := NewStore(dir)