From b0f46a60baaf414748c9d0a64dcfcdedf7de852f Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Sun, 20 Aug 2023 10:32:12 -0400 Subject: [PATCH] More reaping and related unit tests --- snapshot/sink.go | 15 +-- snapshot/store.go | 79 ++++++++++--- snapshot/store_test.go | 250 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 314 insertions(+), 30 deletions(-) create mode 100644 snapshot/store_test.go diff --git a/snapshot/sink.go b/snapshot/sink.go index 3ef40e11..69b7c6f7 100644 --- a/snapshot/sink.go +++ b/snapshot/sink.go @@ -275,22 +275,9 @@ func moveFromTmpSync(src string) (string, error) { // Sync parent directory to ensure snapshot is visible, but it's only // needed on *nix style file systems. if runtime.GOOS != "windows" { - if err := syncDir(parentDir(dst)); err != nil { + if err := syncFile(parentDir(dst)); err != nil { return "", err } } return dst, nil } - -func syncDir(dir string) error { - fh, err := os.Open(dir) - if err != nil { - return err - } - defer fh.Close() - - if err := fh.Sync(); err != nil { - return err - } - return fh.Close() -} diff --git a/snapshot/store.go b/snapshot/store.go index 94cfd2a4..7b3f98eb 100644 --- a/snapshot/store.go +++ b/snapshot/store.go @@ -249,7 +249,7 @@ func (s *Store) GetCurrentGenerationDir() (string, bool, error) { // Reap reaps old generations, and reaps snapshots within the remaining generation. func (s *Store) Reap() error { - if err := s.ReapGenerations(); err != nil { + if _, err := s.ReapGenerations(); err != nil { return err } @@ -264,21 +264,25 @@ func (s *Store) Reap() error { return nil } -func (s *Store) ReapGenerations() error { +// ReapGenerations removes old generations. It returns the number of generations +// removed, or an error. +func (s *Store) ReapGenerations() (int, error) { generations, err := s.GetGenerations() if err != nil { - return err + return 0, err } if len(generations) == 0 { - return nil + return 0, nil } + n := 0 for i := 0; i < len(generations)-1; i++ { genDir := filepath.Join(s.generationsDir, generations[i]) if err := os.RemoveAll(genDir); err != nil { - return err + return n, err } + n++ } - return nil + return n, nil } // ReapSnapshots removes snapshots that are no longer needed. It does this by @@ -311,16 +315,37 @@ func (s *Store) ReapSnapshots(dir string, retain int) (int, error) { sort.Sort(metaSlice(snapshots)) n := 0 + baseSqliteFilePath := filepath.Join(dir, baseSqliteFile) + for _, snap := range snapshots[0 : len(snapshots)-retain] { - baseSqliteFilePath := filepath.Join(dir, baseSqliteFile) - snapDirPath := filepath.Join(dir, snap.ID) - snapWALFilePath := filepath.Join(snapDirPath, snapWALFile) - walToCheckpointFilePath := filepath.Join(dir, baseSqliteWALFile) + snapDirPath := filepath.Join(dir, snap.ID) // Path to the snapshot directory + snapWALFilePath := filepath.Join(snapDirPath, snapWALFile) // Path to the WAL file in the snapshot + walToCheckpointFilePath := filepath.Join(dir, baseSqliteWALFile) // Path to the WAL file to checkpoint + snapWALFilePathCopy := walToCheckpointFilePath + snap.ID // If the snapshot directory doesn't contain a WAL file, then the base SQLite // file is the snapshot state, and there is no checkpointing to do. if fileExists(snapWALFilePath) { - // Move the WAL file to beside the base SQLite file + // Copy the WAL file from the snapshot to a temporary location beside the base SQLite file. + // We do this so that we only delete the snapshot directory once we can be sure that + // we've copied it out fully. Renaming is not atomic on every OS, so let's be sure. We + // also use a temporary file name, so we know where the WAL came from if we exit here + // and need to clean up on a restart. + if err := copyFileSync(snapWALFilePath, snapWALFilePathCopy); err != nil { + s.logger.Printf("failed to copy WAL file from snapshot %s: %s", snapWALFilePath, err) + return n, err + } + + // Delete the snapshot directory, since we have what we need now. + if err := os.RemoveAll(snapDirPath); err != nil { + s.logger.Printf("failed to delete snapshot %s: %s", snap.ID, err) + return n, err + } + if err := syncFile(dir); err != nil { + s.logger.Printf("failed to sync directory containing snapshots: %s", err) + } + + // Move the WAL file to the correct name for checkpointing. if err := os.Rename(snapWALFilePath, walToCheckpointFilePath); err != nil { s.logger.Printf("failed to move WAL file %s: %s", snapWALFilePath, err) return n, err @@ -333,11 +358,6 @@ func (s *Store) ReapSnapshots(dir string, retain int) (int, error) { } } - // Delete the snapshot directory, since the state is now in the base SQLite file. - if err := os.RemoveAll(snapDirPath); err != nil { - s.logger.Printf("failed to delete snapshot %s: %s", snap.ID, err) - return n, err - } n++ s.logger.Printf("reaped snapshot %s successfully", snap.ID) } @@ -423,6 +443,33 @@ func dirExists(path string) bool { return err == nil && stat.IsDir() } +func copyFileSync(src, dst string) error { + srcFd, err := os.Open(src) + if err != nil { + return err + } + defer srcFd.Close() + dstFd, err := os.Create(dst) + if err != nil { + return err + } + defer dstFd.Close() + if err := dstFd.Sync(); err != nil { + return err + } + _, err = io.Copy(dstFd, srcFd) + return err +} + +func syncFile(path string) error { + fd, err := os.OpenFile(path, os.O_RDWR, 0644) + if err != nil { + return err + } + defer fd.Close() + return fd.Sync() +} + // snapshotName generates a name for the snapshot. func snapshotName(term, index uint64) string { now := time.Now() diff --git a/snapshot/store_test.go b/snapshot/store_test.go new file mode 100644 index 00000000..90831215 --- /dev/null +++ b/snapshot/store_test.go @@ -0,0 +1,250 @@ +package snapshot + +import ( + "strings" + "testing" +) + +func Test_NewStore(t *testing.T) { + tmpDir := t.TempDir() + s, err := NewStore(tmpDir) + if err != nil { + t.Fatal(err) + } + if s == nil { + t.Fatal("expected non-nil store") + } + + generations, err := s.GetGenerations() + if err != nil { + t.Fatalf("failed to get generations: %s", err.Error()) + } + if len(generations) != 0 { + t.Fatalf("expected 0 generation, got %d", len(generations)) + } + + _, ok, err := s.GetCurrentGenerationDir() + if err != nil { + t.Fatalf("failed to get current generation dir: %s", err.Error()) + } + if ok { + t.Fatalf("expected current generation dir not to exist") + } + + nextGenDir, err := s.GetNextGenerationDir() + if err != nil { + t.Fatalf("failed to get next generation dir: %s", err.Error()) + } + if !strings.HasSuffix(nextGenDir, firstGeneration) { + t.Fatalf("expected next generation dir to be empty, got %s", nextGenDir) + } +} + +func Test_NewStore_ListEmpty(t *testing.T) { + dir := t.TempDir() + s, err := NewStore(dir) + if err != nil { + t.Fatalf("failed to create snapshot store: %s", err) + } + if !s.FullNeeded() { + t.Fatalf("expected full snapshots to be needed") + } + + if snaps, err := s.List(); err != nil { + t.Fatalf("failed to list snapshots: %s", err) + } else if len(snaps) != 0 { + t.Fatalf("expected 1 snapshots, got %d", len(snaps)) + } +} + +func Test_Store_ReapGenerations(t *testing.T) { + dir := t.TempDir() + s, err := NewStore(dir) + if err != nil { + t.Fatalf("failed to create snapshot store: %s", err) + } + + testCurrGenDirIs := func(exp string) string { + curGenDir, ok, err := s.GetCurrentGenerationDir() + if err != nil { + t.Fatalf("failed to get current generation dir: %s", err.Error()) + } + if !ok { + t.Fatalf("expected current generation dir to exist") + } + if curGenDir != exp { + t.Fatalf("expected current generation dir to be %s, got %s", exp, curGenDir) + } + return curGenDir + } + + testGenCountIs := func(exp int) { + generations, err := s.GetGenerations() + if err != nil { + t.Fatalf("failed to get generations: %s", err.Error()) + } + if exp, got := exp, len(generations); exp != got { + t.Fatalf("expected %d generations, got %d", exp, got) + } + } + + testReapsOK := func(expN int) { + n, err := s.ReapGenerations() + if err != nil { + t.Fatalf("reaping failed: %s", err.Error()) + } + if n != expN { + t.Fatalf("expected %d generations to be reaped, got %d", expN, n) + } + } + + var nextGenDir string + + nextGenDir, err = s.GetNextGenerationDir() + if err != nil { + t.Fatalf("failed to get next generation dir: %s", err.Error()) + } + mustCreateDir(nextGenDir) + testCurrGenDirIs(nextGenDir) + testReapsOK(0) + + // Create another generation and then tell the Store to reap. + nextGenDir, err = s.GetNextGenerationDir() + if err != nil { + t.Fatalf("failed to get next generation dir: %s", err.Error()) + } + mustCreateDir(nextGenDir) + testGenCountIs(2) + testReapsOK(1) + testCurrGenDirIs(nextGenDir) + + // Finally, test reaping lots of generations. + for i := 0; i < 10; i++ { + nextGenDir, err = s.GetNextGenerationDir() + if err != nil { + t.Fatalf("failed to get next generation dir: %s", err.Error()) + } + mustCreateDir(nextGenDir) + } + testGenCountIs(11) + testReapsOK(10) + testGenCountIs(1) + testCurrGenDirIs(nextGenDir) +} + +// Test_Store_Reaping tests that the snapshot store correctly +// reaps snapshots that are no longer needed. Because it's critical that +// reaping is done correctly, this test checks internal implementation +// details. +// func Test_Store_Reaping(t *testing.T) { +// dir := t.TempDir() +// str, err := NewStore(dir) +// if err != nil { +// t.Fatalf("failed to create snapshot store: %s", err) +// } +// //str.noAutoReap = true + +// testConfig := makeTestConfiguration("1", "2") + +// createSnapshot := func(index, term uint64, file string) { +// b, err := os.ReadFile(file) +// if err != nil { +// t.Fatalf("failed to read file: %s", err) +// } +// sink, err := str.Create(1, index, term, testConfig, 4, nil) +// if err != nil { +// t.Fatalf("failed to create 2nd snapshot: %s", err) +// } +// if _, err = sink.Write(b); err != nil { +// t.Fatalf("failed to write to sink: %s", err) +// } +// sink.Close() +// } + +// createSnapshot(1, 1, "testdata/reaping/backup.db") +// createSnapshot(3, 2, "testdata/reaping/wal-00") +// createSnapshot(5, 3, "testdata/reaping/wal-01") +// createSnapshot(7, 4, "testdata/reaping/wal-02") +// createSnapshot(9, 5, "testdata/reaping/wal-03") + +// // There should be 5 snapshot directories, one of which should be +// // a full, and the rest incremental. +// snaps, err := str.getSnapshots() +// if err != nil { +// t.Fatalf("failed to list snapshots: %s", err) +// } +// if exp, got := 5, len(snaps); exp != got { +// t.Fatalf("expected %d snapshots, got %d", exp, got) +// } +// for _, snap := range snaps[0:4] { +// if snap.Full { +// t.Fatalf("snapshot %s is full", snap.ID) +// } +// } +// if !snaps[4].Full { +// t.Fatalf("snapshot %s is incremental", snaps[4].ID) +// } + +// // Reap just the first snapshot, which is full. +// n, err := str.ReapSnapshots(4) +// if err != nil { +// t.Fatalf("failed to reap full snapshot: %s", err) +// } +// if exp, got := 1, n; exp != got { +// t.Fatalf("expected %d snapshots to be reaped, got %d", exp, got) +// } +// snaps, err = str.getSnapshots() +// if err != nil { +// t.Fatalf("failed to list snapshots: %s", err) +// } +// if exp, got := 4, len(snaps); exp != got { +// t.Fatalf("expected %d snapshots, got %d", exp, got) +// } + +// // Reap all but the last two snapshots. The remaining snapshots +// // should all be incremental. +// n, err = str.ReapSnapshots(2) +// if err != nil { +// t.Fatalf("failed to reap snapshots: %s", err) +// } +// if exp, got := 2, n; exp != got { +// t.Fatalf("expected %d snapshots to be reaped, got %d", exp, got) +// } +// snaps, err = str.getSnapshots() +// if err != nil { +// t.Fatalf("failed to list snapshots: %s", err) +// } +// if exp, got := 2, len(snaps); exp != got { +// t.Fatalf("expected %d snapshots, got %d", exp, got) +// } +// for _, snap := range snaps { +// if snap.Full { +// t.Fatalf("snapshot %s is full", snap.ID) +// } +// } +// if snaps[0].Index != 9 && snaps[1].Term != 5 { +// t.Fatalf("snap 0 is wrong") +// } +// if snaps[1].Index != 7 && snaps[1].Term != 3 { +// t.Fatalf("snap 1 is wrong") +// } + +// // Check the contents of the remaining snapshots by creating a new +// // SQLite from the Store +// dbPath, err := str.ReplayWALs() +// if err != nil { +// t.Fatalf("failed to replay WALs: %s", err) +// } +// db, err := db.Open(dbPath, false, true) +// if err != nil { +// t.Fatalf("failed to open database: %s", err) +// } +// defer db.Close() +// rows, err := db.QueryStringStmt("SELECT COUNT(*) FROM foo") +// if err != nil { +// t.Fatalf("failed to query database: %s", err) +// } +// if exp, got := `[{"columns":["COUNT(*)"],"types":["integer"],"values":[[4]]}]`, asJSON(rows); exp != got { +// t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got) +// } +// }