1
0
Fork 0

More reaping and related unit tests

master
Philip O'Toole 1 year ago
parent be1d59588d
commit b0f46a60ba

@ -275,22 +275,9 @@ func moveFromTmpSync(src string) (string, error) {
// Sync parent directory to ensure snapshot is visible, but it's only // Sync parent directory to ensure snapshot is visible, but it's only
// needed on *nix style file systems. // needed on *nix style file systems.
if runtime.GOOS != "windows" { if runtime.GOOS != "windows" {
if err := syncDir(parentDir(dst)); err != nil { if err := syncFile(parentDir(dst)); err != nil {
return "", err return "", err
} }
} }
return dst, nil return dst, nil
} }
func syncDir(dir string) error {
fh, err := os.Open(dir)
if err != nil {
return err
}
defer fh.Close()
if err := fh.Sync(); err != nil {
return err
}
return fh.Close()
}

@ -249,7 +249,7 @@ func (s *Store) GetCurrentGenerationDir() (string, bool, error) {
// Reap reaps old generations, and reaps snapshots within the remaining generation. // Reap reaps old generations, and reaps snapshots within the remaining generation.
func (s *Store) Reap() error { func (s *Store) Reap() error {
if err := s.ReapGenerations(); err != nil { if _, err := s.ReapGenerations(); err != nil {
return err return err
} }
@ -264,21 +264,25 @@ func (s *Store) Reap() error {
return nil return nil
} }
func (s *Store) ReapGenerations() error { // ReapGenerations removes old generations. It returns the number of generations
// removed, or an error.
func (s *Store) ReapGenerations() (int, error) {
generations, err := s.GetGenerations() generations, err := s.GetGenerations()
if err != nil { if err != nil {
return err return 0, err
} }
if len(generations) == 0 { if len(generations) == 0 {
return nil return 0, nil
} }
n := 0
for i := 0; i < len(generations)-1; i++ { for i := 0; i < len(generations)-1; i++ {
genDir := filepath.Join(s.generationsDir, generations[i]) genDir := filepath.Join(s.generationsDir, generations[i])
if err := os.RemoveAll(genDir); err != nil { if err := os.RemoveAll(genDir); err != nil {
return err return n, err
} }
n++
} }
return nil return n, nil
} }
// ReapSnapshots removes snapshots that are no longer needed. It does this by // ReapSnapshots removes snapshots that are no longer needed. It does this by
@ -311,16 +315,37 @@ func (s *Store) ReapSnapshots(dir string, retain int) (int, error) {
sort.Sort(metaSlice(snapshots)) sort.Sort(metaSlice(snapshots))
n := 0 n := 0
for _, snap := range snapshots[0 : len(snapshots)-retain] {
baseSqliteFilePath := filepath.Join(dir, baseSqliteFile) baseSqliteFilePath := filepath.Join(dir, baseSqliteFile)
snapDirPath := filepath.Join(dir, snap.ID)
snapWALFilePath := filepath.Join(snapDirPath, snapWALFile) for _, snap := range snapshots[0 : len(snapshots)-retain] {
walToCheckpointFilePath := filepath.Join(dir, baseSqliteWALFile) snapDirPath := filepath.Join(dir, snap.ID) // Path to the snapshot directory
snapWALFilePath := filepath.Join(snapDirPath, snapWALFile) // Path to the WAL file in the snapshot
walToCheckpointFilePath := filepath.Join(dir, baseSqliteWALFile) // Path to the WAL file to checkpoint
snapWALFilePathCopy := walToCheckpointFilePath + snap.ID
// If the snapshot directory doesn't contain a WAL file, then the base SQLite // If the snapshot directory doesn't contain a WAL file, then the base SQLite
// file is the snapshot state, and there is no checkpointing to do. // file is the snapshot state, and there is no checkpointing to do.
if fileExists(snapWALFilePath) { if fileExists(snapWALFilePath) {
// Move the WAL file to beside the base SQLite file // Copy the WAL file from the snapshot to a temporary location beside the base SQLite file.
// We do this so that we only delete the snapshot directory once we can be sure that
// we've copied it out fully. Renaming is not atomic on every OS, so let's be sure. We
// also use a temporary file name, so we know where the WAL came from if we exit here
// and need to clean up on a restart.
if err := copyFileSync(snapWALFilePath, snapWALFilePathCopy); err != nil {
s.logger.Printf("failed to copy WAL file from snapshot %s: %s", snapWALFilePath, err)
return n, err
}
// Delete the snapshot directory, since we have what we need now.
if err := os.RemoveAll(snapDirPath); err != nil {
s.logger.Printf("failed to delete snapshot %s: %s", snap.ID, err)
return n, err
}
if err := syncFile(dir); err != nil {
s.logger.Printf("failed to sync directory containing snapshots: %s", err)
}
// Move the WAL file to the correct name for checkpointing.
if err := os.Rename(snapWALFilePath, walToCheckpointFilePath); err != nil { if err := os.Rename(snapWALFilePath, walToCheckpointFilePath); err != nil {
s.logger.Printf("failed to move WAL file %s: %s", snapWALFilePath, err) s.logger.Printf("failed to move WAL file %s: %s", snapWALFilePath, err)
return n, err return n, err
@ -333,11 +358,6 @@ func (s *Store) ReapSnapshots(dir string, retain int) (int, error) {
} }
} }
// Delete the snapshot directory, since the state is now in the base SQLite file.
if err := os.RemoveAll(snapDirPath); err != nil {
s.logger.Printf("failed to delete snapshot %s: %s", snap.ID, err)
return n, err
}
n++ n++
s.logger.Printf("reaped snapshot %s successfully", snap.ID) s.logger.Printf("reaped snapshot %s successfully", snap.ID)
} }
@ -423,6 +443,33 @@ func dirExists(path string) bool {
return err == nil && stat.IsDir() return err == nil && stat.IsDir()
} }
func copyFileSync(src, dst string) error {
srcFd, err := os.Open(src)
if err != nil {
return err
}
defer srcFd.Close()
dstFd, err := os.Create(dst)
if err != nil {
return err
}
defer dstFd.Close()
if err := dstFd.Sync(); err != nil {
return err
}
_, err = io.Copy(dstFd, srcFd)
return err
}
func syncFile(path string) error {
fd, err := os.OpenFile(path, os.O_RDWR, 0644)
if err != nil {
return err
}
defer fd.Close()
return fd.Sync()
}
// snapshotName generates a name for the snapshot. // snapshotName generates a name for the snapshot.
func snapshotName(term, index uint64) string { func snapshotName(term, index uint64) string {
now := time.Now() now := time.Now()

@ -0,0 +1,250 @@
package snapshot
import (
"strings"
"testing"
)
func Test_NewStore(t *testing.T) {
tmpDir := t.TempDir()
s, err := NewStore(tmpDir)
if err != nil {
t.Fatal(err)
}
if s == nil {
t.Fatal("expected non-nil store")
}
generations, err := s.GetGenerations()
if err != nil {
t.Fatalf("failed to get generations: %s", err.Error())
}
if len(generations) != 0 {
t.Fatalf("expected 0 generation, got %d", len(generations))
}
_, ok, err := s.GetCurrentGenerationDir()
if err != nil {
t.Fatalf("failed to get current generation dir: %s", err.Error())
}
if ok {
t.Fatalf("expected current generation dir not to exist")
}
nextGenDir, err := s.GetNextGenerationDir()
if err != nil {
t.Fatalf("failed to get next generation dir: %s", err.Error())
}
if !strings.HasSuffix(nextGenDir, firstGeneration) {
t.Fatalf("expected next generation dir to be empty, got %s", nextGenDir)
}
}
func Test_NewStore_ListEmpty(t *testing.T) {
dir := t.TempDir()
s, err := NewStore(dir)
if err != nil {
t.Fatalf("failed to create snapshot store: %s", err)
}
if !s.FullNeeded() {
t.Fatalf("expected full snapshots to be needed")
}
if snaps, err := s.List(); err != nil {
t.Fatalf("failed to list snapshots: %s", err)
} else if len(snaps) != 0 {
t.Fatalf("expected 1 snapshots, got %d", len(snaps))
}
}
func Test_Store_ReapGenerations(t *testing.T) {
dir := t.TempDir()
s, err := NewStore(dir)
if err != nil {
t.Fatalf("failed to create snapshot store: %s", err)
}
testCurrGenDirIs := func(exp string) string {
curGenDir, ok, err := s.GetCurrentGenerationDir()
if err != nil {
t.Fatalf("failed to get current generation dir: %s", err.Error())
}
if !ok {
t.Fatalf("expected current generation dir to exist")
}
if curGenDir != exp {
t.Fatalf("expected current generation dir to be %s, got %s", exp, curGenDir)
}
return curGenDir
}
testGenCountIs := func(exp int) {
generations, err := s.GetGenerations()
if err != nil {
t.Fatalf("failed to get generations: %s", err.Error())
}
if exp, got := exp, len(generations); exp != got {
t.Fatalf("expected %d generations, got %d", exp, got)
}
}
testReapsOK := func(expN int) {
n, err := s.ReapGenerations()
if err != nil {
t.Fatalf("reaping failed: %s", err.Error())
}
if n != expN {
t.Fatalf("expected %d generations to be reaped, got %d", expN, n)
}
}
var nextGenDir string
nextGenDir, err = s.GetNextGenerationDir()
if err != nil {
t.Fatalf("failed to get next generation dir: %s", err.Error())
}
mustCreateDir(nextGenDir)
testCurrGenDirIs(nextGenDir)
testReapsOK(0)
// Create another generation and then tell the Store to reap.
nextGenDir, err = s.GetNextGenerationDir()
if err != nil {
t.Fatalf("failed to get next generation dir: %s", err.Error())
}
mustCreateDir(nextGenDir)
testGenCountIs(2)
testReapsOK(1)
testCurrGenDirIs(nextGenDir)
// Finally, test reaping lots of generations.
for i := 0; i < 10; i++ {
nextGenDir, err = s.GetNextGenerationDir()
if err != nil {
t.Fatalf("failed to get next generation dir: %s", err.Error())
}
mustCreateDir(nextGenDir)
}
testGenCountIs(11)
testReapsOK(10)
testGenCountIs(1)
testCurrGenDirIs(nextGenDir)
}
// Test_Store_Reaping tests that the snapshot store correctly
// reaps snapshots that are no longer needed. Because it's critical that
// reaping is done correctly, this test checks internal implementation
// details.
// func Test_Store_Reaping(t *testing.T) {
// dir := t.TempDir()
// str, err := NewStore(dir)
// if err != nil {
// t.Fatalf("failed to create snapshot store: %s", err)
// }
// //str.noAutoReap = true
// testConfig := makeTestConfiguration("1", "2")
// createSnapshot := func(index, term uint64, file string) {
// b, err := os.ReadFile(file)
// if err != nil {
// t.Fatalf("failed to read file: %s", err)
// }
// sink, err := str.Create(1, index, term, testConfig, 4, nil)
// if err != nil {
// t.Fatalf("failed to create 2nd snapshot: %s", err)
// }
// if _, err = sink.Write(b); err != nil {
// t.Fatalf("failed to write to sink: %s", err)
// }
// sink.Close()
// }
// createSnapshot(1, 1, "testdata/reaping/backup.db")
// createSnapshot(3, 2, "testdata/reaping/wal-00")
// createSnapshot(5, 3, "testdata/reaping/wal-01")
// createSnapshot(7, 4, "testdata/reaping/wal-02")
// createSnapshot(9, 5, "testdata/reaping/wal-03")
// // There should be 5 snapshot directories, one of which should be
// // a full, and the rest incremental.
// snaps, err := str.getSnapshots()
// if err != nil {
// t.Fatalf("failed to list snapshots: %s", err)
// }
// if exp, got := 5, len(snaps); exp != got {
// t.Fatalf("expected %d snapshots, got %d", exp, got)
// }
// for _, snap := range snaps[0:4] {
// if snap.Full {
// t.Fatalf("snapshot %s is full", snap.ID)
// }
// }
// if !snaps[4].Full {
// t.Fatalf("snapshot %s is incremental", snaps[4].ID)
// }
// // Reap just the first snapshot, which is full.
// n, err := str.ReapSnapshots(4)
// if err != nil {
// t.Fatalf("failed to reap full snapshot: %s", err)
// }
// if exp, got := 1, n; exp != got {
// t.Fatalf("expected %d snapshots to be reaped, got %d", exp, got)
// }
// snaps, err = str.getSnapshots()
// if err != nil {
// t.Fatalf("failed to list snapshots: %s", err)
// }
// if exp, got := 4, len(snaps); exp != got {
// t.Fatalf("expected %d snapshots, got %d", exp, got)
// }
// // Reap all but the last two snapshots. The remaining snapshots
// // should all be incremental.
// n, err = str.ReapSnapshots(2)
// if err != nil {
// t.Fatalf("failed to reap snapshots: %s", err)
// }
// if exp, got := 2, n; exp != got {
// t.Fatalf("expected %d snapshots to be reaped, got %d", exp, got)
// }
// snaps, err = str.getSnapshots()
// if err != nil {
// t.Fatalf("failed to list snapshots: %s", err)
// }
// if exp, got := 2, len(snaps); exp != got {
// t.Fatalf("expected %d snapshots, got %d", exp, got)
// }
// for _, snap := range snaps {
// if snap.Full {
// t.Fatalf("snapshot %s is full", snap.ID)
// }
// }
// if snaps[0].Index != 9 && snaps[1].Term != 5 {
// t.Fatalf("snap 0 is wrong")
// }
// if snaps[1].Index != 7 && snaps[1].Term != 3 {
// t.Fatalf("snap 1 is wrong")
// }
// // Check the contents of the remaining snapshots by creating a new
// // SQLite from the Store
// dbPath, err := str.ReplayWALs()
// if err != nil {
// t.Fatalf("failed to replay WALs: %s", err)
// }
// db, err := db.Open(dbPath, false, true)
// if err != nil {
// t.Fatalf("failed to open database: %s", err)
// }
// defer db.Close()
// rows, err := db.QueryStringStmt("SELECT COUNT(*) FROM foo")
// if err != nil {
// t.Fatalf("failed to query database: %s", err)
// }
// if exp, got := `[{"columns":["COUNT(*)"],"types":["integer"],"values":[[4]]}]`, asJSON(rows); exp != got {
// t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
// }
// }
Loading…
Cancel
Save