From c782b43f2d45e98f2a0478d7b4314fe61b18826c Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Mon, 1 Feb 2021 23:55:32 -0500 Subject: [PATCH] Build on-disk databases in-memory first With this change, rqlite nodes running in "on-disk" mode build the database in memory first, and move it to the disk just before the Raft system starts. This means that on-disk nodes now initialize almost as quickly as in-memory nodes. --- cmd/rqlited/main.go | 5 - {store => log}/log.go | 58 ++++-------- {store => log}/log_test.go | 153 ++++++++++++------------------ store/store.go | 187 ++++++++++++++++++++++++++----------- 4 files changed, 212 insertions(+), 191 deletions(-) rename {store => log}/log.go (53%) rename {store => log}/log_test.go (64%) diff --git a/cmd/rqlited/main.go b/cmd/rqlited/main.go index 700b6027..5b2fe648 100644 --- a/cmd/rqlited/main.go +++ b/cmd/rqlited/main.go @@ -234,11 +234,6 @@ func main() { enableBootstrap = true // New node, so we may be bootstrapping } else { log.Printf("preexisting node state detected in %s", dataPath) - fi, li, err := store.NewLog(dataPath).Indexes() - if err != nil { - log.Fatalf("failed to get Log first and last indexes: %s", err.Error()) - } - log.Printf("first log index is %d, last log index is %d", fi, li) } // Determine join addresses diff --git a/store/log.go b/log/log.go similarity index 53% rename from store/log.go rename to log/log.go index b6484aa1..c62fecf0 100644 --- a/store/log.go +++ b/log/log.go @@ -1,8 +1,7 @@ -package store +package log import ( "fmt" - "path/filepath" "github.com/hashicorp/raft" "github.com/hashicorp/raft-boltdb" @@ -10,34 +9,29 @@ import ( // Log is an object that can return information about the Raft log. type Log struct { - path string + *raftboltdb.BoltStore } // NewLog returns an instantiated Log object. -func NewLog(dir string) *Log { - return &Log{ - path: filepath.Join(dir, raftDBPath), +func NewLog(path string) (*Log, error) { + bs, err := raftboltdb.NewBoltStore(path) + if err != nil { + return nil, fmt.Errorf("new bolt store: %s", err) } + return &Log{bs}, nil } -// FirstIndex returns the first index written. 0 for no entries. -func (l *Log) FirstIndex() (uint64, error) { - bs, err := raftboltdb.NewBoltStore(l.path) +// Indexes returns the first and last indexes. +func (l *Log) Indexes() (uint64, uint64, error) { + fi, err := l.FirstIndex() if err != nil { - return 0, fmt.Errorf("new bolt store: %s", err) + return 0, 0, fmt.Errorf("failed to get first index: %s", err) } - defer bs.Close() - return bs.FirstIndex() -} - -// LastIndex returns the last index written. 0 for no entries. -func (l *Log) LastIndex() (uint64, error) { - bs, err := raftboltdb.NewBoltStore(l.path) + li, err := l.LastIndex() if err != nil { - return 0, fmt.Errorf("new bolt store: %s", err) + return 0, 0, fmt.Errorf("failed to get last index: %s", err) } - defer bs.Close() - return bs.LastIndex() + return fi, li, nil } // LastCommandIndex returns the index of the last Command @@ -49,16 +43,15 @@ func (l *Log) LastCommandIndex() (uint64, error) { return 0, fmt.Errorf("get indexes: %s", err) } - bs, err := raftboltdb.NewBoltStore(l.path) - if err != nil { - return 0, fmt.Errorf("new bolt store: %s", err) + // Check for empty log. + if li == 0 { + return 0, nil } - defer bs.Close() var rl raft.Log for i := li; i >= fi; i-- { - if err := bs.GetLog(i, &rl); err != nil { - return 0, fmt.Errorf("get log: %s", err) + if err := l.GetLog(i, &rl); err != nil { + return 0, fmt.Errorf("get log at index %d: %s", i, err) } if rl.Type == raft.LogCommand { return i, nil @@ -66,16 +59,3 @@ func (l *Log) LastCommandIndex() (uint64, error) { } return 0, nil } - -// Indexes returns the first and last indexes. -func (l *Log) Indexes() (uint64, uint64, error) { - fi, err := l.FirstIndex() - if err != nil { - return 0, 0, fmt.Errorf("failed to get first index: %s", err) - } - li, err := l.LastIndex() - if err != nil { - return 0, 0, fmt.Errorf("failed to get last index: %s", err) - } - return fi, li, nil -} diff --git a/store/log_test.go b/log/log_test.go similarity index 64% rename from store/log_test.go rename to log/log_test.go index 7071e43d..8694ca90 100644 --- a/store/log_test.go +++ b/log/log_test.go @@ -1,26 +1,22 @@ -package store +package log import ( + "io/ioutil" "os" - "path/filepath" "testing" "github.com/hashicorp/raft" "github.com/hashicorp/raft-boltdb" ) -func Test_LogNew(t *testing.T) { - l := NewLog("/some/path") - if l == nil { - t.Fatal("got nil pointer for log") - } -} - -func Test_LogNewNotExist(t *testing.T) { - path := mustTempDir() +func Test_LogNewEmpty(t *testing.T) { + path := mustTempFile() defer os.Remove(path) - l := NewLog(path) + l, err := NewLog(path) + if err != nil { + t.Fatalf("failed to create log: %s", err) + } fi, err := l.FirstIndex() if err != nil { t.Fatalf("failed to get first index: %s", err) @@ -36,44 +32,23 @@ func Test_LogNewNotExist(t *testing.T) { if li != 0 { t.Fatalf("got non-zero value for last index of empty log: %d", li) } -} - -func Test_LogNewExistEmpty(t *testing.T) { - path := mustTempDir() - defer os.Remove(path) - - // Precreate an empty BoltDB store. - bs, err := raftboltdb.NewBoltStore(filepath.Join(path, raftDBPath)) - if err != nil { - t.Fatalf("failed to create bolt store: %s", err) - } - bs.Close() - - l := NewLog(path) - fi, err := l.FirstIndex() + lci, err := l.LastCommandIndex() if err != nil { - t.Fatalf("failed to get first index: %s", err) + t.Fatalf("failed to get last command index: %s", err) } - if fi != 0 { - t.Fatalf("got non-zero value for first index of empty log: %d", fi) + if lci != 0 { + t.Fatalf("got wrong value for last command index of not empty log: %d", lci) } - li, err := l.LastIndex() - if err != nil { - t.Fatalf("failed to get last index: %s", err) - } - if li != 0 { - t.Fatalf("got non-zero value for last index of empty log: %d", li) - } } func Test_LogNewExistNotEmpty(t *testing.T) { - path := mustTempDir() + path := mustTempFile() defer os.Remove(path) // Write some entries directory to the BoltDB Raft store. - bs, err := raftboltdb.NewBoltStore(filepath.Join(path, raftDBPath)) + bs, err := raftboltdb.NewBoltStore(path) if err != nil { t.Fatalf("failed to create bolt store: %s", err) } @@ -88,7 +63,10 @@ func Test_LogNewExistNotEmpty(t *testing.T) { t.Fatalf("failed to close bolt db: %s", err) } - l := NewLog(path) + l, err := NewLog(path) + if err != nil { + t.Fatalf("failed to create new log: %s", err) + } fi, err := l.FirstIndex() if err != nil { @@ -114,8 +92,12 @@ func Test_LogNewExistNotEmpty(t *testing.T) { t.Fatalf("got wrong value for last command index of not empty log: %d", lci) } + if err := l.Close(); err != nil { + t.Fatalf("failed to close log: %s", err) + } + // Delete an entry, recheck index functionality. - bs, err = raftboltdb.NewBoltStore(filepath.Join(path, raftDBPath)) + bs, err = raftboltdb.NewBoltStore(path) if err != nil { t.Fatalf("failed to re-open bolt store: %s", err) } @@ -126,6 +108,11 @@ func Test_LogNewExistNotEmpty(t *testing.T) { t.Fatalf("failed to close bolt db: %s", err) } + l, err = NewLog(path) + if err != nil { + t.Fatalf("failed to create new log: %s", err) + } + fi, err = l.FirstIndex() if err != nil { t.Fatalf("failed to get first index: %s", err) @@ -152,64 +139,18 @@ func Test_LogNewExistNotEmpty(t *testing.T) { if li != 4 { t.Fatalf("got wrong value for last index of empty log: %d", li) } -} - -func Test_LogLastCommandIndex(t *testing.T) { - path := mustTempDir() - defer os.Remove(path) - - // Write some entries directory to the BoltDB Raft store. - bs, err := raftboltdb.NewBoltStore(filepath.Join(path, raftDBPath)) - if err != nil { - t.Fatalf("failed to create bolt store: %s", err) - } - - for i := 1; i < 3; i++ { - if err := bs.StoreLog(&raft.Log{ - Index: uint64(i), - Type: raft.LogCommand, - }); err != nil { - t.Fatalf("failed to write entry to raft log: %s", err) - } - } - if err := bs.StoreLog(&raft.Log{ - Index: uint64(3), - Type: raft.LogNoop, - }); err != nil { - t.Fatalf("failed to write entry to raft log: %s", err) - } - - if err := bs.Close(); err != nil { - t.Fatalf("failed to close bolt db: %s", err) - } - - l := NewLog(path) - lci, err := l.LastCommandIndex() - if err != nil { - t.Fatalf("failed to get last command index: %s", err) - } - if lci != 2 { - t.Fatalf("got wrong value for last command index of not empty log: %d", lci) - } - fi, li, err := l.Indexes() - if err != nil { - t.Fatalf("failed to get indexes: %s", err) - } - if fi != 1 { - t.Fatalf("got wrong value for first index of empty log: %d", fi) - } - if li != 3 { - t.Fatalf("got wrong for last index of empty log: %d", li) + if err := l.Close(); err != nil { + t.Fatalf("failed to close log: %s", err) } } func Test_LogLastCommandIndexNotExist(t *testing.T) { - path := mustTempDir() + path := mustTempFile() defer os.Remove(path) // Write some entries directory to the BoltDB Raft store. - bs, err := raftboltdb.NewBoltStore(filepath.Join(path, raftDBPath)) + bs, err := raftboltdb.NewBoltStore(path) if err != nil { t.Fatalf("failed to create bolt store: %s", err) } @@ -225,7 +166,10 @@ func Test_LogLastCommandIndexNotExist(t *testing.T) { t.Fatalf("failed to close bolt db: %s", err) } - l := NewLog(path) + l, err := NewLog(path) + if err != nil { + t.Fatalf("failed to create new log: %s", err) + } fi, err := l.FirstIndex() if err != nil { @@ -248,11 +192,15 @@ func Test_LogLastCommandIndexNotExist(t *testing.T) { t.Fatalf("failed to get last command index: %s", err) } if lci != 0 { - t.Fatalf("got wrong for last command index of not empty log: %d", lci) + t.Fatalf("got wrong value for last command index of not empty log: %d", lci) + } + + if err := l.Close(); err != nil { + t.Fatalf("failed to close log: %s", err) } // Delete first log. - bs, err = raftboltdb.NewBoltStore(filepath.Join(path, raftDBPath)) + bs, err = raftboltdb.NewBoltStore(path) if err != nil { t.Fatalf("failed to re-open bolt store: %s", err) } @@ -263,12 +211,27 @@ func Test_LogLastCommandIndexNotExist(t *testing.T) { t.Fatalf("failed to close bolt db: %s", err) } + l, err = NewLog(path) + if err != nil { + t.Fatalf("failed to create new log: %s", err) + } + lci, err = l.LastCommandIndex() if err != nil { t.Fatalf("failed to get last command index: %s", err) } if lci != 0 { - t.Fatalf("got wrong for last command index of not empty log: %d", lci) + t.Fatalf("got wrong value for last command index of not empty log: %d", lci) } +} +// mustTempFile returns a path to a temporary file in directory dir. It is up to the +// caller to remove the file once it is no longer needed. +func mustTempFile() string { + tmpfile, err := ioutil.TempFile("", "rqlite-db-test") + if err != nil { + panic(err.Error()) + } + tmpfile.Close() + return tmpfile.Name() } diff --git a/store/store.go b/store/store.go index 4521ab2f..5130f135 100644 --- a/store/store.go +++ b/store/store.go @@ -24,10 +24,10 @@ import ( "unsafe" "github.com/hashicorp/raft" - "github.com/hashicorp/raft-boltdb" "github.com/rqlite/rqlite/command" legacy "github.com/rqlite/rqlite/command/legacy" sql "github.com/rqlite/rqlite/db" + rlog "github.com/rqlite/rqlite/log" ) var ( @@ -122,12 +122,16 @@ type Store struct { reqMarshaller *command.RequestMarshaler // Request marshaler for writing to log. raftLog raft.LogStore // Persistent log store. raftStable raft.StableStore // Persistent k-v store. - boltStore *raftboltdb.BoltStore // Physical store. + boltStore *rlog.Log // Physical store. - lastIdxOnOpen uint64 // Last index on log when Store opens. - firstLogAppliedT time.Time // Time first log is applied - appliedOnOpen uint64 // Number of logs applied at open. - openT time.Time // Timestamp when Store opens. + onDiskCreated bool // On disk database actually created? + snapsExistOnOpen bool // Any snaps present when store opens? + firstIdxOnOpen uint64 // First index on log when Store opens. + lastIdxOnOpen uint64 // Last index on log when Store opens. + lastCommandIdxOnOpen uint64 // Last command index on log when Store opens. + firstLogAppliedT time.Time // Time first log is applied + appliedOnOpen uint64 // Number of logs applied at open. + openT time.Time // Timestamp when Store opens. txMu sync.RWMutex // Sync between snapshots and query-level transactions. queryMu sync.RWMutex // Sync queries generally with other operations. @@ -200,13 +204,6 @@ func (s *Store) Open(enableBootstrap bool) error { return err } - // Open underlying database. - db, err := s.open() - if err != nil { - return err - } - s.db = db - // Create Raft-compatible network layer. s.raftTn = raft.NewNetworkTransport(NewTransport(s.ln), connectionPoolCount, connectionTimeout, nil) @@ -221,11 +218,17 @@ func (s *Store) Open(enableBootstrap bool) error { if err != nil { return fmt.Errorf("file snapshot store: %s", err) } + snaps, err := snapshots.List() + if err != nil { + return fmt.Errorf("list snapshots: %s", err) + } + s.logger.Printf("%d preexisting snapshots present", len(snaps)) + s.snapsExistOnOpen = len(snaps) > 0 // Create the log store and stable store. - s.boltStore, err = raftboltdb.NewBoltStore(filepath.Join(s.raftDir, raftDBPath)) + s.boltStore, err = rlog.NewLog(filepath.Join(s.raftDir, raftDBPath)) if err != nil { - return fmt.Errorf("new bolt store: %s", err) + return fmt.Errorf("new log store: %s", err) } s.raftStable = s.boltStore s.raftLog, err = raft.NewLogCache(raftLogCacheSize, s.boltStore) @@ -234,9 +237,29 @@ func (s *Store) Open(enableBootstrap bool) error { } // Get some info about the log, before any more entries are committed. - s.lastIdxOnOpen, err = s.raftLog.LastIndex() - if err != nil { - return fmt.Errorf("failed to get last index: %s", err) + if err := s.setLogInfo(); err != nil { + return fmt.Errorf("set log info: %s", err) + } + s.logger.Printf("first log index: %d, last log index: %d, last command log index: %d:", + s.firstIdxOnOpen, s.lastIdxOnOpen, s.lastCommandIdxOnOpen) + + // If an on-disk database has been requested, and there are no snapshots, and + // there are no commands in the log, then this is the only opportunity to + // create that on-disk database file before Raft initializes. + if !s.dbConf.Memory && !s.snapsExistOnOpen && s.lastCommandIdxOnOpen == 0 { + db, err := s.openOnDisk() + if err != nil { + return fmt.Errorf("failed to open on-disk database") + } + s.db = db + s.onDiskCreated = true + } else { + // We need an in-memory database, at least for bootstrapping purposes. + db, err := s.openInMemory() + if err != nil { + return fmt.Errorf("failed to open in-memory database") + } + s.db = db } // Instantiate the Raft system. @@ -437,13 +460,15 @@ func (s *Store) Stats() (map[string]interface{}, error) { "version": sql.DBVersion, "db_size": dbSz, } - if !s.dbConf.Memory { + if s.dbConf.Memory { + dbStatus["path"] = ":memory:" + } else { dbStatus["path"] = s.dbPath - if dbStatus["size"], err = s.db.FileSize(); err != nil { - return nil, err + if s.onDiskCreated { + if dbStatus["size"], err = s.db.FileSize(); err != nil { + return nil, err + } } - } else { - dbStatus["path"] = ":memory:" } nodes, err := s.Nodes() @@ -771,31 +796,47 @@ func (s *Store) setMetadata(id string, md map[string]string) error { return nil } -// open opens the in-memory or file-based database. -func (s *Store) open() (*sql.DB, error) { - var db *sql.DB - var err error - if s.dbConf.Memory { - db, err = sql.OpenInMemoryWithDSN(s.dbConf.DSN) - if err != nil { - return nil, err - } - s.logger.Printf("SQLite %s database opened", s.databaseTypePretty()) - } else { - // Explicitly remove any pre-existing SQLite database file as it will be - // completely rebuilt from committed log entries (and possibly a snapshot). - if err := os.Remove(s.dbPath); err != nil && !os.IsNotExist(err) { - return nil, err - } - db, err = sql.OpenWithDSN(s.dbPath, s.dbConf.DSN) - if err != nil { - return nil, err - } - s.logger.Printf("SQLite %s database opened at %s", s.databaseTypePretty(), s.dbPath) +// openOnDisk opens an empty in-memory database at the Store's configured path. +func (s *Store) openInMemory() (*sql.DB, error) { + db, err := sql.OpenInMemoryWithDSN(s.dbConf.DSN) + if err != nil { + return nil, err + } + return db, nil +} + +// openOnDisk opens an empty on-disk database at the Store's configured path. +func (s *Store) openOnDisk() (*sql.DB, error) { + // Explicitly remove any pre-existing SQLite database file as it will be + // completely rebuilt from committed log entries (and possibly a snapshot). + if err := os.Remove(s.dbPath); err != nil && !os.IsNotExist(err) { + return nil, err + } + db, err := sql.OpenWithDSN(s.dbPath, s.dbConf.DSN) + if err != nil { + return nil, err } return db, nil } +// setLogInfo records some key indexs about the log. +func (s *Store) setLogInfo() error { + var err error + s.firstIdxOnOpen, err = s.boltStore.FirstIndex() + if err != nil { + return fmt.Errorf("failed to get last index: %s", err) + } + s.lastIdxOnOpen, err = s.boltStore.LastIndex() + if err != nil { + return fmt.Errorf("failed to get last index: %s", err) + } + s.lastCommandIdxOnOpen, err = s.boltStore.LastCommandIndex() + if err != nil { + return fmt.Errorf("failed to get last command index: %s", err) + } + return nil +} + // remove removes the node, with the given ID, from the cluster. func (s *Store) remove(id string) error { if s.raft.State() != raft.Leader { @@ -877,13 +918,48 @@ type fsmGenericResponse struct { } // Apply applies a Raft log entry to the database. -func (s *Store) Apply(l *raft.Log) interface{} { +func (s *Store) Apply(l *raft.Log) (e interface{}) { defer func() { - if l.Index <= s.lastIdxOnOpen { + if l.Index <= s.lastCommandIdxOnOpen { + // In here means at least one command entry was in the log when the Store + // opened. s.appliedOnOpen++ - if l.Index == s.lastIdxOnOpen { + if l.Index == s.lastCommandIdxOnOpen { s.logger.Printf("%d committed log entries applied in %s, took %s since open", s.appliedOnOpen, time.Since(s.firstLogAppliedT), time.Since(s.openT)) + + // Last command log applied. Time to switch to on-disk database? + if s.dbConf.Memory { + s.logger.Println("continuing use of in-memory database") + } else { + // Since we're here, it means that a) an on-disk database was requested + // *and* there were commands in the log. A snapshot may or may not have + // been applied, but it wouldn't have created the on-disk database in that + // case since there were commands in the log. This is the very last chance + // to do it. + b, err := s.db.Serialize() + if err != nil { + e = &fsmGenericResponse{error: fmt.Errorf("serialize failed: %s", err)} + return + } + if err := s.db.Close(); err != nil { + e = &fsmGenericResponse{error: fmt.Errorf("close failed: %s", err)} + return + } + // Write new database to file on disk + if err := ioutil.WriteFile(s.dbPath, b, 0660); err != nil { + e = &fsmGenericResponse{error: fmt.Errorf("write failed: %s", err)} + return + } + + // Re-open it. + s.db, err = sql.OpenWithDSN(s.dbPath, s.dbConf.DSN) + if err != nil { + e = &fsmGenericResponse{error: fmt.Errorf("open on-disk failed: %s", err)} + } + s.onDiskCreated = true + s.logger.Println("successfully switched to on-disk database") + } } } }() @@ -1070,8 +1146,11 @@ func (s *Store) Restore(rc io.ReadCloser) error { } var db *sql.DB - if !s.dbConf.Memory { - // Write snapshot over any existing database file. + if !s.dbConf.Memory && s.lastCommandIdxOnOpen == 0 { + // A snapshot clearly exists (this function has been called) but there + // are no command entries in the log -- so Apply will not be called. + // Therefore this is the last opportunity to create the on-disk database + // before Raft starts. if err := ioutil.WriteFile(s.dbPath, database, 0660); err != nil { return err } @@ -1081,8 +1160,14 @@ func (s *Store) Restore(rc io.ReadCloser) error { if err != nil { return fmt.Errorf("open with DSN: %s", err) } + s.onDiskCreated = true + s.logger.Println("successfully switched to on-disk database due to restore") } else { - // In memory, so directly deserialize into the database. + // Deserialize into an in-memory database because a) an in-memory database + // has been requested, or b) while there was a snapshot, there are also + // command entries in the log. So by sticking with an in-memory database + // those entries will be applied in the fastest possible manner. We will + // defer creation of any database on disk until the Apply function. db, err = sql.DeserializeInMemoryWithDSN(database, s.dbConf.DSN) if err != nil { return fmt.Errorf("DeserializeInMemoryWithDSN with DSN: %s", err) @@ -1093,8 +1178,6 @@ func (s *Store) Restore(rc io.ReadCloser) error { return fmt.Errorf("failed to close pre-restore database: %s", err) } s.db = db - s.logger.Printf("successfully restored %s database from snapshot", - s.databaseTypePretty()) // Unmarshal remaining bytes, and set to cluster meta. err = func() error {