1
0
Fork 0

Introduce a synchronized, swappable DB

Type prevents races between queries and operations (such as auto-vacuum)
which change the DB object out.
master
Philip O'Toole 8 months ago
parent 1d43c8f81e
commit 3ed29b5c31

@ -0,0 +1,133 @@
package db
import (
"fmt"
"io"
"os"
"sync"
command "github.com/rqlite/rqlite/v8/command/proto"
)
type SwappableDB struct {
db *DB
dbMu sync.RWMutex
}
func OpenSwappable(dbPath string, fkEnabled, wal bool) (*SwappableDB, error) {
db, err := Open(dbPath, fkEnabled, wal)
if err != nil {
return nil, err
}
return &SwappableDB{db: db}, nil
}
func (s *SwappableDB) Swap(path string, fkConstraints, walEnabled bool) error {
if !IsValidSQLiteFile(path) {
return fmt.Errorf("invalid SQLite data")
}
s.dbMu.Lock()
defer s.dbMu.Unlock()
if err := s.db.Close(); err != nil {
return fmt.Errorf("failed to close: %s", err)
}
if err := RemoveFiles(s.db.Path()); err != nil {
return fmt.Errorf("failed to remove files: %s", err)
}
if err := os.Rename(path, s.db.Path()); err != nil {
return fmt.Errorf("failed to rename database: %s", err)
}
var db *DB
db, err := Open(s.db.Path(), fkConstraints, walEnabled)
if err != nil {
return fmt.Errorf("open SQLite file failed: %s", err)
}
s.db = db
return nil
}
func (s *SwappableDB) Close() error {
s.dbMu.RLock()
defer s.dbMu.RUnlock()
return s.db.Close()
}
func (s *SwappableDB) Stats() (map[string]interface{}, error) {
s.dbMu.RLock()
defer s.dbMu.RUnlock()
return s.db.Stats()
}
func (s *SwappableDB) Request(req *command.Request, xTime bool) ([]*command.ExecuteQueryResponse, error) {
s.dbMu.RLock()
defer s.dbMu.RUnlock()
return s.db.Request(req, xTime)
}
func (s *SwappableDB) Execute(ex *command.Request, xTime bool) ([]*command.ExecuteResult, error) {
s.dbMu.RLock()
defer s.dbMu.RUnlock()
return s.db.Execute(ex, xTime)
}
func (s *SwappableDB) Query(q *command.Request, xTime bool) ([]*command.QueryRows, error) {
s.dbMu.RLock()
defer s.dbMu.RUnlock()
return s.db.Query(q, xTime)
}
func (s *SwappableDB) VacuumInto(path string) error {
s.dbMu.RLock()
defer s.dbMu.RUnlock()
return s.db.VacuumInto(path)
}
func (s *SwappableDB) Backup(path string, vacuum bool) error {
s.dbMu.RLock()
defer s.dbMu.RUnlock()
return s.db.Backup(path, vacuum)
}
func (s *SwappableDB) Serialize() ([]byte, error) {
s.dbMu.RLock()
defer s.dbMu.RUnlock()
return s.db.Serialize()
}
func (s *SwappableDB) StmtReadOnly(sql string) (bool, error) {
s.dbMu.RLock()
defer s.dbMu.RUnlock()
return s.db.StmtReadOnly(sql)
}
func (s *SwappableDB) Checkpoint() error {
s.dbMu.RLock()
defer s.dbMu.RUnlock()
return s.db.Checkpoint()
}
func (s *SwappableDB) Path() string {
s.dbMu.RLock()
defer s.dbMu.RUnlock()
return s.db.Path()
}
func (s *SwappableDB) Dump(w io.Writer) error {
s.dbMu.RLock()
defer s.dbMu.RUnlock()
return s.db.Dump(w)
}
func (s *SwappableDB) FKEnabled() bool {
s.dbMu.RLock()
defer s.dbMu.RUnlock()
return s.db.fkEnabled
}
func (s *SwappableDB) WALEnabled() bool {
s.dbMu.RLock()
defer s.dbMu.RUnlock()
return s.db.wal
}

@ -4,6 +4,7 @@ import (
"fmt"
"log"
"os"
"path/filepath"
"github.com/rqlite/rqlite/v8/command"
"github.com/rqlite/rqlite/v8/command/chunking"
@ -41,7 +42,7 @@ func NewCommandProcessor(logger *log.Logger, dm *chunking.DechunkerManager) *Com
}
// Process processes the given command against the given database.
func (c *CommandProcessor) Process(data []byte, pDB **sql.DB) (*proto.Command, bool, interface{}) {
func (c *CommandProcessor) Process(data []byte, pDB *sql.SwappableDB) (*proto.Command, bool, interface{}) {
db := *pDB
cmd := &proto.Command{}
if err := command.Unmarshal(data, cmd); err != nil {
@ -76,20 +77,23 @@ func (c *CommandProcessor) Process(data []byte, pDB **sql.DB) (*proto.Command, b
panic(fmt.Sprintf("failed to unmarshal load subcommand: %s", err.Error()))
}
// Swap the underlying database to the new one.
if err := db.Close(); err != nil {
return cmd, false, &fsmGenericResponse{error: fmt.Errorf("failed to close post-load database: %s", err)}
}
if err := sql.RemoveFiles(db.Path()); err != nil {
return cmd, false, &fsmGenericResponse{error: fmt.Errorf("failed to remove existing database files: %s", err)}
// create a scratch file in the same directory as s.db.Path()
fd, err := os.CreateTemp(filepath.Dir(pDB.Path()), "rqlilte-load-")
if err != nil {
return cmd, false, &fsmGenericResponse{error: fmt.Errorf("failed to create temporary database file: %s", err)}
}
newDB, err := createOnDisk(lr.Data, db.Path(), db.FKEnabled(), db.WALEnabled())
defer os.Remove(fd.Name())
defer fd.Close()
_, err = fd.Write(lr.Data)
if err != nil {
return cmd, false, &fsmGenericResponse{error: fmt.Errorf("failed to create on-disk database: %s", err)}
return cmd, false, &fsmGenericResponse{error: fmt.Errorf("failed to write to temporary database file: %s", err)}
}
fd.Close()
*pDB = newDB
// Swap the underlying database to the new one.
if err := pDB.Swap(fd.Name(), pDB.FKEnabled(), pDB.WALEnabled()); err != nil {
return cmd, false, &fsmGenericResponse{error: fmt.Errorf("error swapping databases: %s", err)}
}
return cmd, true, &fsmGenericResponse{}
case proto.Command_COMMAND_TYPE_LOAD_CHUNK:
var lcr proto.LoadChunkRequest
@ -129,25 +133,9 @@ func (c *CommandProcessor) Process(data []byte, pDB **sql.DB) (*proto.Command, b
c.logger.Printf("invalid chunked database file - ignoring")
return cmd, false, &fsmGenericResponse{error: fmt.Errorf("invalid chunked database file - ignoring")}
}
// Close the underlying database before we overwrite it.
if err := db.Close(); err != nil {
return cmd, false, &fsmGenericResponse{error: fmt.Errorf("failed to close post-load database: %s", err)}
if err := pDB.Swap(path, pDB.FKEnabled(), pDB.WALEnabled()); err != nil {
return cmd, false, &fsmGenericResponse{error: fmt.Errorf("error swapping databases: %s", err)}
}
if err := sql.RemoveFiles(db.Path()); err != nil {
return cmd, false, &fsmGenericResponse{error: fmt.Errorf("failed to remove existing database files: %s", err)}
}
if err := os.Rename(path, db.Path()); err != nil {
return cmd, false, &fsmGenericResponse{error: fmt.Errorf("failed to rename temporary database file: %s", err)}
}
newDB, err := sql.Open(db.Path(), db.FKEnabled(), db.WALEnabled())
if err != nil {
return cmd, false, &fsmGenericResponse{error: fmt.Errorf("failed to open new on-disk database: %s", err)}
}
// Swap the underlying database to the new one.
*pDB = newDB
}
}
return cmd, true, &fsmGenericResponse{}

@ -104,7 +104,7 @@ func RecoverNode(dataDir string, logger *log.Logger, logs raft.LogStore, stable
}
// Now, open the database so we can replay any outstanding Raft log entries.
db, err := sql.Open(tmpDBPath, false, true)
db, err := sql.OpenSwappable(tmpDBPath, false, true)
if err != nil {
return fmt.Errorf("failed to open temporary database: %s", err)
}
@ -135,7 +135,7 @@ func RecoverNode(dataDir string, logger *log.Logger, logs raft.LogStore, stable
return fmt.Errorf("failed to get log at index %d: %v", index, err)
}
if entry.Type == raft.LogCommand {
cmdProc.Process(entry.Data, &db)
cmdProc.Process(entry.Data, db)
}
lastIndex = entry.Index
lastTerm = entry.Term

@ -238,12 +238,12 @@ type Store struct {
raft *raft.Raft // The consensus mechanism.
ly Layer
raftTn *NodeTransport
raftID string // Node ID.
dbConf *DBConfig // SQLite database config.
dbPath string // Path to underlying SQLite file.
walPath string // Path to WAL file.
dbDir string // Path to directory containing SQLite file.
db *sql.DB // The underlying SQLite store.
raftID string // Node ID.
dbConf *DBConfig // SQLite database config.
dbPath string // Path to underlying SQLite file.
walPath string // Path to WAL file.
dbDir string // Path to directory containing SQLite file.
db *sql.SwappableDB // The underlying SQLite store.
dechunkManager *chunking.DechunkerManager
cmdProc *CommandProcessor
@ -1388,20 +1388,9 @@ func (s *Store) ReadFrom(r io.Reader) (int64, error) {
}
// Swap in new database file.
if err := s.db.Close(); err != nil {
return n, err
}
if err := sql.RemoveFiles(s.dbPath); err != nil {
return n, err
if err := s.db.Swap(f.Name(), s.dbConf.FKConstraints, true); err != nil {
return n, fmt.Errorf("error swapping database file: %v", err)
}
if err := os.Rename(f.Name(), s.dbPath); err != nil {
return n, err
}
db, err := sql.Open(s.dbPath, s.dbConf.FKConstraints, true)
if err != nil {
return n, err
}
s.db = db
// Snapshot, so we load the new database into the Raft system.
if err := s.snapshotStore.SetFullNeeded(); err != nil {
@ -1435,20 +1424,9 @@ func (s *Store) Vacuum() error {
}
// Swap in new database file.
if err := s.db.Close(); err != nil {
return err
}
if err := sql.RemoveFiles(s.dbPath); err != nil {
return err
if err := s.db.Swap(fd.Name(), s.dbConf.FKConstraints, true); err != nil {
return fmt.Errorf("error swapping database file: %v", err)
}
if err := os.Rename(fd.Name(), s.dbPath); err != nil {
return err
}
db, err := sql.Open(s.dbPath, s.dbConf.FKConstraints, true)
if err != nil {
return err
}
s.db = db
if err := s.snapshotStore.SetFullNeeded(); err != nil {
return err
@ -1812,7 +1790,7 @@ func (s *Store) fsmApply(l *raft.Log) (e interface{}) {
s.logger.Printf("first log applied since node start, log at index %d", l.Index)
}
cmd, mutated, r := s.cmdProc.Process(l.Data, &s.db)
cmd, mutated, r := s.cmdProc.Process(l.Data, s.db)
if mutated {
s.dbAppliedIdxMu.Lock()
s.dbAppliedIdx = l.Index
@ -1980,27 +1958,9 @@ func (s *Store) fsmRestore(rc io.ReadCloser) (retErr error) {
return fmt.Errorf("error creating temporary file for restore operation: %v", err)
}
// Must wipe out all pre-existing state if being asked to do a restore, and put
// the new database in place.
if !sql.IsValidSQLiteFile(tmpFile.Name()) {
return fmt.Errorf("invalid SQLite data")
}
if err := s.db.Close(); err != nil {
return fmt.Errorf("failed to close pre-restore database: %s", err)
}
if err := sql.RemoveFiles(s.db.Path()); err != nil {
return fmt.Errorf("failed to remove pre-restore database files: %s", err)
}
if err := os.Rename(tmpFile.Name(), s.db.Path()); err != nil {
return fmt.Errorf("failed to rename restored database: %s", err)
}
var db *sql.DB
db, err = sql.Open(s.dbPath, s.dbConf.FKConstraints, true)
if err != nil {
return fmt.Errorf("open SQLite file during restore: %s", err)
if err := s.db.Swap(tmpFile.Name(), s.dbConf.FKConstraints, true); err != nil {
return fmt.Errorf("error swapping database file: %v", err)
}
s.db = db
s.logger.Printf("successfully opened database at %s due to restore", s.db.Path())
// Take conservative approach and assume that everything has changed, so update
@ -2270,7 +2230,7 @@ func (s *Store) autoVacNeeded(t time.Time) (bool, error) {
// createOnDisk opens an on-disk database file at the configured path. If b is
// non-nil, any preexisting file will first be overwritten with those contents.
// Otherwise, any preexisting file will be removed before the database is opened.
func createOnDisk(b []byte, path string, fkConstraints, wal bool) (*sql.DB, error) {
func createOnDisk(b []byte, path string, fkConstraints, wal bool) (*sql.SwappableDB, error) {
if err := sql.RemoveFiles(path); err != nil {
return nil, err
}
@ -2279,7 +2239,7 @@ func createOnDisk(b []byte, path string, fkConstraints, wal bool) (*sql.DB, erro
return nil, err
}
}
return sql.Open(path, fkConstraints, wal)
return sql.OpenSwappable(path, fkConstraints, wal)
}
func copyFromReaderToFile(path string, r io.Reader) (int64, error) {

Loading…
Cancel
Save