From 1c8e52c5f714e45776bba84e3232aca306fd4ddf Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Thu, 13 Jul 2023 10:07:50 -0400 Subject: [PATCH] Test_SingleNodeLoadBinaryFromReader passes That's good, but much more extensive testing needed, and this needs to replace the existing load path -- and then we move to 8.0. --- command/chunking/dechunker.go | 51 ++++++++++++++++++ command/chunking/dechunker_test.go | 85 ++++++++++++++++++++++++++++++ store/store.go | 60 +++++++++++++++++++-- 3 files changed, 193 insertions(+), 3 deletions(-) diff --git a/command/chunking/dechunker.go b/command/chunking/dechunker.go index 2d92c240..0c6106b1 100644 --- a/command/chunking/dechunker.go +++ b/command/chunking/dechunker.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "os" + "sync" "github.com/rqlite/rqlite/command" ) @@ -68,3 +69,53 @@ func (d *Dechunker) Close() (string, error) { } return d.filePath, nil } + +// DechunkerManager manages Dechunkers. +type DechunkerManager struct { + dir string + mu sync.Mutex + m map[string]*Dechunker +} + +// NewDechunkerManager returns a new DechunkerManager. +func NewDechunkerManager(dir string) (*DechunkerManager, error) { + // Test that we can use the given directory. + file, err := os.CreateTemp(dir, "dechunker-manager-test-*") + if err != nil { + return nil, fmt.Errorf("failed to test file in dir %s: %v", dir, err) + } + file.Close() + os.Remove(file.Name()) + + return &DechunkerManager{ + dir: dir, + }, nil +} + +// Get returns the Dechunker for the given stream ID. If the Dechunker does not +// exist, it is created. +func (d *DechunkerManager) Get(id string) (*Dechunker, error) { + d.mu.Lock() + defer d.mu.Unlock() + + if d.m == nil { + d.m = make(map[string]*Dechunker) + } + + if _, ok := d.m[id]; !ok { + dechunker, err := NewDechunker(d.dir) + if err != nil { + return nil, err + } + d.m[id] = dechunker + } + + return d.m[id], nil +} + +// Delete deletes the Dechunker for the given stream ID. +func (d *DechunkerManager) Delete(id string) { + d.mu.Lock() + defer d.mu.Unlock() + delete(d.m, id) +} diff --git a/command/chunking/dechunker_test.go b/command/chunking/dechunker_test.go index 4ef44097..51b2c368 100644 --- a/command/chunking/dechunker_test.go +++ b/command/chunking/dechunker_test.go @@ -309,6 +309,91 @@ func Test_ReassemblyOfLargeData(t *testing.T) { } } +func Test_CreateDechunkerManager(t *testing.T) { + dir := os.TempDir() + manager, err := NewDechunkerManager(dir) + if err != nil { + t.Fatalf("failed to create DechunkerManager: %v", err) + } + if manager == nil { + t.Fatalf("expected DechunkerManager instance, got nil") + } + if manager.dir != dir { + t.Errorf("unexpected dir in manager: expected %s, got %s", dir, manager.dir) + } + if manager.m != nil { + t.Errorf("expected manager map to be nil, got: %v", manager.m) + } +} + +func Test_CreateDechunkerManagerFail(t *testing.T) { + manager, err := NewDechunkerManager("/does/not/exist/surely") + if err == nil { + t.Fatalf("expected an error but got none") + } + if manager != nil { + t.Fatalf("expected nil manager but got one") + } +} + +func Test_GetDechunker(t *testing.T) { + dir := os.TempDir() + manager, err := NewDechunkerManager(dir) + if err != nil { + t.Fatalf("failed to create DechunkerManager: %v", err) + } + + dechunker1, err := manager.Get("test1") + if err != nil { + t.Fatalf("failed to get dechunker: %v", err) + } + if dechunker1 == nil { + t.Fatalf("expected Dechunker instance, got nil") + } + + // Make sure we get the same dechunker. + dechunker2, err := manager.Get("test1") + if err != nil { + t.Fatalf("failed to get dechunker: %v", err) + } + if dechunker2 == nil { + t.Fatalf("expected Dechunker instance, got nil") + } + if dechunker1 != dechunker2 { + t.Errorf("expected same dechunker instance, got different instances") + } +} + +func Test_DeleteDechunker(t *testing.T) { + dir := os.TempDir() + manager, err := NewDechunkerManager(dir) + if err != nil { + t.Fatalf("failed to create DechunkerManager: %v", err) + } + + dechunker1, err := manager.Get("test1") + if err != nil { + t.Fatalf("failed to get dechunker: %v", err) + } + if dechunker1 == nil { + t.Fatalf("expected Dechunker instance, got nil") + } + + manager.Delete("test1") + + // Attempt to get the Dechunker for the ID "test1" + dechunker2, err := manager.Get("test1") + if err != nil { + t.Fatalf("failed to get dechunker: %v", err) + } + if dechunker2 == nil { + t.Fatalf("expected Dechunker instance, got nil") + } + if dechunker1 == dechunker2 { + t.Errorf("expected different dechunker instances, got same instance") + } +} + func mustCompressData(data []byte) []byte { var buf bytes.Buffer gz := gzip.NewWriter(&buf) diff --git a/store/store.go b/store/store.go index ba89962b..7dea1dc9 100644 --- a/store/store.go +++ b/store/store.go @@ -180,6 +180,8 @@ type Store struct { dbAppliedIndex uint64 appliedIdxUpdateDone chan struct{} + dechunkManager *chunking.DechunkerManager + // Channels that must be closed for the Store to be considered ready. readyChans []<-chan struct{} numClosedReadyChannels int @@ -348,6 +350,11 @@ func (s *Store) Open() (retErr error) { if err := os.MkdirAll(filepath.Dir(s.peersPath), 0755); err != nil { return err } + decMgmr, err := chunking.NewDechunkerManager(filepath.Dir(s.dbPath)) + if err != nil { + return err + } + s.dechunkManager = decMgmr // Create Raft-compatible network layer. s.raftTn = raft.NewNetworkTransport(NewTransport(s.ln), connectionPoolCount, connectionTimeout, nil) @@ -1550,7 +1557,7 @@ func (s *Store) Apply(l *raft.Log) (e interface{}) { s.firstLogAppliedT = time.Now() } - typ, r := applyCommand(l.Data, &s.db) + typ, r := applyCommand(l.Data, &s.db, s.dechunkManager) if typ == command.Command_COMMAND_TYPE_NOOP { s.numNoops++ } @@ -1858,6 +1865,12 @@ func RecoverNode(dataDir string, logger *log.Logger, logs raft.LogStore, stable } defer db.Close() + // Need a dechunker manager to handle any chunked load requests. + decMgmr, err := chunking.NewDechunkerManager(dataDir) + if err != nil { + return fmt.Errorf("failed to create dechunker manager: %s", err.Error()) + } + // The snapshot information is the best known end point for the data // until we play back the Raft log entries. lastIndex := snapshotIndex @@ -1876,7 +1889,7 @@ func RecoverNode(dataDir string, logger *log.Logger, logs raft.LogStore, stable return fmt.Errorf("failed to get log at index %d: %v", index, err) } if entry.Type == raft.LogCommand { - applyCommand(entry.Data, &db) + applyCommand(entry.Data, &db, decMgmr) } lastIndex = entry.Index lastTerm = entry.Term @@ -1925,7 +1938,7 @@ func dbBytesFromSnapshot(rc io.ReadCloser) ([]byte, error) { return database.Bytes(), nil } -func applyCommand(data []byte, pDB **sql.DB) (command.Command_Type, interface{}) { +func applyCommand(data []byte, pDB **sql.DB, decMgmr *chunking.DechunkerManager) (command.Command_Type, interface{}) { var c command.Command db := *pDB @@ -1987,6 +2000,47 @@ func applyCommand(data []byte, pDB **sql.DB) (command.Command_Type, interface{}) panic(fmt.Sprintf("failed to unmarshal load-chunk subcommand: %s", err.Error())) } + dec, err := decMgmr.Get(lcr.StreamId) + if err != nil { + return c.Type, &fsmGenericResponse{error: fmt.Errorf("failed to get dechunker: %s", err)} + } + last, err := dec.WriteChunk(&lcr) + if err != nil { + return c.Type, &fsmGenericResponse{error: fmt.Errorf("failed to write chunk: %s", err)} + } + if last { + path, err := dec.Close() + if err != nil { + return c.Type, &fsmGenericResponse{error: fmt.Errorf("failed to close dechunker: %s", err)} + } + decMgmr.Delete(lcr.StreamId) + + // Read all the data at path into a byte slice. + b, err := ioutil.ReadFile(path) + if err != nil { + return c.Type, &fsmGenericResponse{error: fmt.Errorf("failed to read chunked data: %s", err)} + } + os.Remove(path) + + var newDB *sql.DB + if db.InMemory() { + newDB, err = createInMemory(b, db.FKEnabled()) + if err != nil { + return c.Type, &fsmGenericResponse{error: fmt.Errorf("failed to create in-memory database: %s", err)} + } + } else { + newDB, err = createOnDisk(b, db.Path(), db.FKEnabled(), db.WALEnabled()) + if err != nil { + return c.Type, &fsmGenericResponse{error: fmt.Errorf("failed to create on-disk database: %s", err)} + } + } + + // Swap the underlying database to the new one. + if err := db.Close(); err != nil { + return c.Type, &fsmGenericResponse{error: fmt.Errorf("failed to close post-load database: %s", err)} + } + *pDB = newDB + } return c.Type, &fsmGenericResponse{} case command.Command_COMMAND_TYPE_NOOP: return c.Type, &fsmGenericResponse{}