1
0
Fork 0

Start full testing of snapshotting

I need a snapshot decoder, for use by the main Store. And then this
decoder could be used by the Sink too.
master
Philip O'Toole 1 year ago
parent b0f46a60ba
commit 4503187b2f

@ -81,8 +81,11 @@ func (s *Sink) Close() error {
return err
}
if !s.str.noAutoreap {
return s.str.Reap()
}
return nil
}
func (s *Sink) processSnapshotData() error {
if s.nWritten == 0 {
@ -218,7 +221,7 @@ func (s *Sink) processFullSnapshot(fullSnap *FullSnapshot) error {
func (s *Sink) writeMeta(dir string, full bool) error {
fh, err := os.Create(filepath.Join(dir, metaFileName))
if err != nil {
return err
return fmt.Errorf("error creating meta file: %v", err)
}
defer fh.Close()
s.meta.Full = full
@ -226,7 +229,7 @@ func (s *Sink) writeMeta(dir string, full bool) error {
// Write out as JSON
enc := json.NewEncoder(fh)
if err = enc.Encode(s.meta); err != nil {
return err
return fmt.Errorf("failed to encode meta: %v", err)
}
if err := fh.Sync(); err != nil {
@ -275,7 +278,7 @@ func moveFromTmpSync(src string) (string, error) {
// Sync parent directory to ensure snapshot is visible, but it's only
// needed on *nix style file systems.
if runtime.GOOS != "windows" {
if err := syncFile(parentDir(dst)); err != nil {
if err := syncDir(parentDir(dst)); err != nil {
return "", err
}
}

@ -70,6 +70,10 @@ func Test_SinkFullSnapshot(t *testing.T) {
if !fileExists(filepath.Join(nextGenDir, baseSqliteFile)) {
t.Fatalf("next generation directory %s does not contain base SQLite file", nextGenDir)
}
expMetaPath := filepath.Join(nextGenDir, "snap-1234", metaFileName)
if !fileExists(expMetaPath) {
t.Fatalf("meta file does not exist at %s", expMetaPath)
}
// Check SQLite database has been created correctly.
db, err := db.Open(filepath.Join(nextGenDir, baseSqliteFile), false, false)
@ -84,8 +88,6 @@ func Test_SinkFullSnapshot(t *testing.T) {
if exp, got := `[{"columns":["COUNT(*)"],"types":["integer"],"values":[[4]]}]`, asJSON(rows); exp != got {
t.Fatalf("unexpected results for query, expected %s, got %s", exp, got)
}
// confirm that snapshot is valid (full?, contains meta?)
}
func Test_SinkIncrementalSnapshot(t *testing.T) {
@ -130,7 +132,10 @@ func Test_SinkIncrementalSnapshot(t *testing.T) {
t.Fatalf("WAL file data does not match")
}
// confirm that snapshot is valid (incremental?, contains meta?)
expMetaPath := filepath.Join(currGenDir, "snap-1234", metaFileName)
if !fileExists(expMetaPath) {
t.Fatalf("meta file does not exist at %s", expMetaPath)
}
}
func mustNewStoreForSinkTest(t *testing.T) *Store {

@ -1,6 +1,8 @@
package snapshot
import "io"
import (
"io"
)
type Snapshot struct {
walData []byte
@ -19,7 +21,7 @@ func NewFullSnapshot(files ...string) *Snapshot {
}
}
func (s *Snapshot) Persist(sink *Sink) error {
func (s *Snapshot) Persist(sink io.Writer) error {
stream, err := s.OpenStream()
if err != nil {
return err

@ -50,6 +50,8 @@ type Meta struct {
type Store struct {
rootDir string
generationsDir string
noAutoreap bool
logger *log.Logger
}
@ -250,16 +252,18 @@ func (s *Store) GetCurrentGenerationDir() (string, bool, error) {
// Reap reaps old generations, and reaps snapshots within the remaining generation.
func (s *Store) Reap() error {
if _, err := s.ReapGenerations(); err != nil {
return err
return fmt.Errorf("failed to reap generations during reap: %s", err)
}
currDir, ok, err := s.GetCurrentGenerationDir()
if err != nil {
return err
return fmt.Errorf("failed to get current generation directory during reap: %s", err)
}
if ok {
_, err = s.ReapSnapshots(currDir, 2)
return err
if err != nil {
return fmt.Errorf("failed to reap snapshots during reap: %s", err)
}
}
return nil
}
@ -290,16 +294,13 @@ func (s *Store) ReapGenerations() (int, error) {
// returns the number of snapshots removed, or an error. The retain parameter
// specifies the number of snapshots to retain.
func (s *Store) ReapSnapshots(dir string, retain int) (int, error) {
// s.mu.Lock()
// defer s.mu.Unlock()
if retain < minSnapshotRetain {
return 0, ErrRetainCountTooLow
}
snapshots, err := s.getSnapshots(dir)
if err != nil {
s.logger.Printf("failed to get snapshots: %s", err)
s.logger.Printf("failed to get snapshots in directory %s: %s", dir, err)
return 0, err
}
@ -341,7 +342,7 @@ func (s *Store) ReapSnapshots(dir string, retain int) (int, error) {
s.logger.Printf("failed to delete snapshot %s: %s", snap.ID, err)
return n, err
}
if err := syncFile(dir); err != nil {
if err := syncDir(dir); err != nil {
s.logger.Printf("failed to sync directory containing snapshots: %s", err)
}
@ -387,15 +388,14 @@ func (s *Store) getSnapshots(dir string) ([]*Meta, error) {
}
// Ignore any temporary snapshots
snapName := snap.Name()
if isTmpName(snapName) {
if isTmpName(snap.Name()) {
continue
}
// Try to read the meta data
meta, err := s.readMeta(snapName)
meta, err := s.readMeta(filepath.Join(dir, snap.Name()))
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to read meta for snapshot %s: %s", snap.Name(), err)
}
snapMeta = append(snapMeta, meta)
}
@ -406,10 +406,10 @@ func (s *Store) getSnapshots(dir string) ([]*Meta, error) {
return snapMeta, nil
}
// readMeta is used to read the meta data for a given named backup
func (s *Store) readMeta(name string) (*Meta, error) {
// readMeta is used to read the meta data in a given snapshot directory.
func (s *Store) readMeta(dir string) (*Meta, error) {
// Open the meta file
metaPath := filepath.Join(s.rootDir, name, metaFileName)
metaPath := filepath.Join(dir, metaFileName)
fh, err := os.Open(metaPath)
if err != nil {
return nil, err
@ -470,6 +470,19 @@ func syncFile(path string) error {
return fd.Sync()
}
func syncDir(dir string) error {
fh, err := os.Open(dir)
if err != nil {
return err
}
defer fh.Close()
if err := fh.Sync(); err != nil {
return err
}
return fh.Close()
}
// snapshotName generates a name for the snapshot.
func snapshotName(term, index uint64) string {
now := time.Now()

@ -3,6 +3,8 @@ package snapshot
import (
"strings"
"testing"
"github.com/hashicorp/raft"
)
func Test_NewStore(t *testing.T) {
@ -57,6 +59,80 @@ func Test_NewStore_ListEmpty(t *testing.T) {
}
}
// Test_WALSnapshotStore_CreateFull performs detailed testing of the
// snapshot creation process.
func Test_Store_CreateFullThenIncremental(t *testing.T) {
checkSnapshotCount := func(s *Store, exp int) *raft.SnapshotMeta {
snaps, err := s.List()
if err != nil {
t.Fatalf("failed to list snapshots: %s", err)
}
if exp, got := exp, len(snaps); exp != got {
t.Fatalf("expected %d snapshots, got %d", exp, got)
}
if len(snaps) == 0 {
return nil
}
return snaps[0]
}
dir := t.TempDir()
str, err := NewStore(dir)
if err != nil {
t.Fatalf("failed to create snapshot store: %s", err)
}
if !str.FullNeeded() {
t.Fatalf("expected full snapshots to be needed")
}
testConfig1 := makeTestConfiguration("1", "2")
sink, err := str.Create(1, 22, 33, testConfig1, 4, nil)
if err != nil {
t.Fatalf("failed to create 1st snapshot sink: %s", err)
}
// Create a full snapshot and write it to the sink.
fullSnap := NewFullSnapshot("testdata/db-and-wals/backup.db")
if err := fullSnap.Persist(sink); err != nil {
t.Fatalf("failed to persist full snapshot: %s", err)
}
if err := sink.Close(); err != nil {
t.Fatalf("failed to close sink: %s", err)
}
if str.FullNeeded() {
t.Fatalf("full snapshot still needed")
}
meta := checkSnapshotCount(str, 1)
if meta.Index != 22 || meta.Term != 33 {
t.Fatalf("unexpected snapshot metadata: %+v", meta)
}
// Open the latest snapshot and check that it's correct.
_, _, err = str.Open(meta.ID)
if err != nil {
t.Fatalf("failed to open snapshot %s: %s", meta.ID, err)
}
// Incremental snapshot next
sink, err = str.Create(2, 55, 66, testConfig1, 4, nil)
if err != nil {
t.Fatalf("failed to create 2nd snapshot sink: %s", err)
}
walData := mustReadFile("testdata/db-and-wals/wal-00")
incSnap := NewWALSnapshot(walData)
if err := incSnap.Persist(sink); err != nil {
t.Fatalf("failed to persist incremental snapshot: %s", err)
}
if err := sink.Close(); err != nil {
t.Fatalf("failed to close sink: %s", err)
}
meta = checkSnapshotCount(str, 1)
if meta.Index != 55 || meta.Term != 66 {
t.Fatalf("unexpected snapshot metadata: %+v", meta)
}
}
func Test_Store_ReapGenerations(t *testing.T) {
dir := t.TempDir()
s, err := NewStore(dir)

Loading…
Cancel
Save