diff --git a/CHANGELOG.md b/CHANGELOG.md index 377fe549..aaf2d69a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,7 @@ ## 7.8.0 (unreleased) +### New features +- [PR #1081](https://github.com/rqlite/rqlite/pull/1081): Transparently forward Backup requests to Leaders. + ### Implementation changes and bug fixes - [PR #1079](https://github.com/rqlite/rqlite/pull/1079): Use a Protobuf model for Backup requests. - [PR #1078](https://github.com/rqlite/rqlite/pull/1078): Decrease bootstrap polling interval from 5 seconds to 2 seconds. diff --git a/DOC/BACKUPS.md b/DOC/BACKUPS.md index 2f270ba0..8db2f18d 100644 --- a/DOC/BACKUPS.md +++ b/DOC/BACKUPS.md @@ -9,9 +9,11 @@ This command will write the SQLite database file to `bak.sqlite3`. You can also access the rqlite API directly, via a HTTP `GET` request to the endpoint `/db/backup`. For example, using `curl`, and assuming the node is listening on `localhost:4001`, you could retrieve a backup as follows: ```bash -curl -s -L -XGET localhost:4001/db/backup -o bak.sqlite3 +curl -s -XGET localhost:4001/db/backup -o bak.sqlite3 ``` -Note that if the node is not the Leader, a HTTP 301 response will be returned with the Leader's address. The `curl` command above will automatically follow the Leader, due to the presence of the option `-L`. +Note that if the node is not the Leader, the node will transparently forward the request to Leader, wait for the backup data from the Leader, and return it to the client. If, instead, you want a backup of SQLite database of the actual node that receives the request, add `noleader` to the URL as a query parameter. + +If you do not wish a Follower to transparently forward a backup request to a Leader, add `redirect` to the URL as a query parameter. In that case if a Follower receives a backup request the Follower will respond with [HTTP 301 Moved Permanently](https://en.wikipedia.org/wiki/HTTP_301) and include the address of the Leader as the `Location` header in the response. It is then up the clients to re-issue the command to the Leader. In either case the generated file can then be used to restore a node (or cluster) using the [restore API](https://github.com/rqlite/rqlite/blob/master/DOC/RESTORE_FROM_SQLITE.md). @@ -27,6 +29,6 @@ curl -s -L -XGET localhost:4001/db/backup?fmt=sql -o bak.sql ``` ## Backup isolation level -The isolation offered by backups is `READ COMMITTED`. This means that any changes due to transactions to the database, that take place during the backup, will be reflected immediately once the transaction is committed, but not before. +The isolation offered by binary backups is `READ COMMITTED`. This means that any changes due to transactions to the database, that take place during the backup, will be reflected immediately once the transaction is committed, but not before. See the [SQLite documentation](https://www.sqlite.org/isolation.html) for more details. diff --git a/cluster/client.go b/cluster/client.go index f7676225..9e7eb271 100644 --- a/cluster/client.go +++ b/cluster/client.go @@ -1,6 +1,8 @@ package cluster import ( + "bytes" + "compress/gzip" "encoding/binary" "errors" "fmt" @@ -293,6 +295,96 @@ func (c *Client) Query(qr *command.QueryRequest, nodeAddr string, creds *Credent return a.Rows, nil } +// Backup retrieves a backup from a remote node and writes to the io.Writer +func (c *Client) Backup(br *command.BackupRequest, nodeAddr string, creds *Credentials, timeout time.Duration, w io.Writer) error { + conn, err := c.dial(nodeAddr, c.timeout) + if err != nil { + return err + } + defer conn.Close() + + // Send the request + command := &Command{ + Type: Command_COMMAND_TYPE_BACKUP, + Request: &Command_BackupRequest{ + BackupRequest: br, + }, + Credentials: creds, + } + p, err := proto.Marshal(command) + if err != nil { + return fmt.Errorf("command marshal: %s", err) + } + + // Write length of Protobuf + b := make([]byte, 4) + binary.LittleEndian.PutUint16(b[0:], uint16(len(p))) + if err := conn.SetDeadline(time.Now().Add(timeout)); err != nil { + handleConnError(conn) + return err + } + _, err = conn.Write(b) + if err != nil { + handleConnError(conn) + return fmt.Errorf("write protobuf length: %s", err) + } + + // Now write backup request proto itself. + if err := conn.SetDeadline(time.Now().Add(timeout)); err != nil { + handleConnError(conn) + return err + } + _, err = conn.Write(p) + if err != nil { + handleConnError(conn) + return fmt.Errorf("write protobuf: %s", err) + } + + // Read the backup response + if err := conn.SetDeadline(time.Now().Add(timeout)); err != nil { + handleConnError(conn) + return err + } + + // Read length of response. + _, err = io.ReadFull(conn, b) + if err != nil { + handleConnError(conn) + return err + } + sz := binary.LittleEndian.Uint32(b[0:]) + + // Read in the actual response. + p = make([]byte, sz) + _, err = io.ReadFull(conn, p) + if err != nil { + handleConnError(conn) + return err + } + + // Decompress.... + p, err = gzUncompress(p) + if err != nil { + handleConnError(conn) + return fmt.Errorf("backup decompress: %s", err) + } + + resp := &CommandBackupResponse{} + err = proto.Unmarshal(p, resp) + if err != nil { + return fmt.Errorf("backup unmarshal: %s", err) + } + + if resp.Error != "" { + return errors.New(resp.Error) + } + + if _, err := w.Write(resp.Data); err != nil { + return fmt.Errorf("backup write: %s", err) + } + return nil +} + // Stats returns stats on the Client instance func (c *Client) Stats() (map[string]interface{}, error) { c.mu.RLock() @@ -364,3 +456,20 @@ func handleConnError(conn net.Conn) { pc.MarkUnusable() } } + +func gzUncompress(b []byte) ([]byte, error) { + gz, err := gzip.NewReader(bytes.NewReader(b)) + if err != nil { + return nil, fmt.Errorf("unmarshal gzip NewReader: %s", err) + } + + ub, err := io.ReadAll(gz) + if err != nil { + return nil, fmt.Errorf("unmarshal gzip ReadAll: %s", err) + } + + if err := gz.Close(); err != nil { + return nil, fmt.Errorf("unmarshal gzip Close: %s", err) + } + return ub, nil +} diff --git a/cluster/message.pb.go b/cluster/message.pb.go index 1c2f1ba4..6037e47d 100644 --- a/cluster/message.pb.go +++ b/cluster/message.pb.go @@ -28,6 +28,7 @@ const ( Command_COMMAND_TYPE_GET_NODE_API_URL Command_Type = 1 Command_COMMAND_TYPE_EXECUTE Command_Type = 2 Command_COMMAND_TYPE_QUERY Command_Type = 3 + Command_COMMAND_TYPE_BACKUP Command_Type = 4 ) // Enum value maps for Command_Type. @@ -37,12 +38,14 @@ var ( 1: "COMMAND_TYPE_GET_NODE_API_URL", 2: "COMMAND_TYPE_EXECUTE", 3: "COMMAND_TYPE_QUERY", + 4: "COMMAND_TYPE_BACKUP", } Command_Type_value = map[string]int32{ "COMMAND_TYPE_UNKNOWN": 0, "COMMAND_TYPE_GET_NODE_API_URL": 1, "COMMAND_TYPE_EXECUTE": 2, "COMMAND_TYPE_QUERY": 3, + "COMMAND_TYPE_BACKUP": 4, } ) @@ -184,6 +187,7 @@ type Command struct { // Types that are assignable to Request: // *Command_ExecuteRequest // *Command_QueryRequest + // *Command_BackupRequest Request isCommand_Request `protobuf_oneof:"request"` Credentials *Credentials `protobuf:"bytes,4,opt,name=credentials,proto3" json:"credentials,omitempty"` } @@ -248,6 +252,13 @@ func (x *Command) GetQueryRequest() *command.QueryRequest { return nil } +func (x *Command) GetBackupRequest() *command.BackupRequest { + if x, ok := x.GetRequest().(*Command_BackupRequest); ok { + return x.BackupRequest + } + return nil +} + func (x *Command) GetCredentials() *Credentials { if x != nil { return x.Credentials @@ -267,10 +278,16 @@ type Command_QueryRequest struct { QueryRequest *command.QueryRequest `protobuf:"bytes,3,opt,name=query_request,json=queryRequest,proto3,oneof"` } +type Command_BackupRequest struct { + BackupRequest *command.BackupRequest `protobuf:"bytes,5,opt,name=backup_request,json=backupRequest,proto3,oneof"` +} + func (*Command_ExecuteRequest) isCommand_Request() {} func (*Command_QueryRequest) isCommand_Request() {} +func (*Command_BackupRequest) isCommand_Request() {} + type CommandExecuteResponse struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -381,6 +398,61 @@ func (x *CommandQueryResponse) GetRows() []*command.QueryRows { return nil } +type CommandBackupResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Error string `protobuf:"bytes,1,opt,name=error,proto3" json:"error,omitempty"` + Data []byte `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"` +} + +func (x *CommandBackupResponse) Reset() { + *x = CommandBackupResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_message_proto_msgTypes[5] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *CommandBackupResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*CommandBackupResponse) ProtoMessage() {} + +func (x *CommandBackupResponse) ProtoReflect() protoreflect.Message { + mi := &file_message_proto_msgTypes[5] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use CommandBackupResponse.ProtoReflect.Descriptor instead. +func (*CommandBackupResponse) Descriptor() ([]byte, []int) { + return file_message_proto_rawDescGZIP(), []int{5} +} + +func (x *CommandBackupResponse) GetError() string { + if x != nil { + return x.Error + } + return "" +} + +func (x *CommandBackupResponse) GetData() []byte { + if x != nil { + return x.Data + } + return nil +} + var File_message_proto protoreflect.FileDescriptor var file_message_proto_rawDesc = []byte{ @@ -393,7 +465,7 @@ var file_message_proto_rawDesc = []byte{ 0x73, 0x73, 0x77, 0x6f, 0x72, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x70, 0x61, 0x73, 0x73, 0x77, 0x6f, 0x72, 0x64, 0x22, 0x1b, 0x0a, 0x07, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x12, 0x10, 0x0a, 0x03, 0x75, 0x72, 0x6c, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, - 0x75, 0x72, 0x6c, 0x22, 0xf0, 0x02, 0x0a, 0x07, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x12, + 0x75, 0x72, 0x6c, 0x22, 0xcb, 0x03, 0x0a, 0x07, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x12, 0x29, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x15, 0x2e, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x42, 0x0a, 0x0f, 0x65, 0x78, @@ -404,33 +476,43 @@ var file_message_proto_rawDesc = []byte{ 0x0a, 0x0d, 0x71, 0x75, 0x65, 0x72, 0x79, 0x5f, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x15, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e, 0x51, 0x75, 0x65, 0x72, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x48, 0x00, 0x52, 0x0c, - 0x71, 0x75, 0x65, 0x72, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x36, 0x0a, 0x0b, - 0x63, 0x72, 0x65, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x61, 0x6c, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, - 0x0b, 0x32, 0x14, 0x2e, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x2e, 0x43, 0x72, 0x65, 0x64, - 0x65, 0x6e, 0x74, 0x69, 0x61, 0x6c, 0x73, 0x52, 0x0b, 0x63, 0x72, 0x65, 0x64, 0x65, 0x6e, 0x74, - 0x69, 0x61, 0x6c, 0x73, 0x22, 0x75, 0x0a, 0x04, 0x54, 0x79, 0x70, 0x65, 0x12, 0x18, 0x0a, 0x14, - 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x55, 0x4e, 0x4b, - 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x21, 0x0a, 0x1d, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, - 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x47, 0x45, 0x54, 0x5f, 0x4e, 0x4f, 0x44, 0x45, 0x5f, - 0x41, 0x50, 0x49, 0x5f, 0x55, 0x52, 0x4c, 0x10, 0x01, 0x12, 0x18, 0x0a, 0x14, 0x43, 0x4f, 0x4d, - 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x45, 0x58, 0x45, 0x43, 0x55, 0x54, - 0x45, 0x10, 0x02, 0x12, 0x16, 0x0a, 0x12, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, - 0x59, 0x50, 0x45, 0x5f, 0x51, 0x55, 0x45, 0x52, 0x59, 0x10, 0x03, 0x42, 0x09, 0x0a, 0x07, 0x72, - 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x60, 0x0a, 0x16, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, - 0x64, 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, - 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x30, 0x0a, 0x07, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, - 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x16, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, - 0x64, 0x2e, 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, 0x65, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x52, - 0x07, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x73, 0x22, 0x54, 0x0a, 0x14, 0x43, 0x6f, 0x6d, 0x6d, - 0x61, 0x6e, 0x64, 0x51, 0x75, 0x65, 0x72, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, - 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x26, 0x0a, 0x04, 0x72, 0x6f, 0x77, 0x73, 0x18, 0x02, - 0x20, 0x03, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e, 0x51, - 0x75, 0x65, 0x72, 0x79, 0x52, 0x6f, 0x77, 0x73, 0x52, 0x04, 0x72, 0x6f, 0x77, 0x73, 0x42, 0x22, - 0x5a, 0x20, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x72, 0x71, 0x6c, - 0x69, 0x74, 0x65, 0x2f, 0x72, 0x71, 0x6c, 0x69, 0x74, 0x65, 0x2f, 0x63, 0x6c, 0x75, 0x73, 0x74, - 0x65, 0x72, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x71, 0x75, 0x65, 0x72, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x3f, 0x0a, 0x0e, + 0x62, 0x61, 0x63, 0x6b, 0x75, 0x70, 0x5f, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x18, 0x05, + 0x20, 0x01, 0x28, 0x0b, 0x32, 0x16, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e, 0x42, + 0x61, 0x63, 0x6b, 0x75, 0x70, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x48, 0x00, 0x52, 0x0d, + 0x62, 0x61, 0x63, 0x6b, 0x75, 0x70, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x36, 0x0a, + 0x0b, 0x63, 0x72, 0x65, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x61, 0x6c, 0x73, 0x18, 0x04, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x2e, 0x43, 0x72, 0x65, + 0x64, 0x65, 0x6e, 0x74, 0x69, 0x61, 0x6c, 0x73, 0x52, 0x0b, 0x63, 0x72, 0x65, 0x64, 0x65, 0x6e, + 0x74, 0x69, 0x61, 0x6c, 0x73, 0x22, 0x8e, 0x01, 0x0a, 0x04, 0x54, 0x79, 0x70, 0x65, 0x12, 0x18, + 0x0a, 0x14, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x55, + 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x21, 0x0a, 0x1d, 0x43, 0x4f, 0x4d, 0x4d, + 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x47, 0x45, 0x54, 0x5f, 0x4e, 0x4f, 0x44, + 0x45, 0x5f, 0x41, 0x50, 0x49, 0x5f, 0x55, 0x52, 0x4c, 0x10, 0x01, 0x12, 0x18, 0x0a, 0x14, 0x43, + 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x45, 0x58, 0x45, 0x43, + 0x55, 0x54, 0x45, 0x10, 0x02, 0x12, 0x16, 0x0a, 0x12, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, + 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x51, 0x55, 0x45, 0x52, 0x59, 0x10, 0x03, 0x12, 0x17, 0x0a, + 0x13, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x42, 0x41, + 0x43, 0x4b, 0x55, 0x50, 0x10, 0x04, 0x42, 0x09, 0x0a, 0x07, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x22, 0x60, 0x0a, 0x16, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x45, 0x78, 0x65, 0x63, + 0x75, 0x74, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x65, + 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, + 0x72, 0x12, 0x30, 0x0a, 0x07, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x73, 0x18, 0x02, 0x20, 0x03, + 0x28, 0x0b, 0x32, 0x16, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e, 0x45, 0x78, 0x65, + 0x63, 0x75, 0x74, 0x65, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x52, 0x07, 0x72, 0x65, 0x73, 0x75, + 0x6c, 0x74, 0x73, 0x22, 0x54, 0x0a, 0x14, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x51, 0x75, + 0x65, 0x72, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x65, + 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, + 0x72, 0x12, 0x26, 0x0a, 0x04, 0x72, 0x6f, 0x77, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, + 0x12, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e, 0x51, 0x75, 0x65, 0x72, 0x79, 0x52, + 0x6f, 0x77, 0x73, 0x52, 0x04, 0x72, 0x6f, 0x77, 0x73, 0x22, 0x41, 0x0a, 0x15, 0x43, 0x6f, 0x6d, + 0x6d, 0x61, 0x6e, 0x64, 0x42, 0x61, 0x63, 0x6b, 0x75, 0x70, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x42, 0x22, 0x5a, 0x20, + 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x72, 0x71, 0x6c, 0x69, 0x74, + 0x65, 0x2f, 0x72, 0x71, 0x6c, 0x69, 0x74, 0x65, 0x2f, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, + 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -446,7 +528,7 @@ func file_message_proto_rawDescGZIP() []byte { } var file_message_proto_enumTypes = make([]protoimpl.EnumInfo, 1) -var file_message_proto_msgTypes = make([]protoimpl.MessageInfo, 5) +var file_message_proto_msgTypes = make([]protoimpl.MessageInfo, 6) var file_message_proto_goTypes = []interface{}{ (Command_Type)(0), // 0: cluster.Command.Type (*Credentials)(nil), // 1: cluster.Credentials @@ -454,23 +536,26 @@ var file_message_proto_goTypes = []interface{}{ (*Command)(nil), // 3: cluster.Command (*CommandExecuteResponse)(nil), // 4: cluster.CommandExecuteResponse (*CommandQueryResponse)(nil), // 5: cluster.CommandQueryResponse - (*command.ExecuteRequest)(nil), // 6: command.ExecuteRequest - (*command.QueryRequest)(nil), // 7: command.QueryRequest - (*command.ExecuteResult)(nil), // 8: command.ExecuteResult - (*command.QueryRows)(nil), // 9: command.QueryRows + (*CommandBackupResponse)(nil), // 6: cluster.CommandBackupResponse + (*command.ExecuteRequest)(nil), // 7: command.ExecuteRequest + (*command.QueryRequest)(nil), // 8: command.QueryRequest + (*command.BackupRequest)(nil), // 9: command.BackupRequest + (*command.ExecuteResult)(nil), // 10: command.ExecuteResult + (*command.QueryRows)(nil), // 11: command.QueryRows } var file_message_proto_depIdxs = []int32{ - 0, // 0: cluster.Command.type:type_name -> cluster.Command.Type - 6, // 1: cluster.Command.execute_request:type_name -> command.ExecuteRequest - 7, // 2: cluster.Command.query_request:type_name -> command.QueryRequest - 1, // 3: cluster.Command.credentials:type_name -> cluster.Credentials - 8, // 4: cluster.CommandExecuteResponse.results:type_name -> command.ExecuteResult - 9, // 5: cluster.CommandQueryResponse.rows:type_name -> command.QueryRows - 6, // [6:6] is the sub-list for method output_type - 6, // [6:6] is the sub-list for method input_type - 6, // [6:6] is the sub-list for extension type_name - 6, // [6:6] is the sub-list for extension extendee - 0, // [0:6] is the sub-list for field type_name + 0, // 0: cluster.Command.type:type_name -> cluster.Command.Type + 7, // 1: cluster.Command.execute_request:type_name -> command.ExecuteRequest + 8, // 2: cluster.Command.query_request:type_name -> command.QueryRequest + 9, // 3: cluster.Command.backup_request:type_name -> command.BackupRequest + 1, // 4: cluster.Command.credentials:type_name -> cluster.Credentials + 10, // 5: cluster.CommandExecuteResponse.results:type_name -> command.ExecuteResult + 11, // 6: cluster.CommandQueryResponse.rows:type_name -> command.QueryRows + 7, // [7:7] is the sub-list for method output_type + 7, // [7:7] is the sub-list for method input_type + 7, // [7:7] is the sub-list for extension type_name + 7, // [7:7] is the sub-list for extension extendee + 0, // [0:7] is the sub-list for field type_name } func init() { file_message_proto_init() } @@ -539,10 +624,23 @@ func file_message_proto_init() { return nil } } + file_message_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*CommandBackupResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } file_message_proto_msgTypes[2].OneofWrappers = []interface{}{ (*Command_ExecuteRequest)(nil), (*Command_QueryRequest)(nil), + (*Command_BackupRequest)(nil), } type x struct{} out := protoimpl.TypeBuilder{ @@ -550,7 +648,7 @@ func file_message_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_message_proto_rawDesc, NumEnums: 1, - NumMessages: 5, + NumMessages: 6, NumExtensions: 0, NumServices: 0, }, diff --git a/cluster/message.proto b/cluster/message.proto index 97159bf4..4390d636 100644 --- a/cluster/message.proto +++ b/cluster/message.proto @@ -20,12 +20,14 @@ message Command { COMMAND_TYPE_GET_NODE_API_URL = 1; COMMAND_TYPE_EXECUTE = 2; COMMAND_TYPE_QUERY = 3; + COMMAND_TYPE_BACKUP = 4; } Type type = 1; oneof request { command.ExecuteRequest execute_request = 2; command.QueryRequest query_request = 3; + command.BackupRequest backup_request = 5; } Credentials credentials = 4; @@ -40,3 +42,8 @@ message CommandQueryResponse { string error = 1; repeated command.QueryRows rows = 2; } + +message CommandBackupResponse { + string error = 1; + bytes data = 2; +} diff --git a/cluster/service.go b/cluster/service.go index 6ca73460..aba38267 100644 --- a/cluster/service.go +++ b/cluster/service.go @@ -1,6 +1,8 @@ package cluster import ( + "bytes" + "compress/gzip" "encoding/binary" "expvar" "fmt" @@ -25,6 +27,7 @@ const ( numGetNodeAPIResponse = "num_get_node_api_resp" numExecuteRequest = "num_execute_req" numQueryRequest = "num_query_req" + numBackupRequest = "num_backup_req" // Client stats for this package. numGetNodeAPIRequestLocal = "num_get_node_api_req_local" @@ -44,6 +47,7 @@ func init() { stats.Add(numGetNodeAPIResponse, 0) stats.Add(numExecuteRequest, 0) stats.Add(numQueryRequest, 0) + stats.Add(numBackupRequest, 0) stats.Add(numGetNodeAPIRequestLocal, 0) } @@ -62,6 +66,9 @@ type Database interface { // Query executes a slice of queries, each of which returns rows. Query(qr *command.QueryRequest) ([]*command.QueryRows, error) + + // Backup writes a backup of the database to the writer. + Backup(br *command.BackupRequest, dst io.Writer) error } // CredentialStore is the interface credential stores must support. @@ -293,6 +300,60 @@ func (s *Service) handleConn(conn net.Conn) { binary.LittleEndian.PutUint32(b[0:], uint32(len(p))) conn.Write(b) conn.Write(p) + + case Command_COMMAND_TYPE_BACKUP: + stats.Add(numBackupRequest, 1) + + resp := &CommandBackupResponse{} + + br := c.GetBackupRequest() + if br == nil { + resp.Error = "BackupRequest is nil" + } else if !s.checkCommandPerm(c, auth.PermBackup) { + resp.Error = "unauthorized" + } else { + buf := new(bytes.Buffer) + if err := s.db.Backup(br, buf); err != nil { + resp.Error = err.Error() + } else { + resp.Data = buf.Bytes() + } + } + p, err = proto.Marshal(resp) + if err != nil { + conn.Close() + return + } + + // Compress the backup for less space on the wire between nodes. + p, err = gzCompress(p) + if err != nil { + conn.Close() + return + } + + // Write length of Protobuf first, then write the actual Protobuf. + b = make([]byte, 4) + binary.LittleEndian.PutUint32(b[0:], uint32(len(p))) + conn.Write(b) + conn.Write(p) } } } + +// gzCompress compresses the given byte slice. +func gzCompress(b []byte) ([]byte, error) { + var buf bytes.Buffer + gzw, err := gzip.NewWriterLevel(&buf, gzip.BestCompression) + if err != nil { + return nil, fmt.Errorf("gzip new writer: %s", err) + } + + if _, err := gzw.Write(b); err != nil { + return nil, fmt.Errorf("gzip Write: %s", err) + } + if err := gzw.Close(); err != nil { + return nil, fmt.Errorf("gzip Close: %s", err) + } + return buf.Bytes(), nil +} diff --git a/cluster/service_database_test.go b/cluster/service_database_test.go index f435e9ad..6ba8890f 100644 --- a/cluster/service_database_test.go +++ b/cluster/service_database_test.go @@ -1,9 +1,11 @@ package cluster import ( + "bytes" "encoding/binary" "errors" "fmt" + "io" "os" "strings" "testing" @@ -258,6 +260,52 @@ func Test_ServiceQueryLarge(t *testing.T) { } } +func Test_ServiceBackup(t *testing.T) { + ln, mux := mustNewMux() + go mux.Serve() + tn := mux.Listen(1) // Could be any byte value. + db := mustNewMockDatabase() + cred := mustNewMockCredentialStore() + s := New(tn, db, cred) + if s == nil { + t.Fatalf("failed to create cluster service") + } + + c := NewClient(mustNewDialer(1, false, false), 30*time.Second) + + if err := s.Open(); err != nil { + t.Fatalf("failed to open cluster service: %s", err.Error()) + } + + // Ready for Backup tests now. + testData := []byte("this is SQLite data") + db.backupFn = func(br *command.BackupRequest, dst io.Writer) error { + if br.Format != command.BackupRequest_BACKUP_REQUEST_FORMAT_BINARY { + t.Fatalf("wrong backup format requested") + } + dst.Write(testData) + return nil + } + + buf := new(bytes.Buffer) + err := c.Backup(backupRequestBinary(true), s.Addr(), NO_CREDS, longWait, buf) + if err != nil { + t.Fatalf("failed to backup database: %s", err.Error()) + } + + if bytes.Compare(buf.Bytes(), testData) != 0 { + t.Fatalf("backup data is not as expected, exp: %s, got: %s", testData, buf.Bytes()) + } + + // Clean up resources. + if err := ln.Close(); err != nil { + t.Fatalf("failed to close Mux's listener: %s", err) + } + if err := s.Close(); err != nil { + t.Fatalf("failed to close cluster service") + } +} + // Test_BinaryEncoding_Backwards ensures that software earlier than v6.6.2 // can communicate with v6.6.2+ releases. v6.6.2 increased the maximum size // of cluster responses. @@ -314,6 +362,20 @@ func queryRequestFromStrings(s []string) *command.QueryRequest { } } +func backupRequestSQL(leader bool) *command.BackupRequest { + return &command.BackupRequest{ + Format: command.BackupRequest_BACKUP_REQUEST_FORMAT_SQL, + Leader: leader, + } +} + +func backupRequestBinary(leader bool) *command.BackupRequest { + return &command.BackupRequest{ + Format: command.BackupRequest_BACKUP_REQUEST_FORMAT_BINARY, + Leader: leader, + } +} + func asJSON(v interface{}) string { b, err := encoding.JSONMarshal(v) if err != nil { diff --git a/cluster/service_test.go b/cluster/service_test.go index 1d19b21f..62fdc6b9 100644 --- a/cluster/service_test.go +++ b/cluster/service_test.go @@ -3,6 +3,7 @@ package cluster import ( "crypto/tls" "fmt" + "io" "net" "os" "testing" @@ -310,6 +311,7 @@ func mustNewMockTLSTransport() *mockTransport { type mockDatabase struct { executeFn func(er *command.ExecuteRequest) ([]*command.ExecuteResult, error) queryFn func(qr *command.QueryRequest) ([]*command.QueryRows, error) + backupFn func(br *command.BackupRequest, dst io.Writer) error } func (m *mockDatabase) Execute(er *command.ExecuteRequest) ([]*command.ExecuteResult, error) { @@ -320,6 +322,13 @@ func (m *mockDatabase) Query(qr *command.QueryRequest) ([]*command.QueryRows, er return m.queryFn(qr) } +func (m *mockDatabase) Backup(br *command.BackupRequest, dst io.Writer) error { + if m.backupFn == nil { + return nil + } + return m.backupFn(br, dst) +} + func mustNewMockDatabase() *mockDatabase { e := func(er *command.ExecuteRequest) ([]*command.ExecuteResult, error) { return []*command.ExecuteResult{}, nil diff --git a/http/service.go b/http/service.go index a0ad0373..5e0156c4 100644 --- a/http/service.go +++ b/http/service.go @@ -94,6 +94,9 @@ type Cluster interface { // Query performs an Query Request on a remote node. Query(qr *command.QueryRequest, nodeAddr string, creds *cluster.Credentials, timeout time.Duration) ([]*command.QueryRows, error) + // Backup retrieves a backup from a remote node and writes to the io.Writer + Backup(br *command.BackupRequest, nodeAddr string, creds *cluster.Credentials, timeout time.Duration, w io.Writer) error + // Stats returns stats on the Cluster. Stats() (map[string]interface{}, error) } @@ -168,6 +171,7 @@ const ( numQueries = "queries" numRemoteExecutions = "remote_executions" numRemoteQueries = "remote_queries" + numRemoteBackups = "remote_backups" numReadyz = "num_readyz" numStatus = "num_status" numBackups = "backups" @@ -200,6 +204,7 @@ func init() { stats.Add(numQueries, 0) stats.Add(numRemoteExecutions, 0) stats.Add(numRemoteQueries, 0) + stats.Add(numRemoteBackups, 0) stats.Add(numReadyz, 0) stats.Add(numStatus, 0) stats.Add(numBackups, 0) @@ -565,30 +570,69 @@ func (s *Service) handleBackup(w http.ResponseWriter, r *http.Request) { http.Error(w, err.Error(), http.StatusInternalServerError) return } + redirect, err := isRedirect(r) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + format, err := backupFormat(w, r) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } - bf, err := backupFormat(w, r) + timeout, err := timeoutParam(r, defaultTimeout) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } br := &command.BackupRequest{ - Format: bf, + Format: format, Leader: !noLeader, } err = s.store.Backup(br, w) if err != nil { if err == store.ErrNotLeader { - leaderAPIAddr := s.LeaderAPIAddr() - if leaderAPIAddr == "" { + if redirect { + leaderAPIAddr := s.LeaderAPIAddr() + if leaderAPIAddr == "" { + stats.Add(numLeaderNotFound, 1) + http.Error(w, ErrLeaderNotFound.Error(), http.StatusServiceUnavailable) + return + } + + redirect := s.FormRedirect(r, leaderAPIAddr) + http.Redirect(w, r, redirect, http.StatusMovedPermanently) + return + } + + addr, err := s.store.LeaderAddr() + if err != nil { + http.Error(w, fmt.Sprintf("leader address: %s", err.Error()), + http.StatusInternalServerError) + return + } + if addr == "" { stats.Add(numLeaderNotFound, 1) http.Error(w, ErrLeaderNotFound.Error(), http.StatusServiceUnavailable) return } - redirect := s.FormRedirect(r, leaderAPIAddr) - http.Redirect(w, r, redirect, http.StatusMovedPermanently) + username, password, ok := r.BasicAuth() + if !ok { + username = "" + } + + backupErr := s.cluster.Backup(br, addr, makeCredentials(username, password), timeout, w) + if backupErr != nil && backupErr.Error() == "unauthorized" { + http.Error(w, "remote backup not authorized", http.StatusUnauthorized) + return + } + stats.Add(numRemoteBackups, 1) + w.Header().Add(ServedByHTTPHeader, addr) return } http.Error(w, err.Error(), http.StatusInternalServerError) diff --git a/http/service_test.go b/http/service_test.go index 32909a6f..2a2f5a62 100644 --- a/http/service_test.go +++ b/http/service_test.go @@ -579,7 +579,7 @@ func Test_BackupOK(t *testing.T) { } } -func Test_BackupFlagsNoLeader(t *testing.T) { +func Test_BackupFlagsNoLeaderRedirect(t *testing.T) { m := &MockStore{} c := &mockClusterService{ apiAddr: "http://1.2.3.4:999", @@ -602,7 +602,7 @@ func Test_BackupFlagsNoLeader(t *testing.T) { } host := fmt.Sprintf("http://%s", s.Addr().String()) - resp, err := client.Get(host + "/db/backup") + resp, err := client.Get(host + "/db/backup?redirect") if err != nil { t.Fatalf("failed to make backup request: %s", err.Error()) } @@ -611,6 +611,51 @@ func Test_BackupFlagsNoLeader(t *testing.T) { } } +func Test_BackupFlagsNoLeaderRemoteFetch(t *testing.T) { + m := &MockStore{ + leaderAddr: "foo:1234", + } + c := &mockClusterService{ + apiAddr: "http://1.2.3.4:999", + } + + s := New("127.0.0.1:0", m, c, nil) + + if err := s.Start(); err != nil { + t.Fatalf("failed to start service") + } + defer s.Close() + + m.backupFn = func(br *command.BackupRequest, dst io.Writer) error { + return store.ErrNotLeader + } + + backupData := "this is SQLite data" + c.backupFn = func(br *command.BackupRequest, addr string, t time.Duration, w io.Writer) error { + w.Write([]byte(backupData)) + return nil + } + + client := &http.Client{} + client.CheckRedirect = func(req *http.Request, via []*http.Request) error { + return http.ErrUseLastResponse + } + + host := fmt.Sprintf("http://%s", s.Addr().String()) + resp, err := client.Get(host + "/db/backup") + if err != nil { + t.Fatalf("failed to make backup request: %s", err.Error()) + } + if resp.StatusCode != http.StatusOK { + t.Fatalf("failed to get expected StatusOK for remote backup fetch, got %d", resp.StatusCode) + } + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + if exp, got := backupData, string(body); exp != got { + t.Fatalf("received incorrect backup data, exp: %s, got: %s", exp, got) + } +} + func Test_BackupFlagsNoLeaderOK(t *testing.T) { m := &MockStore{} c := &mockClusterService{ @@ -1057,6 +1102,7 @@ type mockClusterService struct { apiAddr string executeFn func(er *command.ExecuteRequest, addr string, t time.Duration) ([]*command.ExecuteResult, error) queryFn func(qr *command.QueryRequest, addr string, t time.Duration) ([]*command.QueryRows, error) + backupFn func(br *command.BackupRequest, addr string, t time.Duration, w io.Writer) error } func (m *mockClusterService) GetNodeAPIAddr(a string, t time.Duration) (string, error) { @@ -1077,6 +1123,13 @@ func (m *mockClusterService) Query(qr *command.QueryRequest, addr string, creds return nil, nil } +func (m *mockClusterService) Backup(br *command.BackupRequest, addr string, creds *cluster.Credentials, t time.Duration, w io.Writer) error { + if m.backupFn != nil { + return m.backupFn(br, addr, t, w) + } + return nil +} + type mockCredentialStore struct { HasPermOK bool aaFunc func(username, password, perm string) bool diff --git a/system_test/full_system_test.py b/system_test/full_system_test.py index f679cece..5719b462 100644 --- a/system_test/full_system_test.py +++ b/system_test/full_system_test.py @@ -1357,13 +1357,18 @@ class TestEndToEndBackupRestore(unittest.TestCase): fd, self.db_file = tempfile.mkstemp() os.close(fd) + # Create a two-node cluster. self.node0 = Node(RQLITED_PATH, '0') self.node0.start() self.node0.wait_for_leader() self.node0.execute('CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)') self.node0.execute('INSERT INTO foo(name) VALUES("fiona")') self.node0.wait_for_all_fsm() + self.node1 = Node(RQLITED_PATH, '1') + self.node1.start(join=self.node0.APIAddr()) + self.node1.wait_for_leader() + # Get a backup from the first node and check it. self.node0.backup(self.db_file) conn = sqlite3.connect(self.db_file) rows = conn.execute('SELECT * FROM foo').fetchall() @@ -1371,12 +1376,21 @@ class TestEndToEndBackupRestore(unittest.TestCase): self.assertEqual(rows[0], (1, 'fiona')) conn.close() - self.node1 = Node(RQLITED_PATH, '1') - self.node1.start() - self.node1.wait_for_leader() - j = self.node1.restore(self.db_file) + # Get a backup from the other node and check it too. + self.node1.backup(self.db_file) + conn = sqlite3.connect(self.db_file) + rows = conn.execute('SELECT * FROM foo').fetchall() + self.assertEqual(len(rows), 1) + self.assertEqual(rows[0], (1, 'fiona')) + conn.close() + + # Load file into a brand new node, check the data is right. + self.node2 = Node(RQLITED_PATH, '1') + self.node2.start() + self.node2.wait_for_leader() + j = self.node2.restore(self.db_file) self.assertEqual(j, d_("{'results': [{'last_insert_id': 1, 'rows_affected': 1}]}")) - j = self.node1.query('SELECT * FROM foo') + j = self.node2.query('SELECT * FROM foo') self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona']], 'types': ['integer', 'text'], 'columns': ['id', 'name']}]}")) def tearDown(self): @@ -1384,6 +1398,8 @@ class TestEndToEndBackupRestore(unittest.TestCase): deprovision_node(self.node0) if hasattr(self, 'node1'): deprovision_node(self.node1) + if hasattr(self, 'node2'): + deprovision_node(self.node2) os.remove(self.db_file) class TestEndToEndSnapRestoreSingle(unittest.TestCase):