From 140f106acc1399adc812f4bf344080a8f6dea9a6 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Sat, 16 Dec 2023 10:01:35 -0500 Subject: [PATCH] Add ReadFrom to Store --- db/db.go | 2 +- store/store.go | 75 +++++++++++++++ store/store_test.go | 152 ++++++++++++++++++++++++++++++ store/testdata/wal-enabled.sqlite | Bin 0 -> 8192 bytes 4 files changed, 228 insertions(+), 1 deletion(-) create mode 100644 store/testdata/wal-enabled.sqlite diff --git a/db/db.go b/db/db.go index d33c0d46..a9a2a2fa 100644 --- a/db/db.go +++ b/db/db.go @@ -199,7 +199,7 @@ func IsDELETEModeEnabledSQLiteFile(path string) bool { return IsDELETEModeEnabled(b) } -// IsDELETEModeEnabled checks that the supplied path looks like a SQLite file +// IsDELETEModeEnabled checks that the supplied data looks like a SQLite file // with DELETE mode enabled. func IsDELETEModeEnabled(b []byte) bool { return len(b) >= 20 && b[18] == 1 && b[19] == 1 diff --git a/store/store.go b/store/store.go index 73f9a830..d2560a1f 100644 --- a/store/store.go +++ b/store/store.go @@ -42,6 +42,10 @@ var ( // operation. ErrNotLeader = errors.New("not leader") + // ErrNotSingleNode is returned when a node attempts to execute a single-node + // only operation. + ErrNotSingleNode = errors.New("not single-node") + // ErrStaleRead is returned if the executing the query would violate the // requested freshness. ErrStaleRead = errors.New("stale read") @@ -96,6 +100,7 @@ const ( numSnapshotsFull = "num_snapshots_full" numSnapshotsIncremental = "num_snapshots_incremental" numProvides = "num_provides" + numBoots = "num_boots" numBackups = "num_backups" numLoads = "num_loads" numRestores = "num_restores" @@ -135,6 +140,7 @@ func ResetStats() { stats.Add(numUserSnapshots, 0) stats.Add(numSnapshotsFull, 0) stats.Add(numSnapshotsIncremental, 0) + stats.Add(numBoots, 0) stats.Add(numProvides, 0) stats.Add(numBackups, 0) stats.Add(numLoads, 0) @@ -1253,6 +1259,75 @@ func (s *Store) load(lr *command.LoadRequest) error { return nil } +// ReadFrom reads data from r, and loads it into the database, bypassing Raft consensus. +// Once the data is loaded, a snapshot is triggered, which then results in a system as +// if the data had been loaded through Raft consensus. +func (s *Store) ReadFrom(r io.Reader) (int64, error) { + // Check the constraints. + if s.raft.State() != raft.Leader { + return 0, ErrNotLeader + } + nodes, err := s.Nodes() + if err != nil { + return 0, err + } + if len(nodes) != 1 { + return 0, ErrNotSingleNode + } + + // Write the data to a temporary file.. + f, err := os.CreateTemp("", "rqlite-boot-*") + if err != nil { + return 0, err + } + defer f.Close() + defer os.Remove(f.Name()) + n, err := io.Copy(f, r) + if err != nil { + return n, err + } + f.Close() + + // Confirm the data is a valid SQLite database. + if !sql.IsValidSQLiteFile(f.Name()) { + return n, fmt.Errorf("invalid SQLite data") + } + if !sql.IsDELETEModeEnabledSQLiteFile(f.Name()) { + return n, fmt.Errorf("SQLite file does not have DELETE mode enabled") + } + + // Close the database, remove the database file, and then rename the temporary + if err := s.db.Close(); err != nil { + return n, err + } + if err := sql.RemoveFiles(s.dbPath); err != nil { + return n, err + } + if err := os.Rename(f.Name(), s.dbPath); err != nil { + return n, err + } + db, err := sql.Open(s.dbPath, s.dbConf.FKConstraints, true) + if err != nil { + return n, err + } + s.db = db + + // Raft won't snapshot unless there is at least one unsnappshotted log entry. + if af, err := s.Noop("boot"); err != nil { + return n, err + } else if err := af.Error(); err != nil { + return n, err + } + if err := s.snapshotStore.SetFullNeeded(); err != nil { + return n, err + } + if err := s.Snapshot(); err != nil { + return n, err + } + stats.Add(numBoots, 1) + return n, nil +} + // Notify notifies this Store that a node is ready for bootstrapping at the // given address. Once the number of known nodes reaches the expected level // bootstrapping will be attempted using this Store. "Expected level" includes diff --git a/store/store_test.go b/store/store_test.go index 200e7512..89b320fb 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -2,6 +2,7 @@ package store import ( "bytes" + "crypto/rand" "errors" "expvar" "fmt" @@ -1313,6 +1314,142 @@ func Test_SingleNodeSetRestoreFailBadFile(t *testing.T) { } } +// Test_SingleNodeBoot tests that a Store correctly boots from SQLite data. +func Test_SingleNodeBoot(t *testing.T) { + s, ln := mustNewStore(t) + defer ln.Close() + + if err := s.Open(); err != nil { + t.Fatalf("failed to open single-node store: %s", err.Error()) + } + if err := s.Bootstrap(NewServer(s.ID(), s.Addr(), true)); err != nil { + t.Fatalf("failed to bootstrap single-node store: %s", err.Error()) + } + defer s.Close(true) + if _, err := s.WaitForLeader(10 * time.Second); err != nil { + t.Fatalf("Error waiting for leader: %s", err) + } + + // Load a dataset, to check it's erased by the SQLite file load. + dump := `PRAGMA foreign_keys=OFF; +BEGIN TRANSACTION; +CREATE TABLE bar (id integer not null primary key, name text); +INSERT INTO "bar" VALUES(1,'declan'); +COMMIT; +` + _, err := s.Execute(executeRequestFromString(dump, false, false)) + if err != nil { + t.Fatalf("failed to load simple dump: %s", err.Error()) + } + + // Check that data were loaded correctly. + qr := queryRequestFromString("SELECT * FROM bar", false, true) + qr.Level = command.QueryRequest_QUERY_REQUEST_LEVEL_STRONG + r, err := s.Query(qr) + if err != nil { + t.Fatalf("failed to query single node: %s", err.Error()) + } + if exp, got := `["id","name"]`, asJSON(r[0].Columns); exp != got { + t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got) + } + if exp, got := `[[1,"declan"]]`, asJSON(r[0].Values); exp != got { + t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got) + } + + f, err := os.Open(filepath.Join("testdata", "load.sqlite")) + if err != nil { + t.Fatalf("failed to open SQLite file: %s", err.Error()) + } + defer f.Close() + + // Load the SQLite file in bypass mode, check that the right number + // of snapshots were created, and that the right amount of data was + // loaded. + numSnapshots := s.numSnapshots + n, err := s.ReadFrom(f) + if err != nil { + t.Fatalf("failed to load SQLite file via Reader: %s", err.Error()) + } + sz := mustFileSize(filepath.Join("testdata", "load.sqlite")) + if n != sz { + t.Fatalf("expected %d bytes to be read, got %d", sz, n) + } + if s.numSnapshots != numSnapshots+1 { + t.Fatalf("expected 1 extra snapshot, got %d", s.numSnapshots) + } + + // Check that data were loaded correctly. + qr = queryRequestFromString("SELECT * FROM foo WHERE id=2", false, true) + qr.Level = command.QueryRequest_QUERY_REQUEST_LEVEL_STRONG + r, err = s.Query(qr) + if err != nil { + t.Fatalf("failed to query single node: %s", err.Error()) + } + if exp, got := `["id","name"]`, asJSON(r[0].Columns); exp != got { + t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got) + } + if exp, got := `[[2,"fiona"]]`, asJSON(r[0].Values); exp != got { + t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got) + } + qr = queryRequestFromString("SELECT count(*) FROM foo", false, true) + qr.Level = command.QueryRequest_QUERY_REQUEST_LEVEL_STRONG + r, err = s.Query(qr) + if err != nil { + t.Fatalf("failed to query single node: %s", err.Error()) + } + if exp, got := `["count(*)"]`, asJSON(r[0].Columns); exp != got { + t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got) + } + if exp, got := `[[3]]`, asJSON(r[0].Values); exp != got { + t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got) + } + + // Check pre-existing data is gone. + qr = queryRequestFromString("SELECT * FROM bar", false, true) + qr.Level = command.QueryRequest_QUERY_REQUEST_LEVEL_STRONG + r, err = s.Query(qr) + if err != nil { + t.Fatalf("failed to query single node: %s", err.Error()) + } + if exp, got := `{"error":"no such table: bar"}`, asJSON(r[0]); exp != got { + t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got) + } +} + +func Test_SingleNodeBoot_Fail(t *testing.T) { + s, ln := mustNewStore(t) + defer ln.Close() + + if err := s.Open(); err != nil { + t.Fatalf("failed to open single-node store: %s", err.Error()) + } + if err := s.Bootstrap(NewServer(s.ID(), s.Addr(), true)); err != nil { + t.Fatalf("failed to bootstrap single-node store: %s", err.Error()) + } + defer s.Close(true) + if _, err := s.WaitForLeader(10 * time.Second); err != nil { + t.Fatalf("Error waiting for leader: %s", err) + } + + // Ensure invalid SQLite data is not accepted. + b := make([]byte, 1024) + mustReadRandom(b) + r := bytes.NewReader(b) + if _, err := s.ReadFrom(r); err == nil { + t.Fatalf("expected error reading from invalid SQLite file") + } + + // Ensure WAL-enabled SQLite file is not accepted. + f, err := os.Open(filepath.Join("testdata", "wal-enabled.sqlite")) + if err != nil { + t.Fatalf("failed to open SQLite file: %s", err.Error()) + } + defer f.Close() + if _, err := s.ReadFrom(f); err == nil { + t.Fatalf("expected error reading from WAL-enabled SQLite file") + } +} + // Test_SingleNodeRecoverNoChange tests a node recovery that doesn't // actually change anything. func Test_SingleNodeRecoverNoChange(t *testing.T) { @@ -2711,6 +2848,13 @@ func mustReadFile(path string) []byte { return b } +func mustReadRandom(b []byte) { + _, err := rand.Read(b) + if err != nil { + panic("failed to read random bytes") + } +} + func mustCopyFileToTempFile(path string) string { f := mustCreateTempFile() mustWriteFile(f, string(mustReadFile(path))) @@ -2725,6 +2869,14 @@ func mustParseDuration(t string) time.Duration { return d } +func mustFileSize(path string) int64 { + n, err := fileSize(path) + if err != nil { + panic("failed to get file size") + } + return n +} + func executeRequestFromString(s string, timings, tx bool) *command.ExecuteRequest { return executeRequestFromStrings([]string{s}, timings, tx) } diff --git a/store/testdata/wal-enabled.sqlite b/store/testdata/wal-enabled.sqlite new file mode 100644 index 0000000000000000000000000000000000000000..7ee3518604c0795df63733dddbd18d7c5f4e705c GIT binary patch literal 8192 zcmeI#u?oU45C-5&5ITuM2N#DM6~x6Cuu70(tQsTONh-Ebum+?r;0yVTrqaE;{6Cjm z0yllzosPP!b1vMg%4%NFkffv~GZD%5`1XtNcdvant@(FF()>E^`zbN_kS_!R5P$## zAOHafKmY;|fB*y_0D(?{g9t`pDBeMx-5z=2oG-aHDzb`gw9?8i@{T9vo#Vu+jWV3< zEhlHK`C#H*WK!O$bjHuD%Gs)moi@9