1
0
Fork 0

Fill out methods on SnapshotStore v2

master
Philip O'Toole 12 months ago
parent 7c420407c2
commit 5e3d481a6e

@ -10,14 +10,13 @@ If the SS needs an incremental snapshot
- checkpoint WAL into SQLite file.
Persist()
- create <snap>.tmp directory in SS (perhaps already created by Sink)
- create <snap>.tmp directory in SS (already created by Sink Open())
- check type of snapshot
- if full:
- copy SQLite file to SS root, named as <snap>.db
- rename <snap>.tmp to <snap>
- move SQLite file to SS root, named as <snap>.db
- delete <snap-1>.db, if it exists, and any earlier snap directories too.
- else:
- copy WAL file to beside existing SQLite file. Name it <snap>.db-wal. Sync everything.
- move WAL file to beside existing SQLite file. Name it <snap>.db-wal. Sync everything.
- rename <snap>.tmp to <snap>
- rename <snap-1>.db to <snap>.db
- checkpoint <snap>.db-wal <snap>.db

@ -1,9 +1,7 @@
package snapshot2
import (
"encoding/json"
"errors"
"fmt"
"os"
"path/filepath"
@ -69,7 +67,10 @@ func (s *Sink) ID() string {
// going to be closed.
func (s *Sink) Cancel() error {
s.closed = true
return nil
if err := s.dataFD.Close(); err != nil {
return err
}
return RemoveAllTmpSnapshotData(s.str.Dir())
}
// Close closes the sink, and finalizes creation of the snapshot. It is critical
@ -79,25 +80,24 @@ func (s *Sink) Close() error {
return nil
}
s.closed = true
s.dataFD.Close()
// Write meta data
if err := s.writeMeta(s.snapTmpDirPath); err != nil {
return err
}
s.dataFD.Close()
if err := s.processSnapshotData(); err != nil {
return err
}
// XXX Add autoreap check here.
return s.str.Reap()
}
func (s *Sink) processSnapshotData() (retErr error) {
defer func() {
if retErr != nil {
s.removeTmpSnapshotData()
RemoveAllTmpSnapshotData(s.str.Dir())
}
}()
@ -158,63 +158,14 @@ func (s *Sink) processSnapshotData() (retErr error) {
// SQLite file for the latest snapshot. This is an invalid state.
return ErrInvalidStore
}
// // Remove all snapshots, and all associated data, except the newest one.
// for _, snap := range snapshots[:len(snapshots)-1] {
// if err := removeAllPrefix(s.str.Dir(), snap); err != nil {
// return err
// }
// }
} else {
return ErrInvalidStore
}
// At this point we have one of the following situations:
// - a single directory and a single DB file. They should have the same base name.
// - two directories and a single DB file and a wal file. The single DB file should be
// named after the older snapshot. The wal file should be named after the newer snapshot.
s.str.Reap()
return nil
}
func (s *Sink) removeTmpSnapshotData() {
// Get a list of all snapshots in the directory which end with tmpSuffix.
// These are snapshots which were not successfully persisted -- all resources
// associated with them should be removed.
// List all directories in the snapshot directory.
directories, err := os.ReadDir(s.str.Dir())
if err != nil {
return
}
for _, d := range directories {
if d.IsDir() && isTmpName(d.Name()) {
os.RemoveAll(filepath.Join(s.str.Dir(), d.Name()))
// get a list of all files that reg match d.Name()
// remove all of them
}
}
}
func (s *Sink) writeMeta(dir string) error {
return writeMeta(dir, s.meta)
}
func writeMeta(dir string, meta *raft.SnapshotMeta) error {
fh, err := os.Create(filepath.Join(dir, metaFileName))
if err != nil {
return fmt.Errorf("error creating meta file: %v", err)
}
defer fh.Close()
// Write out as JSON
enc := json.NewEncoder(fh)
if err = enc.Encode(meta); err != nil {
return fmt.Errorf("failed to encode meta: %v", err)
}
if err := fh.Sync(); err != nil {
return err
}
return fh.Close()
}

@ -1,6 +1,7 @@
package snapshot2
import (
"encoding/json"
"expvar"
"fmt"
"io"
@ -69,14 +70,36 @@ func (s *Store) Create(version raft.SnapshotVersion, index, term uint64, configu
return sink, nil
}
// List returns a list of all the snapshots in the Store.
// List returns a list of all the snapshots in the Store. It returns the snapshots
// in newest to oldest order.
func (s *Store) List() ([]*raft.SnapshotMeta, error) {
return nil, nil
snapshots, err := s.getSnapshots()
if err != nil {
return nil, err
}
var snapMeta []*raft.SnapshotMeta
if len(snapshots) > 0 {
snapshotDir := filepath.Join(s.dir, snapshots[0])
meta, err := readMeta(snapshotDir)
if err != nil {
return nil, err
}
snapMeta = append(snapMeta, meta)
}
return snapMeta, nil
}
// Open opens the snapshot with the given ID.
func (s *Store) Open(id string) (*raft.SnapshotMeta, io.ReadCloser, error) {
return nil, nil, nil
meta, err := readMeta(filepath.Join(s.dir, id))
if err != nil {
return nil, nil, err
}
fd, err := os.Open(filepath.Join(s.dir, id+".db"))
if err != nil {
return nil, nil, err
}
return meta, fd, nil
}
// Stats returns stats about the Snapshot Store.
@ -86,21 +109,41 @@ func (s *Store) Stats() (map[string]interface{}, error) {
// Reap reaps old snapshots.
func (s *Store) Reap() error {
snapshots, err := s.getSnapshots()
if err != nil {
return err
}
// Remove all snapshots, and all associated data, except the newest one.
for _, snap := range snapshots[:len(snapshots)-1] {
if err := removeAllPrefix(s.dir, snap); err != nil {
return err
}
}
return nil
}
// FullNeeded returns true if the next type of snapshot needed
// by the Store is a full snapshot.
func (s *Store) FullNeeded() bool {
return true
}
// Dir returns the directory where the snapshots are stored.
func (s *Store) Dir() string {
return s.dir
}
// RemoveAllTmpSnapshotData removes all temporary Snapshot data from the store.
// getSnapshots returns a list of all snapshots in the store, sorted
// from oldest to newest.
func (s *Store) getSnapshots() ([]string, error) {
directories, err := os.ReadDir(s.dir)
if err != nil {
return nil, err
}
var snapshots []string
for _, d := range directories {
if !isTmpName(d.Name()) {
snapshots = append(snapshots, d.Name())
}
}
return snapshots, nil
}
// RemoveAllTmpSnapshotData removes all temporary Snapshot data from the directory.
// This process is defined as follows: for every directory in dir, if the directory
// is a temporary directory, remove the directory. Then remove all other files
// that contain the name of a temporary directory, minus the temporary suffix,
@ -131,22 +174,6 @@ func RemoveAllTmpSnapshotData(dir string) error {
return nil
}
// getSnapshots returns a list of all snapshots in the store, sorted
// from oldest to newest.
func (s *Store) getSnapshots() ([]string, error) {
directories, err := os.ReadDir(s.dir)
if err != nil {
return nil, err
}
var snapshots []string
for _, d := range directories {
if !isTmpName(d.Name()) {
snapshots = append(snapshots, d.Name())
}
}
return snapshots, nil
}
// snapshotName generates a name for the snapshot.
func snapshotName(term, index uint64) string {
now := time.Now()
@ -183,11 +210,6 @@ func syncDirMaybe(dir string) error {
return syncDir(dir)
}
func fileExists(path string) bool {
_, err := os.Stat(path)
return !os.IsNotExist(err)
}
// removeAllPrefix removes all files in the given directory that have the given prefix.
func removeAllPrefix(path, prefix string) error {
files, err := filepath.Glob(filepath.Join(path, prefix) + "*")
@ -201,3 +223,39 @@ func removeAllPrefix(path, prefix string) error {
}
return nil
}
// readMeta is used to read the meta data in a given snapshot directory.
func readMeta(dir string) (*raft.SnapshotMeta, error) {
metaPath := filepath.Join(dir, metaFileName)
fh, err := os.Open(metaPath)
if err != nil {
return nil, err
}
defer fh.Close()
meta := &raft.SnapshotMeta{}
dec := json.NewDecoder(fh)
if err := dec.Decode(meta); err != nil {
return nil, err
}
return meta, nil
}
func writeMeta(dir string, meta *raft.SnapshotMeta) error {
fh, err := os.Create(filepath.Join(dir, metaFileName))
if err != nil {
return fmt.Errorf("error creating meta file: %v", err)
}
defer fh.Close()
// Write out as JSON
enc := json.NewEncoder(fh)
if err = enc.Encode(meta); err != nil {
return fmt.Errorf("failed to encode meta: %v", err)
}
if err := fh.Sync(); err != nil {
return err
}
return fh.Close()
}

@ -153,9 +153,6 @@ func ResetStats() {
type SnapshotStore interface {
raft.SnapshotStore
// FullNeeded returns whether the Snapshot Store needs a full snapshot.
FullNeeded() bool
// Stats returns stats about the Snapshot Store.
Stats() (map[string]interface{}, error)
}
@ -1643,8 +1640,13 @@ func (s *Store) Database(leader bool) ([]byte, error) {
func (s *Store) Snapshot() (raft.FSMSnapshot, error) {
startT := time.Now()
fNeeded := s.snapshotStore.FullNeeded()
fPLog := fullPretty(fNeeded)
currSnaps, err := s.snapshotStore.List()
if err != nil {
return nil, err
}
fullNeeded := len(currSnaps) == 0
fPLog := fullPretty(fullNeeded)
s.logger.Printf("initiating %s snapshot on node ID %s", fPLog, s.raftID)
defer func() {
s.numSnapshotsMu.Lock()
@ -1656,7 +1658,7 @@ func (s *Store) Snapshot() (raft.FSMSnapshot, error) {
defer s.queryTxMu.Unlock()
var fsmSnapshot raft.FSMSnapshot
if fNeeded {
if fullNeeded {
if err := s.db.Checkpoint(); err != nil {
return nil, err
}

Loading…
Cancel
Save