diff --git a/db/swappable_db.go b/db/swappable_db.go new file mode 100644 index 00000000..079de59a --- /dev/null +++ b/db/swappable_db.go @@ -0,0 +1,133 @@ +package db + +import ( + "fmt" + "io" + "os" + "sync" + + command "github.com/rqlite/rqlite/v8/command/proto" +) + +type SwappableDB struct { + db *DB + dbMu sync.RWMutex +} + +func OpenSwappable(dbPath string, fkEnabled, wal bool) (*SwappableDB, error) { + db, err := Open(dbPath, fkEnabled, wal) + if err != nil { + return nil, err + } + return &SwappableDB{db: db}, nil +} + +func (s *SwappableDB) Swap(path string, fkConstraints, walEnabled bool) error { + if !IsValidSQLiteFile(path) { + return fmt.Errorf("invalid SQLite data") + } + + s.dbMu.Lock() + defer s.dbMu.Unlock() + if err := s.db.Close(); err != nil { + return fmt.Errorf("failed to close: %s", err) + } + if err := RemoveFiles(s.db.Path()); err != nil { + return fmt.Errorf("failed to remove files: %s", err) + } + if err := os.Rename(path, s.db.Path()); err != nil { + return fmt.Errorf("failed to rename database: %s", err) + } + + var db *DB + db, err := Open(s.db.Path(), fkConstraints, walEnabled) + if err != nil { + return fmt.Errorf("open SQLite file failed: %s", err) + } + s.db = db + return nil +} + +func (s *SwappableDB) Close() error { + s.dbMu.RLock() + defer s.dbMu.RUnlock() + return s.db.Close() +} + +func (s *SwappableDB) Stats() (map[string]interface{}, error) { + s.dbMu.RLock() + defer s.dbMu.RUnlock() + return s.db.Stats() +} + +func (s *SwappableDB) Request(req *command.Request, xTime bool) ([]*command.ExecuteQueryResponse, error) { + s.dbMu.RLock() + defer s.dbMu.RUnlock() + return s.db.Request(req, xTime) +} + +func (s *SwappableDB) Execute(ex *command.Request, xTime bool) ([]*command.ExecuteResult, error) { + s.dbMu.RLock() + defer s.dbMu.RUnlock() + return s.db.Execute(ex, xTime) +} + +func (s *SwappableDB) Query(q *command.Request, xTime bool) ([]*command.QueryRows, error) { + s.dbMu.RLock() + defer s.dbMu.RUnlock() + return s.db.Query(q, xTime) +} + +func (s *SwappableDB) VacuumInto(path string) error { + s.dbMu.RLock() + defer s.dbMu.RUnlock() + return s.db.VacuumInto(path) +} + +func (s *SwappableDB) Backup(path string, vacuum bool) error { + s.dbMu.RLock() + defer s.dbMu.RUnlock() + return s.db.Backup(path, vacuum) +} + +func (s *SwappableDB) Serialize() ([]byte, error) { + s.dbMu.RLock() + defer s.dbMu.RUnlock() + return s.db.Serialize() +} + +func (s *SwappableDB) StmtReadOnly(sql string) (bool, error) { + s.dbMu.RLock() + defer s.dbMu.RUnlock() + return s.db.StmtReadOnly(sql) +} + +func (s *SwappableDB) Checkpoint() error { + s.dbMu.RLock() + defer s.dbMu.RUnlock() + return s.db.Checkpoint() +} + +func (s *SwappableDB) Path() string { + s.dbMu.RLock() + defer s.dbMu.RUnlock() + return s.db.Path() +} + +func (s *SwappableDB) Dump(w io.Writer) error { + s.dbMu.RLock() + defer s.dbMu.RUnlock() + return s.db.Dump(w) +} + +func (s *SwappableDB) FKEnabled() bool { + s.dbMu.RLock() + defer s.dbMu.RUnlock() + return s.db.fkEnabled +} + +func (s *SwappableDB) WALEnabled() bool { + s.dbMu.RLock() + defer s.dbMu.RUnlock() + return s.db.wal +} diff --git a/store/command_processor.go b/store/command_processor.go index c8c51043..c1110b5a 100644 --- a/store/command_processor.go +++ b/store/command_processor.go @@ -4,6 +4,7 @@ import ( "fmt" "log" "os" + "path/filepath" "github.com/rqlite/rqlite/v8/command" "github.com/rqlite/rqlite/v8/command/chunking" @@ -41,7 +42,7 @@ func NewCommandProcessor(logger *log.Logger, dm *chunking.DechunkerManager) *Com } // Process processes the given command against the given database. -func (c *CommandProcessor) Process(data []byte, pDB **sql.DB) (*proto.Command, bool, interface{}) { +func (c *CommandProcessor) Process(data []byte, pDB *sql.SwappableDB) (*proto.Command, bool, interface{}) { db := *pDB cmd := &proto.Command{} if err := command.Unmarshal(data, cmd); err != nil { @@ -76,20 +77,23 @@ func (c *CommandProcessor) Process(data []byte, pDB **sql.DB) (*proto.Command, b panic(fmt.Sprintf("failed to unmarshal load subcommand: %s", err.Error())) } - // Swap the underlying database to the new one. - if err := db.Close(); err != nil { - return cmd, false, &fsmGenericResponse{error: fmt.Errorf("failed to close post-load database: %s", err)} - } - if err := sql.RemoveFiles(db.Path()); err != nil { - return cmd, false, &fsmGenericResponse{error: fmt.Errorf("failed to remove existing database files: %s", err)} + // create a scratch file in the same directory as s.db.Path() + fd, err := os.CreateTemp(filepath.Dir(pDB.Path()), "rqlilte-load-") + if err != nil { + return cmd, false, &fsmGenericResponse{error: fmt.Errorf("failed to create temporary database file: %s", err)} } - - newDB, err := createOnDisk(lr.Data, db.Path(), db.FKEnabled(), db.WALEnabled()) + defer os.Remove(fd.Name()) + defer fd.Close() + _, err = fd.Write(lr.Data) if err != nil { - return cmd, false, &fsmGenericResponse{error: fmt.Errorf("failed to create on-disk database: %s", err)} + return cmd, false, &fsmGenericResponse{error: fmt.Errorf("failed to write to temporary database file: %s", err)} } + fd.Close() - *pDB = newDB + // Swap the underlying database to the new one. + if err := pDB.Swap(fd.Name(), pDB.FKEnabled(), pDB.WALEnabled()); err != nil { + return cmd, false, &fsmGenericResponse{error: fmt.Errorf("error swapping databases: %s", err)} + } return cmd, true, &fsmGenericResponse{} case proto.Command_COMMAND_TYPE_LOAD_CHUNK: var lcr proto.LoadChunkRequest @@ -129,25 +133,9 @@ func (c *CommandProcessor) Process(data []byte, pDB **sql.DB) (*proto.Command, b c.logger.Printf("invalid chunked database file - ignoring") return cmd, false, &fsmGenericResponse{error: fmt.Errorf("invalid chunked database file - ignoring")} } - - // Close the underlying database before we overwrite it. - if err := db.Close(); err != nil { - return cmd, false, &fsmGenericResponse{error: fmt.Errorf("failed to close post-load database: %s", err)} + if err := pDB.Swap(path, pDB.FKEnabled(), pDB.WALEnabled()); err != nil { + return cmd, false, &fsmGenericResponse{error: fmt.Errorf("error swapping databases: %s", err)} } - if err := sql.RemoveFiles(db.Path()); err != nil { - return cmd, false, &fsmGenericResponse{error: fmt.Errorf("failed to remove existing database files: %s", err)} - } - - if err := os.Rename(path, db.Path()); err != nil { - return cmd, false, &fsmGenericResponse{error: fmt.Errorf("failed to rename temporary database file: %s", err)} - } - newDB, err := sql.Open(db.Path(), db.FKEnabled(), db.WALEnabled()) - if err != nil { - return cmd, false, &fsmGenericResponse{error: fmt.Errorf("failed to open new on-disk database: %s", err)} - } - - // Swap the underlying database to the new one. - *pDB = newDB } } return cmd, true, &fsmGenericResponse{} diff --git a/store/state.go b/store/state.go index c1164699..dd27280c 100644 --- a/store/state.go +++ b/store/state.go @@ -104,7 +104,7 @@ func RecoverNode(dataDir string, logger *log.Logger, logs raft.LogStore, stable } // Now, open the database so we can replay any outstanding Raft log entries. - db, err := sql.Open(tmpDBPath, false, true) + db, err := sql.OpenSwappable(tmpDBPath, false, true) if err != nil { return fmt.Errorf("failed to open temporary database: %s", err) } @@ -135,7 +135,7 @@ func RecoverNode(dataDir string, logger *log.Logger, logs raft.LogStore, stable return fmt.Errorf("failed to get log at index %d: %v", index, err) } if entry.Type == raft.LogCommand { - cmdProc.Process(entry.Data, &db) + cmdProc.Process(entry.Data, db) } lastIndex = entry.Index lastTerm = entry.Term diff --git a/store/store.go b/store/store.go index 07d27735..06859f61 100644 --- a/store/store.go +++ b/store/store.go @@ -238,12 +238,12 @@ type Store struct { raft *raft.Raft // The consensus mechanism. ly Layer raftTn *NodeTransport - raftID string // Node ID. - dbConf *DBConfig // SQLite database config. - dbPath string // Path to underlying SQLite file. - walPath string // Path to WAL file. - dbDir string // Path to directory containing SQLite file. - db *sql.DB // The underlying SQLite store. + raftID string // Node ID. + dbConf *DBConfig // SQLite database config. + dbPath string // Path to underlying SQLite file. + walPath string // Path to WAL file. + dbDir string // Path to directory containing SQLite file. + db *sql.SwappableDB // The underlying SQLite store. dechunkManager *chunking.DechunkerManager cmdProc *CommandProcessor @@ -1388,20 +1388,9 @@ func (s *Store) ReadFrom(r io.Reader) (int64, error) { } // Swap in new database file. - if err := s.db.Close(); err != nil { - return n, err - } - if err := sql.RemoveFiles(s.dbPath); err != nil { - return n, err + if err := s.db.Swap(f.Name(), s.dbConf.FKConstraints, true); err != nil { + return n, fmt.Errorf("error swapping database file: %v", err) } - if err := os.Rename(f.Name(), s.dbPath); err != nil { - return n, err - } - db, err := sql.Open(s.dbPath, s.dbConf.FKConstraints, true) - if err != nil { - return n, err - } - s.db = db // Snapshot, so we load the new database into the Raft system. if err := s.snapshotStore.SetFullNeeded(); err != nil { @@ -1435,20 +1424,9 @@ func (s *Store) Vacuum() error { } // Swap in new database file. - if err := s.db.Close(); err != nil { - return err - } - if err := sql.RemoveFiles(s.dbPath); err != nil { - return err + if err := s.db.Swap(fd.Name(), s.dbConf.FKConstraints, true); err != nil { + return fmt.Errorf("error swapping database file: %v", err) } - if err := os.Rename(fd.Name(), s.dbPath); err != nil { - return err - } - db, err := sql.Open(s.dbPath, s.dbConf.FKConstraints, true) - if err != nil { - return err - } - s.db = db if err := s.snapshotStore.SetFullNeeded(); err != nil { return err @@ -1812,7 +1790,7 @@ func (s *Store) fsmApply(l *raft.Log) (e interface{}) { s.logger.Printf("first log applied since node start, log at index %d", l.Index) } - cmd, mutated, r := s.cmdProc.Process(l.Data, &s.db) + cmd, mutated, r := s.cmdProc.Process(l.Data, s.db) if mutated { s.dbAppliedIdxMu.Lock() s.dbAppliedIdx = l.Index @@ -1980,27 +1958,9 @@ func (s *Store) fsmRestore(rc io.ReadCloser) (retErr error) { return fmt.Errorf("error creating temporary file for restore operation: %v", err) } - // Must wipe out all pre-existing state if being asked to do a restore, and put - // the new database in place. - if !sql.IsValidSQLiteFile(tmpFile.Name()) { - return fmt.Errorf("invalid SQLite data") - } - if err := s.db.Close(); err != nil { - return fmt.Errorf("failed to close pre-restore database: %s", err) - } - if err := sql.RemoveFiles(s.db.Path()); err != nil { - return fmt.Errorf("failed to remove pre-restore database files: %s", err) - } - if err := os.Rename(tmpFile.Name(), s.db.Path()); err != nil { - return fmt.Errorf("failed to rename restored database: %s", err) - } - - var db *sql.DB - db, err = sql.Open(s.dbPath, s.dbConf.FKConstraints, true) - if err != nil { - return fmt.Errorf("open SQLite file during restore: %s", err) + if err := s.db.Swap(tmpFile.Name(), s.dbConf.FKConstraints, true); err != nil { + return fmt.Errorf("error swapping database file: %v", err) } - s.db = db s.logger.Printf("successfully opened database at %s due to restore", s.db.Path()) // Take conservative approach and assume that everything has changed, so update @@ -2270,7 +2230,7 @@ func (s *Store) autoVacNeeded(t time.Time) (bool, error) { // createOnDisk opens an on-disk database file at the configured path. If b is // non-nil, any preexisting file will first be overwritten with those contents. // Otherwise, any preexisting file will be removed before the database is opened. -func createOnDisk(b []byte, path string, fkConstraints, wal bool) (*sql.DB, error) { +func createOnDisk(b []byte, path string, fkConstraints, wal bool) (*sql.SwappableDB, error) { if err := sql.RemoveFiles(path); err != nil { return nil, err } @@ -2279,7 +2239,7 @@ func createOnDisk(b []byte, path string, fkConstraints, wal bool) (*sql.DB, erro return nil, err } } - return sql.Open(path, fkConstraints, wal) + return sql.OpenSwappable(path, fkConstraints, wal) } func copyFromReaderToFile(path string, r io.Reader) (int64, error) {