1
0
Fork 0

Compress SQLite database in Raft snapshot

Before Raft truncates the log, it needs to retrieve a snapshot of the SQLite database to represent the log entries that are removed during that truncation. Raft then writes that snapshot to disk. To reduce both disk IO and disk space this change compresses that SQLite snapshot before it is written to disk, and decompresses it during a restore operation.

This change also deals with older SQLite snapshots, which were not compressed.
master
Philip O'Toole 4 years ago committed by GitHub
parent 8f194717f1
commit 6d77d904e7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -5,6 +5,7 @@ package store
import (
"bytes"
"compress/gzip"
"encoding/binary"
"encoding/json"
"errors"
@ -13,12 +14,14 @@ import (
"io"
"io/ioutil"
"log"
"math"
"os"
"path/filepath"
"sort"
"strconv"
"sync"
"time"
"unsafe"
"github.com/hashicorp/raft"
"github.com/hashicorp/raft-boltdb"
@ -962,20 +965,67 @@ func (s *Store) Restore(rc io.ReadCloser) error {
return err
}
// Get size of database.
var sz uint64
if err := binary.Read(rc, binary.LittleEndian, &sz); err != nil {
return err
var uint64_size uint64
inc := int64(unsafe.Sizeof(uint64_size))
// Read all the data into RAM, since we have to decode known-length
// chunks of various forms.
var offset int64
b, err := ioutil.ReadAll(rc)
if err != nil {
return fmt.Errorf("readall: %s", err)
}
readUint64 := func(bb []byte) (uint64, error) {
var sz uint64
if err := binary.Read(bytes.NewReader(bb), binary.LittleEndian, &sz); err != nil {
return 0, fmt.Errorf("read initial size: %s", err)
}
return sz, nil
}
// Get size of database, checking for compression.
compressed := false
sz, err := readUint64(b[offset : offset+inc])
if err != nil {
return fmt.Errorf("read compression check: %s", err)
}
offset = offset + inc
if sz == math.MaxUint64 {
compressed = true
// Database is actually compressed, read actual size next.
sz, err = readUint64(b[offset : offset+inc])
if err != nil {
return fmt.Errorf("read compressed size: %s", err)
}
offset = offset + inc
}
// Now read in the database file data and restore.
// Now read in the database file data, decompress if necessary, and restore.
database := make([]byte, sz)
if _, err := io.ReadFull(rc, database); err != nil {
return err
if compressed {
buf := new(bytes.Buffer)
gz, err := gzip.NewReader(bytes.NewReader(b[offset : offset+int64(sz)]))
if err != nil {
return err
}
if _, err := io.Copy(buf, gz); err != nil {
return fmt.Errorf("SQLite database decompress: %s", err)
}
if err := gz.Close(); err != nil {
return err
}
database = buf.Bytes()
offset += int64(sz)
} else {
database = b[offset : offset+int64(sz)]
offset += int64(sz)
}
var db *sql.DB
var err error
if !s.dbConf.Memory {
// Write snapshot over any existing database file.
if err := ioutil.WriteFile(s.dbPath, database, 0660); err != nil {
@ -985,7 +1035,7 @@ func (s *Store) Restore(rc io.ReadCloser) error {
// Re-open it.
db, err = sql.OpenWithDSN(s.dbPath, s.dbConf.DSN)
if err != nil {
return err
return fmt.Errorf("open with DSN: %s", err)
}
} else {
// In memory. Copy to temporary file, and then load memory from file.
@ -1003,24 +1053,19 @@ func (s *Store) Restore(rc io.ReadCloser) error {
// Load an in-memory database from the snapshot now on disk.
db, err = sql.LoadInMemoryWithDSN(f.Name(), s.dbConf.DSN)
if err != nil {
return err
return fmt.Errorf("load into memory with DSN: %s", err)
}
}
s.db = db
// Read remaining bytes, and set to cluster meta.
b, err := ioutil.ReadAll(rc)
if err != nil {
return err
}
// Unmarshal remaining bytes, and set to cluster meta.
err = func() error {
s.metaMu.Lock()
defer s.metaMu.Unlock()
return json.Unmarshal(b, &s.meta)
return json.Unmarshal(b[offset:], &s.meta)
}()
if err != nil {
return err
return fmt.Errorf("cluster metadata unmarshal: %s", err)
}
stats.Add(numRestores, 1)
return nil
@ -1053,23 +1098,44 @@ type fsmSnapshot struct {
// Persist writes the snapshot to the given sink.
func (f *fsmSnapshot) Persist(sink raft.SnapshotSink) error {
err := func() error {
// Start by writing size of database.
b := new(bytes.Buffer)
sz := uint64(len(f.database))
err := binary.Write(b, binary.LittleEndian, sz)
var b *bytes.Buffer
var sz uint64
// Flag compressed database by writing max uint64 value first.
// No SQLite database written by earlier versions will have this
// as a size. *Surely*.
b = new(bytes.Buffer)
err := binary.Write(b, binary.LittleEndian, uint64(math.MaxUint64))
if err != nil {
return err
}
if _, err := sink.Write(b.Bytes()); err != nil {
return err
}
b.Reset() // Clear state of buffer for future use.
// Get compressed copy of database.
cdb, err := f.compressedDatabase()
if err != nil {
return err
}
// Next write database to sink.
if _, err := sink.Write(f.database); err != nil {
// Write size of compressed database.
sz = uint64(len(cdb))
err = binary.Write(b, binary.LittleEndian, sz)
if err != nil {
return err
}
if _, err := sink.Write(b.Bytes()); err != nil {
return err
}
// Finally write the meta.
// Write compressed database to sink.
if _, err := sink.Write(cdb); err != nil {
return err
}
// Write the cluster metadata.
if _, err := sink.Write(f.meta); err != nil {
return err
}
@ -1086,6 +1152,22 @@ func (f *fsmSnapshot) Persist(sink raft.SnapshotSink) error {
return nil
}
func (f *fsmSnapshot) compressedDatabase() ([]byte, error) {
var buf bytes.Buffer
gz, err := gzip.NewWriterLevel(&buf, gzip.BestCompression)
if err != nil {
return nil, err
}
if _, err := gz.Write(f.database); err != nil {
return nil, err
}
if err := gz.Close(); err != nil {
return nil, err
}
return buf.Bytes(), nil
}
// Database copies contents of the underlying SQLite database to dst
func (s *Store) database(leader bool, dst io.Writer) error {
if leader && s.raft.State() != raft.Leader {

@ -1105,6 +1105,39 @@ func Test_SingleNodeSnapshotInMem(t *testing.T) {
}
}
func Test_SingleNodeRestoreNoncompressed(t *testing.T) {
s := mustNewStore(false)
defer os.RemoveAll(s.Path())
if err := s.Open(true); err != nil {
t.Fatalf("failed to open single-node store: %s", err.Error())
}
defer s.Close(true)
s.WaitForLeader(10 * time.Second)
// Check restoration from a pre-compressed SQLite database snap.
// This is to test for backwards compatilibty of this code.
f, err := os.Open(filepath.Join("testdata", "noncompressed-sqlite-snap.bin"))
if err != nil {
t.Fatalf("failed to open snapshot file: %s", err.Error())
}
if err := s.Restore(f); err != nil {
t.Fatalf("failed to restore noncompressed snapshot from disk: %s", err.Error())
}
// Ensure database is back in the expected state.
r, err := s.Query(queryRequestFromString("SELECT count(*) FROM foo", false, false))
if err != nil {
t.Fatalf("failed to query single node: %s", err.Error())
}
if exp, got := `["count(*)"]`, asJSON(r[0].Columns); exp != got {
t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
}
if exp, got := `[[5000]]`, asJSON(r[0].Values); exp != got {
t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
}
}
func Test_MetadataMultinode(t *testing.T) {
s0 := mustNewStore(true)
if err := s0.Open(true); err != nil {

Binary file not shown.
Loading…
Cancel
Save