1
0
Fork 0

Restore from file uses chunking

master
Philip O'Toole 1 year ago
parent 5ca5ce342e
commit b7633143cc

@ -79,6 +79,8 @@ const (
raftLogCacheSize = 512
trailingScale = 1.25
observerChanLen = 50
defaultChunkSize = 512 * 1024 * 1024 // 512 MB
)
const (
@ -163,8 +165,9 @@ type Store struct {
peersPath string
peersInfoPath string
restorePath string
restoreDoneCh chan struct{}
restoreChunkSize int64
restorePath string
restoreDoneCh chan struct{}
raft *raft.Raft // The consensus mechanism.
ln Listener
@ -274,19 +277,20 @@ func New(ln Listener, c *Config) *Store {
}
return &Store{
ln: ln,
raftDir: c.Dir,
peersPath: filepath.Join(c.Dir, peersPath),
peersInfoPath: filepath.Join(c.Dir, peersInfoPath),
restoreDoneCh: make(chan struct{}),
raftID: c.ID,
dbConf: c.DBConf,
dbPath: dbPath,
leaderObservers: make([]chan<- struct{}, 0),
reqMarshaller: command.NewRequestMarshaler(),
logger: logger,
notifyingNodes: make(map[string]*Server),
ApplyTimeout: applyTimeout,
ln: ln,
raftDir: c.Dir,
peersPath: filepath.Join(c.Dir, peersPath),
peersInfoPath: filepath.Join(c.Dir, peersInfoPath),
restoreChunkSize: defaultChunkSize,
restoreDoneCh: make(chan struct{}),
raftID: c.ID,
dbConf: c.DBConf,
dbPath: dbPath,
leaderObservers: make([]chan<- struct{}, 0),
reqMarshaller: command.NewRequestMarshaler(),
logger: logger,
notifyingNodes: make(map[string]*Server),
ApplyTimeout: applyTimeout,
}
}
@ -315,6 +319,12 @@ func (s *Store) SetRestorePath(path string) error {
return nil
}
// SetRestoreChunkSize sets the chunk size to use when restoring a database.
// If not set, the default chunk size is used.
func (s *Store) SetRestoreChunkSize(size int64) {
s.restoreChunkSize = size
}
// Open opens the Store.
func (s *Store) Open() (retErr error) {
defer func() {
@ -1153,7 +1163,7 @@ func (s *Store) Provide(path string) error {
// LoadFromReader reads data from r chunk-by-chunk, and loads it into the
// database.
func (s *Store) LoadFromReader(r io.Reader) error {
func (s *Store) LoadFromReader(r io.Reader, chunkSize int64) error {
if !s.open {
return ErrNotOpen
}
@ -1162,7 +1172,7 @@ func (s *Store) LoadFromReader(r io.Reader) error {
return ErrNotReady
}
chunker := chunking.NewChunker(r, 1024*1024)
chunker := chunking.NewChunker(r, chunkSize)
for {
chunk, err := chunker.Next()
if err != nil {
@ -1784,14 +1794,7 @@ func (s *Store) installRestore() error {
return err
}
defer f.Close()
b, err := io.ReadAll(f)
if err != nil {
return err
}
lr := &command.LoadRequest{
Data: b,
}
return s.load(lr)
return s.LoadFromReader(f, s.restoreChunkSize)
}
// logSize returns the size of the Raft log on disk.

@ -1021,7 +1021,7 @@ COMMIT;
t.Fatalf("failed to open SQLite file: %s", err.Error())
}
defer f.Close()
err = s.LoadFromReader(f)
err = s.LoadFromReader(f, 1024*1024)
if err != nil {
t.Fatalf("failed to load SQLite file via Reader: %s", err.Error())
}

Loading…
Cancel
Save