1
0
Fork 0

Test_SingleNodeLoadBinaryFromReader passes

That's good, but much more extensive testing needed, and this needs to
replace the existing load path -- and then we move to 8.0.
master
Philip O'Toole 1 year ago
parent 93a752ae0e
commit 1c8e52c5f7

@ -6,6 +6,7 @@ import (
"fmt"
"io"
"os"
"sync"
"github.com/rqlite/rqlite/command"
)
@ -68,3 +69,53 @@ func (d *Dechunker) Close() (string, error) {
}
return d.filePath, nil
}
// DechunkerManager manages Dechunkers.
type DechunkerManager struct {
dir string
mu sync.Mutex
m map[string]*Dechunker
}
// NewDechunkerManager returns a new DechunkerManager.
func NewDechunkerManager(dir string) (*DechunkerManager, error) {
// Test that we can use the given directory.
file, err := os.CreateTemp(dir, "dechunker-manager-test-*")
if err != nil {
return nil, fmt.Errorf("failed to test file in dir %s: %v", dir, err)
}
file.Close()
os.Remove(file.Name())
return &DechunkerManager{
dir: dir,
}, nil
}
// Get returns the Dechunker for the given stream ID. If the Dechunker does not
// exist, it is created.
func (d *DechunkerManager) Get(id string) (*Dechunker, error) {
d.mu.Lock()
defer d.mu.Unlock()
if d.m == nil {
d.m = make(map[string]*Dechunker)
}
if _, ok := d.m[id]; !ok {
dechunker, err := NewDechunker(d.dir)
if err != nil {
return nil, err
}
d.m[id] = dechunker
}
return d.m[id], nil
}
// Delete deletes the Dechunker for the given stream ID.
func (d *DechunkerManager) Delete(id string) {
d.mu.Lock()
defer d.mu.Unlock()
delete(d.m, id)
}

@ -309,6 +309,91 @@ func Test_ReassemblyOfLargeData(t *testing.T) {
}
}
func Test_CreateDechunkerManager(t *testing.T) {
dir := os.TempDir()
manager, err := NewDechunkerManager(dir)
if err != nil {
t.Fatalf("failed to create DechunkerManager: %v", err)
}
if manager == nil {
t.Fatalf("expected DechunkerManager instance, got nil")
}
if manager.dir != dir {
t.Errorf("unexpected dir in manager: expected %s, got %s", dir, manager.dir)
}
if manager.m != nil {
t.Errorf("expected manager map to be nil, got: %v", manager.m)
}
}
func Test_CreateDechunkerManagerFail(t *testing.T) {
manager, err := NewDechunkerManager("/does/not/exist/surely")
if err == nil {
t.Fatalf("expected an error but got none")
}
if manager != nil {
t.Fatalf("expected nil manager but got one")
}
}
func Test_GetDechunker(t *testing.T) {
dir := os.TempDir()
manager, err := NewDechunkerManager(dir)
if err != nil {
t.Fatalf("failed to create DechunkerManager: %v", err)
}
dechunker1, err := manager.Get("test1")
if err != nil {
t.Fatalf("failed to get dechunker: %v", err)
}
if dechunker1 == nil {
t.Fatalf("expected Dechunker instance, got nil")
}
// Make sure we get the same dechunker.
dechunker2, err := manager.Get("test1")
if err != nil {
t.Fatalf("failed to get dechunker: %v", err)
}
if dechunker2 == nil {
t.Fatalf("expected Dechunker instance, got nil")
}
if dechunker1 != dechunker2 {
t.Errorf("expected same dechunker instance, got different instances")
}
}
func Test_DeleteDechunker(t *testing.T) {
dir := os.TempDir()
manager, err := NewDechunkerManager(dir)
if err != nil {
t.Fatalf("failed to create DechunkerManager: %v", err)
}
dechunker1, err := manager.Get("test1")
if err != nil {
t.Fatalf("failed to get dechunker: %v", err)
}
if dechunker1 == nil {
t.Fatalf("expected Dechunker instance, got nil")
}
manager.Delete("test1")
// Attempt to get the Dechunker for the ID "test1"
dechunker2, err := manager.Get("test1")
if err != nil {
t.Fatalf("failed to get dechunker: %v", err)
}
if dechunker2 == nil {
t.Fatalf("expected Dechunker instance, got nil")
}
if dechunker1 == dechunker2 {
t.Errorf("expected different dechunker instances, got same instance")
}
}
func mustCompressData(data []byte) []byte {
var buf bytes.Buffer
gz := gzip.NewWriter(&buf)

@ -180,6 +180,8 @@ type Store struct {
dbAppliedIndex uint64
appliedIdxUpdateDone chan struct{}
dechunkManager *chunking.DechunkerManager
// Channels that must be closed for the Store to be considered ready.
readyChans []<-chan struct{}
numClosedReadyChannels int
@ -348,6 +350,11 @@ func (s *Store) Open() (retErr error) {
if err := os.MkdirAll(filepath.Dir(s.peersPath), 0755); err != nil {
return err
}
decMgmr, err := chunking.NewDechunkerManager(filepath.Dir(s.dbPath))
if err != nil {
return err
}
s.dechunkManager = decMgmr
// Create Raft-compatible network layer.
s.raftTn = raft.NewNetworkTransport(NewTransport(s.ln), connectionPoolCount, connectionTimeout, nil)
@ -1550,7 +1557,7 @@ func (s *Store) Apply(l *raft.Log) (e interface{}) {
s.firstLogAppliedT = time.Now()
}
typ, r := applyCommand(l.Data, &s.db)
typ, r := applyCommand(l.Data, &s.db, s.dechunkManager)
if typ == command.Command_COMMAND_TYPE_NOOP {
s.numNoops++
}
@ -1858,6 +1865,12 @@ func RecoverNode(dataDir string, logger *log.Logger, logs raft.LogStore, stable
}
defer db.Close()
// Need a dechunker manager to handle any chunked load requests.
decMgmr, err := chunking.NewDechunkerManager(dataDir)
if err != nil {
return fmt.Errorf("failed to create dechunker manager: %s", err.Error())
}
// The snapshot information is the best known end point for the data
// until we play back the Raft log entries.
lastIndex := snapshotIndex
@ -1876,7 +1889,7 @@ func RecoverNode(dataDir string, logger *log.Logger, logs raft.LogStore, stable
return fmt.Errorf("failed to get log at index %d: %v", index, err)
}
if entry.Type == raft.LogCommand {
applyCommand(entry.Data, &db)
applyCommand(entry.Data, &db, decMgmr)
}
lastIndex = entry.Index
lastTerm = entry.Term
@ -1925,7 +1938,7 @@ func dbBytesFromSnapshot(rc io.ReadCloser) ([]byte, error) {
return database.Bytes(), nil
}
func applyCommand(data []byte, pDB **sql.DB) (command.Command_Type, interface{}) {
func applyCommand(data []byte, pDB **sql.DB, decMgmr *chunking.DechunkerManager) (command.Command_Type, interface{}) {
var c command.Command
db := *pDB
@ -1987,6 +2000,47 @@ func applyCommand(data []byte, pDB **sql.DB) (command.Command_Type, interface{})
panic(fmt.Sprintf("failed to unmarshal load-chunk subcommand: %s", err.Error()))
}
dec, err := decMgmr.Get(lcr.StreamId)
if err != nil {
return c.Type, &fsmGenericResponse{error: fmt.Errorf("failed to get dechunker: %s", err)}
}
last, err := dec.WriteChunk(&lcr)
if err != nil {
return c.Type, &fsmGenericResponse{error: fmt.Errorf("failed to write chunk: %s", err)}
}
if last {
path, err := dec.Close()
if err != nil {
return c.Type, &fsmGenericResponse{error: fmt.Errorf("failed to close dechunker: %s", err)}
}
decMgmr.Delete(lcr.StreamId)
// Read all the data at path into a byte slice.
b, err := ioutil.ReadFile(path)
if err != nil {
return c.Type, &fsmGenericResponse{error: fmt.Errorf("failed to read chunked data: %s", err)}
}
os.Remove(path)
var newDB *sql.DB
if db.InMemory() {
newDB, err = createInMemory(b, db.FKEnabled())
if err != nil {
return c.Type, &fsmGenericResponse{error: fmt.Errorf("failed to create in-memory database: %s", err)}
}
} else {
newDB, err = createOnDisk(b, db.Path(), db.FKEnabled(), db.WALEnabled())
if err != nil {
return c.Type, &fsmGenericResponse{error: fmt.Errorf("failed to create on-disk database: %s", err)}
}
}
// Swap the underlying database to the new one.
if err := db.Close(); err != nil {
return c.Type, &fsmGenericResponse{error: fmt.Errorf("failed to close post-load database: %s", err)}
}
*pDB = newDB
}
return c.Type, &fsmGenericResponse{}
case command.Command_COMMAND_TYPE_NOOP:
return c.Type, &fsmGenericResponse{}

Loading…
Cancel
Save