1
0
Fork 0

Merge pull request #1017 from rqlite/handle-install

Support direct loading of SQLite files
master
Philip O'Toole 2 years ago committed by GitHub
commit 98adddaf0e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -1,4 +1,7 @@
## 7.3.3 (unreleased)
## 7.4.0 (unreleased)
## New features
- [PR #1017](https://github.com/rqlite/rqlite/pull/1017): Support restoring from SQLite data files. Fixes [issue #1005](https://github.com/rqlite/rqlite/issues/1016).
### Implementation changes and bug fixes
- [PR #1015](https://github.com/rqlite/rqlite/pull/1015): go mod (dependencies) updates.
## 7.3.2 (March 1st 2022)

@ -1,11 +1,43 @@
# Restoring from a SQLite dump file
rqlite supports loading a node directly from a SQLite dump file. This is a fast and efficient manner to initialize a system from an existing SQLite database, or to restore from an existing [node backup](https://github.com/rqlite/rqlite/blob/master/DOC/BACKUPS.md). An example restore is shown below.
rqlite supports loading a node directly from two sources, either of which can be used to restore from an existing [node backup](https://github.com/rqlite/rqlite/blob/master/DOC/BACKUPS.md):
- An actual SQLite database file. This is the fastest way to initialize a rqlite node from an existing SQLite database.
- SQLite dump file. This is another convenient manner to initialize a system from an existing SQLite database, or to restore from an . But if your database is large, it can be slow.
## Examples
The following examples show a trivial database being generated by `sqlite3`, the SQLite file being backed up, converted to the corresponding list of SQL commands, and then loaded into a rqlite node listening on localhost.
The following examples show a trivial database being generated by `sqlite3`, the SQLite file being backed up, converted to the corresponding list of SQL commands, and then loaded into a rqlite node listening on localhost using each form.
### HTTP
_Be sure to set the Content-type header as shown in each case._
```bash
~ $ sqlite3 restore.sqlite
SQLite version 3.14.1 2016-08-11 18:53:32
Enter ".help" for usage hints.
sqlite> CREATE TABLE foo (id integer not null primary key, name text);
sqlite> INSERT INTO "foo" VALUES(1,'fiona');
sqlite>
# Load directly from the SQLite file.
~ $ curl -v -XPOST localhost:4001/db/load?fmt=binary -H "Content-type: application/octet-stream" --data-binary @restore.sqlite
# Convert SQLite database file to set of SQL commands and then load
~ $ echo '.dump' | sqlite3 restore.sqlite > restore.dump
~ $ curl -XPOST localhost:4001/db/load -H "Content-type: text/plain" --data-binary @restore.dump
```
After either command, we can connect to the node, and check that the data has been loaded correctly.
```bash
$ rqlite
127.0.0.1:4001> SELECT * FROM foo
+----+-------+
| id | name |
+----+-------+
| 1 | fiona |
+----+-------+
```
### rqlite CLI
Note that the CLI currently only supports loading from a SQLite dump file.
```
~ $ sqlite3 restore.sqlite
SQLite version 3.22.0 2018-01-22 18:45:57
@ -31,32 +63,6 @@ database restored successfully
| 1 | fiona |
+----+-------+
```
### HTTP
_Be sure to set the Content-type header as shown._
```bash
~ $ sqlite3 restore.sqlite
SQLite version 3.14.1 2016-08-11 18:53:32
Enter ".help" for usage hints.
sqlite> CREATE TABLE foo (id integer not null primary key, name text);
sqlite> INSERT INTO "foo" VALUES(1,'fiona');
sqlite>
~ $ echo '.dump' | sqlite3 restore.sqlite > restore.dump # Convert SQLite database file to set of SQL commands.
~ $ curl -XPOST localhost:4001/db/load -H "Content-type: text/plain" --data-binary @restore.dump
```
Let's connect to the node, and check that the data has been loaded correctly.
```bash
$ rqlite
127.0.0.1:4001> SELECT * FROM foo
+----+-------+
| id | name |
+----+-------+
| 1 | fiona |
+----+-------+
```
**Note that you must convert the SQLite file (in the above examples the file named `restore.sqlite`) to the list of SQL commands**. You cannot restore using the actual SQLite database file.
## Caveats
The behavior of the restore operation when data already exists on the cluster is undefined -- you should only restore to a cluster that has no data, or a brand-new cluster. Also, please **note that SQLite dump files normally contain a command to disable Foreign Key constraints**. If you are running with Foreign Key Constraints enabled, and wish to re-enable this, this is the one time you should explicitly re-enable those constraints via the following `curl` command:

@ -1,6 +1,6 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.27.1
// protoc-gen-go v1.28.0
// protoc v3.6.1
// source: command.proto
@ -76,6 +76,7 @@ const (
Command_COMMAND_TYPE_QUERY Command_Type = 1
Command_COMMAND_TYPE_EXECUTE Command_Type = 2
Command_COMMAND_TYPE_NOOP Command_Type = 3
Command_COMMAND_TYPE_LOAD Command_Type = 4
)
// Enum value maps for Command_Type.
@ -85,12 +86,14 @@ var (
1: "COMMAND_TYPE_QUERY",
2: "COMMAND_TYPE_EXECUTE",
3: "COMMAND_TYPE_NOOP",
4: "COMMAND_TYPE_LOAD",
}
Command_Type_value = map[string]int32{
"COMMAND_TYPE_UNKNOWN": 0,
"COMMAND_TYPE_QUERY": 1,
"COMMAND_TYPE_EXECUTE": 2,
"COMMAND_TYPE_NOOP": 3,
"COMMAND_TYPE_LOAD": 4,
}
)
@ -118,7 +121,7 @@ func (x Command_Type) Number() protoreflect.EnumNumber {
// Deprecated: Use Command_Type.Descriptor instead.
func (Command_Type) EnumDescriptor() ([]byte, []int) {
return file_command_proto_rawDescGZIP(), []int{9, 0}
return file_command_proto_rawDescGZIP(), []int{10, 0}
}
type Parameter struct {
@ -684,6 +687,53 @@ func (x *ExecuteResult) GetTime() float64 {
return 0
}
type LoadRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Data []byte `protobuf:"bytes,1,opt,name=data,proto3" json:"data,omitempty"`
}
func (x *LoadRequest) Reset() {
*x = LoadRequest{}
if protoimpl.UnsafeEnabled {
mi := &file_command_proto_msgTypes[8]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *LoadRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*LoadRequest) ProtoMessage() {}
func (x *LoadRequest) ProtoReflect() protoreflect.Message {
mi := &file_command_proto_msgTypes[8]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use LoadRequest.ProtoReflect.Descriptor instead.
func (*LoadRequest) Descriptor() ([]byte, []int) {
return file_command_proto_rawDescGZIP(), []int{8}
}
func (x *LoadRequest) GetData() []byte {
if x != nil {
return x.Data
}
return nil
}
type Noop struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
@ -695,7 +745,7 @@ type Noop struct {
func (x *Noop) Reset() {
*x = Noop{}
if protoimpl.UnsafeEnabled {
mi := &file_command_proto_msgTypes[8]
mi := &file_command_proto_msgTypes[9]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@ -708,7 +758,7 @@ func (x *Noop) String() string {
func (*Noop) ProtoMessage() {}
func (x *Noop) ProtoReflect() protoreflect.Message {
mi := &file_command_proto_msgTypes[8]
mi := &file_command_proto_msgTypes[9]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@ -721,7 +771,7 @@ func (x *Noop) ProtoReflect() protoreflect.Message {
// Deprecated: Use Noop.ProtoReflect.Descriptor instead.
func (*Noop) Descriptor() ([]byte, []int) {
return file_command_proto_rawDescGZIP(), []int{8}
return file_command_proto_rawDescGZIP(), []int{9}
}
func (x *Noop) GetId() string {
@ -744,7 +794,7 @@ type Command struct {
func (x *Command) Reset() {
*x = Command{}
if protoimpl.UnsafeEnabled {
mi := &file_command_proto_msgTypes[9]
mi := &file_command_proto_msgTypes[10]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@ -757,7 +807,7 @@ func (x *Command) String() string {
func (*Command) ProtoMessage() {}
func (x *Command) ProtoReflect() protoreflect.Message {
mi := &file_command_proto_msgTypes[9]
mi := &file_command_proto_msgTypes[10]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@ -770,7 +820,7 @@ func (x *Command) ProtoReflect() protoreflect.Message {
// Deprecated: Use Command.ProtoReflect.Descriptor instead.
func (*Command) Descriptor() ([]byte, []int) {
return file_command_proto_rawDescGZIP(), []int{9}
return file_command_proto_rawDescGZIP(), []int{10}
}
func (x *Command) GetType() Command_Type {
@ -861,25 +911,29 @@ var file_command_proto_rawDesc = []byte{
0x66, 0x66, 0x65, 0x63, 0x74, 0x65, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72,
0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x12, 0x0a,
0x04, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x01, 0x52, 0x04, 0x74, 0x69, 0x6d,
0x65, 0x22, 0x16, 0x0a, 0x04, 0x4e, 0x6f, 0x6f, 0x70, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18,
0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x22, 0xe0, 0x01, 0x0a, 0x07, 0x43, 0x6f,
0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x12, 0x29, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20,
0x01, 0x28, 0x0e, 0x32, 0x15, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e, 0x43, 0x6f,
0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65,
0x12, 0x1f, 0x0a, 0x0b, 0x73, 0x75, 0x62, 0x5f, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x18,
0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0a, 0x73, 0x75, 0x62, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e,
0x64, 0x12, 0x1e, 0x0a, 0x0a, 0x63, 0x6f, 0x6d, 0x70, 0x72, 0x65, 0x73, 0x73, 0x65, 0x64, 0x18,
0x03, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0a, 0x63, 0x6f, 0x6d, 0x70, 0x72, 0x65, 0x73, 0x73, 0x65,
0x64, 0x22, 0x69, 0x0a, 0x04, 0x54, 0x79, 0x70, 0x65, 0x12, 0x18, 0x0a, 0x14, 0x43, 0x4f, 0x4d,
0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57,
0x4e, 0x10, 0x00, 0x12, 0x16, 0x0a, 0x12, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54,
0x59, 0x50, 0x45, 0x5f, 0x51, 0x55, 0x45, 0x52, 0x59, 0x10, 0x01, 0x12, 0x18, 0x0a, 0x14, 0x43,
0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x45, 0x58, 0x45, 0x43,
0x55, 0x54, 0x45, 0x10, 0x02, 0x12, 0x15, 0x0a, 0x11, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44,
0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x4e, 0x4f, 0x4f, 0x50, 0x10, 0x03, 0x42, 0x22, 0x5a, 0x20,
0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x72, 0x71, 0x6c, 0x69, 0x74,
0x65, 0x2f, 0x72, 0x71, 0x6c, 0x69, 0x74, 0x65, 0x2f, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64,
0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
0x65, 0x22, 0x21, 0x0a, 0x0b, 0x4c, 0x6f, 0x61, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74,
0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04,
0x64, 0x61, 0x74, 0x61, 0x22, 0x16, 0x0a, 0x04, 0x4e, 0x6f, 0x6f, 0x70, 0x12, 0x0e, 0x0a, 0x02,
0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x22, 0xf8, 0x01, 0x0a,
0x07, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x12, 0x29, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65,
0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x15, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64,
0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74,
0x79, 0x70, 0x65, 0x12, 0x1f, 0x0a, 0x0b, 0x73, 0x75, 0x62, 0x5f, 0x63, 0x6f, 0x6d, 0x6d, 0x61,
0x6e, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0a, 0x73, 0x75, 0x62, 0x43, 0x6f, 0x6d,
0x6d, 0x61, 0x6e, 0x64, 0x12, 0x1e, 0x0a, 0x0a, 0x63, 0x6f, 0x6d, 0x70, 0x72, 0x65, 0x73, 0x73,
0x65, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0a, 0x63, 0x6f, 0x6d, 0x70, 0x72, 0x65,
0x73, 0x73, 0x65, 0x64, 0x22, 0x80, 0x01, 0x0a, 0x04, 0x54, 0x79, 0x70, 0x65, 0x12, 0x18, 0x0a,
0x14, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x55, 0x4e,
0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x16, 0x0a, 0x12, 0x43, 0x4f, 0x4d, 0x4d, 0x41,
0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x51, 0x55, 0x45, 0x52, 0x59, 0x10, 0x01, 0x12,
0x18, 0x0a, 0x14, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f,
0x45, 0x58, 0x45, 0x43, 0x55, 0x54, 0x45, 0x10, 0x02, 0x12, 0x15, 0x0a, 0x11, 0x43, 0x4f, 0x4d,
0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x4e, 0x4f, 0x4f, 0x50, 0x10, 0x03,
0x12, 0x15, 0x0a, 0x11, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45,
0x5f, 0x4c, 0x4f, 0x41, 0x44, 0x10, 0x04, 0x42, 0x22, 0x5a, 0x20, 0x67, 0x69, 0x74, 0x68, 0x75,
0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x72, 0x71, 0x6c, 0x69, 0x74, 0x65, 0x2f, 0x72, 0x71, 0x6c,
0x69, 0x74, 0x65, 0x2f, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x62, 0x06, 0x70, 0x72, 0x6f,
0x74, 0x6f, 0x33,
}
var (
@ -895,7 +949,7 @@ func file_command_proto_rawDescGZIP() []byte {
}
var file_command_proto_enumTypes = make([]protoimpl.EnumInfo, 2)
var file_command_proto_msgTypes = make([]protoimpl.MessageInfo, 10)
var file_command_proto_msgTypes = make([]protoimpl.MessageInfo, 11)
var file_command_proto_goTypes = []interface{}{
(QueryRequest_Level)(0), // 0: command.QueryRequest.Level
(Command_Type)(0), // 1: command.Command.Type
@ -907,8 +961,9 @@ var file_command_proto_goTypes = []interface{}{
(*QueryRows)(nil), // 7: command.QueryRows
(*ExecuteRequest)(nil), // 8: command.ExecuteRequest
(*ExecuteResult)(nil), // 9: command.ExecuteResult
(*Noop)(nil), // 10: command.Noop
(*Command)(nil), // 11: command.Command
(*LoadRequest)(nil), // 10: command.LoadRequest
(*Noop)(nil), // 11: command.Noop
(*Command)(nil), // 12: command.Command
}
var file_command_proto_depIdxs = []int32{
2, // 0: command.Statement.parameters:type_name -> command.Parameter
@ -1029,7 +1084,7 @@ func file_command_proto_init() {
}
}
file_command_proto_msgTypes[8].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*Noop); i {
switch v := v.(*LoadRequest); i {
case 0:
return &v.state
case 1:
@ -1041,6 +1096,18 @@ func file_command_proto_init() {
}
}
file_command_proto_msgTypes[9].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*Noop); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_command_proto_msgTypes[10].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*Command); i {
case 0:
return &v.state
@ -1066,7 +1133,7 @@ func file_command_proto_init() {
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_command_proto_rawDesc,
NumEnums: 2,
NumMessages: 10,
NumMessages: 11,
NumExtensions: 0,
NumServices: 0,
},

@ -60,6 +60,10 @@ message ExecuteResult {
double time = 4;
}
message LoadRequest {
bytes data = 1;
}
message Noop {
string id = 1;
}
@ -70,6 +74,7 @@ message Command {
COMMAND_TYPE_QUERY = 1;
COMMAND_TYPE_EXECUTE = 2;
COMMAND_TYPE_NOOP = 3;
COMMAND_TYPE_LOAD = 4;
}
Type type = 1;
bytes sub_command = 2;

@ -89,23 +89,15 @@ func (m *RequestMarshaler) Marshal(r Requester) ([]byte, bool, error) {
if compress {
// Let's try compression.
var buf bytes.Buffer
gzw, err := gzip.NewWriterLevel(&buf, gzip.BestCompression)
gzData, err := gzCompress(b)
if err != nil {
return nil, false, fmt.Errorf("gzip new writer: %s", err)
}
if _, err := gzw.Write(b); err != nil {
return nil, false, fmt.Errorf("gzip Write: %s", err)
}
if err := gzw.Close(); err != nil {
return nil, false, fmt.Errorf("gzip Close: %s", err)
return nil, false, err
}
// Is compression better?
if ubz > len(buf.Bytes()) || m.ForceCompression {
if ubz > len(gzData) || m.ForceCompression {
// Yes! Let's keep it.
b = buf.Bytes()
b = gzData
stats.Add(numCompressedRequests, 1)
stats.Add(numCompressedBytes, int64(len(b)))
} else {
@ -151,29 +143,72 @@ func UnmarshalNoop(b []byte, c *Noop) error {
return proto.Unmarshal(b, c)
}
// MarshalLoadRequest marshals a LoadRequest command
func MarshalLoadRequest(lr *LoadRequest) ([]byte, error) {
b, err := proto.Marshal(lr)
if err != nil {
return nil, err
}
return gzCompress(b)
}
// UnmarshalLoadRequest unmarshals a LoadRequest command
func UnmarshalLoadRequest(b []byte, lr *LoadRequest) error {
u, err := gzUncompress(b)
if err != nil {
return err
}
return proto.Unmarshal(u, lr)
}
// UnmarshalSubCommand unmarshalls a sub command m. It assumes that
// m is the correct type.
func UnmarshalSubCommand(c *Command, m proto.Message) error {
b := c.SubCommand
if c.Compressed {
gz, err := gzip.NewReader(bytes.NewReader(b))
var err error
b, err = gzUncompress(b)
if err != nil {
return fmt.Errorf("unmarshal sub gzip NewReader: %s", err)
return fmt.Errorf("unmarshal sub uncompress: %s", err)
}
}
ub, err := ioutil.ReadAll(gz)
if err := proto.Unmarshal(b, m); err != nil {
return fmt.Errorf("proto unmarshal: %s", err)
}
return nil
}
// gzCompress compresses the given byte slice.
func gzCompress(b []byte) ([]byte, error) {
var buf bytes.Buffer
gzw, err := gzip.NewWriterLevel(&buf, gzip.BestCompression)
if err != nil {
return fmt.Errorf("unmarshal sub gzip ReadAll: %s", err)
return nil, fmt.Errorf("gzip new writer: %s", err)
}
if err := gz.Close(); err != nil {
return fmt.Errorf("unmarshal sub gzip Close: %s", err)
if _, err := gzw.Write(b); err != nil {
return nil, fmt.Errorf("gzip Write: %s", err)
}
b = ub
if err := gzw.Close(); err != nil {
return nil, fmt.Errorf("gzip Close: %s", err)
}
return buf.Bytes(), nil
}
if err := proto.Unmarshal(b, m); err != nil {
return fmt.Errorf("proto unmarshal: %s", err)
func gzUncompress(b []byte) ([]byte, error) {
gz, err := gzip.NewReader(bytes.NewReader(b))
if err != nil {
return nil, fmt.Errorf("unmarshal gzip NewReader: %s", err)
}
return nil
ub, err := ioutil.ReadAll(gz)
if err != nil {
return nil, fmt.Errorf("unmarshal gzip ReadAll: %s", err)
}
if err := gz.Close(); err != nil {
return nil, fmt.Errorf("unmarshal gzip Close: %s", err)
}
return ub, nil
}

@ -54,6 +54,7 @@ func init() {
type DB struct {
path string // Path to database file, if running on-disk.
memory bool // In-memory only.
fkEnabled bool // Foreign key constraints enabled
rwDB *sql.DB // Database connection for database reads and writes.
roDB *sql.DB // Database connection database reads.
@ -108,6 +109,7 @@ func Open(dbPath string, fkEnabled bool) (*DB, error) {
return &DB{
path: dbPath,
fkEnabled: fkEnabled,
rwDB: rwDB,
roDB: roDB,
rwDSN: rwDSN,
@ -159,6 +161,8 @@ func OpenInMemory(fkEnabled bool) (*DB, error) {
return &DB{
memory: true,
path: ":memory:",
fkEnabled: fkEnabled,
rwDB: rwDB,
roDB: roDB,
rwDSN: rwDSN,
@ -215,7 +219,7 @@ func DeserializeIntoMemory(b []byte, fkEnabled bool) (retDB *DB, retErr error) {
}
defer func() {
// Don't leak a database if deserialization fails.
if retErr != nil {
if retDB != nil && retErr != nil {
retDB.Close()
}
}()
@ -283,10 +287,8 @@ func (db *DB) Stats() (map[string]interface{}, error) {
"conn_pool_stats": connPoolStats,
}
if db.memory {
stats["path"] = ":memory:"
} else {
stats["path"] = db.path
if !db.memory {
if stats["size"], err = db.FileSize(); err != nil {
return nil, err
}
@ -318,6 +320,21 @@ func (db *DB) FileSize() (int64, error) {
return fi.Size(), nil
}
// InMemory returns whether this database is in-memory.
func (db *DB) InMemory() bool {
return db.memory
}
// FKEnabled returns whether Foreign Key constraints are enabled.
func (db *DB) FKEnabled() bool {
return db.fkEnabled
}
// Path returns the path of this database.
func (db *DB) Path() string {
return db.path
}
// CompileOptions returns the SQLite compilation options.
func (db *DB) CompileOptions() ([]string, error) {
res, err := db.QueryStringStmt("PRAGMA compile_options")

@ -32,6 +32,15 @@ func Test_DbFileCreation(t *testing.T) {
if db == nil {
t.Fatal("database is nil")
}
if db.InMemory() {
t.Fatal("on-disk database marked as in-memory")
}
if db.FKEnabled() {
t.Fatal("FK constraints marked as enabled")
}
if db.Path() != dbPath {
t.Fatal("database path is incorrect")
}
if _, err := os.Stat(dbPath); os.IsNotExist(err) {
t.Fatalf("%s does not exist after open", dbPath)
@ -97,6 +106,10 @@ func Test_TableCreationInMemory(t *testing.T) {
db := mustCreateInMemoryDatabase()
defer db.Close()
if !db.InMemory() {
t.Fatal("in-memory database marked as not in-memory")
}
r, err := db.ExecuteStringStmt("CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)")
if err != nil {
t.Fatalf("failed to create table: %s", err.Error())
@ -150,6 +163,9 @@ func Test_TableCreationInMemoryFK(t *testing.T) {
// Now, do same testing with FK constraints enabled.
dbFK := mustCreateInMemoryDatabaseFK()
defer dbFK.Close()
if !dbFK.FKEnabled() {
t.Fatal("FK constraints not marked as enabled")
}
r, err = dbFK.ExecuteStringStmt(createTableFoo)
if err != nil {

@ -50,6 +50,9 @@ type Database interface {
// is true, then all queries will take place while a read transaction
// is held on the database.
Query(qr *command.QueryRequest) ([]*command.QueryRows, error)
// Load loads a SQLite file into the system
Load(lr *command.LoadRequest) error
}
// Store is the interface the Raft-based database must implement.
@ -590,6 +593,28 @@ func (s *Service) handleLoad(w http.ResponseWriter, r *http.Request) {
}
r.Body.Close()
format, err := fmtParam(r)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if strings.ToLower(format) == "binary" {
if !validateSQLiteFile(b) {
http.Error(w, "invalid SQLite database file", http.StatusBadRequest)
return
}
lr := &command.LoadRequest{
Data: b,
}
err := s.store.Load(lr)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
s.writeResponse(w, r, resp)
} else {
// No JSON structure expected for this API.
queries := []string{string(b)}
er := executeRequestFromStrings(queries, timings, false)
@ -614,6 +639,7 @@ func (s *Service) handleLoad(w http.ResponseWriter, r *http.Request) {
}
resp.end = time.Now()
s.writeResponse(w, r, resp)
}
}
// handleStatus returns status on the system.
@ -1521,3 +1547,9 @@ func queryRequestFromStrings(s []string, timings, tx bool) *command.QueryRequest
Timings: timings,
}
}
// validateSQLiteFile checks that the supplied data looks like a SQLite database
// file. See https://www.sqlite.org/fileformat.html
func validateSQLiteFile(b []byte) bool {
return string(b[0:13]) == "SQLite format"
}

@ -1068,6 +1068,10 @@ func (m *MockStore) Query(qr *command.QueryRequest) ([]*command.QueryRows, error
return nil, nil
}
func (m *MockStore) Load(lr *command.LoadRequest) error {
return nil
}
func (m *MockStore) Join(id, addr string, voter bool) error {
return nil
}

@ -70,6 +70,7 @@ const (
const (
numSnaphots = "num_snapshots"
numBackups = "num_backups"
numLoads = "num_loads"
numRestores = "num_restores"
numRecoveries = "num_recoveries"
numUncompressedCommands = "num_uncompressed_commands"
@ -355,7 +356,7 @@ func (s *Store) Open() (retErr error) {
// can also happen if the user explicitly disables the startup optimization of
// building the SQLite database in memory, before switching to disk.
if s.StartupOnDisk || (!s.dbConf.Memory && !s.snapsExistOnOpen && s.lastCommandIdxOnOpen == 0) {
s.db, err = s.createOnDisk(nil)
s.db, err = createOnDisk(nil, s.dbPath, s.dbConf.FKConstraints)
if err != nil {
return fmt.Errorf("failed to create on-disk database")
}
@ -363,7 +364,7 @@ func (s *Store) Open() (retErr error) {
s.logger.Printf("created on-disk database at open")
} else {
// We need an in-memory database, at least for bootstrapping purposes.
s.db, err = s.createInMemory(nil)
s.db, err = createInMemory(nil, s.dbConf.FKConstraints)
if err != nil {
return fmt.Errorf("failed to create in-memory database")
}
@ -843,6 +844,42 @@ func (s *Store) Backup(leader bool, fmt BackupFormat, dst io.Writer) error {
return nil
}
// Loads an entire SQLite file into the database, sending the request
// through the Raft log.
func (s *Store) Load(lr *command.LoadRequest) error {
startT := time.Now()
b, err := command.MarshalLoadRequest(lr)
if err != nil {
return err
}
c := &command.Command{
Type: command.Command_COMMAND_TYPE_LOAD,
SubCommand: b,
}
b, err = command.Marshal(c)
if err != nil {
return err
}
af := s.raft.Apply(b, s.ApplyTimeout).(raft.ApplyFuture)
if af.Error() != nil {
if af.Error() == raft.ErrNotLeader {
return ErrNotLeader
}
return af.Error()
}
s.dbAppliedIndexMu.Lock()
s.dbAppliedIndex = af.Index()
s.dbAppliedIndexMu.Unlock()
stats.Add(numLoads, 1)
s.logger.Printf("node loaded in %s", time.Since(startT))
return af.Error()
}
// Notify notifies this Store that a node is ready for bootstrapping at the
// given address. Once the number of known nodes reaches the expected level
// bootstrapping will be attempted using this Store. "Expected level" includes
@ -987,32 +1024,6 @@ func (s *Store) Noop(id string) error {
return nil
}
// createInMemory returns an in-memory database. If b is non-nil and non-empty,
// then the database will be initialized with the contents of b.
func (s *Store) createInMemory(b []byte) (db *sql.DB, err error) {
if b == nil || len(b) == 0 {
db, err = sql.OpenInMemory(s.dbConf.FKConstraints)
} else {
db, err = sql.DeserializeIntoMemory(b, s.dbConf.FKConstraints)
}
return
}
// createOnDisk opens an on-disk database file at the Store's configured path. If
// b is non-nil, any preexisting file will first be overwritten with those contents.
// Otherwise, any preexisting file will be removed before the database is opened.
func (s *Store) createOnDisk(b []byte) (*sql.DB, error) {
if err := os.Remove(s.dbPath); err != nil && !os.IsNotExist(err) {
return nil, err
}
if b != nil {
if err := ioutil.WriteFile(s.dbPath, b, 0660); err != nil {
return nil, err
}
}
return sql.Open(s.dbPath, s.dbConf.FKConstraints)
}
// setLogInfo records some key indexs about the log.
func (s *Store) setLogInfo() error {
var err error
@ -1122,7 +1133,7 @@ func (s *Store) Apply(l *raft.Log) (e interface{}) {
return
}
// Open a new on-disk database.
s.db, err = s.createOnDisk(b)
s.db, err = createOnDisk(b, s.dbPath, s.dbConf.FKConstraints)
if err != nil {
e = &fsmGenericResponse{error: fmt.Errorf("open on-disk failed: %s", err)}
return
@ -1138,7 +1149,7 @@ func (s *Store) Apply(l *raft.Log) (e interface{}) {
s.firstLogAppliedT = time.Now()
}
typ, r := applyCommand(l.Data, s.db)
typ, r := applyCommand(l.Data, &s.db)
if typ == command.Command_COMMAND_TYPE_NOOP {
s.numNoops++
}
@ -1213,7 +1224,7 @@ func (s *Store) Restore(rc io.ReadCloser) error {
// Therefore, this is the last opportunity to create the on-disk database
// before Raft starts. This could also happen because the user has explicitly
// disabled the build-on-disk-database-in-memory-first optimization.
db, err = s.createOnDisk(b)
db, err = createOnDisk(b, s.dbPath, s.dbConf.FKConstraints)
if err != nil {
return fmt.Errorf("open on-disk file during restore: %s", err)
}
@ -1225,7 +1236,7 @@ func (s *Store) Restore(rc io.ReadCloser) error {
// command entries in the log. So by sticking with an in-memory database
// those entries will be applied in the fastest possible manner. We will
// defer creation of any database on disk until the Apply function.
db, err = s.createInMemory(b)
db, err = createInMemory(b, s.dbConf.FKConstraints)
if err != nil {
return fmt.Errorf("createInMemory: %s", err)
}
@ -1518,7 +1529,7 @@ func RecoverNode(dataDir string, logger *log.Logger, logs raft.LogStore, stable
return fmt.Errorf("failed to get log at index %d: %v", index, err)
}
if entry.Type == raft.LogCommand {
applyCommand(entry.Data, db)
applyCommand(entry.Data, &db)
}
lastIndex = entry.Index
lastTerm = entry.Term
@ -1609,8 +1620,9 @@ func dbBytesFromSnapshot(rc io.ReadCloser) ([]byte, error) {
return database, nil
}
func applyCommand(data []byte, db *sql.DB) (command.Command_Type, interface{}) {
func applyCommand(data []byte, pDB **sql.DB) (command.Command_Type, interface{}) {
var c command.Command
db := *pDB
if err := command.Unmarshal(data, &c); err != nil {
panic(fmt.Sprintf("failed to unmarshal cluster command: %s", err.Error()))
@ -1631,6 +1643,32 @@ func applyCommand(data []byte, db *sql.DB) (command.Command_Type, interface{}) {
}
r, err := db.Execute(er.Request, er.Timings)
return c.Type, &fsmExecuteResponse{results: r, error: err}
case command.Command_COMMAND_TYPE_LOAD:
var lr command.LoadRequest
if err := command.UnmarshalLoadRequest(c.SubCommand, &lr); err != nil {
panic(fmt.Sprintf("failed to unmarshal load subcommand: %s", err.Error()))
}
var newDB *sql.DB
var err error
if db.InMemory() {
newDB, err = createInMemory(lr.Data, db.FKEnabled())
if err != nil {
return c.Type, &fsmGenericResponse{error: fmt.Errorf("failed to create in-memory database: %s", err)}
}
} else {
newDB, err = createOnDisk(lr.Data, db.Path(), db.FKEnabled())
if err != nil {
return c.Type, &fsmGenericResponse{error: fmt.Errorf("failed to create on-disk database: %s", err)}
}
}
// Swap the underlying database to the new one.
if err := db.Close(); err != nil {
return c.Type, &fsmGenericResponse{error: fmt.Errorf("failed to close post-load database: %s", err)}
}
*pDB = newDB
return c.Type, &fsmGenericResponse{}
case command.Command_COMMAND_TYPE_NOOP:
return c.Type, &fsmGenericResponse{}
default:
@ -1669,6 +1707,32 @@ func checkRaftConfiguration(configuration raft.Configuration) error {
return nil
}
// createInMemory returns an in-memory database. If b is non-nil and non-empty,
// then the database will be initialized with the contents of b.
func createInMemory(b []byte, fkConstraints bool) (db *sql.DB, err error) {
if b == nil || len(b) == 0 {
db, err = sql.OpenInMemory(fkConstraints)
} else {
db, err = sql.DeserializeIntoMemory(b, fkConstraints)
}
return
}
// createOnDisk opens an on-disk database file at the configured path. If b is
// non-nil, any preexisting file will first be overwritten with those contents.
// Otherwise, any preexisting file will be removed before the database is opened.
func createOnDisk(b []byte, path string, fkConstraints bool) (*sql.DB, error) {
if err := os.Remove(path); err != nil && !os.IsNotExist(err) {
return nil, err
}
if b != nil {
if err := ioutil.WriteFile(path, b, 0660); err != nil {
return nil, err
}
}
return sql.Open(path, fkConstraints)
}
func readUint64(b []byte) (uint64, error) {
var sz uint64
if err := binary.Read(bytes.NewReader(b), binary.LittleEndian, &sz); err != nil {

@ -441,7 +441,6 @@ func Test_SingleNodeSQLitePath(t *testing.T) {
if !pathExists(path) {
t.Fatalf("SQLite file does not exist at %s", path)
}
}
func Test_SingleNodeBackupBinary(t *testing.T) {
@ -545,7 +544,7 @@ COMMIT;
}
}
func Test_SingleNodeLoad(t *testing.T) {
func Test_SingleNodeSingleCommandTrigger(t *testing.T) {
s, ln := mustNewStore(true)
defer os.RemoveAll(s.Path())
defer ln.Close()
@ -563,31 +562,35 @@ func Test_SingleNodeLoad(t *testing.T) {
dump := `PRAGMA foreign_keys=OFF;
BEGIN TRANSACTION;
CREATE TABLE foo (id integer not null primary key, name text);
INSERT INTO "foo" VALUES(1,'fiona');
CREATE TABLE foo (id integer primary key asc, name text);
INSERT INTO "foo" VALUES(1,'bob');
INSERT INTO "foo" VALUES(2,'alice');
INSERT INTO "foo" VALUES(3,'eve');
CREATE TABLE bar (nameid integer, age integer);
INSERT INTO "bar" VALUES(1,44);
INSERT INTO "bar" VALUES(2,46);
INSERT INTO "bar" VALUES(3,8);
CREATE VIEW foobar as select name as Person, Age as age from foo inner join bar on foo.id == bar.nameid;
CREATE TRIGGER new_foobar instead of insert on foobar begin insert into foo (name) values (new.Person); insert into bar (nameid, age) values ((select id from foo where name == new.Person), new.Age); end;
COMMIT;
`
_, err := s.Execute(executeRequestFromString(dump, false, false))
if err != nil {
t.Fatalf("failed to load simple dump: %s", err.Error())
t.Fatalf("failed to load dump with trigger: %s", err.Error())
}
// Check that data were loaded correctly.
qr := queryRequestFromString("SELECT * FROM foo", false, true)
qr.Level = command.QueryRequest_QUERY_REQUEST_LEVEL_STRONG
r, err := s.Query(qr)
// Check that the VIEW and TRIGGER are OK by using both.
er := executeRequestFromString("INSERT INTO foobar VALUES('jason', 16)", false, true)
r, err := s.Execute(er)
if err != nil {
t.Fatalf("failed to query single node: %s", err.Error())
}
if exp, got := `["id","name"]`, asJSON(r[0].Columns); exp != got {
t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
t.Fatalf("failed to insert into view on single node: %s", err.Error())
}
if exp, got := `[[1,"fiona"]]`, asJSON(r[0].Values); exp != got {
t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
if exp, got := int64(3), r[0].GetLastInsertId(); exp != got {
t.Fatalf("unexpected results for query\nexp: %d\ngot: %d", exp, got)
}
}
func Test_SingleNodeSingleCommandTrigger(t *testing.T) {
func Test_SingleNodeLoadText(t *testing.T) {
s, ln := mustNewStore(true)
defer os.RemoveAll(s.Path())
defer ln.Close()
@ -605,35 +608,31 @@ func Test_SingleNodeSingleCommandTrigger(t *testing.T) {
dump := `PRAGMA foreign_keys=OFF;
BEGIN TRANSACTION;
CREATE TABLE foo (id integer primary key asc, name text);
INSERT INTO "foo" VALUES(1,'bob');
INSERT INTO "foo" VALUES(2,'alice');
INSERT INTO "foo" VALUES(3,'eve');
CREATE TABLE bar (nameid integer, age integer);
INSERT INTO "bar" VALUES(1,44);
INSERT INTO "bar" VALUES(2,46);
INSERT INTO "bar" VALUES(3,8);
CREATE VIEW foobar as select name as Person, Age as age from foo inner join bar on foo.id == bar.nameid;
CREATE TRIGGER new_foobar instead of insert on foobar begin insert into foo (name) values (new.Person); insert into bar (nameid, age) values ((select id from foo where name == new.Person), new.Age); end;
CREATE TABLE foo (id integer not null primary key, name text);
INSERT INTO "foo" VALUES(1,'fiona');
COMMIT;
`
_, err := s.Execute(executeRequestFromString(dump, false, false))
if err != nil {
t.Fatalf("failed to load dump with trigger: %s", err.Error())
t.Fatalf("failed to load simple dump: %s", err.Error())
}
// Check that the VIEW and TRIGGER are OK by using both.
er := executeRequestFromString("INSERT INTO foobar VALUES('jason', 16)", false, true)
r, err := s.Execute(er)
// Check that data were loaded correctly.
qr := queryRequestFromString("SELECT * FROM foo", false, true)
qr.Level = command.QueryRequest_QUERY_REQUEST_LEVEL_STRONG
r, err := s.Query(qr)
if err != nil {
t.Fatalf("failed to insert into view on single node: %s", err.Error())
t.Fatalf("failed to query single node: %s", err.Error())
}
if exp, got := int64(3), r[0].GetLastInsertId(); exp != got {
t.Fatalf("unexpected results for query\nexp: %d\ngot: %d", exp, got)
if exp, got := `["id","name"]`, asJSON(r[0].Columns); exp != got {
t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
}
if exp, got := `[[1,"fiona"]]`, asJSON(r[0].Values); exp != got {
t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
}
}
func Test_SingleNodeLoadNoStatements(t *testing.T) {
func Test_SingleNodeLoadTextNoStatements(t *testing.T) {
s, ln := mustNewStore(true)
defer os.RemoveAll(s.Path())
defer ln.Close()
@ -659,7 +658,7 @@ COMMIT;
}
}
func Test_SingleNodeLoadEmpty(t *testing.T) {
func Test_SingleNodeLoadTextEmpty(t *testing.T) {
s, ln := mustNewStore(true)
defer os.RemoveAll(s.Path())
defer ln.Close()
@ -682,7 +681,7 @@ func Test_SingleNodeLoadEmpty(t *testing.T) {
}
}
func Test_SingleNodeLoadChinook(t *testing.T) {
func Test_SingleNodeLoadTextChinook(t *testing.T) {
s, ln := mustNewStore(true)
defer os.RemoveAll(s.Path())
defer ln.Close()
@ -745,6 +744,91 @@ func Test_SingleNodeLoadChinook(t *testing.T) {
}
}
func Test_SingleNodeLoadBinary(t *testing.T) {
s, ln := mustNewStore(true)
defer os.RemoveAll(s.Path())
defer ln.Close()
if err := s.Open(); err != nil {
t.Fatalf("failed to open single-node store: %s", err.Error())
}
if err := s.Bootstrap(NewServer(s.ID(), s.Addr(), true)); err != nil {
t.Fatalf("failed to bootstrap single-node store: %s", err.Error())
}
defer s.Close(true)
if _, err := s.WaitForLeader(10 * time.Second); err != nil {
t.Fatalf("Error waiting for leader: %s", err)
}
// Load a dataset, to check it's erased by the SQLite file load.
dump := `PRAGMA foreign_keys=OFF;
BEGIN TRANSACTION;
CREATE TABLE bar (id integer not null primary key, name text);
INSERT INTO "bar" VALUES(1,'declan');
COMMIT;
`
_, err := s.Execute(executeRequestFromString(dump, false, false))
if err != nil {
t.Fatalf("failed to load simple dump: %s", err.Error())
}
// Check that data were loaded correctly.
qr := queryRequestFromString("SELECT * FROM bar", false, true)
qr.Level = command.QueryRequest_QUERY_REQUEST_LEVEL_STRONG
r, err := s.Query(qr)
if err != nil {
t.Fatalf("failed to query single node: %s", err.Error())
}
if exp, got := `["id","name"]`, asJSON(r[0].Columns); exp != got {
t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
}
if exp, got := `[[1,"declan"]]`, asJSON(r[0].Values); exp != got {
t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
}
err = s.Load(loadRequestFromFile(filepath.Join("testdata", "load.sqlite")))
if err != nil {
t.Fatalf("failed to load SQLite file: %s", err.Error())
}
// Check that data were loaded correctly.
qr = queryRequestFromString("SELECT * FROM foo WHERE id=2", false, true)
qr.Level = command.QueryRequest_QUERY_REQUEST_LEVEL_STRONG
r, err = s.Query(qr)
if err != nil {
t.Fatalf("failed to query single node: %s", err.Error())
}
if exp, got := `["id","name"]`, asJSON(r[0].Columns); exp != got {
t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
}
if exp, got := `[[2,"fiona"]]`, asJSON(r[0].Values); exp != got {
t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
}
qr = queryRequestFromString("SELECT count(*) FROM foo", false, true)
qr.Level = command.QueryRequest_QUERY_REQUEST_LEVEL_STRONG
r, err = s.Query(qr)
if err != nil {
t.Fatalf("failed to query single node: %s", err.Error())
}
if exp, got := `["count(*)"]`, asJSON(r[0].Columns); exp != got {
t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
}
if exp, got := `[[3]]`, asJSON(r[0].Values); exp != got {
t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
}
// Check pre-existing data is gone.
qr = queryRequestFromString("SELECT * FROM bar", false, true)
qr.Level = command.QueryRequest_QUERY_REQUEST_LEVEL_STRONG
r, err = s.Query(qr)
if err != nil {
t.Fatalf("failed to query single node: %s", err.Error())
}
if exp, got := `{"error":"no such table: bar"}`, asJSON(r[0]); exp != got {
t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
}
}
// Test_SingleNodeRecoverNoChange tests a node recovery that doesn't
// actually change anything.
func Test_SingleNodeRecoverNoChange(t *testing.T) {
@ -1943,6 +2027,14 @@ func mustWriteFile(path, contents string) {
}
}
func mustReadFile(path string) []byte {
b, err := ioutil.ReadFile(path)
if err != nil {
panic("failed to read file")
}
return b
}
func mustTempDir() string {
var err error
path, err := ioutil.TempDir("", "rqlilte-test-")
@ -2002,6 +2094,12 @@ func queryRequestFromStrings(s []string, timings, tx bool) *command.QueryRequest
}
}
func loadRequestFromFile(path string) *command.LoadRequest {
return &command.LoadRequest{
Data: mustReadFile(path),
}
}
// waitForLeaderID waits until the Store's LeaderID is set, or the timeout
// expires. Because setting Leader ID requires Raft to set the cluster
// configuration, it's not entirely deterministic when it will be set.

Binary file not shown.

@ -468,12 +468,18 @@ class Node(object):
raise_for_status(r)
fd.write(r.content)
def restore(self, file):
def restore(self, file, fmt=None):
# This is the one API that doesn't expect JSON.
if fmt != "binary":
conn = sqlite3.connect(file)
r = requests.post(self._load_url(), data='\n'.join(conn.iterdump()))
r = requests.post(self._load_url(fmt), data='\n'.join(conn.iterdump()))
raise_for_status(r)
conn.close()
else:
with open(file, 'rb') as f:
data = f.read()
r = requests.post(self._load_url(fmt), data=data, headers={'Content-Type': 'application/octet-stream'})
raise_for_status(r)
return r.json()
def redirect_addr(self):
@ -510,8 +516,11 @@ class Node(object):
return 'http://' + self.APIAddr() + '/db/execute' + rd
def _backup_url(self):
return 'http://' + self.APIAddr() + '/db/backup'
def _load_url(self):
return 'http://' + self.APIAddr() + '/db/load'
def _load_url(self, fmt=None):
f = ""
if fmt is not None:
f = '?fmt=%s' % (fmt)
return 'http://' + self.APIAddr() + '/db/load' + f
def __eq__(self, other):
return self.node_id == other.node_id
def __str__(self):
@ -1284,6 +1293,14 @@ class TestEndToEndBackupRestore(unittest.TestCase):
j = self.node1.query('SELECT * FROM foo')
self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona']], 'types': ['integer', 'text'], 'columns': ['id', 'name']}]}"))
self.node2 = Node(RQLITED_PATH, '1')
self.node2.start()
self.node2.wait_for_leader()
j = self.node2.restore(self.db_file, fmt='binary')
self.assertEqual(j, d_("{'results': []}"))
j = self.node2.query('SELECT * FROM foo')
self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona']], 'types': ['integer', 'text'], 'columns': ['id', 'name']}]}"))
def tearDown(self):
if hasattr(self, 'node0'):
deprovision_node(self.node0)

Loading…
Cancel
Save