1
0
Fork 0
You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

226 lines
5.9 KiB
Go

package snapshot
import (
"fmt"
"os"
"path/filepath"
"github.com/hashicorp/raft"
"github.com/rqlite/rqlite/v8/db"
)
// Sink is a sink for writing snapshot data to a Snapshot store.
type Sink struct {
str *Store
meta *raft.SnapshotMeta
snapDirPath string
snapTmpDirPath string
dataFD *os.File
opened bool
}
// NewSink creates a new Sink object.
func NewSink(str *Store, meta *raft.SnapshotMeta) *Sink {
return &Sink{
str: str,
meta: meta,
}
}
// Open opens the sink for writing.
func (s *Sink) Open() error {
if s.opened {
return nil
}
s.opened = true
// Make temp snapshot directory
s.snapDirPath = filepath.Join(s.str.Dir(), s.meta.ID)
s.snapTmpDirPath = tmpName(s.snapDirPath)
if err := os.MkdirAll(s.snapTmpDirPath, 0755); err != nil {
return err
}
dataPath := filepath.Join(s.snapTmpDirPath, s.meta.ID+".data")
dataFD, err := os.Create(dataPath)
if err != nil {
return err
}
s.dataFD = dataFD
return nil
}
// Write writes snapshot data to the sink. The snapshot is not in place
// until Close is called.
func (s *Sink) Write(p []byte) (n int, err error) {
return s.dataFD.Write(p)
}
// ID returns the ID of the snapshot being written.
func (s *Sink) ID() string {
return s.meta.ID
}
// Cancel cancels the snapshot. Cancel must be called if the snapshot is not
// going to be closed.
func (s *Sink) Cancel() error {
if !s.opened {
return nil
}
s.opened = false
if err := s.dataFD.Close(); err != nil {
return err
}
s.dataFD = nil
return RemoveAllTmpSnapshotData(s.str.Dir())
}
// Close closes the sink, and finalizes creation of the snapshot. It is critical
// that Close is called, or the snapshot will not be in place. It is OK to call
// Close without every calling Write. In that case the Snapshot will be finalized
// as usual, but will effectively be the same as the previously created snapshot.
func (s *Sink) Close() error {
if !s.opened {
return nil
}
s.opened = false
if err := s.dataFD.Close(); err != nil {
return err
}
// Write meta data
if err := s.writeMeta(s.snapTmpDirPath); err != nil {
return err
}
if err := s.processSnapshotData(); err != nil {
return err
}
// Get size of SQLite file and set in meta.
dbPath, err := s.str.getDBPath()
if err != nil {
return err
}
fi, err := os.Stat(dbPath)
if err != nil {
return err
}
if err := updateMetaSize(s.snapDirPath, fi.Size()); err != nil {
return fmt.Errorf("failed to update snapshot meta size: %s", err.Error())
}
if err := s.str.unsetFullNeeded(); err != nil {
return err
}
_, err = s.str.Reap()
return err
}
func (s *Sink) processSnapshotData() (retErr error) {
defer func() {
if retErr != nil {
RemoveAllTmpSnapshotData(s.str.Dir())
}
}()
// Check the state of the store before processing this new snapshot. This
// allows us to perform some sanity checks on the incoming snapshot data.
snapshots, err := s.str.getSnapshots()
if err != nil {
return err
}
if len(snapshots) == 0 && !db.IsValidSQLiteFile(s.dataFD.Name()) {
// We have no snapshots yet, so the incomding data must be a valid SQLite file.
return fmt.Errorf("data for first snapshot must be a valid SQLite file")
}
dataSz, err := fileSize(s.dataFD.Name())
if err != nil {
return err
}
// Writing zero data for a snapshot is acceptable, and indicates the snapshot
// is empty. This could happen if lots of entries were written to the Raft log,
// which would trigger a Raft snapshot, but those entries didn't actually change
// the database. Otherwise, the data must be a valid SQLite file or WAL file.
if dataSz != 0 {
if db.IsValidSQLiteFile(s.dataFD.Name()) {
if err := os.Rename(s.dataFD.Name(), filepath.Join(s.str.Dir(), s.meta.ID+".db")); err != nil {
return err
}
} else if db.IsValidSQLiteWALFile(s.dataFD.Name()) {
// With WAL data incoming, then we must have a valid SQLite file from the previous snapshot.
snapPrev := snapshots[len(snapshots)-1]
snapPrevDB := filepath.Join(s.str.Dir(), snapPrev.ID+".db")
if !db.IsValidSQLiteFile(snapPrevDB) {
return fmt.Errorf("previous snapshot data is not a SQLite file: %s", snapPrevDB)
}
if err := os.Rename(s.dataFD.Name(), filepath.Join(s.str.Dir(), s.meta.ID+".db-wal")); err != nil {
return err
}
} else {
return fmt.Errorf("invalid snapshot data file: %s", s.dataFD.Name())
}
}
// Indicate snapshot data been successfully persisted to disk by renaming
// the temp directory to a non-temporary name.
if err := os.Rename(s.snapTmpDirPath, s.snapDirPath); err != nil {
return err
}
if err := syncDirMaybe(s.str.Dir()); err != nil {
return err
}
// Now check if we need to replay any WAL file into the previous SQLite file. This is
// the final step of any snapshot process.
snapshots, err = s.str.getSnapshots()
if err != nil {
return err
}
if len(snapshots) >= 2 {
snapPrev := snapshots[len(snapshots)-2]
snapNew := snapshots[len(snapshots)-1]
snapPrevDB := filepath.Join(s.str.Dir(), snapPrev.ID+".db")
snapNewDB := filepath.Join(s.str.Dir(), snapNew.ID+".db")
snapNewWAL := filepath.Join(s.str.Dir(), snapNew.ID+".db-wal")
if db.IsValidSQLiteWALFile(snapNewWAL) || dataSz == 0 {
// One of two things have happened. Either the snapshot data is empty, in which
// case we can just make the existing SQLite file the new snapshot, or the snapshot
// data is a valid WAL file, in which case we need to replay it into the existing
// SQLite file.
if err := os.Rename(snapPrevDB, snapNewDB); err != nil {
return err
}
// An open-close cycle checkpoints and removes any WAL file.
if err := openCloseDB(snapNewDB); err != nil {
return err
}
}
}
if err := syncDirMaybe(s.str.Dir()); err != nil {
return err
}
s.str.Reap()
return nil
}
func (s *Sink) writeMeta(dir string) error {
return writeMeta(dir, s.meta)
}
func fileSize(path string) (int64, error) {
stat, err := os.Stat(path)
if err != nil {
return 0, err
}
return stat.Size(), nil
}