1
0
Fork 0

Add ReadFrom to Store

master
Philip O'Toole 9 months ago
parent 6163987b9d
commit 140f106acc

@ -199,7 +199,7 @@ func IsDELETEModeEnabledSQLiteFile(path string) bool {
return IsDELETEModeEnabled(b) return IsDELETEModeEnabled(b)
} }
// IsDELETEModeEnabled checks that the supplied path looks like a SQLite file // IsDELETEModeEnabled checks that the supplied data looks like a SQLite file
// with DELETE mode enabled. // with DELETE mode enabled.
func IsDELETEModeEnabled(b []byte) bool { func IsDELETEModeEnabled(b []byte) bool {
return len(b) >= 20 && b[18] == 1 && b[19] == 1 return len(b) >= 20 && b[18] == 1 && b[19] == 1

@ -42,6 +42,10 @@ var (
// operation. // operation.
ErrNotLeader = errors.New("not leader") ErrNotLeader = errors.New("not leader")
// ErrNotSingleNode is returned when a node attempts to execute a single-node
// only operation.
ErrNotSingleNode = errors.New("not single-node")
// ErrStaleRead is returned if the executing the query would violate the // ErrStaleRead is returned if the executing the query would violate the
// requested freshness. // requested freshness.
ErrStaleRead = errors.New("stale read") ErrStaleRead = errors.New("stale read")
@ -96,6 +100,7 @@ const (
numSnapshotsFull = "num_snapshots_full" numSnapshotsFull = "num_snapshots_full"
numSnapshotsIncremental = "num_snapshots_incremental" numSnapshotsIncremental = "num_snapshots_incremental"
numProvides = "num_provides" numProvides = "num_provides"
numBoots = "num_boots"
numBackups = "num_backups" numBackups = "num_backups"
numLoads = "num_loads" numLoads = "num_loads"
numRestores = "num_restores" numRestores = "num_restores"
@ -135,6 +140,7 @@ func ResetStats() {
stats.Add(numUserSnapshots, 0) stats.Add(numUserSnapshots, 0)
stats.Add(numSnapshotsFull, 0) stats.Add(numSnapshotsFull, 0)
stats.Add(numSnapshotsIncremental, 0) stats.Add(numSnapshotsIncremental, 0)
stats.Add(numBoots, 0)
stats.Add(numProvides, 0) stats.Add(numProvides, 0)
stats.Add(numBackups, 0) stats.Add(numBackups, 0)
stats.Add(numLoads, 0) stats.Add(numLoads, 0)
@ -1253,6 +1259,75 @@ func (s *Store) load(lr *command.LoadRequest) error {
return nil return nil
} }
// ReadFrom reads data from r, and loads it into the database, bypassing Raft consensus.
// Once the data is loaded, a snapshot is triggered, which then results in a system as
// if the data had been loaded through Raft consensus.
func (s *Store) ReadFrom(r io.Reader) (int64, error) {
// Check the constraints.
if s.raft.State() != raft.Leader {
return 0, ErrNotLeader
}
nodes, err := s.Nodes()
if err != nil {
return 0, err
}
if len(nodes) != 1 {
return 0, ErrNotSingleNode
}
// Write the data to a temporary file..
f, err := os.CreateTemp("", "rqlite-boot-*")
if err != nil {
return 0, err
}
defer f.Close()
defer os.Remove(f.Name())
n, err := io.Copy(f, r)
if err != nil {
return n, err
}
f.Close()
// Confirm the data is a valid SQLite database.
if !sql.IsValidSQLiteFile(f.Name()) {
return n, fmt.Errorf("invalid SQLite data")
}
if !sql.IsDELETEModeEnabledSQLiteFile(f.Name()) {
return n, fmt.Errorf("SQLite file does not have DELETE mode enabled")
}
// Close the database, remove the database file, and then rename the temporary
if err := s.db.Close(); err != nil {
return n, err
}
if err := sql.RemoveFiles(s.dbPath); err != nil {
return n, err
}
if err := os.Rename(f.Name(), s.dbPath); err != nil {
return n, err
}
db, err := sql.Open(s.dbPath, s.dbConf.FKConstraints, true)
if err != nil {
return n, err
}
s.db = db
// Raft won't snapshot unless there is at least one unsnappshotted log entry.
if af, err := s.Noop("boot"); err != nil {
return n, err
} else if err := af.Error(); err != nil {
return n, err
}
if err := s.snapshotStore.SetFullNeeded(); err != nil {
return n, err
}
if err := s.Snapshot(); err != nil {
return n, err
}
stats.Add(numBoots, 1)
return n, nil
}
// Notify notifies this Store that a node is ready for bootstrapping at the // Notify notifies this Store that a node is ready for bootstrapping at the
// given address. Once the number of known nodes reaches the expected level // given address. Once the number of known nodes reaches the expected level
// bootstrapping will be attempted using this Store. "Expected level" includes // bootstrapping will be attempted using this Store. "Expected level" includes

@ -2,6 +2,7 @@ package store
import ( import (
"bytes" "bytes"
"crypto/rand"
"errors" "errors"
"expvar" "expvar"
"fmt" "fmt"
@ -1313,6 +1314,142 @@ func Test_SingleNodeSetRestoreFailBadFile(t *testing.T) {
} }
} }
// Test_SingleNodeBoot tests that a Store correctly boots from SQLite data.
func Test_SingleNodeBoot(t *testing.T) {
s, ln := mustNewStore(t)
defer ln.Close()
if err := s.Open(); err != nil {
t.Fatalf("failed to open single-node store: %s", err.Error())
}
if err := s.Bootstrap(NewServer(s.ID(), s.Addr(), true)); err != nil {
t.Fatalf("failed to bootstrap single-node store: %s", err.Error())
}
defer s.Close(true)
if _, err := s.WaitForLeader(10 * time.Second); err != nil {
t.Fatalf("Error waiting for leader: %s", err)
}
// Load a dataset, to check it's erased by the SQLite file load.
dump := `PRAGMA foreign_keys=OFF;
BEGIN TRANSACTION;
CREATE TABLE bar (id integer not null primary key, name text);
INSERT INTO "bar" VALUES(1,'declan');
COMMIT;
`
_, err := s.Execute(executeRequestFromString(dump, false, false))
if err != nil {
t.Fatalf("failed to load simple dump: %s", err.Error())
}
// Check that data were loaded correctly.
qr := queryRequestFromString("SELECT * FROM bar", false, true)
qr.Level = command.QueryRequest_QUERY_REQUEST_LEVEL_STRONG
r, err := s.Query(qr)
if err != nil {
t.Fatalf("failed to query single node: %s", err.Error())
}
if exp, got := `["id","name"]`, asJSON(r[0].Columns); exp != got {
t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
}
if exp, got := `[[1,"declan"]]`, asJSON(r[0].Values); exp != got {
t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
}
f, err := os.Open(filepath.Join("testdata", "load.sqlite"))
if err != nil {
t.Fatalf("failed to open SQLite file: %s", err.Error())
}
defer f.Close()
// Load the SQLite file in bypass mode, check that the right number
// of snapshots were created, and that the right amount of data was
// loaded.
numSnapshots := s.numSnapshots
n, err := s.ReadFrom(f)
if err != nil {
t.Fatalf("failed to load SQLite file via Reader: %s", err.Error())
}
sz := mustFileSize(filepath.Join("testdata", "load.sqlite"))
if n != sz {
t.Fatalf("expected %d bytes to be read, got %d", sz, n)
}
if s.numSnapshots != numSnapshots+1 {
t.Fatalf("expected 1 extra snapshot, got %d", s.numSnapshots)
}
// Check that data were loaded correctly.
qr = queryRequestFromString("SELECT * FROM foo WHERE id=2", false, true)
qr.Level = command.QueryRequest_QUERY_REQUEST_LEVEL_STRONG
r, err = s.Query(qr)
if err != nil {
t.Fatalf("failed to query single node: %s", err.Error())
}
if exp, got := `["id","name"]`, asJSON(r[0].Columns); exp != got {
t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
}
if exp, got := `[[2,"fiona"]]`, asJSON(r[0].Values); exp != got {
t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
}
qr = queryRequestFromString("SELECT count(*) FROM foo", false, true)
qr.Level = command.QueryRequest_QUERY_REQUEST_LEVEL_STRONG
r, err = s.Query(qr)
if err != nil {
t.Fatalf("failed to query single node: %s", err.Error())
}
if exp, got := `["count(*)"]`, asJSON(r[0].Columns); exp != got {
t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
}
if exp, got := `[[3]]`, asJSON(r[0].Values); exp != got {
t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
}
// Check pre-existing data is gone.
qr = queryRequestFromString("SELECT * FROM bar", false, true)
qr.Level = command.QueryRequest_QUERY_REQUEST_LEVEL_STRONG
r, err = s.Query(qr)
if err != nil {
t.Fatalf("failed to query single node: %s", err.Error())
}
if exp, got := `{"error":"no such table: bar"}`, asJSON(r[0]); exp != got {
t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
}
}
func Test_SingleNodeBoot_Fail(t *testing.T) {
s, ln := mustNewStore(t)
defer ln.Close()
if err := s.Open(); err != nil {
t.Fatalf("failed to open single-node store: %s", err.Error())
}
if err := s.Bootstrap(NewServer(s.ID(), s.Addr(), true)); err != nil {
t.Fatalf("failed to bootstrap single-node store: %s", err.Error())
}
defer s.Close(true)
if _, err := s.WaitForLeader(10 * time.Second); err != nil {
t.Fatalf("Error waiting for leader: %s", err)
}
// Ensure invalid SQLite data is not accepted.
b := make([]byte, 1024)
mustReadRandom(b)
r := bytes.NewReader(b)
if _, err := s.ReadFrom(r); err == nil {
t.Fatalf("expected error reading from invalid SQLite file")
}
// Ensure WAL-enabled SQLite file is not accepted.
f, err := os.Open(filepath.Join("testdata", "wal-enabled.sqlite"))
if err != nil {
t.Fatalf("failed to open SQLite file: %s", err.Error())
}
defer f.Close()
if _, err := s.ReadFrom(f); err == nil {
t.Fatalf("expected error reading from WAL-enabled SQLite file")
}
}
// Test_SingleNodeRecoverNoChange tests a node recovery that doesn't // Test_SingleNodeRecoverNoChange tests a node recovery that doesn't
// actually change anything. // actually change anything.
func Test_SingleNodeRecoverNoChange(t *testing.T) { func Test_SingleNodeRecoverNoChange(t *testing.T) {
@ -2711,6 +2848,13 @@ func mustReadFile(path string) []byte {
return b return b
} }
func mustReadRandom(b []byte) {
_, err := rand.Read(b)
if err != nil {
panic("failed to read random bytes")
}
}
func mustCopyFileToTempFile(path string) string { func mustCopyFileToTempFile(path string) string {
f := mustCreateTempFile() f := mustCreateTempFile()
mustWriteFile(f, string(mustReadFile(path))) mustWriteFile(f, string(mustReadFile(path)))
@ -2725,6 +2869,14 @@ func mustParseDuration(t string) time.Duration {
return d return d
} }
func mustFileSize(path string) int64 {
n, err := fileSize(path)
if err != nil {
panic("failed to get file size")
}
return n
}
func executeRequestFromString(s string, timings, tx bool) *command.ExecuteRequest { func executeRequestFromString(s string, timings, tx bool) *command.ExecuteRequest {
return executeRequestFromStrings([]string{s}, timings, tx) return executeRequestFromStrings([]string{s}, timings, tx)
} }

Binary file not shown.
Loading…
Cancel
Save