1
0
Fork 0

Flesh our Store check()

master
Philip O'Toole 1 year ago
parent 2590f9daed
commit e4d5ffb8a9

@ -322,7 +322,6 @@ func (s *Store) ReapSnapshots(dir string, retain int) (int, error) {
snapDirPath := filepath.Join(dir, snap.ID) // Path to the snapshot directory
snapWALFilePath := filepath.Join(snapDirPath, snapWALFile) // Path to the WAL file in the snapshot
walToCheckpointFilePath := filepath.Join(dir, baseSqliteWALFile) // Path to the WAL file to checkpoint
snapWALFilePathCopy := walToCheckpointFilePath + snap.ID
// If the snapshot directory doesn't contain a WAL file, then the base SQLite
// file is the snapshot state, and there is no checkpointing to do.
@ -332,22 +331,11 @@ func (s *Store) ReapSnapshots(dir string, retain int) (int, error) {
// we've copied it out fully. Renaming is not atomic on every OS, so let's be sure. We
// also use a temporary file name, so we know where the WAL came from if we exit here
// and need to clean up on a restart.
if err := copyFileSync(snapWALFilePath, snapWALFilePathCopy); err != nil {
if err := copyWALFromSnapshot(snapDirPath, walToCheckpointFilePath); err != nil {
s.logger.Printf("failed to copy WAL file from snapshot %s: %s", snapWALFilePath, err)
return n, err
}
// Delete the snapshot directory, since we have what we need now.
if err := removeDirSync(snapDirPath); err != nil {
s.logger.Printf("failed to remove incremental snapshot directory %s: %s", snapDirPath, err)
}
// Move the WAL file to the correct name for checkpointing.
if err := os.Rename(snapWALFilePathCopy, walToCheckpointFilePath); err != nil {
s.logger.Printf("failed to move WAL file %s: %s", snapWALFilePath, err)
return n, err
}
// Checkpoint the WAL file into the base SQLite file
if err := db.ReplayWAL(baseSqliteFilePath, []string{walToCheckpointFilePath}, false); err != nil {
s.logger.Printf("failed to checkpoint WAL file %s: %s", walToCheckpointFilePath, err)
@ -426,6 +414,120 @@ func (s *Store) readMeta(dir string) (*Meta, error) {
}
func (s *Store) check() error {
// Simplify logic by reaping generations first.
if _, err := s.ReapGenerations(); err != nil {
return err
}
// Remove any temporary generational directories. They represent operations
// that were interrupted.
entries, err := os.ReadDir(s.generationsDir)
if err != nil {
return err
}
for _, entry := range entries {
if !isTmpName(entry.Name()) {
continue
}
if err := os.RemoveAll(filepath.Join(s.generationsDir, entry.Name())); err != nil {
return err
}
}
// Remove any temporary files in the current generation.
currGenDir, ok, err := s.GetCurrentGenerationDir()
if err != nil {
return err
}
if !ok {
return nil
}
entries, err = os.ReadDir(currGenDir)
if err != nil {
return err
}
for _, entry := range entries {
if isTmpName(entry.Name()) {
if err := os.RemoveAll(filepath.Join(currGenDir, entry.Name())); err != nil {
return err
}
}
}
baseSqliteFilePath := filepath.Join(currGenDir, baseSqliteFile)
baseSqliteWALFilePath := filepath.Join(currGenDir, baseSqliteWALFile)
// Any snapshots in the current generation?
snapshots, err := s.getSnapshots(currGenDir)
if err != nil {
return err
}
if len(snapshots) == 0 {
// An empty current generation is useless.
return os.RemoveAll(currGenDir)
}
// If we have no base file, we shouldn't have any snapshot directories. If we
// do it's an inconsistent state which we cannot repair, and needs to be flagged.
if !fileExists(baseSqliteFilePath) {
return ErrSnapshotBaseMissing
}
// If we have a WAL file in the current generation which is ends with the same ID as
// the oldest snapshot, then the copy of the WAL from the snapshot didn't complete.
// Complete it now.
walSnapshotCopy := filepath.Join(currGenDir, baseSqliteWALFile+snapshots[0].ID)
snapDirPath := filepath.Join(currGenDir, snapshots[0].ID)
if fileExists(filepath.Join(currGenDir, walSnapshotCopy)) {
if err := os.Remove(walSnapshotCopy); err != nil {
return err
}
if err := copyWALFromSnapshot(snapDirPath, baseSqliteWALFilePath); err != nil {
s.logger.Printf("failed to copy WAL file from snapshot %s: %s", snapshots[0].ID, err)
return err
}
}
// If we have a base SQLite file, and a WAL file sitting beside it, this implies
// that we were interrupted before completing a checkpoint operation, as part of
// reaping snapshots. Complete the checkpoint operation now.
if fileExists(baseSqliteFilePath) && fileExists(baseSqliteWALFilePath) {
if err := db.ReplayWAL(baseSqliteFilePath, []string{baseSqliteWALFilePath},
false); err != nil {
return err
}
if err := os.Remove(baseSqliteWALFilePath); err != nil {
return err
}
}
return nil
}
func hasBaseSQLiteFile(dir string) bool {
return fileExists(filepath.Join(dir, baseSqliteFile))
}
func hasBaseSQLiteWALFile(dir string) bool {
return fileExists(filepath.Join(dir, baseSqliteWALFile))
}
func copyWALFromSnapshot(snapDirPath string, dstWALPath string) error {
snapWALFilePath := filepath.Join(snapDirPath, snapWALFile)
snapWALFilePathCopy := filepath.Dir(dstWALPath) + filepath.Base(snapDirPath)
if err := copyFileSync(snapWALFilePath, snapWALFilePathCopy); err != nil {
return fmt.Errorf("failed to copy WAL file from snapshot %s: %s", snapWALFilePath, err)
}
// Delete the snapshot directory, since we have what we need now.
if err := removeDirSync(snapDirPath); err != nil {
return fmt.Errorf("failed to remove incremental snapshot directory %s: %s", snapDirPath, err)
}
// Move the WAL file to the correct name for checkpointing.
if err := os.Rename(snapWALFilePathCopy, dstWALPath); err != nil {
return fmt.Errorf("failed to move WAL file %s: %s", snapWALFilePath, err)
}
return nil
}

@ -79,6 +79,8 @@ const (
trailingScale = 1.25
observerChanLen = 50
checkpointTimeout = 10 * time.Second
defaultChunkSize = 512 * 1024 * 1024 // 512 MB
)
@ -198,6 +200,7 @@ type Store struct {
raftLog raft.LogStore // Persistent log store.
raftStable raft.StableStore // Persistent k-v store.
boltStore *rlog.Log // Physical store.
snapshotStore *snapshot.Store // Snapshot store.
// Raft changes observer
leaderObserversMu sync.RWMutex
@ -369,12 +372,13 @@ func (s *Store) Open() (retErr error) {
config := s.raftConfig()
config.LocalID = raft.ServerID(s.raftID)
// Create the snapshot store. This allows Raft to truncate the log.
snapshots, err := raft.NewFileSnapshotStore(s.raftDir, retainSnapshotCount, os.Stderr)
snapshotStore, err := snapshot.NewStore(filepath.Join(s.raftDir, "rsnapshots"))
if err != nil {
return fmt.Errorf("file snapshot store: %s", err)
return fmt.Errorf("snapshot store: %s", err)
}
snaps, err := snapshots.List()
s.snapshotStore = snapshotStore
snaps, err := s.snapshotStore.List()
if err != nil {
return fmt.Errorf("list snapshots: %s", err)
}
@ -398,7 +402,7 @@ func (s *Store) Open() (retErr error) {
if err != nil {
return fmt.Errorf("failed to read peers file: %s", err.Error())
}
if err = RecoverNode(s.raftDir, s.logger, s.raftLog, s.boltStore, snapshots, s.raftTn, config); err != nil {
if err = RecoverNode(s.raftDir, s.logger, s.raftLog, s.boltStore, s.snapshotStore, s.raftTn, config); err != nil {
return fmt.Errorf("failed to recover node: %s", err.Error())
}
if err := os.Rename(s.peersPath, s.peersInfoPath); err != nil {
@ -422,7 +426,7 @@ func (s *Store) Open() (retErr error) {
s.logger.Printf("created on-disk database at open")
// Instantiate the Raft system.
ra, err := raft.NewRaft(config, s, s.raftLog, s.raftStable, snapshots, s.raftTn)
ra, err := raft.NewRaft(config, s, s.raftLog, s.raftStable, s.snapshotStore, s.raftTn)
if err != nil {
return fmt.Errorf("new raft: %s", err)
}
@ -1617,13 +1621,34 @@ func (s *Store) Snapshot() (raft.FSMSnapshot, error) {
s.queryTxMu.Lock()
defer s.queryTxMu.Unlock()
fsm := NewFSMSnapshot(s.db, s.logger)
dur := time.Since(fsm.startT)
var fsmSnapshot raft.FSMSnapshot
if s.snapshotStore.FullNeeded() {
if err := s.db.Checkpoint(checkpointTimeout); err != nil {
return nil, err
}
fsmSnapshot = snapshot.NewFullSnapshot(s.dbPath)
} else {
b, err := os.ReadFile("xxxx")
if err != nil {
return nil, err
}
// XXX Handle no WAL data
fsmSnapshot = snapshot.NewWALSnapshot(b)
if err != nil {
return nil, err
}
if err := s.db.Checkpoint(checkpointTimeout); err != nil {
return nil, err
}
}
//dur := time.Since(fsm.startT)
stats.Add(numSnaphots, 1)
stats.Get(snapshotCreateDuration).(*expvar.Int).Set(dur.Milliseconds())
stats.Get(snapshotDBSerializedSize).(*expvar.Int).Set(int64(len(fsm.database)))
s.logger.Printf("node snapshot created in %s", dur)
return fsm, nil
//stats.Get(snapshotCreateDuration).(*expvar.Int).Set(dur.Milliseconds())
//stats.Get(snapshotDBSerializedSize).(*expvar.Int).Set(int64(len(fsm.database)))
//s.logger.Printf("node snapshot created in %s", dur)
return fsmSnapshot, nil
}
// Restore restores the node to a previous state. The Hashicorp docs state this
@ -1631,6 +1656,23 @@ func (s *Store) Snapshot() (raft.FSMSnapshot, error) {
// is not necessary.
func (s *Store) Restore(rc io.ReadCloser) error {
startT := time.Now()
strHdr, _, err := snapshot.NewStreamHeaderFromReader(rc)
if err != nil {
return fmt.Errorf("error reading stream header: %v", err)
}
// if strHdr.GetVersion() != streamVersion {
// return fmt.Errorf("unsupported snapshot version %d", strHdr.GetVersion())
// }
fullSnap := strHdr.GetFullSnapshot()
if fullSnap == nil {
return fmt.Errorf("got nil FullSnapshot")
}
if err := snapshot.ReplayDB(fullSnap, rc, s.db.Path()); err != nil {
return fmt.Errorf("error replaying DB: %v", err)
}
b, err := dbBytesFromSnapshot(rc)
if err != nil {
return fmt.Errorf("restore failed: %s", err.Error())

Loading…
Cancel
Save