diff --git a/snapshot/store.go b/snapshot/store.go index 9adcff06..7884573c 100644 --- a/snapshot/store.go +++ b/snapshot/store.go @@ -322,7 +322,6 @@ func (s *Store) ReapSnapshots(dir string, retain int) (int, error) { snapDirPath := filepath.Join(dir, snap.ID) // Path to the snapshot directory snapWALFilePath := filepath.Join(snapDirPath, snapWALFile) // Path to the WAL file in the snapshot walToCheckpointFilePath := filepath.Join(dir, baseSqliteWALFile) // Path to the WAL file to checkpoint - snapWALFilePathCopy := walToCheckpointFilePath + snap.ID // If the snapshot directory doesn't contain a WAL file, then the base SQLite // file is the snapshot state, and there is no checkpointing to do. @@ -332,22 +331,11 @@ func (s *Store) ReapSnapshots(dir string, retain int) (int, error) { // we've copied it out fully. Renaming is not atomic on every OS, so let's be sure. We // also use a temporary file name, so we know where the WAL came from if we exit here // and need to clean up on a restart. - if err := copyFileSync(snapWALFilePath, snapWALFilePathCopy); err != nil { + if err := copyWALFromSnapshot(snapDirPath, walToCheckpointFilePath); err != nil { s.logger.Printf("failed to copy WAL file from snapshot %s: %s", snapWALFilePath, err) return n, err } - // Delete the snapshot directory, since we have what we need now. - if err := removeDirSync(snapDirPath); err != nil { - s.logger.Printf("failed to remove incremental snapshot directory %s: %s", snapDirPath, err) - } - - // Move the WAL file to the correct name for checkpointing. - if err := os.Rename(snapWALFilePathCopy, walToCheckpointFilePath); err != nil { - s.logger.Printf("failed to move WAL file %s: %s", snapWALFilePath, err) - return n, err - } - // Checkpoint the WAL file into the base SQLite file if err := db.ReplayWAL(baseSqliteFilePath, []string{walToCheckpointFilePath}, false); err != nil { s.logger.Printf("failed to checkpoint WAL file %s: %s", walToCheckpointFilePath, err) @@ -426,6 +414,120 @@ func (s *Store) readMeta(dir string) (*Meta, error) { } func (s *Store) check() error { + // Simplify logic by reaping generations first. + if _, err := s.ReapGenerations(); err != nil { + return err + } + + // Remove any temporary generational directories. They represent operations + // that were interrupted. + entries, err := os.ReadDir(s.generationsDir) + if err != nil { + return err + } + for _, entry := range entries { + if !isTmpName(entry.Name()) { + continue + } + if err := os.RemoveAll(filepath.Join(s.generationsDir, entry.Name())); err != nil { + return err + } + } + + // Remove any temporary files in the current generation. + currGenDir, ok, err := s.GetCurrentGenerationDir() + if err != nil { + return err + } + if !ok { + return nil + } + entries, err = os.ReadDir(currGenDir) + if err != nil { + return err + } + for _, entry := range entries { + if isTmpName(entry.Name()) { + if err := os.RemoveAll(filepath.Join(currGenDir, entry.Name())); err != nil { + return err + } + } + } + + baseSqliteFilePath := filepath.Join(currGenDir, baseSqliteFile) + baseSqliteWALFilePath := filepath.Join(currGenDir, baseSqliteWALFile) + + // Any snapshots in the current generation? + snapshots, err := s.getSnapshots(currGenDir) + if err != nil { + return err + } + if len(snapshots) == 0 { + // An empty current generation is useless. + return os.RemoveAll(currGenDir) + } + + // If we have no base file, we shouldn't have any snapshot directories. If we + // do it's an inconsistent state which we cannot repair, and needs to be flagged. + if !fileExists(baseSqliteFilePath) { + return ErrSnapshotBaseMissing + } + + // If we have a WAL file in the current generation which is ends with the same ID as + // the oldest snapshot, then the copy of the WAL from the snapshot didn't complete. + // Complete it now. + walSnapshotCopy := filepath.Join(currGenDir, baseSqliteWALFile+snapshots[0].ID) + snapDirPath := filepath.Join(currGenDir, snapshots[0].ID) + if fileExists(filepath.Join(currGenDir, walSnapshotCopy)) { + if err := os.Remove(walSnapshotCopy); err != nil { + return err + } + if err := copyWALFromSnapshot(snapDirPath, baseSqliteWALFilePath); err != nil { + s.logger.Printf("failed to copy WAL file from snapshot %s: %s", snapshots[0].ID, err) + return err + } + } + + // If we have a base SQLite file, and a WAL file sitting beside it, this implies + // that we were interrupted before completing a checkpoint operation, as part of + // reaping snapshots. Complete the checkpoint operation now. + if fileExists(baseSqliteFilePath) && fileExists(baseSqliteWALFilePath) { + if err := db.ReplayWAL(baseSqliteFilePath, []string{baseSqliteWALFilePath}, + false); err != nil { + return err + } + if err := os.Remove(baseSqliteWALFilePath); err != nil { + return err + } + } + return nil +} + +func hasBaseSQLiteFile(dir string) bool { + return fileExists(filepath.Join(dir, baseSqliteFile)) +} + +func hasBaseSQLiteWALFile(dir string) bool { + return fileExists(filepath.Join(dir, baseSqliteWALFile)) +} + +func copyWALFromSnapshot(snapDirPath string, dstWALPath string) error { + snapWALFilePath := filepath.Join(snapDirPath, snapWALFile) + snapWALFilePathCopy := filepath.Dir(dstWALPath) + filepath.Base(snapDirPath) + + if err := copyFileSync(snapWALFilePath, snapWALFilePathCopy); err != nil { + return fmt.Errorf("failed to copy WAL file from snapshot %s: %s", snapWALFilePath, err) + } + + // Delete the snapshot directory, since we have what we need now. + if err := removeDirSync(snapDirPath); err != nil { + return fmt.Errorf("failed to remove incremental snapshot directory %s: %s", snapDirPath, err) + } + + // Move the WAL file to the correct name for checkpointing. + if err := os.Rename(snapWALFilePathCopy, dstWALPath); err != nil { + return fmt.Errorf("failed to move WAL file %s: %s", snapWALFilePath, err) + } return nil } diff --git a/store/store.go b/store/store.go index 5b758532..9e16a2ae 100644 --- a/store/store.go +++ b/store/store.go @@ -79,6 +79,8 @@ const ( trailingScale = 1.25 observerChanLen = 50 + checkpointTimeout = 10 * time.Second + defaultChunkSize = 512 * 1024 * 1024 // 512 MB ) @@ -198,6 +200,7 @@ type Store struct { raftLog raft.LogStore // Persistent log store. raftStable raft.StableStore // Persistent k-v store. boltStore *rlog.Log // Physical store. + snapshotStore *snapshot.Store // Snapshot store. // Raft changes observer leaderObserversMu sync.RWMutex @@ -369,12 +372,13 @@ func (s *Store) Open() (retErr error) { config := s.raftConfig() config.LocalID = raft.ServerID(s.raftID) - // Create the snapshot store. This allows Raft to truncate the log. - snapshots, err := raft.NewFileSnapshotStore(s.raftDir, retainSnapshotCount, os.Stderr) + snapshotStore, err := snapshot.NewStore(filepath.Join(s.raftDir, "rsnapshots")) if err != nil { - return fmt.Errorf("file snapshot store: %s", err) + return fmt.Errorf("snapshot store: %s", err) } - snaps, err := snapshots.List() + s.snapshotStore = snapshotStore + + snaps, err := s.snapshotStore.List() if err != nil { return fmt.Errorf("list snapshots: %s", err) } @@ -398,7 +402,7 @@ func (s *Store) Open() (retErr error) { if err != nil { return fmt.Errorf("failed to read peers file: %s", err.Error()) } - if err = RecoverNode(s.raftDir, s.logger, s.raftLog, s.boltStore, snapshots, s.raftTn, config); err != nil { + if err = RecoverNode(s.raftDir, s.logger, s.raftLog, s.boltStore, s.snapshotStore, s.raftTn, config); err != nil { return fmt.Errorf("failed to recover node: %s", err.Error()) } if err := os.Rename(s.peersPath, s.peersInfoPath); err != nil { @@ -422,7 +426,7 @@ func (s *Store) Open() (retErr error) { s.logger.Printf("created on-disk database at open") // Instantiate the Raft system. - ra, err := raft.NewRaft(config, s, s.raftLog, s.raftStable, snapshots, s.raftTn) + ra, err := raft.NewRaft(config, s, s.raftLog, s.raftStable, s.snapshotStore, s.raftTn) if err != nil { return fmt.Errorf("new raft: %s", err) } @@ -1617,13 +1621,34 @@ func (s *Store) Snapshot() (raft.FSMSnapshot, error) { s.queryTxMu.Lock() defer s.queryTxMu.Unlock() - fsm := NewFSMSnapshot(s.db, s.logger) - dur := time.Since(fsm.startT) + + var fsmSnapshot raft.FSMSnapshot + if s.snapshotStore.FullNeeded() { + if err := s.db.Checkpoint(checkpointTimeout); err != nil { + return nil, err + } + fsmSnapshot = snapshot.NewFullSnapshot(s.dbPath) + } else { + b, err := os.ReadFile("xxxx") + if err != nil { + return nil, err + } + // XXX Handle no WAL data + fsmSnapshot = snapshot.NewWALSnapshot(b) + if err != nil { + return nil, err + } + if err := s.db.Checkpoint(checkpointTimeout); err != nil { + return nil, err + } + } + + //dur := time.Since(fsm.startT) stats.Add(numSnaphots, 1) - stats.Get(snapshotCreateDuration).(*expvar.Int).Set(dur.Milliseconds()) - stats.Get(snapshotDBSerializedSize).(*expvar.Int).Set(int64(len(fsm.database))) - s.logger.Printf("node snapshot created in %s", dur) - return fsm, nil + //stats.Get(snapshotCreateDuration).(*expvar.Int).Set(dur.Milliseconds()) + //stats.Get(snapshotDBSerializedSize).(*expvar.Int).Set(int64(len(fsm.database))) + //s.logger.Printf("node snapshot created in %s", dur) + return fsmSnapshot, nil } // Restore restores the node to a previous state. The Hashicorp docs state this @@ -1631,6 +1656,23 @@ func (s *Store) Snapshot() (raft.FSMSnapshot, error) { // is not necessary. func (s *Store) Restore(rc io.ReadCloser) error { startT := time.Now() + + strHdr, _, err := snapshot.NewStreamHeaderFromReader(rc) + if err != nil { + return fmt.Errorf("error reading stream header: %v", err) + } + // if strHdr.GetVersion() != streamVersion { + // return fmt.Errorf("unsupported snapshot version %d", strHdr.GetVersion()) + // } + + fullSnap := strHdr.GetFullSnapshot() + if fullSnap == nil { + return fmt.Errorf("got nil FullSnapshot") + } + if err := snapshot.ReplayDB(fullSnap, rc, s.db.Path()); err != nil { + return fmt.Errorf("error replaying DB: %v", err) + } + b, err := dbBytesFromSnapshot(rc) if err != nil { return fmt.Errorf("restore failed: %s", err.Error())