From a6a3567a4d3620f73296a52a93615906f130dd50 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Mon, 9 May 2022 15:33:40 -0400 Subject: [PATCH 01/13] DB layer returns in-memory status --- db/db.go | 5 +++++ db/db_test.go | 7 +++++++ 2 files changed, 12 insertions(+) diff --git a/db/db.go b/db/db.go index 021a3467..be677aa7 100644 --- a/db/db.go +++ b/db/db.go @@ -318,6 +318,11 @@ func (db *DB) FileSize() (int64, error) { return fi.Size(), nil } +// InMemory returns whether this database is in-memory. +func (db *DB) InMemory() bool { + return db.memory +} + // CompileOptions returns the SQLite compilation options. func (db *DB) CompileOptions() ([]string, error) { res, err := db.QueryStringStmt("PRAGMA compile_options") diff --git a/db/db_test.go b/db/db_test.go index f8afdc17..8d9adfae 100644 --- a/db/db_test.go +++ b/db/db_test.go @@ -32,6 +32,9 @@ func Test_DbFileCreation(t *testing.T) { if db == nil { t.Fatal("database is nil") } + if db.InMemory() { + t.Fatal("on-disk database marked as in-memory") + } if _, err := os.Stat(dbPath); os.IsNotExist(err) { t.Fatalf("%s does not exist after open", dbPath) @@ -97,6 +100,10 @@ func Test_TableCreationInMemory(t *testing.T) { db := mustCreateInMemoryDatabase() defer db.Close() + if !db.InMemory() { + t.Fatal("in-memory database marked as not in-memory") + } + r, err := db.ExecuteStringStmt("CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)") if err != nil { t.Fatalf("failed to create table: %s", err.Error()) From 2729a1727963fa1983a21750f47f0689ff45333c Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Mon, 9 May 2022 16:31:33 -0400 Subject: [PATCH 02/13] Move some functions to standalone form --- store/store.go | 62 +++++++++++++++++++++++++------------------------- 1 file changed, 31 insertions(+), 31 deletions(-) diff --git a/store/store.go b/store/store.go index f961a837..1ac610f6 100644 --- a/store/store.go +++ b/store/store.go @@ -355,7 +355,7 @@ func (s *Store) Open() (retErr error) { // can also happen if the user explicitly disables the startup optimization of // building the SQLite database in memory, before switching to disk. if s.StartupOnDisk || (!s.dbConf.Memory && !s.snapsExistOnOpen && s.lastCommandIdxOnOpen == 0) { - s.db, err = s.createOnDisk(nil) + s.db, err = createOnDisk(nil, s.dbPath, s.dbConf.FKConstraints) if err != nil { return fmt.Errorf("failed to create on-disk database") } @@ -363,7 +363,7 @@ func (s *Store) Open() (retErr error) { s.logger.Printf("created on-disk database at open") } else { // We need an in-memory database, at least for bootstrapping purposes. - s.db, err = s.createInMemory(nil) + s.db, err = createInMemory(nil, s.dbConf.FKConstraints) if err != nil { return fmt.Errorf("failed to create in-memory database") } @@ -987,32 +987,6 @@ func (s *Store) Noop(id string) error { return nil } -// createInMemory returns an in-memory database. If b is non-nil and non-empty, -// then the database will be initialized with the contents of b. -func (s *Store) createInMemory(b []byte) (db *sql.DB, err error) { - if b == nil || len(b) == 0 { - db, err = sql.OpenInMemory(s.dbConf.FKConstraints) - } else { - db, err = sql.DeserializeIntoMemory(b, s.dbConf.FKConstraints) - } - return -} - -// createOnDisk opens an on-disk database file at the Store's configured path. If -// b is non-nil, any preexisting file will first be overwritten with those contents. -// Otherwise, any preexisting file will be removed before the database is opened. -func (s *Store) createOnDisk(b []byte) (*sql.DB, error) { - if err := os.Remove(s.dbPath); err != nil && !os.IsNotExist(err) { - return nil, err - } - if b != nil { - if err := ioutil.WriteFile(s.dbPath, b, 0660); err != nil { - return nil, err - } - } - return sql.Open(s.dbPath, s.dbConf.FKConstraints) -} - // setLogInfo records some key indexs about the log. func (s *Store) setLogInfo() error { var err error @@ -1122,7 +1096,7 @@ func (s *Store) Apply(l *raft.Log) (e interface{}) { return } // Open a new on-disk database. - s.db, err = s.createOnDisk(b) + s.db, err = createOnDisk(b, s.dbPath, s.dbConf.FKConstraints) if err != nil { e = &fsmGenericResponse{error: fmt.Errorf("open on-disk failed: %s", err)} return @@ -1213,7 +1187,7 @@ func (s *Store) Restore(rc io.ReadCloser) error { // Therefore, this is the last opportunity to create the on-disk database // before Raft starts. This could also happen because the user has explicitly // disabled the build-on-disk-database-in-memory-first optimization. - db, err = s.createOnDisk(b) + db, err = createOnDisk(b, s.dbPath, s.dbConf.FKConstraints) if err != nil { return fmt.Errorf("open on-disk file during restore: %s", err) } @@ -1225,7 +1199,7 @@ func (s *Store) Restore(rc io.ReadCloser) error { // command entries in the log. So by sticking with an in-memory database // those entries will be applied in the fastest possible manner. We will // defer creation of any database on disk until the Apply function. - db, err = s.createInMemory(b) + db, err = createInMemory(b, s.dbConf.FKConstraints) if err != nil { return fmt.Errorf("createInMemory: %s", err) } @@ -1669,6 +1643,32 @@ func checkRaftConfiguration(configuration raft.Configuration) error { return nil } +// createInMemory returns an in-memory database. If b is non-nil and non-empty, +// then the database will be initialized with the contents of b. +func createInMemory(b []byte, fkConstraints bool) (db *sql.DB, err error) { + if b == nil || len(b) == 0 { + db, err = sql.OpenInMemory(fkConstraints) + } else { + db, err = sql.DeserializeIntoMemory(b, fkConstraints) + } + return +} + +// createOnDisk opens an on-disk database file at the Store's configured path. If +// b is non-nil, any preexisting file will first be overwritten with those contents. +// Otherwise, any preexisting file will be removed before the database is opened. +func createOnDisk(b []byte, path string, fkConstraints bool) (*sql.DB, error) { + if err := os.Remove(path); err != nil && !os.IsNotExist(err) { + return nil, err + } + if b != nil { + if err := ioutil.WriteFile(path, b, 0660); err != nil { + return nil, err + } + } + return sql.Open(path, fkConstraints) +} + func readUint64(b []byte) (uint64, error) { var sz uint64 if err := binary.Read(bytes.NewReader(b), binary.LittleEndian, &sz); err != nil { From c91ca45ba45c21064cb0c05cc57e3da0037fd91f Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Mon, 9 May 2022 17:46:58 -0400 Subject: [PATCH 03/13] Expose more DB attributes --- db/db.go | 44 ++++++++++++++++++++++++++++---------------- db/db_test.go | 9 +++++++++ 2 files changed, 37 insertions(+), 16 deletions(-) diff --git a/db/db.go b/db/db.go index be677aa7..d44419df 100644 --- a/db/db.go +++ b/db/db.go @@ -52,8 +52,9 @@ func init() { // DB is the SQL database. type DB struct { - path string // Path to database file, if running on-disk. - memory bool // In-memory only. + path string // Path to database file, if running on-disk. + memory bool // In-memory only. + fkEnabled bool // Foreign key constraints enabled rwDB *sql.DB // Database connection for database reads and writes. roDB *sql.DB // Database connection database reads. @@ -107,11 +108,12 @@ func Open(dbPath string, fkEnabled bool) (*DB, error) { roDB.SetConnMaxLifetime(0) return &DB{ - path: dbPath, - rwDB: rwDB, - roDB: roDB, - rwDSN: rwDSN, - roDSN: roDSN, + path: dbPath, + fkEnabled: fkEnabled, + rwDB: rwDB, + roDB: roDB, + rwDSN: rwDSN, + roDSN: roDSN, }, nil } @@ -158,11 +160,13 @@ func OpenInMemory(fkEnabled bool) (*DB, error) { } return &DB{ - memory: true, - rwDB: rwDB, - roDB: roDB, - rwDSN: rwDSN, - roDSN: roDSN, + memory: true, + path: ":memory:", + fkEnabled: fkEnabled, + rwDB: rwDB, + roDB: roDB, + rwDSN: rwDSN, + roDSN: roDSN, }, nil } @@ -283,10 +287,8 @@ func (db *DB) Stats() (map[string]interface{}, error) { "conn_pool_stats": connPoolStats, } - if db.memory { - stats["path"] = ":memory:" - } else { - stats["path"] = db.path + stats["path"] = db.path + if !db.memory { if stats["size"], err = db.FileSize(); err != nil { return nil, err } @@ -323,6 +325,16 @@ func (db *DB) InMemory() bool { return db.memory } +// FKEnabled returns whether Foreign Key constraints are enabled. +func (db *DB) FKEnabled() bool { + return db.fkEnabled +} + +// Path returns the path of this database. +func (db *DB) Path() string { + return db.path +} + // CompileOptions returns the SQLite compilation options. func (db *DB) CompileOptions() ([]string, error) { res, err := db.QueryStringStmt("PRAGMA compile_options") diff --git a/db/db_test.go b/db/db_test.go index 8d9adfae..91b892de 100644 --- a/db/db_test.go +++ b/db/db_test.go @@ -35,6 +35,12 @@ func Test_DbFileCreation(t *testing.T) { if db.InMemory() { t.Fatal("on-disk database marked as in-memory") } + if db.FKEnabled() { + t.Fatal("FK constraints marked as enabled") + } + if db.Path() != dbPath { + t.Fatal("database path is incorrect") + } if _, err := os.Stat(dbPath); os.IsNotExist(err) { t.Fatalf("%s does not exist after open", dbPath) @@ -157,6 +163,9 @@ func Test_TableCreationInMemoryFK(t *testing.T) { // Now, do same testing with FK constraints enabled. dbFK := mustCreateInMemoryDatabaseFK() defer dbFK.Close() + if !dbFK.FKEnabled() { + t.Fatal("FK constraints not marked as enabled") + } r, err = dbFK.ExecuteStringStmt(createTableFoo) if err != nil { From 304124952c0169249bf803256971b285bca2a09b Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Mon, 9 May 2022 17:58:58 -0400 Subject: [PATCH 04/13] Add Store level changes Needs unit testing, and HTTP testing won't compile. --- command/command.pb.go | 131 +++++++++++++++++++++++++++++++----------- command/command.proto | 5 ++ command/marshal.go | 11 ++++ http/service.go | 57 ++++++++++++------ store/store.go | 68 +++++++++++++++++++++- 5 files changed, 219 insertions(+), 53 deletions(-) diff --git a/command/command.pb.go b/command/command.pb.go index 8897e696..96c1f529 100644 --- a/command/command.pb.go +++ b/command/command.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.27.1 +// protoc-gen-go v1.28.0 // protoc v3.6.1 // source: command.proto @@ -76,6 +76,7 @@ const ( Command_COMMAND_TYPE_QUERY Command_Type = 1 Command_COMMAND_TYPE_EXECUTE Command_Type = 2 Command_COMMAND_TYPE_NOOP Command_Type = 3 + Command_COMMAND_TYPE_LOAD Command_Type = 4 ) // Enum value maps for Command_Type. @@ -85,12 +86,14 @@ var ( 1: "COMMAND_TYPE_QUERY", 2: "COMMAND_TYPE_EXECUTE", 3: "COMMAND_TYPE_NOOP", + 4: "COMMAND_TYPE_LOAD", } Command_Type_value = map[string]int32{ "COMMAND_TYPE_UNKNOWN": 0, "COMMAND_TYPE_QUERY": 1, "COMMAND_TYPE_EXECUTE": 2, "COMMAND_TYPE_NOOP": 3, + "COMMAND_TYPE_LOAD": 4, } ) @@ -118,7 +121,7 @@ func (x Command_Type) Number() protoreflect.EnumNumber { // Deprecated: Use Command_Type.Descriptor instead. func (Command_Type) EnumDescriptor() ([]byte, []int) { - return file_command_proto_rawDescGZIP(), []int{9, 0} + return file_command_proto_rawDescGZIP(), []int{10, 0} } type Parameter struct { @@ -684,6 +687,53 @@ func (x *ExecuteResult) GetTime() float64 { return 0 } +type LoadRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Data []byte `protobuf:"bytes,1,opt,name=data,proto3" json:"data,omitempty"` +} + +func (x *LoadRequest) Reset() { + *x = LoadRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_command_proto_msgTypes[8] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *LoadRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*LoadRequest) ProtoMessage() {} + +func (x *LoadRequest) ProtoReflect() protoreflect.Message { + mi := &file_command_proto_msgTypes[8] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use LoadRequest.ProtoReflect.Descriptor instead. +func (*LoadRequest) Descriptor() ([]byte, []int) { + return file_command_proto_rawDescGZIP(), []int{8} +} + +func (x *LoadRequest) GetData() []byte { + if x != nil { + return x.Data + } + return nil +} + type Noop struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -695,7 +745,7 @@ type Noop struct { func (x *Noop) Reset() { *x = Noop{} if protoimpl.UnsafeEnabled { - mi := &file_command_proto_msgTypes[8] + mi := &file_command_proto_msgTypes[9] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -708,7 +758,7 @@ func (x *Noop) String() string { func (*Noop) ProtoMessage() {} func (x *Noop) ProtoReflect() protoreflect.Message { - mi := &file_command_proto_msgTypes[8] + mi := &file_command_proto_msgTypes[9] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -721,7 +771,7 @@ func (x *Noop) ProtoReflect() protoreflect.Message { // Deprecated: Use Noop.ProtoReflect.Descriptor instead. func (*Noop) Descriptor() ([]byte, []int) { - return file_command_proto_rawDescGZIP(), []int{8} + return file_command_proto_rawDescGZIP(), []int{9} } func (x *Noop) GetId() string { @@ -744,7 +794,7 @@ type Command struct { func (x *Command) Reset() { *x = Command{} if protoimpl.UnsafeEnabled { - mi := &file_command_proto_msgTypes[9] + mi := &file_command_proto_msgTypes[10] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -757,7 +807,7 @@ func (x *Command) String() string { func (*Command) ProtoMessage() {} func (x *Command) ProtoReflect() protoreflect.Message { - mi := &file_command_proto_msgTypes[9] + mi := &file_command_proto_msgTypes[10] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -770,7 +820,7 @@ func (x *Command) ProtoReflect() protoreflect.Message { // Deprecated: Use Command.ProtoReflect.Descriptor instead. func (*Command) Descriptor() ([]byte, []int) { - return file_command_proto_rawDescGZIP(), []int{9} + return file_command_proto_rawDescGZIP(), []int{10} } func (x *Command) GetType() Command_Type { @@ -861,25 +911,29 @@ var file_command_proto_rawDesc = []byte{ 0x66, 0x66, 0x65, 0x63, 0x74, 0x65, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x12, 0x0a, 0x04, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x01, 0x52, 0x04, 0x74, 0x69, 0x6d, - 0x65, 0x22, 0x16, 0x0a, 0x04, 0x4e, 0x6f, 0x6f, 0x70, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, - 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x22, 0xe0, 0x01, 0x0a, 0x07, 0x43, 0x6f, - 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x12, 0x29, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x0e, 0x32, 0x15, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e, 0x43, 0x6f, - 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, - 0x12, 0x1f, 0x0a, 0x0b, 0x73, 0x75, 0x62, 0x5f, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x18, - 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0a, 0x73, 0x75, 0x62, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, - 0x64, 0x12, 0x1e, 0x0a, 0x0a, 0x63, 0x6f, 0x6d, 0x70, 0x72, 0x65, 0x73, 0x73, 0x65, 0x64, 0x18, - 0x03, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0a, 0x63, 0x6f, 0x6d, 0x70, 0x72, 0x65, 0x73, 0x73, 0x65, - 0x64, 0x22, 0x69, 0x0a, 0x04, 0x54, 0x79, 0x70, 0x65, 0x12, 0x18, 0x0a, 0x14, 0x43, 0x4f, 0x4d, - 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, - 0x4e, 0x10, 0x00, 0x12, 0x16, 0x0a, 0x12, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, - 0x59, 0x50, 0x45, 0x5f, 0x51, 0x55, 0x45, 0x52, 0x59, 0x10, 0x01, 0x12, 0x18, 0x0a, 0x14, 0x43, - 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x45, 0x58, 0x45, 0x43, - 0x55, 0x54, 0x45, 0x10, 0x02, 0x12, 0x15, 0x0a, 0x11, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, - 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x4e, 0x4f, 0x4f, 0x50, 0x10, 0x03, 0x42, 0x22, 0x5a, 0x20, - 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x72, 0x71, 0x6c, 0x69, 0x74, - 0x65, 0x2f, 0x72, 0x71, 0x6c, 0x69, 0x74, 0x65, 0x2f, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, - 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x65, 0x22, 0x21, 0x0a, 0x0b, 0x4c, 0x6f, 0x61, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, + 0x64, 0x61, 0x74, 0x61, 0x22, 0x16, 0x0a, 0x04, 0x4e, 0x6f, 0x6f, 0x70, 0x12, 0x0e, 0x0a, 0x02, + 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x22, 0xf8, 0x01, 0x0a, + 0x07, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x12, 0x29, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x15, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, + 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, + 0x79, 0x70, 0x65, 0x12, 0x1f, 0x0a, 0x0b, 0x73, 0x75, 0x62, 0x5f, 0x63, 0x6f, 0x6d, 0x6d, 0x61, + 0x6e, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0a, 0x73, 0x75, 0x62, 0x43, 0x6f, 0x6d, + 0x6d, 0x61, 0x6e, 0x64, 0x12, 0x1e, 0x0a, 0x0a, 0x63, 0x6f, 0x6d, 0x70, 0x72, 0x65, 0x73, 0x73, + 0x65, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0a, 0x63, 0x6f, 0x6d, 0x70, 0x72, 0x65, + 0x73, 0x73, 0x65, 0x64, 0x22, 0x80, 0x01, 0x0a, 0x04, 0x54, 0x79, 0x70, 0x65, 0x12, 0x18, 0x0a, + 0x14, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x55, 0x4e, + 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x16, 0x0a, 0x12, 0x43, 0x4f, 0x4d, 0x4d, 0x41, + 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x51, 0x55, 0x45, 0x52, 0x59, 0x10, 0x01, 0x12, + 0x18, 0x0a, 0x14, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, + 0x45, 0x58, 0x45, 0x43, 0x55, 0x54, 0x45, 0x10, 0x02, 0x12, 0x15, 0x0a, 0x11, 0x43, 0x4f, 0x4d, + 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x4e, 0x4f, 0x4f, 0x50, 0x10, 0x03, + 0x12, 0x15, 0x0a, 0x11, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, + 0x5f, 0x4c, 0x4f, 0x41, 0x44, 0x10, 0x04, 0x42, 0x22, 0x5a, 0x20, 0x67, 0x69, 0x74, 0x68, 0x75, + 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x72, 0x71, 0x6c, 0x69, 0x74, 0x65, 0x2f, 0x72, 0x71, 0x6c, + 0x69, 0x74, 0x65, 0x2f, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x62, 0x06, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x33, } var ( @@ -895,7 +949,7 @@ func file_command_proto_rawDescGZIP() []byte { } var file_command_proto_enumTypes = make([]protoimpl.EnumInfo, 2) -var file_command_proto_msgTypes = make([]protoimpl.MessageInfo, 10) +var file_command_proto_msgTypes = make([]protoimpl.MessageInfo, 11) var file_command_proto_goTypes = []interface{}{ (QueryRequest_Level)(0), // 0: command.QueryRequest.Level (Command_Type)(0), // 1: command.Command.Type @@ -907,8 +961,9 @@ var file_command_proto_goTypes = []interface{}{ (*QueryRows)(nil), // 7: command.QueryRows (*ExecuteRequest)(nil), // 8: command.ExecuteRequest (*ExecuteResult)(nil), // 9: command.ExecuteResult - (*Noop)(nil), // 10: command.Noop - (*Command)(nil), // 11: command.Command + (*LoadRequest)(nil), // 10: command.LoadRequest + (*Noop)(nil), // 11: command.Noop + (*Command)(nil), // 12: command.Command } var file_command_proto_depIdxs = []int32{ 2, // 0: command.Statement.parameters:type_name -> command.Parameter @@ -1029,7 +1084,7 @@ func file_command_proto_init() { } } file_command_proto_msgTypes[8].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*Noop); i { + switch v := v.(*LoadRequest); i { case 0: return &v.state case 1: @@ -1041,6 +1096,18 @@ func file_command_proto_init() { } } file_command_proto_msgTypes[9].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Noop); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_command_proto_msgTypes[10].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*Command); i { case 0: return &v.state @@ -1066,7 +1133,7 @@ func file_command_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_command_proto_rawDesc, NumEnums: 2, - NumMessages: 10, + NumMessages: 11, NumExtensions: 0, NumServices: 0, }, diff --git a/command/command.proto b/command/command.proto index fbce3890..82e2839e 100644 --- a/command/command.proto +++ b/command/command.proto @@ -60,6 +60,10 @@ message ExecuteResult { double time = 4; } +message LoadRequest { + bytes data = 1; +} + message Noop { string id = 1; } @@ -70,6 +74,7 @@ message Command { COMMAND_TYPE_QUERY = 1; COMMAND_TYPE_EXECUTE = 2; COMMAND_TYPE_NOOP = 3; + COMMAND_TYPE_LOAD = 4; } Type type = 1; bytes sub_command = 2; diff --git a/command/marshal.go b/command/marshal.go index 8b308e9c..ee61028b 100644 --- a/command/marshal.go +++ b/command/marshal.go @@ -151,6 +151,17 @@ func UnmarshalNoop(b []byte, c *Noop) error { return proto.Unmarshal(b, c) } +// MarshalLoadRequest marshals a LoadRequest command +func MarshalLoadRequest(lr *LoadRequest) ([]byte, error) { + // XXX Compress the SQLIte data! + return proto.Marshal(lr) +} + +// UnmarshalLoadRequest unmarshals a LoadRequest command +func UnmarshalLoadRequest(b []byte, lr *LoadRequest) error { + return proto.Unmarshal(b, lr) +} + // UnmarshalSubCommand unmarshalls a sub command m. It assumes that // m is the correct type. func UnmarshalSubCommand(c *Command, m proto.Message) error { diff --git a/http/service.go b/http/service.go index c5562e39..b948eb58 100644 --- a/http/service.go +++ b/http/service.go @@ -50,6 +50,9 @@ type Database interface { // is true, then all queries will take place while a read transaction // is held on the database. Query(qr *command.QueryRequest) ([]*command.QueryRows, error) + + // Load loads a SQLite file into the system + Load(lr *command.LoadRequest) error } // Store is the interface the Raft-based database must implement. @@ -590,30 +593,48 @@ func (s *Service) handleLoad(w http.ResponseWriter, r *http.Request) { } r.Body.Close() - // No JSON structure expected for this API. - queries := []string{string(b)} - er := executeRequestFromStrings(queries, timings, false) - - results, err := s.store.Execute(er) + fmt, err := fmtParam(r) if err != nil { - if err == store.ErrNotLeader { - leaderAPIAddr := s.LeaderAPIAddr() - if leaderAPIAddr == "" { - stats.Add(numLeaderNotFound, 1) - http.Error(w, ErrLeaderNotFound.Error(), http.StatusServiceUnavailable) - return - } + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } - redirect := s.FormRedirect(r, leaderAPIAddr) - http.Redirect(w, r, redirect, http.StatusMovedPermanently) + if strings.ToLower(fmt) == "binary" { + lr := &command.LoadRequest{ + Data: b, + } + err := s.store.Load(lr) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) return } - resp.Error = err.Error() + s.writeResponse(w, r, resp) } else { - resp.Results.ExecuteResult = results + // No JSON structure expected for this API. + queries := []string{string(b)} + er := executeRequestFromStrings(queries, timings, false) + + results, err := s.store.Execute(er) + if err != nil { + if err == store.ErrNotLeader { + leaderAPIAddr := s.LeaderAPIAddr() + if leaderAPIAddr == "" { + stats.Add(numLeaderNotFound, 1) + http.Error(w, ErrLeaderNotFound.Error(), http.StatusServiceUnavailable) + return + } + + redirect := s.FormRedirect(r, leaderAPIAddr) + http.Redirect(w, r, redirect, http.StatusMovedPermanently) + return + } + resp.Error = err.Error() + } else { + resp.Results.ExecuteResult = results + } + resp.end = time.Now() + s.writeResponse(w, r, resp) } - resp.end = time.Now() - s.writeResponse(w, r, resp) } // handleStatus returns status on the system. diff --git a/store/store.go b/store/store.go index 1ac610f6..79f44cb3 100644 --- a/store/store.go +++ b/store/store.go @@ -70,6 +70,7 @@ const ( const ( numSnaphots = "num_snapshots" numBackups = "num_backups" + numLoads = "num_loads" numRestores = "num_restores" numRecoveries = "num_recoveries" numUncompressedCommands = "num_uncompressed_commands" @@ -843,6 +844,40 @@ func (s *Store) Backup(leader bool, fmt BackupFormat, dst io.Writer) error { return nil } +func (s *Store) Load(lr *command.LoadRequest) error { + startT := time.Now() + + b, err := command.MarshalLoadRequest(lr) + if err != nil { + return err + } + + c := &command.Command{ + Type: command.Command_COMMAND_TYPE_LOAD, + SubCommand: b, + } + + b, err = command.Marshal(c) + if err != nil { + return err + } + + af := s.raft.Apply(b, s.ApplyTimeout).(raft.ApplyFuture) + if af.Error() != nil { + if af.Error() == raft.ErrNotLeader { + return ErrNotLeader + } + return af.Error() + } + + s.dbAppliedIndexMu.Lock() + s.dbAppliedIndex = af.Index() + s.dbAppliedIndexMu.Unlock() + stats.Add(numLoads, 1) + s.logger.Printf("node loaded in %s", time.Since(startT)) + return af.Error() +} + // Notify notifies this Store that a node is ready for bootstrapping at the // given address. Once the number of known nodes reaches the expected level // bootstrapping will be attempted using this Store. "Expected level" includes @@ -1112,7 +1147,7 @@ func (s *Store) Apply(l *raft.Log) (e interface{}) { s.firstLogAppliedT = time.Now() } - typ, r := applyCommand(l.Data, s.db) + typ, r := applyCommand(l.Data, &s.db) if typ == command.Command_COMMAND_TYPE_NOOP { s.numNoops++ } @@ -1492,7 +1527,7 @@ func RecoverNode(dataDir string, logger *log.Logger, logs raft.LogStore, stable return fmt.Errorf("failed to get log at index %d: %v", index, err) } if entry.Type == raft.LogCommand { - applyCommand(entry.Data, db) + applyCommand(entry.Data, &db) } lastIndex = entry.Index lastTerm = entry.Term @@ -1583,8 +1618,9 @@ func dbBytesFromSnapshot(rc io.ReadCloser) ([]byte, error) { return database, nil } -func applyCommand(data []byte, db *sql.DB) (command.Command_Type, interface{}) { +func applyCommand(data []byte, pDB **sql.DB) (command.Command_Type, interface{}) { var c command.Command + db := *pDB if err := command.Unmarshal(data, &c); err != nil { panic(fmt.Sprintf("failed to unmarshal cluster command: %s", err.Error())) @@ -1605,6 +1641,32 @@ func applyCommand(data []byte, db *sql.DB) (command.Command_Type, interface{}) { } r, err := db.Execute(er.Request, er.Timings) return c.Type, &fsmExecuteResponse{results: r, error: err} + case command.Command_COMMAND_TYPE_LOAD: + var lr command.LoadRequest + if err := command.UnmarshalLoadRequest(c.SubCommand, &lr); err != nil { + panic(fmt.Sprintf("failed to unmarshal load subcommand: %s", err.Error())) + } + + var newDB *sql.DB + var err error + if db.InMemory() { + newDB, err = createInMemory(lr.Data, db.FKEnabled()) + if err != nil { + return c.Type, &fsmGenericResponse{error: fmt.Errorf("failed to create in-memory database: %s", err)} + } + } else { + newDB, err = createOnDisk(lr.Data, db.Path(), db.FKEnabled()) + if err != nil { + return c.Type, &fsmGenericResponse{error: fmt.Errorf("failed to create on-disk database: %s", err)} + } + } + + // Swap the underlying database to the new one. + if err := db.Close(); err != nil { + return c.Type, &fsmGenericResponse{error: fmt.Errorf("failed to close post-load database: %s", err)} + } + *pDB = newDB + return c.Type, &fsmGenericResponse{} case command.Command_COMMAND_TYPE_NOOP: return c.Type, &fsmGenericResponse{} default: From b03ed99edaafac27a1e056da9bea5c0646c08126 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Mon, 9 May 2022 18:00:55 -0400 Subject: [PATCH 05/13] Update comment --- store/store.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/store/store.go b/store/store.go index 79f44cb3..163e957f 100644 --- a/store/store.go +++ b/store/store.go @@ -1716,8 +1716,8 @@ func createInMemory(b []byte, fkConstraints bool) (db *sql.DB, err error) { return } -// createOnDisk opens an on-disk database file at the Store's configured path. If -// b is non-nil, any preexisting file will first be overwritten with those contents. +// createOnDisk opens an on-disk database file at the configured path. If b is +// non-nil, any preexisting file will first be overwritten with those contents. // Otherwise, any preexisting file will be removed before the database is opened. func createOnDisk(b []byte, path string, fkConstraints bool) (*sql.DB, error) { if err := os.Remove(path); err != nil && !os.IsNotExist(err) { From 89b132ac26e63dac186bac9aed34a752f411cbed Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Mon, 9 May 2022 21:16:44 -0400 Subject: [PATCH 06/13] Fix Store-level unit tests --- http/service_test.go | 4 ++ store/store.go | 2 + store/store_test.go | 133 +++++++++++++++++++++++++++---------- store/testdata/load.sqlite | Bin 0 -> 8192 bytes 4 files changed, 103 insertions(+), 36 deletions(-) create mode 100644 store/testdata/load.sqlite diff --git a/http/service_test.go b/http/service_test.go index 32d5dcdf..9cf88896 100644 --- a/http/service_test.go +++ b/http/service_test.go @@ -1068,6 +1068,10 @@ func (m *MockStore) Query(qr *command.QueryRequest) ([]*command.QueryRows, error return nil, nil } +func (m *MockStore) Load(lr *command.LoadRequest) error { + return nil +} + func (m *MockStore) Join(id, addr string, voter bool) error { return nil } diff --git a/store/store.go b/store/store.go index 163e957f..e283ece3 100644 --- a/store/store.go +++ b/store/store.go @@ -844,6 +844,8 @@ func (s *Store) Backup(leader bool, fmt BackupFormat, dst io.Writer) error { return nil } +// Loads an entire SQLite file into the database, sending the request +// through the Raft log. func (s *Store) Load(lr *command.LoadRequest) error { startT := time.Now() diff --git a/store/store_test.go b/store/store_test.go index 8f774e86..66d23fc4 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -441,7 +441,6 @@ func Test_SingleNodeSQLitePath(t *testing.T) { if !pathExists(path) { t.Fatalf("SQLite file does not exist at %s", path) } - } func Test_SingleNodeBackupBinary(t *testing.T) { @@ -545,7 +544,7 @@ COMMIT; } } -func Test_SingleNodeLoad(t *testing.T) { +func Test_SingleNodeSingleCommandTrigger(t *testing.T) { s, ln := mustNewStore(true) defer os.RemoveAll(s.Path()) defer ln.Close() @@ -563,31 +562,35 @@ func Test_SingleNodeLoad(t *testing.T) { dump := `PRAGMA foreign_keys=OFF; BEGIN TRANSACTION; -CREATE TABLE foo (id integer not null primary key, name text); -INSERT INTO "foo" VALUES(1,'fiona'); +CREATE TABLE foo (id integer primary key asc, name text); +INSERT INTO "foo" VALUES(1,'bob'); +INSERT INTO "foo" VALUES(2,'alice'); +INSERT INTO "foo" VALUES(3,'eve'); +CREATE TABLE bar (nameid integer, age integer); +INSERT INTO "bar" VALUES(1,44); +INSERT INTO "bar" VALUES(2,46); +INSERT INTO "bar" VALUES(3,8); +CREATE VIEW foobar as select name as Person, Age as age from foo inner join bar on foo.id == bar.nameid; +CREATE TRIGGER new_foobar instead of insert on foobar begin insert into foo (name) values (new.Person); insert into bar (nameid, age) values ((select id from foo where name == new.Person), new.Age); end; COMMIT; ` _, err := s.Execute(executeRequestFromString(dump, false, false)) if err != nil { - t.Fatalf("failed to load simple dump: %s", err.Error()) + t.Fatalf("failed to load dump with trigger: %s", err.Error()) } - // Check that data were loaded correctly. - qr := queryRequestFromString("SELECT * FROM foo", false, true) - qr.Level = command.QueryRequest_QUERY_REQUEST_LEVEL_STRONG - r, err := s.Query(qr) + // Check that the VIEW and TRIGGER are OK by using both. + er := executeRequestFromString("INSERT INTO foobar VALUES('jason', 16)", false, true) + r, err := s.Execute(er) if err != nil { - t.Fatalf("failed to query single node: %s", err.Error()) - } - if exp, got := `["id","name"]`, asJSON(r[0].Columns); exp != got { - t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got) + t.Fatalf("failed to insert into view on single node: %s", err.Error()) } - if exp, got := `[[1,"fiona"]]`, asJSON(r[0].Values); exp != got { - t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got) + if exp, got := int64(3), r[0].GetLastInsertId(); exp != got { + t.Fatalf("unexpected results for query\nexp: %d\ngot: %d", exp, got) } } -func Test_SingleNodeSingleCommandTrigger(t *testing.T) { +func Test_SingleNodeLoadText(t *testing.T) { s, ln := mustNewStore(true) defer os.RemoveAll(s.Path()) defer ln.Close() @@ -605,35 +608,31 @@ func Test_SingleNodeSingleCommandTrigger(t *testing.T) { dump := `PRAGMA foreign_keys=OFF; BEGIN TRANSACTION; -CREATE TABLE foo (id integer primary key asc, name text); -INSERT INTO "foo" VALUES(1,'bob'); -INSERT INTO "foo" VALUES(2,'alice'); -INSERT INTO "foo" VALUES(3,'eve'); -CREATE TABLE bar (nameid integer, age integer); -INSERT INTO "bar" VALUES(1,44); -INSERT INTO "bar" VALUES(2,46); -INSERT INTO "bar" VALUES(3,8); -CREATE VIEW foobar as select name as Person, Age as age from foo inner join bar on foo.id == bar.nameid; -CREATE TRIGGER new_foobar instead of insert on foobar begin insert into foo (name) values (new.Person); insert into bar (nameid, age) values ((select id from foo where name == new.Person), new.Age); end; +CREATE TABLE foo (id integer not null primary key, name text); +INSERT INTO "foo" VALUES(1,'fiona'); COMMIT; ` _, err := s.Execute(executeRequestFromString(dump, false, false)) if err != nil { - t.Fatalf("failed to load dump with trigger: %s", err.Error()) + t.Fatalf("failed to load simple dump: %s", err.Error()) } - // Check that the VIEW and TRIGGER are OK by using both. - er := executeRequestFromString("INSERT INTO foobar VALUES('jason', 16)", false, true) - r, err := s.Execute(er) + // Check that data were loaded correctly. + qr := queryRequestFromString("SELECT * FROM foo", false, true) + qr.Level = command.QueryRequest_QUERY_REQUEST_LEVEL_STRONG + r, err := s.Query(qr) if err != nil { - t.Fatalf("failed to insert into view on single node: %s", err.Error()) + t.Fatalf("failed to query single node: %s", err.Error()) } - if exp, got := int64(3), r[0].GetLastInsertId(); exp != got { - t.Fatalf("unexpected results for query\nexp: %d\ngot: %d", exp, got) + if exp, got := `["id","name"]`, asJSON(r[0].Columns); exp != got { + t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got) + } + if exp, got := `[[1,"fiona"]]`, asJSON(r[0].Values); exp != got { + t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got) } } -func Test_SingleNodeLoadNoStatements(t *testing.T) { +func Test_SingleNodeLoadTextNoStatements(t *testing.T) { s, ln := mustNewStore(true) defer os.RemoveAll(s.Path()) defer ln.Close() @@ -659,7 +658,7 @@ COMMIT; } } -func Test_SingleNodeLoadEmpty(t *testing.T) { +func Test_SingleNodeLoadTextEmpty(t *testing.T) { s, ln := mustNewStore(true) defer os.RemoveAll(s.Path()) defer ln.Close() @@ -682,7 +681,7 @@ func Test_SingleNodeLoadEmpty(t *testing.T) { } } -func Test_SingleNodeLoadChinook(t *testing.T) { +func Test_SingleNodeLoadTextChinook(t *testing.T) { s, ln := mustNewStore(true) defer os.RemoveAll(s.Path()) defer ln.Close() @@ -745,6 +744,54 @@ func Test_SingleNodeLoadChinook(t *testing.T) { } } +func Test_SingleNodeLoadBinary(t *testing.T) { + s, ln := mustNewStore(true) + defer os.RemoveAll(s.Path()) + defer ln.Close() + + if err := s.Open(); err != nil { + t.Fatalf("failed to open single-node store: %s", err.Error()) + } + if err := s.Bootstrap(NewServer(s.ID(), s.Addr(), true)); err != nil { + t.Fatalf("failed to bootstrap single-node store: %s", err.Error()) + } + defer s.Close(true) + if _, err := s.WaitForLeader(10 * time.Second); err != nil { + t.Fatalf("Error waiting for leader: %s", err) + } + + err := s.Load(loadRequestFromFile(filepath.Join("testdata", "load.sqlite"))) + if err != nil { + t.Fatalf("failed to load SQLite file: %s", err.Error()) + } + + // Check that data were loaded correctly. + qr := queryRequestFromString("SELECT * FROM foo WHERE id=2", false, true) + qr.Level = command.QueryRequest_QUERY_REQUEST_LEVEL_STRONG + r, err := s.Query(qr) + if err != nil { + t.Fatalf("failed to query single node: %s", err.Error()) + } + if exp, got := `["id","name"]`, asJSON(r[0].Columns); exp != got { + t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got) + } + if exp, got := `[[2,"fiona"]]`, asJSON(r[0].Values); exp != got { + t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got) + } + qr = queryRequestFromString("SELECT count(*) FROM foo", false, true) + qr.Level = command.QueryRequest_QUERY_REQUEST_LEVEL_STRONG + r, err = s.Query(qr) + if err != nil { + t.Fatalf("failed to query single node: %s", err.Error()) + } + if exp, got := `["count(*)"]`, asJSON(r[0].Columns); exp != got { + t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got) + } + if exp, got := `[[3]]`, asJSON(r[0].Values); exp != got { + t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got) + } +} + // Test_SingleNodeRecoverNoChange tests a node recovery that doesn't // actually change anything. func Test_SingleNodeRecoverNoChange(t *testing.T) { @@ -1943,6 +1990,14 @@ func mustWriteFile(path, contents string) { } } +func mustReadFile(path string) []byte { + b, err := ioutil.ReadFile(path) + if err != nil { + panic("failed to read file") + } + return b +} + func mustTempDir() string { var err error path, err := ioutil.TempDir("", "rqlilte-test-") @@ -2002,6 +2057,12 @@ func queryRequestFromStrings(s []string, timings, tx bool) *command.QueryRequest } } +func loadRequestFromFile(path string) *command.LoadRequest { + return &command.LoadRequest{ + Data: mustReadFile(path), + } +} + // waitForLeaderID waits until the Store's LeaderID is set, or the timeout // expires. Because setting Leader ID requires Raft to set the cluster // configuration, it's not entirely deterministic when it will be set. diff --git a/store/testdata/load.sqlite b/store/testdata/load.sqlite new file mode 100644 index 0000000000000000000000000000000000000000..db96de17e7301f2b59f948de1dc67a6b5906ced9 GIT binary patch literal 8192 zcmeI#zY4-I5C-r|E9fE>9J=19ATB{8w(^JVH6dCLQRU~QZ_ zs*;U$Y%Y1u=TaB4toWp=5gS=3cIxVe^==+UPm12r%O3~?AOHafKmY;|fB*y_009U< V00Mt0& Date: Mon, 9 May 2022 21:26:50 -0400 Subject: [PATCH 07/13] Unit test that pre-existing data is gone --- store/store_test.go | 43 ++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 40 insertions(+), 3 deletions(-) diff --git a/store/store_test.go b/store/store_test.go index 66d23fc4..c772e521 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -760,13 +760,20 @@ func Test_SingleNodeLoadBinary(t *testing.T) { t.Fatalf("Error waiting for leader: %s", err) } - err := s.Load(loadRequestFromFile(filepath.Join("testdata", "load.sqlite"))) + // Load a dataset, to check it's erased by the SQLite file load. + dump := `PRAGMA foreign_keys=OFF; +BEGIN TRANSACTION; +CREATE TABLE bar (id integer not null primary key, name text); +INSERT INTO "bar" VALUES(1,'declan'); +COMMIT; +` + _, err := s.Execute(executeRequestFromString(dump, false, false)) if err != nil { - t.Fatalf("failed to load SQLite file: %s", err.Error()) + t.Fatalf("failed to load simple dump: %s", err.Error()) } // Check that data were loaded correctly. - qr := queryRequestFromString("SELECT * FROM foo WHERE id=2", false, true) + qr := queryRequestFromString("SELECT * FROM bar", false, true) qr.Level = command.QueryRequest_QUERY_REQUEST_LEVEL_STRONG r, err := s.Query(qr) if err != nil { @@ -775,6 +782,25 @@ func Test_SingleNodeLoadBinary(t *testing.T) { if exp, got := `["id","name"]`, asJSON(r[0].Columns); exp != got { t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got) } + if exp, got := `[[1,"declan"]]`, asJSON(r[0].Values); exp != got { + t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got) + } + + err = s.Load(loadRequestFromFile(filepath.Join("testdata", "load.sqlite"))) + if err != nil { + t.Fatalf("failed to load SQLite file: %s", err.Error()) + } + + // Check that data were loaded correctly. + qr = queryRequestFromString("SELECT * FROM foo WHERE id=2", false, true) + qr.Level = command.QueryRequest_QUERY_REQUEST_LEVEL_STRONG + r, err = s.Query(qr) + if err != nil { + t.Fatalf("failed to query single node: %s", err.Error()) + } + if exp, got := `["id","name"]`, asJSON(r[0].Columns); exp != got { + t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got) + } if exp, got := `[[2,"fiona"]]`, asJSON(r[0].Values); exp != got { t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got) } @@ -790,6 +816,17 @@ func Test_SingleNodeLoadBinary(t *testing.T) { if exp, got := `[[3]]`, asJSON(r[0].Values); exp != got { t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got) } + + // Check pre-existing data is gone. + qr = queryRequestFromString("SELECT * FROM bar", false, true) + qr.Level = command.QueryRequest_QUERY_REQUEST_LEVEL_STRONG + r, err = s.Query(qr) + if err != nil { + t.Fatalf("failed to query single node: %s", err.Error()) + } + if exp, got := `{"error":"no such table: bar"}`, asJSON(r[0]); exp != got { + t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got) + } } // Test_SingleNodeRecoverNoChange tests a node recovery that doesn't From c8b9fd72daca8837003b94043f94d4441a58b1a5 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Mon, 9 May 2022 22:07:36 -0400 Subject: [PATCH 08/13] Sanity check supplied SQLite data --- db/db.go | 2 +- http/service.go | 15 +++++++++++++-- system_test/full_system_test.py | 7 +++++-- 3 files changed, 19 insertions(+), 5 deletions(-) diff --git a/db/db.go b/db/db.go index d44419df..8991b7ce 100644 --- a/db/db.go +++ b/db/db.go @@ -219,7 +219,7 @@ func DeserializeIntoMemory(b []byte, fkEnabled bool) (retDB *DB, retErr error) { } defer func() { // Don't leak a database if deserialization fails. - if retErr != nil { + if retDB != nil && retErr != nil { retDB.Close() } }() diff --git a/http/service.go b/http/service.go index b948eb58..f01df823 100644 --- a/http/service.go +++ b/http/service.go @@ -593,13 +593,18 @@ func (s *Service) handleLoad(w http.ResponseWriter, r *http.Request) { } r.Body.Close() - fmt, err := fmtParam(r) + format, err := fmtParam(r) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } - if strings.ToLower(fmt) == "binary" { + if strings.ToLower(format) == "binary" { + if !validateSQLiteFile(b) { + http.Error(w, "invalid SQLite database file", http.StatusBadRequest) + return + } + lr := &command.LoadRequest{ Data: b, } @@ -1542,3 +1547,9 @@ func queryRequestFromStrings(s []string, timings, tx bool) *command.QueryRequest Timings: timings, } } + +// validateSQLiteFile checks that the supplied data looks like a SQLite database +// file. See https://www.sqlite.org/fileformat.html +func validateSQLiteFile(b []byte) bool { + return string(b[0:13]) == "SQLite format" +} diff --git a/system_test/full_system_test.py b/system_test/full_system_test.py index efafb639..38fdcad9 100644 --- a/system_test/full_system_test.py +++ b/system_test/full_system_test.py @@ -510,8 +510,11 @@ class Node(object): return 'http://' + self.APIAddr() + '/db/execute' + rd def _backup_url(self): return 'http://' + self.APIAddr() + '/db/backup' - def _load_url(self): - return 'http://' + self.APIAddr() + '/db/load' + def _load_url(self, fmt=None): + f = "" + if fmt is not None: + f = '?fmt=%s' % (fmt) + return 'http://' + self.APIAddr() + '/db/load' + f def __eq__(self, other): return self.node_id == other.node_id def __str__(self): From a068c39d19e2b24a415e0816111e770f9a3383cf Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Mon, 9 May 2022 22:26:24 -0400 Subject: [PATCH 09/13] End-to-end test of restore using a SQLite file --- DOC/RESTORE_FROM_SQLITE.md | 26 -------------------------- system_test/full_system_test.py | 24 +++++++++++++++++++----- 2 files changed, 19 insertions(+), 31 deletions(-) diff --git a/DOC/RESTORE_FROM_SQLITE.md b/DOC/RESTORE_FROM_SQLITE.md index dba031a6..6fb39686 100644 --- a/DOC/RESTORE_FROM_SQLITE.md +++ b/DOC/RESTORE_FROM_SQLITE.md @@ -31,32 +31,6 @@ database restored successfully | 1 | fiona | +----+-------+ ``` -### HTTP - _Be sure to set the Content-type header as shown._ - -```bash -~ $ sqlite3 restore.sqlite -SQLite version 3.14.1 2016-08-11 18:53:32 -Enter ".help" for usage hints. -sqlite> CREATE TABLE foo (id integer not null primary key, name text); -sqlite> INSERT INTO "foo" VALUES(1,'fiona'); -sqlite> -~ $ echo '.dump' | sqlite3 restore.sqlite > restore.dump # Convert SQLite database file to set of SQL commands. -~ $ curl -XPOST localhost:4001/db/load -H "Content-type: text/plain" --data-binary @restore.dump -``` - -Let's connect to the node, and check that the data has been loaded correctly. -```bash -$ rqlite -127.0.0.1:4001> SELECT * FROM foo -+----+-------+ -| id | name | -+----+-------+ -| 1 | fiona | -+----+-------+ -``` - -**Note that you must convert the SQLite file (in the above examples the file named `restore.sqlite`) to the list of SQL commands**. You cannot restore using the actual SQLite database file. ## Caveats The behavior of the restore operation when data already exists on the cluster is undefined -- you should only restore to a cluster that has no data, or a brand-new cluster. Also, please **note that SQLite dump files normally contain a command to disable Foreign Key constraints**. If you are running with Foreign Key Constraints enabled, and wish to re-enable this, this is the one time you should explicitly re-enable those constraints via the following `curl` command: diff --git a/system_test/full_system_test.py b/system_test/full_system_test.py index 38fdcad9..92ec34ed 100644 --- a/system_test/full_system_test.py +++ b/system_test/full_system_test.py @@ -468,12 +468,18 @@ class Node(object): raise_for_status(r) fd.write(r.content) - def restore(self, file): + def restore(self, file, fmt=None): # This is the one API that doesn't expect JSON. - conn = sqlite3.connect(file) - r = requests.post(self._load_url(), data='\n'.join(conn.iterdump())) - raise_for_status(r) - conn.close() + if fmt != "binary": + conn = sqlite3.connect(file) + r = requests.post(self._load_url(fmt), data='\n'.join(conn.iterdump())) + raise_for_status(r) + conn.close() + else: + with open(file, 'rb') as f: + data = f.read() + r = requests.post(self._load_url(fmt), data=data, headers={'Content-Type': 'application/octet-stream'}) + raise_for_status(r) return r.json() def redirect_addr(self): @@ -1287,6 +1293,14 @@ class TestEndToEndBackupRestore(unittest.TestCase): j = self.node1.query('SELECT * FROM foo') self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona']], 'types': ['integer', 'text'], 'columns': ['id', 'name']}]}")) + self.node2 = Node(RQLITED_PATH, '1') + self.node2.start() + self.node2.wait_for_leader() + j = self.node2.restore(self.db_file, fmt='binary') + self.assertEqual(j, d_("{'results': []}")) + j = self.node2.query('SELECT * FROM foo') + self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona']], 'types': ['integer', 'text'], 'columns': ['id', 'name']}]}")) + def tearDown(self): if hasattr(self, 'node0'): deprovision_node(self.node0) From 3473685c3ea51922d54d00cc2bf644626b47e848 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Mon, 9 May 2022 22:27:30 -0400 Subject: [PATCH 10/13] Update docs --- DOC/RESTORE_FROM_SQLITE.md | 34 +++++++++++++++++++++++++++++++++- 1 file changed, 33 insertions(+), 1 deletion(-) diff --git a/DOC/RESTORE_FROM_SQLITE.md b/DOC/RESTORE_FROM_SQLITE.md index 6fb39686..e95d3f30 100644 --- a/DOC/RESTORE_FROM_SQLITE.md +++ b/DOC/RESTORE_FROM_SQLITE.md @@ -1,11 +1,43 @@ # Restoring from a SQLite dump file -rqlite supports loading a node directly from a SQLite dump file. This is a fast and efficient manner to initialize a system from an existing SQLite database, or to restore from an existing [node backup](https://github.com/rqlite/rqlite/blob/master/DOC/BACKUPS.md). An example restore is shown below. +rqlite supports loading a node directly from two sources: +- An actual SQLite database file. This is the fastest way to initialize a rqlite node from an existing SQLite database. +- SQLite dump file. This is another convenient manner to initialize a system from an existing SQLite database, or to restore from an existing [node backup](https://github.com/rqlite/rqlite/blob/master/DOC/BACKUPS.md). But if your database is large, it can be slow. ## Examples The following examples show a trivial database being generated by `sqlite3`, the SQLite file being backed up, converted to the corresponding list of SQL commands, and then loaded into a rqlite node listening on localhost. +### HTTP + _Be sure to set the Content-type header as shown in each case._ + +```bash +~ $ sqlite3 restore.sqlite +SQLite version 3.14.1 2016-08-11 18:53:32 +Enter ".help" for usage hints. +sqlite> CREATE TABLE foo (id integer not null primary key, name text); +sqlite> INSERT INTO "foo" VALUES(1,'fiona'); +sqlite> +# Load directly from the SQLite file. +~ $ curl -v -XPOST localhost:4001/db/load?fmt=binary -H "Content-type: application/octet-stream" --data-binary @restore.sqlite + +# Convert SQLite database file to set of SQL commands and then load +~ $ echo '.dump' | sqlite3 restore.sqlite > restore.dump +~ $ curl -XPOST localhost:4001/db/load -H "Content-type: text/plain" --data-binary @restore.dump +``` + +After either command, we can connect to the node, and check that the data has been loaded correctly. +```bash +$ rqlite +127.0.0.1:4001> SELECT * FROM foo ++----+-------+ +| id | name | ++----+-------+ +| 1 | fiona | ++----+-------+ +``` + ### rqlite CLI +Note that the CLI currently only supports loading from a SQLite dump file. ``` ~ $ sqlite3 restore.sqlite SQLite version 3.22.0 2018-01-22 18:45:57 From 0aabfbff6a85014dc13a2d96719091d59f01dafe Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Mon, 9 May 2022 22:43:16 -0400 Subject: [PATCH 11/13] Use compression for LoadRequest marshalling --- command/marshal.go | 81 +++++++++++++++++++++++++++++++--------------- 1 file changed, 55 insertions(+), 26 deletions(-) diff --git a/command/marshal.go b/command/marshal.go index ee61028b..c29060d8 100644 --- a/command/marshal.go +++ b/command/marshal.go @@ -89,23 +89,15 @@ func (m *RequestMarshaler) Marshal(r Requester) ([]byte, bool, error) { if compress { // Let's try compression. - var buf bytes.Buffer - gzw, err := gzip.NewWriterLevel(&buf, gzip.BestCompression) + gzData, err := gzCompress(b) if err != nil { - return nil, false, fmt.Errorf("gzip new writer: %s", err) - } - - if _, err := gzw.Write(b); err != nil { - return nil, false, fmt.Errorf("gzip Write: %s", err) - } - if err := gzw.Close(); err != nil { - return nil, false, fmt.Errorf("gzip Close: %s", err) + return nil, false, err } // Is compression better? - if ubz > len(buf.Bytes()) || m.ForceCompression { + if ubz > len(gzData) || m.ForceCompression { // Yes! Let's keep it. - b = buf.Bytes() + b = gzData stats.Add(numCompressedRequests, 1) stats.Add(numCompressedBytes, int64(len(b))) } else { @@ -153,13 +145,25 @@ func UnmarshalNoop(b []byte, c *Noop) error { // MarshalLoadRequest marshals a LoadRequest command func MarshalLoadRequest(lr *LoadRequest) ([]byte, error) { - // XXX Compress the SQLIte data! + b, err := gzCompress(lr.Data) + if err != nil { + return nil, err + } + lr.Data = b return proto.Marshal(lr) } // UnmarshalLoadRequest unmarshals a LoadRequest command func UnmarshalLoadRequest(b []byte, lr *LoadRequest) error { - return proto.Unmarshal(b, lr) + if err := proto.Unmarshal(b, lr); err != nil { + return err + } + u, err := gzUncompress(lr.Data) + if err != nil { + return err + } + lr.Data = u + return nil } // UnmarshalSubCommand unmarshalls a sub command m. It assumes that @@ -167,20 +171,11 @@ func UnmarshalLoadRequest(b []byte, lr *LoadRequest) error { func UnmarshalSubCommand(c *Command, m proto.Message) error { b := c.SubCommand if c.Compressed { - gz, err := gzip.NewReader(bytes.NewReader(b)) - if err != nil { - return fmt.Errorf("unmarshal sub gzip NewReader: %s", err) - } - - ub, err := ioutil.ReadAll(gz) + var err error + b, err = gzUncompress(b) if err != nil { - return fmt.Errorf("unmarshal sub gzip ReadAll: %s", err) + return fmt.Errorf("unmarshal sub uncompress: %s", err) } - - if err := gz.Close(); err != nil { - return fmt.Errorf("unmarshal sub gzip Close: %s", err) - } - b = ub } if err := proto.Unmarshal(b, m); err != nil { @@ -188,3 +183,37 @@ func UnmarshalSubCommand(c *Command, m proto.Message) error { } return nil } + +// gzCompress compresses the given byte slice. +func gzCompress(b []byte) ([]byte, error) { + var buf bytes.Buffer + gzw, err := gzip.NewWriterLevel(&buf, gzip.BestCompression) + if err != nil { + return nil, fmt.Errorf("gzip new writer: %s", err) + } + + if _, err := gzw.Write(b); err != nil { + return nil, fmt.Errorf("gzip Write: %s", err) + } + if err := gzw.Close(); err != nil { + return nil, fmt.Errorf("gzip Close: %s", err) + } + return buf.Bytes(), nil +} + +func gzUncompress(b []byte) ([]byte, error) { + gz, err := gzip.NewReader(bytes.NewReader(b)) + if err != nil { + return nil, fmt.Errorf("unmarshal gzip NewReader: %s", err) + } + + ub, err := ioutil.ReadAll(gz) + if err != nil { + return nil, fmt.Errorf("unmarshal gzip ReadAll: %s", err) + } + + if err := gz.Close(); err != nil { + return nil, fmt.Errorf("unmarshal gzip Close: %s", err) + } + return ub, nil +} From 32a2d8e7bf52498d2a0f192b9a94ae87919c2cde Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Mon, 9 May 2022 22:47:53 -0400 Subject: [PATCH 12/13] CHANGELOG and docs --- CHANGELOG.md | 5 ++++- DOC/RESTORE_FROM_SQLITE.md | 6 +++--- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index be9c2301..cd534201 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,7 @@ -## 7.3.3 (unreleased) +## 7.4.0 (unreleased) +## New features +- [PR #1017](https://github.com/rqlite/rqlite/pull/1017): Support restoring from SQLite data files. Fixes [issue #1005](https://github.com/rqlite/rqlite/issues/1016). +### Implementation changes and bug fixes - [PR #1015](https://github.com/rqlite/rqlite/pull/1015): go mod (dependencies) updates. ## 7.3.2 (March 1st 2022) diff --git a/DOC/RESTORE_FROM_SQLITE.md b/DOC/RESTORE_FROM_SQLITE.md index e95d3f30..1ab53330 100644 --- a/DOC/RESTORE_FROM_SQLITE.md +++ b/DOC/RESTORE_FROM_SQLITE.md @@ -1,11 +1,11 @@ # Restoring from a SQLite dump file -rqlite supports loading a node directly from two sources: +rqlite supports loading a node directly from two sources, either of which can be used to restore from an existing [node backup](https://github.com/rqlite/rqlite/blob/master/DOC/BACKUPS.md): - An actual SQLite database file. This is the fastest way to initialize a rqlite node from an existing SQLite database. -- SQLite dump file. This is another convenient manner to initialize a system from an existing SQLite database, or to restore from an existing [node backup](https://github.com/rqlite/rqlite/blob/master/DOC/BACKUPS.md). But if your database is large, it can be slow. +- SQLite dump file. This is another convenient manner to initialize a system from an existing SQLite database, or to restore from an . But if your database is large, it can be slow. ## Examples -The following examples show a trivial database being generated by `sqlite3`, the SQLite file being backed up, converted to the corresponding list of SQL commands, and then loaded into a rqlite node listening on localhost. +The following examples show a trivial database being generated by `sqlite3`, the SQLite file being backed up, converted to the corresponding list of SQL commands, and then loaded into a rqlite node listening on localhost using each form. ### HTTP _Be sure to set the Content-type header as shown in each case._ From ce4cc5282327cd879150a8ffaeb126afc189104b Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Mon, 9 May 2022 22:54:06 -0400 Subject: [PATCH 13/13] Simpler LoadRequest marshaling --- command/marshal.go | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/command/marshal.go b/command/marshal.go index c29060d8..1c978625 100644 --- a/command/marshal.go +++ b/command/marshal.go @@ -145,25 +145,20 @@ func UnmarshalNoop(b []byte, c *Noop) error { // MarshalLoadRequest marshals a LoadRequest command func MarshalLoadRequest(lr *LoadRequest) ([]byte, error) { - b, err := gzCompress(lr.Data) + b, err := proto.Marshal(lr) if err != nil { return nil, err } - lr.Data = b - return proto.Marshal(lr) + return gzCompress(b) } // UnmarshalLoadRequest unmarshals a LoadRequest command func UnmarshalLoadRequest(b []byte, lr *LoadRequest) error { - if err := proto.Unmarshal(b, lr); err != nil { - return err - } - u, err := gzUncompress(lr.Data) + u, err := gzUncompress(b) if err != nil { return err } - lr.Data = u - return nil + return proto.Unmarshal(u, lr) } // UnmarshalSubCommand unmarshalls a sub command m. It assumes that