From f3d7a5b8d4205057a21ddfe604e01f39cdbdc050 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Sat, 19 Aug 2023 12:26:07 -0400 Subject: [PATCH] More Sink tweaks --- snapshot/sink.go | 11 +++++++++-- snapshot/sink_test.go | 17 ++++++++++++++++- snapshot/store.go | 21 +++++++++++++++++---- 3 files changed, 42 insertions(+), 7 deletions(-) diff --git a/snapshot/sink.go b/snapshot/sink.go index 8d8474de..2ebc8b3f 100644 --- a/snapshot/sink.go +++ b/snapshot/sink.go @@ -20,7 +20,8 @@ type Sink struct { nextGenDir string meta *Meta - dataFD *os.File + nWritten int64 + dataFD *os.File logger *log.Logger closed bool @@ -51,7 +52,9 @@ func (s *Sink) Open() error { // Write writes snapshot data to the sink. The snapshot is not in place // until Close is called. func (s *Sink) Write(p []byte) (n int, err error) { - return s.dataFD.Write(p) + n, err = s.dataFD.Write(p) + s.nWritten += int64(n) + return } // ID returns the ID of the snapshot being written. @@ -76,6 +79,10 @@ func (s *Sink) Close() error { } func (s *Sink) processSnapshotData() error { + if s.nWritten == 0 { + return nil + } + if _, err := s.dataFD.Seek(0, 0); err != nil { return err } diff --git a/snapshot/sink_test.go b/snapshot/sink_test.go index 5dfdd344..09d7a13f 100644 --- a/snapshot/sink_test.go +++ b/snapshot/sink_test.go @@ -6,7 +6,22 @@ import ( "testing" ) -func Test_NewSinkOpenOK(t *testing.T) { +func Test_NewSinkOpenCloseOK(t *testing.T) { + tmpDir := t.TempDir() + workDir := filepath.Join(tmpDir, "work") + mustCreateDir(workDir) + currGenDir := filepath.Join(tmpDir, "curr") + nextGenDir := filepath.Join(tmpDir, "next") + s := NewSink(workDir, currGenDir, nextGenDir, &Meta{}) + if err := s.Open(); err != nil { + t.Fatal(err) + } + if err := s.Close(); err != nil { + t.Fatal(err) + } +} + +func Test_SinkFullSnapshot(t *testing.T) { tmpDir := t.TempDir() workDir := filepath.Join(tmpDir, "work") mustCreateDir(workDir) diff --git a/snapshot/store.go b/snapshot/store.go index ef8a1feb..2fe9261f 100644 --- a/snapshot/store.go +++ b/snapshot/store.go @@ -75,13 +75,26 @@ func (s *Store) Create(version raft.SnapshotVersion, index, term uint64, configu if err != nil { return nil, err } - snapshotName := snapshotName(term, index) - snapshotPath := filepath.Join(currGenDir, snapshotName+tmpSuffix) - if err := os.MkdirAll(snapshotPath, 0755); err != nil { + nextGenDir, err := s.GetNextGenerationDir() + if err != nil { return nil, err } - return nil, nil + meta := &Meta{ + SnapshotMeta: raft.SnapshotMeta{ + Index: index, + Term: term, + Configuration: configuration, + ConfigurationIndex: configurationIndex, + Version: version, + }, + } + + sink := NewSink(s.rootDir, currGenDir, nextGenDir, meta) + if err := sink.Open(); err != nil { + return nil, fmt.Errorf("failed to open Sink: %v", err) + } + return sink, nil } // List returns a list of all the snapshots in the Store.