|
|
@ -77,7 +77,9 @@ func (s *Sink) Cancel() error {
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// Close closes the sink, and finalizes creation of the snapshot. It is critical
|
|
|
|
// Close closes the sink, and finalizes creation of the snapshot. It is critical
|
|
|
|
// that Close is called, or the snapshot will not be in place.
|
|
|
|
// that Close is called, or the snapshot will not be in place. It is OK to call
|
|
|
|
|
|
|
|
// Close without every calling Write. In that case the Snapshot will be finalized
|
|
|
|
|
|
|
|
// as usual, but will effectively be the same as the previously created snapshot.
|
|
|
|
func (s *Sink) Close() error {
|
|
|
|
func (s *Sink) Close() error {
|
|
|
|
if !s.opened {
|
|
|
|
if !s.opened {
|
|
|
|
return nil
|
|
|
|
return nil
|
|
|
@ -132,28 +134,38 @@ func (s *Sink) processSnapshotData() (retErr error) {
|
|
|
|
return err
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if db.IsValidSQLiteFile(s.dataFD.Name()) {
|
|
|
|
if len(snapshots) == 0 && !db.IsValidSQLiteFile(s.dataFD.Name()) {
|
|
|
|
if err := os.Rename(s.dataFD.Name(), filepath.Join(s.str.Dir(), s.meta.ID+".db")); err != nil {
|
|
|
|
// We have no snapshots yet, so the incomding data must be a valid SQLite file.
|
|
|
|
return err
|
|
|
|
return fmt.Errorf("data for first snapshot must be a valid SQLite file")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
} else if db.IsValidSQLiteWALFile(s.dataFD.Name()) {
|
|
|
|
|
|
|
|
if len(snapshots) == 0 {
|
|
|
|
dataSz, err := fileSize(s.dataFD.Name())
|
|
|
|
// We are trying to create our first snapshot from a WAL file, which is invalid.
|
|
|
|
if err != nil {
|
|
|
|
return fmt.Errorf("data for first snapshot is a WAL file")
|
|
|
|
return err
|
|
|
|
} else {
|
|
|
|
}
|
|
|
|
// We have at least one previous snapshot. That means we should have a valid SQLite file
|
|
|
|
|
|
|
|
// for the previous snapshot.
|
|
|
|
// Writing zero data for a snapshot is acceptable, and indicates the snapshot
|
|
|
|
|
|
|
|
// is empty. This could happen if lots of entries were written to the Raft log,
|
|
|
|
|
|
|
|
// which would trigger a Raft snapshot, but those entries didn't actually change
|
|
|
|
|
|
|
|
// the database. Otherwise, the data must be a valid SQLite file or WAL file.
|
|
|
|
|
|
|
|
if dataSz != 0 {
|
|
|
|
|
|
|
|
if db.IsValidSQLiteFile(s.dataFD.Name()) {
|
|
|
|
|
|
|
|
if err := os.Rename(s.dataFD.Name(), filepath.Join(s.str.Dir(), s.meta.ID+".db")); err != nil {
|
|
|
|
|
|
|
|
return err
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
} else if db.IsValidSQLiteWALFile(s.dataFD.Name()) {
|
|
|
|
|
|
|
|
// With WAL data incoming, then we must have a valid SQLite file from the previous snapshot.
|
|
|
|
snapPrev := snapshots[len(snapshots)-1]
|
|
|
|
snapPrev := snapshots[len(snapshots)-1]
|
|
|
|
snapPrevDB := filepath.Join(s.str.Dir(), snapPrev.ID+".db")
|
|
|
|
snapPrevDB := filepath.Join(s.str.Dir(), snapPrev.ID+".db")
|
|
|
|
if !db.IsValidSQLiteFile(snapPrevDB) {
|
|
|
|
if !db.IsValidSQLiteFile(snapPrevDB) {
|
|
|
|
return fmt.Errorf("previous snapshot data is not a SQLite file: %s", snapPrevDB)
|
|
|
|
return fmt.Errorf("previous snapshot data is not a SQLite file: %s", snapPrevDB)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := os.Rename(s.dataFD.Name(), filepath.Join(s.str.Dir(), s.meta.ID+".db-wal")); err != nil {
|
|
|
|
|
|
|
|
return err
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
} else {
|
|
|
|
|
|
|
|
return fmt.Errorf("invalid snapshot data file: %s", s.dataFD.Name())
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if err := os.Rename(s.dataFD.Name(), filepath.Join(s.str.Dir(), s.meta.ID+".db-wal")); err != nil {
|
|
|
|
|
|
|
|
return err
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
} else {
|
|
|
|
|
|
|
|
return fmt.Errorf("invalid snapshot data file: %s", s.dataFD.Name())
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// Indicate snapshot data been successfully persisted to disk by renaming
|
|
|
|
// Indicate snapshot data been successfully persisted to disk by renaming
|
|
|
@ -178,12 +190,15 @@ func (s *Sink) processSnapshotData() (retErr error) {
|
|
|
|
snapNewDB := filepath.Join(s.str.Dir(), snapNew.ID+".db")
|
|
|
|
snapNewDB := filepath.Join(s.str.Dir(), snapNew.ID+".db")
|
|
|
|
snapNewWAL := filepath.Join(s.str.Dir(), snapNew.ID+".db-wal")
|
|
|
|
snapNewWAL := filepath.Join(s.str.Dir(), snapNew.ID+".db-wal")
|
|
|
|
|
|
|
|
|
|
|
|
if db.IsValidSQLiteWALFile(snapNewWAL) {
|
|
|
|
if db.IsValidSQLiteWALFile(snapNewWAL) || dataSz == 0 {
|
|
|
|
// The most recent snapshot was created from a WAL file, so we need to replay
|
|
|
|
// One of two things have happened. Either the snapshot data is empty, in which
|
|
|
|
// that WAL file into the previous SQLite file.
|
|
|
|
// case we can just make the existing SQLite file the new snapshot, or the snapshot
|
|
|
|
|
|
|
|
// data is a valid WAL file, in which case we need to replay it into the existing
|
|
|
|
|
|
|
|
// SQLite file.
|
|
|
|
if err := os.Rename(snapPrevDB, snapNewDB); err != nil {
|
|
|
|
if err := os.Rename(snapPrevDB, snapNewDB); err != nil {
|
|
|
|
return err
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// An open-close cycle checkpoints and removes any WAL file.
|
|
|
|
if err := openCloseDB(snapNewDB); err != nil {
|
|
|
|
if err := openCloseDB(snapNewDB); err != nil {
|
|
|
|
return err
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
@ -200,3 +215,11 @@ func (s *Sink) processSnapshotData() (retErr error) {
|
|
|
|
func (s *Sink) writeMeta(dir string) error {
|
|
|
|
func (s *Sink) writeMeta(dir string) error {
|
|
|
|
return writeMeta(dir, s.meta)
|
|
|
|
return writeMeta(dir, s.meta)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
func fileSize(path string) (int64, error) {
|
|
|
|
|
|
|
|
stat, err := os.Stat(path)
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
|
|
return 0, err
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
return stat.Size(), nil
|
|
|
|
|
|
|
|
}
|
|
|
|