1
0
Fork 0

Merge pull request #1081 from rqlite/backup-forwarding

Backup forwarding
master
Philip O'Toole 2 years ago committed by GitHub
commit e4f1de95bd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -1,4 +1,7 @@
## 7.8.0 (unreleased) ## 7.8.0 (unreleased)
### New features
- [PR #1081](https://github.com/rqlite/rqlite/pull/1081): Transparently forward Backup requests to Leaders.
### Implementation changes and bug fixes ### Implementation changes and bug fixes
- [PR #1079](https://github.com/rqlite/rqlite/pull/1079): Use a Protobuf model for Backup requests. - [PR #1079](https://github.com/rqlite/rqlite/pull/1079): Use a Protobuf model for Backup requests.
- [PR #1078](https://github.com/rqlite/rqlite/pull/1078): Decrease bootstrap polling interval from 5 seconds to 2 seconds. - [PR #1078](https://github.com/rqlite/rqlite/pull/1078): Decrease bootstrap polling interval from 5 seconds to 2 seconds.

@ -9,9 +9,11 @@ This command will write the SQLite database file to `bak.sqlite3`.
You can also access the rqlite API directly, via a HTTP `GET` request to the endpoint `/db/backup`. For example, using `curl`, and assuming the node is listening on `localhost:4001`, you could retrieve a backup as follows: You can also access the rqlite API directly, via a HTTP `GET` request to the endpoint `/db/backup`. For example, using `curl`, and assuming the node is listening on `localhost:4001`, you could retrieve a backup as follows:
```bash ```bash
curl -s -L -XGET localhost:4001/db/backup -o bak.sqlite3 curl -s -XGET localhost:4001/db/backup -o bak.sqlite3
``` ```
Note that if the node is not the Leader, a HTTP 301 response will be returned with the Leader's address. The `curl` command above will automatically follow the Leader, due to the presence of the option `-L`. Note that if the node is not the Leader, the node will transparently forward the request to Leader, wait for the backup data from the Leader, and return it to the client. If, instead, you want a backup of SQLite database of the actual node that receives the request, add `noleader` to the URL as a query parameter.
If you do not wish a Follower to transparently forward a backup request to a Leader, add `redirect` to the URL as a query parameter. In that case if a Follower receives a backup request the Follower will respond with [HTTP 301 Moved Permanently](https://en.wikipedia.org/wiki/HTTP_301) and include the address of the Leader as the `Location` header in the response. It is then up the clients to re-issue the command to the Leader.
In either case the generated file can then be used to restore a node (or cluster) using the [restore API](https://github.com/rqlite/rqlite/blob/master/DOC/RESTORE_FROM_SQLITE.md). In either case the generated file can then be used to restore a node (or cluster) using the [restore API](https://github.com/rqlite/rqlite/blob/master/DOC/RESTORE_FROM_SQLITE.md).
@ -27,6 +29,6 @@ curl -s -L -XGET localhost:4001/db/backup?fmt=sql -o bak.sql
``` ```
## Backup isolation level ## Backup isolation level
The isolation offered by backups is `READ COMMITTED`. This means that any changes due to transactions to the database, that take place during the backup, will be reflected immediately once the transaction is committed, but not before. The isolation offered by binary backups is `READ COMMITTED`. This means that any changes due to transactions to the database, that take place during the backup, will be reflected immediately once the transaction is committed, but not before.
See the [SQLite documentation](https://www.sqlite.org/isolation.html) for more details. See the [SQLite documentation](https://www.sqlite.org/isolation.html) for more details.

@ -1,6 +1,8 @@
package cluster package cluster
import ( import (
"bytes"
"compress/gzip"
"encoding/binary" "encoding/binary"
"errors" "errors"
"fmt" "fmt"
@ -293,6 +295,96 @@ func (c *Client) Query(qr *command.QueryRequest, nodeAddr string, creds *Credent
return a.Rows, nil return a.Rows, nil
} }
// Backup retrieves a backup from a remote node and writes to the io.Writer
func (c *Client) Backup(br *command.BackupRequest, nodeAddr string, creds *Credentials, timeout time.Duration, w io.Writer) error {
conn, err := c.dial(nodeAddr, c.timeout)
if err != nil {
return err
}
defer conn.Close()
// Send the request
command := &Command{
Type: Command_COMMAND_TYPE_BACKUP,
Request: &Command_BackupRequest{
BackupRequest: br,
},
Credentials: creds,
}
p, err := proto.Marshal(command)
if err != nil {
return fmt.Errorf("command marshal: %s", err)
}
// Write length of Protobuf
b := make([]byte, 4)
binary.LittleEndian.PutUint16(b[0:], uint16(len(p)))
if err := conn.SetDeadline(time.Now().Add(timeout)); err != nil {
handleConnError(conn)
return err
}
_, err = conn.Write(b)
if err != nil {
handleConnError(conn)
return fmt.Errorf("write protobuf length: %s", err)
}
// Now write backup request proto itself.
if err := conn.SetDeadline(time.Now().Add(timeout)); err != nil {
handleConnError(conn)
return err
}
_, err = conn.Write(p)
if err != nil {
handleConnError(conn)
return fmt.Errorf("write protobuf: %s", err)
}
// Read the backup response
if err := conn.SetDeadline(time.Now().Add(timeout)); err != nil {
handleConnError(conn)
return err
}
// Read length of response.
_, err = io.ReadFull(conn, b)
if err != nil {
handleConnError(conn)
return err
}
sz := binary.LittleEndian.Uint32(b[0:])
// Read in the actual response.
p = make([]byte, sz)
_, err = io.ReadFull(conn, p)
if err != nil {
handleConnError(conn)
return err
}
// Decompress....
p, err = gzUncompress(p)
if err != nil {
handleConnError(conn)
return fmt.Errorf("backup decompress: %s", err)
}
resp := &CommandBackupResponse{}
err = proto.Unmarshal(p, resp)
if err != nil {
return fmt.Errorf("backup unmarshal: %s", err)
}
if resp.Error != "" {
return errors.New(resp.Error)
}
if _, err := w.Write(resp.Data); err != nil {
return fmt.Errorf("backup write: %s", err)
}
return nil
}
// Stats returns stats on the Client instance // Stats returns stats on the Client instance
func (c *Client) Stats() (map[string]interface{}, error) { func (c *Client) Stats() (map[string]interface{}, error) {
c.mu.RLock() c.mu.RLock()
@ -364,3 +456,20 @@ func handleConnError(conn net.Conn) {
pc.MarkUnusable() pc.MarkUnusable()
} }
} }
func gzUncompress(b []byte) ([]byte, error) {
gz, err := gzip.NewReader(bytes.NewReader(b))
if err != nil {
return nil, fmt.Errorf("unmarshal gzip NewReader: %s", err)
}
ub, err := io.ReadAll(gz)
if err != nil {
return nil, fmt.Errorf("unmarshal gzip ReadAll: %s", err)
}
if err := gz.Close(); err != nil {
return nil, fmt.Errorf("unmarshal gzip Close: %s", err)
}
return ub, nil
}

@ -28,6 +28,7 @@ const (
Command_COMMAND_TYPE_GET_NODE_API_URL Command_Type = 1 Command_COMMAND_TYPE_GET_NODE_API_URL Command_Type = 1
Command_COMMAND_TYPE_EXECUTE Command_Type = 2 Command_COMMAND_TYPE_EXECUTE Command_Type = 2
Command_COMMAND_TYPE_QUERY Command_Type = 3 Command_COMMAND_TYPE_QUERY Command_Type = 3
Command_COMMAND_TYPE_BACKUP Command_Type = 4
) )
// Enum value maps for Command_Type. // Enum value maps for Command_Type.
@ -37,12 +38,14 @@ var (
1: "COMMAND_TYPE_GET_NODE_API_URL", 1: "COMMAND_TYPE_GET_NODE_API_URL",
2: "COMMAND_TYPE_EXECUTE", 2: "COMMAND_TYPE_EXECUTE",
3: "COMMAND_TYPE_QUERY", 3: "COMMAND_TYPE_QUERY",
4: "COMMAND_TYPE_BACKUP",
} }
Command_Type_value = map[string]int32{ Command_Type_value = map[string]int32{
"COMMAND_TYPE_UNKNOWN": 0, "COMMAND_TYPE_UNKNOWN": 0,
"COMMAND_TYPE_GET_NODE_API_URL": 1, "COMMAND_TYPE_GET_NODE_API_URL": 1,
"COMMAND_TYPE_EXECUTE": 2, "COMMAND_TYPE_EXECUTE": 2,
"COMMAND_TYPE_QUERY": 3, "COMMAND_TYPE_QUERY": 3,
"COMMAND_TYPE_BACKUP": 4,
} }
) )
@ -184,6 +187,7 @@ type Command struct {
// Types that are assignable to Request: // Types that are assignable to Request:
// *Command_ExecuteRequest // *Command_ExecuteRequest
// *Command_QueryRequest // *Command_QueryRequest
// *Command_BackupRequest
Request isCommand_Request `protobuf_oneof:"request"` Request isCommand_Request `protobuf_oneof:"request"`
Credentials *Credentials `protobuf:"bytes,4,opt,name=credentials,proto3" json:"credentials,omitempty"` Credentials *Credentials `protobuf:"bytes,4,opt,name=credentials,proto3" json:"credentials,omitempty"`
} }
@ -248,6 +252,13 @@ func (x *Command) GetQueryRequest() *command.QueryRequest {
return nil return nil
} }
func (x *Command) GetBackupRequest() *command.BackupRequest {
if x, ok := x.GetRequest().(*Command_BackupRequest); ok {
return x.BackupRequest
}
return nil
}
func (x *Command) GetCredentials() *Credentials { func (x *Command) GetCredentials() *Credentials {
if x != nil { if x != nil {
return x.Credentials return x.Credentials
@ -267,10 +278,16 @@ type Command_QueryRequest struct {
QueryRequest *command.QueryRequest `protobuf:"bytes,3,opt,name=query_request,json=queryRequest,proto3,oneof"` QueryRequest *command.QueryRequest `protobuf:"bytes,3,opt,name=query_request,json=queryRequest,proto3,oneof"`
} }
type Command_BackupRequest struct {
BackupRequest *command.BackupRequest `protobuf:"bytes,5,opt,name=backup_request,json=backupRequest,proto3,oneof"`
}
func (*Command_ExecuteRequest) isCommand_Request() {} func (*Command_ExecuteRequest) isCommand_Request() {}
func (*Command_QueryRequest) isCommand_Request() {} func (*Command_QueryRequest) isCommand_Request() {}
func (*Command_BackupRequest) isCommand_Request() {}
type CommandExecuteResponse struct { type CommandExecuteResponse struct {
state protoimpl.MessageState state protoimpl.MessageState
sizeCache protoimpl.SizeCache sizeCache protoimpl.SizeCache
@ -381,6 +398,61 @@ func (x *CommandQueryResponse) GetRows() []*command.QueryRows {
return nil return nil
} }
type CommandBackupResponse struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Error string `protobuf:"bytes,1,opt,name=error,proto3" json:"error,omitempty"`
Data []byte `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"`
}
func (x *CommandBackupResponse) Reset() {
*x = CommandBackupResponse{}
if protoimpl.UnsafeEnabled {
mi := &file_message_proto_msgTypes[5]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *CommandBackupResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*CommandBackupResponse) ProtoMessage() {}
func (x *CommandBackupResponse) ProtoReflect() protoreflect.Message {
mi := &file_message_proto_msgTypes[5]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use CommandBackupResponse.ProtoReflect.Descriptor instead.
func (*CommandBackupResponse) Descriptor() ([]byte, []int) {
return file_message_proto_rawDescGZIP(), []int{5}
}
func (x *CommandBackupResponse) GetError() string {
if x != nil {
return x.Error
}
return ""
}
func (x *CommandBackupResponse) GetData() []byte {
if x != nil {
return x.Data
}
return nil
}
var File_message_proto protoreflect.FileDescriptor var File_message_proto protoreflect.FileDescriptor
var file_message_proto_rawDesc = []byte{ var file_message_proto_rawDesc = []byte{
@ -393,7 +465,7 @@ var file_message_proto_rawDesc = []byte{
0x73, 0x73, 0x77, 0x6f, 0x72, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x70, 0x61, 0x73, 0x73, 0x77, 0x6f, 0x72, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x70, 0x61,
0x73, 0x73, 0x77, 0x6f, 0x72, 0x64, 0x22, 0x1b, 0x0a, 0x07, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x73, 0x77, 0x6f, 0x72, 0x64, 0x22, 0x1b, 0x0a, 0x07, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73,
0x73, 0x12, 0x10, 0x0a, 0x03, 0x75, 0x72, 0x6c, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x73, 0x12, 0x10, 0x0a, 0x03, 0x75, 0x72, 0x6c, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03,
0x75, 0x72, 0x6c, 0x22, 0xf0, 0x02, 0x0a, 0x07, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x12, 0x75, 0x72, 0x6c, 0x22, 0xcb, 0x03, 0x0a, 0x07, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x12,
0x29, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x15, 0x2e, 0x29, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x15, 0x2e,
0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e,
0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x42, 0x0a, 0x0f, 0x65, 0x78, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x42, 0x0a, 0x0f, 0x65, 0x78,
@ -404,33 +476,43 @@ var file_message_proto_rawDesc = []byte{
0x0a, 0x0d, 0x71, 0x75, 0x65, 0x72, 0x79, 0x5f, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x18, 0x0a, 0x0d, 0x71, 0x75, 0x65, 0x72, 0x79, 0x5f, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x18,
0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x15, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x15, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e,
0x51, 0x75, 0x65, 0x72, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x48, 0x00, 0x52, 0x0c, 0x51, 0x75, 0x65, 0x72, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x48, 0x00, 0x52, 0x0c,
0x71, 0x75, 0x65, 0x72, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x36, 0x0a, 0x0b, 0x71, 0x75, 0x65, 0x72, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x3f, 0x0a, 0x0e,
0x63, 0x72, 0x65, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x61, 0x6c, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x62, 0x61, 0x63, 0x6b, 0x75, 0x70, 0x5f, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x18, 0x05,
0x0b, 0x32, 0x14, 0x2e, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x2e, 0x43, 0x72, 0x65, 0x64, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x16, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e, 0x42,
0x65, 0x6e, 0x74, 0x69, 0x61, 0x6c, 0x73, 0x52, 0x0b, 0x63, 0x72, 0x65, 0x64, 0x65, 0x6e, 0x74, 0x61, 0x63, 0x6b, 0x75, 0x70, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x48, 0x00, 0x52, 0x0d,
0x69, 0x61, 0x6c, 0x73, 0x22, 0x75, 0x0a, 0x04, 0x54, 0x79, 0x70, 0x65, 0x12, 0x18, 0x0a, 0x14, 0x62, 0x61, 0x63, 0x6b, 0x75, 0x70, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x36, 0x0a,
0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x55, 0x4e, 0x4b, 0x0b, 0x63, 0x72, 0x65, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x61, 0x6c, 0x73, 0x18, 0x04, 0x20, 0x01,
0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x21, 0x0a, 0x1d, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x2e, 0x43, 0x72, 0x65,
0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x47, 0x45, 0x54, 0x5f, 0x4e, 0x4f, 0x44, 0x45, 0x5f, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x61, 0x6c, 0x73, 0x52, 0x0b, 0x63, 0x72, 0x65, 0x64, 0x65, 0x6e,
0x41, 0x50, 0x49, 0x5f, 0x55, 0x52, 0x4c, 0x10, 0x01, 0x12, 0x18, 0x0a, 0x14, 0x43, 0x4f, 0x4d, 0x74, 0x69, 0x61, 0x6c, 0x73, 0x22, 0x8e, 0x01, 0x0a, 0x04, 0x54, 0x79, 0x70, 0x65, 0x12, 0x18,
0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x45, 0x58, 0x45, 0x43, 0x55, 0x54, 0x0a, 0x14, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x55,
0x45, 0x10, 0x02, 0x12, 0x16, 0x0a, 0x12, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x21, 0x0a, 0x1d, 0x43, 0x4f, 0x4d, 0x4d,
0x59, 0x50, 0x45, 0x5f, 0x51, 0x55, 0x45, 0x52, 0x59, 0x10, 0x03, 0x42, 0x09, 0x0a, 0x07, 0x72, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x47, 0x45, 0x54, 0x5f, 0x4e, 0x4f, 0x44,
0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x60, 0x0a, 0x16, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x45, 0x5f, 0x41, 0x50, 0x49, 0x5f, 0x55, 0x52, 0x4c, 0x10, 0x01, 0x12, 0x18, 0x0a, 0x14, 0x43,
0x64, 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x45, 0x58, 0x45, 0x43,
0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x55, 0x54, 0x45, 0x10, 0x02, 0x12, 0x16, 0x0a, 0x12, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44,
0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x30, 0x0a, 0x07, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x51, 0x55, 0x45, 0x52, 0x59, 0x10, 0x03, 0x12, 0x17, 0x0a,
0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x16, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x13, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x42, 0x41,
0x64, 0x2e, 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, 0x65, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x52, 0x43, 0x4b, 0x55, 0x50, 0x10, 0x04, 0x42, 0x09, 0x0a, 0x07, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73,
0x07, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x73, 0x22, 0x54, 0x0a, 0x14, 0x43, 0x6f, 0x6d, 0x6d, 0x74, 0x22, 0x60, 0x0a, 0x16, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x45, 0x78, 0x65, 0x63,
0x61, 0x6e, 0x64, 0x51, 0x75, 0x65, 0x72, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x75, 0x74, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x65,
0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f,
0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x26, 0x0a, 0x04, 0x72, 0x6f, 0x77, 0x73, 0x18, 0x02, 0x72, 0x12, 0x30, 0x0a, 0x07, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x73, 0x18, 0x02, 0x20, 0x03,
0x20, 0x03, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e, 0x51, 0x28, 0x0b, 0x32, 0x16, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e, 0x45, 0x78, 0x65,
0x75, 0x65, 0x72, 0x79, 0x52, 0x6f, 0x77, 0x73, 0x52, 0x04, 0x72, 0x6f, 0x77, 0x73, 0x42, 0x22, 0x63, 0x75, 0x74, 0x65, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x52, 0x07, 0x72, 0x65, 0x73, 0x75,
0x5a, 0x20, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x72, 0x71, 0x6c, 0x6c, 0x74, 0x73, 0x22, 0x54, 0x0a, 0x14, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x51, 0x75,
0x69, 0x74, 0x65, 0x2f, 0x72, 0x71, 0x6c, 0x69, 0x74, 0x65, 0x2f, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x65,
0x65, 0x72, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f,
0x72, 0x12, 0x26, 0x0a, 0x04, 0x72, 0x6f, 0x77, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32,
0x12, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e, 0x51, 0x75, 0x65, 0x72, 0x79, 0x52,
0x6f, 0x77, 0x73, 0x52, 0x04, 0x72, 0x6f, 0x77, 0x73, 0x22, 0x41, 0x0a, 0x15, 0x43, 0x6f, 0x6d,
0x6d, 0x61, 0x6e, 0x64, 0x42, 0x61, 0x63, 0x6b, 0x75, 0x70, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e,
0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28,
0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61,
0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x42, 0x22, 0x5a, 0x20,
0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x72, 0x71, 0x6c, 0x69, 0x74,
0x65, 0x2f, 0x72, 0x71, 0x6c, 0x69, 0x74, 0x65, 0x2f, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72,
0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
} }
var ( var (
@ -446,7 +528,7 @@ func file_message_proto_rawDescGZIP() []byte {
} }
var file_message_proto_enumTypes = make([]protoimpl.EnumInfo, 1) var file_message_proto_enumTypes = make([]protoimpl.EnumInfo, 1)
var file_message_proto_msgTypes = make([]protoimpl.MessageInfo, 5) var file_message_proto_msgTypes = make([]protoimpl.MessageInfo, 6)
var file_message_proto_goTypes = []interface{}{ var file_message_proto_goTypes = []interface{}{
(Command_Type)(0), // 0: cluster.Command.Type (Command_Type)(0), // 0: cluster.Command.Type
(*Credentials)(nil), // 1: cluster.Credentials (*Credentials)(nil), // 1: cluster.Credentials
@ -454,23 +536,26 @@ var file_message_proto_goTypes = []interface{}{
(*Command)(nil), // 3: cluster.Command (*Command)(nil), // 3: cluster.Command
(*CommandExecuteResponse)(nil), // 4: cluster.CommandExecuteResponse (*CommandExecuteResponse)(nil), // 4: cluster.CommandExecuteResponse
(*CommandQueryResponse)(nil), // 5: cluster.CommandQueryResponse (*CommandQueryResponse)(nil), // 5: cluster.CommandQueryResponse
(*command.ExecuteRequest)(nil), // 6: command.ExecuteRequest (*CommandBackupResponse)(nil), // 6: cluster.CommandBackupResponse
(*command.QueryRequest)(nil), // 7: command.QueryRequest (*command.ExecuteRequest)(nil), // 7: command.ExecuteRequest
(*command.ExecuteResult)(nil), // 8: command.ExecuteResult (*command.QueryRequest)(nil), // 8: command.QueryRequest
(*command.QueryRows)(nil), // 9: command.QueryRows (*command.BackupRequest)(nil), // 9: command.BackupRequest
(*command.ExecuteResult)(nil), // 10: command.ExecuteResult
(*command.QueryRows)(nil), // 11: command.QueryRows
} }
var file_message_proto_depIdxs = []int32{ var file_message_proto_depIdxs = []int32{
0, // 0: cluster.Command.type:type_name -> cluster.Command.Type 0, // 0: cluster.Command.type:type_name -> cluster.Command.Type
6, // 1: cluster.Command.execute_request:type_name -> command.ExecuteRequest 7, // 1: cluster.Command.execute_request:type_name -> command.ExecuteRequest
7, // 2: cluster.Command.query_request:type_name -> command.QueryRequest 8, // 2: cluster.Command.query_request:type_name -> command.QueryRequest
1, // 3: cluster.Command.credentials:type_name -> cluster.Credentials 9, // 3: cluster.Command.backup_request:type_name -> command.BackupRequest
8, // 4: cluster.CommandExecuteResponse.results:type_name -> command.ExecuteResult 1, // 4: cluster.Command.credentials:type_name -> cluster.Credentials
9, // 5: cluster.CommandQueryResponse.rows:type_name -> command.QueryRows 10, // 5: cluster.CommandExecuteResponse.results:type_name -> command.ExecuteResult
6, // [6:6] is the sub-list for method output_type 11, // 6: cluster.CommandQueryResponse.rows:type_name -> command.QueryRows
6, // [6:6] is the sub-list for method input_type 7, // [7:7] is the sub-list for method output_type
6, // [6:6] is the sub-list for extension type_name 7, // [7:7] is the sub-list for method input_type
6, // [6:6] is the sub-list for extension extendee 7, // [7:7] is the sub-list for extension type_name
0, // [0:6] is the sub-list for field type_name 7, // [7:7] is the sub-list for extension extendee
0, // [0:7] is the sub-list for field type_name
} }
func init() { file_message_proto_init() } func init() { file_message_proto_init() }
@ -539,10 +624,23 @@ func file_message_proto_init() {
return nil return nil
} }
} }
file_message_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*CommandBackupResponse); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
} }
file_message_proto_msgTypes[2].OneofWrappers = []interface{}{ file_message_proto_msgTypes[2].OneofWrappers = []interface{}{
(*Command_ExecuteRequest)(nil), (*Command_ExecuteRequest)(nil),
(*Command_QueryRequest)(nil), (*Command_QueryRequest)(nil),
(*Command_BackupRequest)(nil),
} }
type x struct{} type x struct{}
out := protoimpl.TypeBuilder{ out := protoimpl.TypeBuilder{
@ -550,7 +648,7 @@ func file_message_proto_init() {
GoPackagePath: reflect.TypeOf(x{}).PkgPath(), GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_message_proto_rawDesc, RawDescriptor: file_message_proto_rawDesc,
NumEnums: 1, NumEnums: 1,
NumMessages: 5, NumMessages: 6,
NumExtensions: 0, NumExtensions: 0,
NumServices: 0, NumServices: 0,
}, },

@ -20,12 +20,14 @@ message Command {
COMMAND_TYPE_GET_NODE_API_URL = 1; COMMAND_TYPE_GET_NODE_API_URL = 1;
COMMAND_TYPE_EXECUTE = 2; COMMAND_TYPE_EXECUTE = 2;
COMMAND_TYPE_QUERY = 3; COMMAND_TYPE_QUERY = 3;
COMMAND_TYPE_BACKUP = 4;
} }
Type type = 1; Type type = 1;
oneof request { oneof request {
command.ExecuteRequest execute_request = 2; command.ExecuteRequest execute_request = 2;
command.QueryRequest query_request = 3; command.QueryRequest query_request = 3;
command.BackupRequest backup_request = 5;
} }
Credentials credentials = 4; Credentials credentials = 4;
@ -40,3 +42,8 @@ message CommandQueryResponse {
string error = 1; string error = 1;
repeated command.QueryRows rows = 2; repeated command.QueryRows rows = 2;
} }
message CommandBackupResponse {
string error = 1;
bytes data = 2;
}

@ -1,6 +1,8 @@
package cluster package cluster
import ( import (
"bytes"
"compress/gzip"
"encoding/binary" "encoding/binary"
"expvar" "expvar"
"fmt" "fmt"
@ -25,6 +27,7 @@ const (
numGetNodeAPIResponse = "num_get_node_api_resp" numGetNodeAPIResponse = "num_get_node_api_resp"
numExecuteRequest = "num_execute_req" numExecuteRequest = "num_execute_req"
numQueryRequest = "num_query_req" numQueryRequest = "num_query_req"
numBackupRequest = "num_backup_req"
// Client stats for this package. // Client stats for this package.
numGetNodeAPIRequestLocal = "num_get_node_api_req_local" numGetNodeAPIRequestLocal = "num_get_node_api_req_local"
@ -44,6 +47,7 @@ func init() {
stats.Add(numGetNodeAPIResponse, 0) stats.Add(numGetNodeAPIResponse, 0)
stats.Add(numExecuteRequest, 0) stats.Add(numExecuteRequest, 0)
stats.Add(numQueryRequest, 0) stats.Add(numQueryRequest, 0)
stats.Add(numBackupRequest, 0)
stats.Add(numGetNodeAPIRequestLocal, 0) stats.Add(numGetNodeAPIRequestLocal, 0)
} }
@ -62,6 +66,9 @@ type Database interface {
// Query executes a slice of queries, each of which returns rows. // Query executes a slice of queries, each of which returns rows.
Query(qr *command.QueryRequest) ([]*command.QueryRows, error) Query(qr *command.QueryRequest) ([]*command.QueryRows, error)
// Backup writes a backup of the database to the writer.
Backup(br *command.BackupRequest, dst io.Writer) error
} }
// CredentialStore is the interface credential stores must support. // CredentialStore is the interface credential stores must support.
@ -293,6 +300,60 @@ func (s *Service) handleConn(conn net.Conn) {
binary.LittleEndian.PutUint32(b[0:], uint32(len(p))) binary.LittleEndian.PutUint32(b[0:], uint32(len(p)))
conn.Write(b) conn.Write(b)
conn.Write(p) conn.Write(p)
case Command_COMMAND_TYPE_BACKUP:
stats.Add(numBackupRequest, 1)
resp := &CommandBackupResponse{}
br := c.GetBackupRequest()
if br == nil {
resp.Error = "BackupRequest is nil"
} else if !s.checkCommandPerm(c, auth.PermBackup) {
resp.Error = "unauthorized"
} else {
buf := new(bytes.Buffer)
if err := s.db.Backup(br, buf); err != nil {
resp.Error = err.Error()
} else {
resp.Data = buf.Bytes()
}
}
p, err = proto.Marshal(resp)
if err != nil {
conn.Close()
return
}
// Compress the backup for less space on the wire between nodes.
p, err = gzCompress(p)
if err != nil {
conn.Close()
return
}
// Write length of Protobuf first, then write the actual Protobuf.
b = make([]byte, 4)
binary.LittleEndian.PutUint32(b[0:], uint32(len(p)))
conn.Write(b)
conn.Write(p)
} }
} }
} }
// gzCompress compresses the given byte slice.
func gzCompress(b []byte) ([]byte, error) {
var buf bytes.Buffer
gzw, err := gzip.NewWriterLevel(&buf, gzip.BestCompression)
if err != nil {
return nil, fmt.Errorf("gzip new writer: %s", err)
}
if _, err := gzw.Write(b); err != nil {
return nil, fmt.Errorf("gzip Write: %s", err)
}
if err := gzw.Close(); err != nil {
return nil, fmt.Errorf("gzip Close: %s", err)
}
return buf.Bytes(), nil
}

@ -1,9 +1,11 @@
package cluster package cluster
import ( import (
"bytes"
"encoding/binary" "encoding/binary"
"errors" "errors"
"fmt" "fmt"
"io"
"os" "os"
"strings" "strings"
"testing" "testing"
@ -258,6 +260,52 @@ func Test_ServiceQueryLarge(t *testing.T) {
} }
} }
func Test_ServiceBackup(t *testing.T) {
ln, mux := mustNewMux()
go mux.Serve()
tn := mux.Listen(1) // Could be any byte value.
db := mustNewMockDatabase()
cred := mustNewMockCredentialStore()
s := New(tn, db, cred)
if s == nil {
t.Fatalf("failed to create cluster service")
}
c := NewClient(mustNewDialer(1, false, false), 30*time.Second)
if err := s.Open(); err != nil {
t.Fatalf("failed to open cluster service: %s", err.Error())
}
// Ready for Backup tests now.
testData := []byte("this is SQLite data")
db.backupFn = func(br *command.BackupRequest, dst io.Writer) error {
if br.Format != command.BackupRequest_BACKUP_REQUEST_FORMAT_BINARY {
t.Fatalf("wrong backup format requested")
}
dst.Write(testData)
return nil
}
buf := new(bytes.Buffer)
err := c.Backup(backupRequestBinary(true), s.Addr(), NO_CREDS, longWait, buf)
if err != nil {
t.Fatalf("failed to backup database: %s", err.Error())
}
if bytes.Compare(buf.Bytes(), testData) != 0 {
t.Fatalf("backup data is not as expected, exp: %s, got: %s", testData, buf.Bytes())
}
// Clean up resources.
if err := ln.Close(); err != nil {
t.Fatalf("failed to close Mux's listener: %s", err)
}
if err := s.Close(); err != nil {
t.Fatalf("failed to close cluster service")
}
}
// Test_BinaryEncoding_Backwards ensures that software earlier than v6.6.2 // Test_BinaryEncoding_Backwards ensures that software earlier than v6.6.2
// can communicate with v6.6.2+ releases. v6.6.2 increased the maximum size // can communicate with v6.6.2+ releases. v6.6.2 increased the maximum size
// of cluster responses. // of cluster responses.
@ -314,6 +362,20 @@ func queryRequestFromStrings(s []string) *command.QueryRequest {
} }
} }
func backupRequestSQL(leader bool) *command.BackupRequest {
return &command.BackupRequest{
Format: command.BackupRequest_BACKUP_REQUEST_FORMAT_SQL,
Leader: leader,
}
}
func backupRequestBinary(leader bool) *command.BackupRequest {
return &command.BackupRequest{
Format: command.BackupRequest_BACKUP_REQUEST_FORMAT_BINARY,
Leader: leader,
}
}
func asJSON(v interface{}) string { func asJSON(v interface{}) string {
b, err := encoding.JSONMarshal(v) b, err := encoding.JSONMarshal(v)
if err != nil { if err != nil {

@ -3,6 +3,7 @@ package cluster
import ( import (
"crypto/tls" "crypto/tls"
"fmt" "fmt"
"io"
"net" "net"
"os" "os"
"testing" "testing"
@ -310,6 +311,7 @@ func mustNewMockTLSTransport() *mockTransport {
type mockDatabase struct { type mockDatabase struct {
executeFn func(er *command.ExecuteRequest) ([]*command.ExecuteResult, error) executeFn func(er *command.ExecuteRequest) ([]*command.ExecuteResult, error)
queryFn func(qr *command.QueryRequest) ([]*command.QueryRows, error) queryFn func(qr *command.QueryRequest) ([]*command.QueryRows, error)
backupFn func(br *command.BackupRequest, dst io.Writer) error
} }
func (m *mockDatabase) Execute(er *command.ExecuteRequest) ([]*command.ExecuteResult, error) { func (m *mockDatabase) Execute(er *command.ExecuteRequest) ([]*command.ExecuteResult, error) {
@ -320,6 +322,13 @@ func (m *mockDatabase) Query(qr *command.QueryRequest) ([]*command.QueryRows, er
return m.queryFn(qr) return m.queryFn(qr)
} }
func (m *mockDatabase) Backup(br *command.BackupRequest, dst io.Writer) error {
if m.backupFn == nil {
return nil
}
return m.backupFn(br, dst)
}
func mustNewMockDatabase() *mockDatabase { func mustNewMockDatabase() *mockDatabase {
e := func(er *command.ExecuteRequest) ([]*command.ExecuteResult, error) { e := func(er *command.ExecuteRequest) ([]*command.ExecuteResult, error) {
return []*command.ExecuteResult{}, nil return []*command.ExecuteResult{}, nil

@ -94,6 +94,9 @@ type Cluster interface {
// Query performs an Query Request on a remote node. // Query performs an Query Request on a remote node.
Query(qr *command.QueryRequest, nodeAddr string, creds *cluster.Credentials, timeout time.Duration) ([]*command.QueryRows, error) Query(qr *command.QueryRequest, nodeAddr string, creds *cluster.Credentials, timeout time.Duration) ([]*command.QueryRows, error)
// Backup retrieves a backup from a remote node and writes to the io.Writer
Backup(br *command.BackupRequest, nodeAddr string, creds *cluster.Credentials, timeout time.Duration, w io.Writer) error
// Stats returns stats on the Cluster. // Stats returns stats on the Cluster.
Stats() (map[string]interface{}, error) Stats() (map[string]interface{}, error)
} }
@ -168,6 +171,7 @@ const (
numQueries = "queries" numQueries = "queries"
numRemoteExecutions = "remote_executions" numRemoteExecutions = "remote_executions"
numRemoteQueries = "remote_queries" numRemoteQueries = "remote_queries"
numRemoteBackups = "remote_backups"
numReadyz = "num_readyz" numReadyz = "num_readyz"
numStatus = "num_status" numStatus = "num_status"
numBackups = "backups" numBackups = "backups"
@ -200,6 +204,7 @@ func init() {
stats.Add(numQueries, 0) stats.Add(numQueries, 0)
stats.Add(numRemoteExecutions, 0) stats.Add(numRemoteExecutions, 0)
stats.Add(numRemoteQueries, 0) stats.Add(numRemoteQueries, 0)
stats.Add(numRemoteBackups, 0)
stats.Add(numReadyz, 0) stats.Add(numReadyz, 0)
stats.Add(numStatus, 0) stats.Add(numStatus, 0)
stats.Add(numBackups, 0) stats.Add(numBackups, 0)
@ -565,30 +570,69 @@ func (s *Service) handleBackup(w http.ResponseWriter, r *http.Request) {
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
return return
} }
redirect, err := isRedirect(r)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
format, err := backupFormat(w, r)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
bf, err := backupFormat(w, r) timeout, err := timeoutParam(r, defaultTimeout)
if err != nil { if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
return return
} }
br := &command.BackupRequest{ br := &command.BackupRequest{
Format: bf, Format: format,
Leader: !noLeader, Leader: !noLeader,
} }
err = s.store.Backup(br, w) err = s.store.Backup(br, w)
if err != nil { if err != nil {
if err == store.ErrNotLeader { if err == store.ErrNotLeader {
leaderAPIAddr := s.LeaderAPIAddr() if redirect {
if leaderAPIAddr == "" { leaderAPIAddr := s.LeaderAPIAddr()
if leaderAPIAddr == "" {
stats.Add(numLeaderNotFound, 1)
http.Error(w, ErrLeaderNotFound.Error(), http.StatusServiceUnavailable)
return
}
redirect := s.FormRedirect(r, leaderAPIAddr)
http.Redirect(w, r, redirect, http.StatusMovedPermanently)
return
}
addr, err := s.store.LeaderAddr()
if err != nil {
http.Error(w, fmt.Sprintf("leader address: %s", err.Error()),
http.StatusInternalServerError)
return
}
if addr == "" {
stats.Add(numLeaderNotFound, 1) stats.Add(numLeaderNotFound, 1)
http.Error(w, ErrLeaderNotFound.Error(), http.StatusServiceUnavailable) http.Error(w, ErrLeaderNotFound.Error(), http.StatusServiceUnavailable)
return return
} }
redirect := s.FormRedirect(r, leaderAPIAddr) username, password, ok := r.BasicAuth()
http.Redirect(w, r, redirect, http.StatusMovedPermanently) if !ok {
username = ""
}
backupErr := s.cluster.Backup(br, addr, makeCredentials(username, password), timeout, w)
if backupErr != nil && backupErr.Error() == "unauthorized" {
http.Error(w, "remote backup not authorized", http.StatusUnauthorized)
return
}
stats.Add(numRemoteBackups, 1)
w.Header().Add(ServedByHTTPHeader, addr)
return return
} }
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)

@ -579,7 +579,7 @@ func Test_BackupOK(t *testing.T) {
} }
} }
func Test_BackupFlagsNoLeader(t *testing.T) { func Test_BackupFlagsNoLeaderRedirect(t *testing.T) {
m := &MockStore{} m := &MockStore{}
c := &mockClusterService{ c := &mockClusterService{
apiAddr: "http://1.2.3.4:999", apiAddr: "http://1.2.3.4:999",
@ -602,7 +602,7 @@ func Test_BackupFlagsNoLeader(t *testing.T) {
} }
host := fmt.Sprintf("http://%s", s.Addr().String()) host := fmt.Sprintf("http://%s", s.Addr().String())
resp, err := client.Get(host + "/db/backup") resp, err := client.Get(host + "/db/backup?redirect")
if err != nil { if err != nil {
t.Fatalf("failed to make backup request: %s", err.Error()) t.Fatalf("failed to make backup request: %s", err.Error())
} }
@ -611,6 +611,51 @@ func Test_BackupFlagsNoLeader(t *testing.T) {
} }
} }
func Test_BackupFlagsNoLeaderRemoteFetch(t *testing.T) {
m := &MockStore{
leaderAddr: "foo:1234",
}
c := &mockClusterService{
apiAddr: "http://1.2.3.4:999",
}
s := New("127.0.0.1:0", m, c, nil)
if err := s.Start(); err != nil {
t.Fatalf("failed to start service")
}
defer s.Close()
m.backupFn = func(br *command.BackupRequest, dst io.Writer) error {
return store.ErrNotLeader
}
backupData := "this is SQLite data"
c.backupFn = func(br *command.BackupRequest, addr string, t time.Duration, w io.Writer) error {
w.Write([]byte(backupData))
return nil
}
client := &http.Client{}
client.CheckRedirect = func(req *http.Request, via []*http.Request) error {
return http.ErrUseLastResponse
}
host := fmt.Sprintf("http://%s", s.Addr().String())
resp, err := client.Get(host + "/db/backup")
if err != nil {
t.Fatalf("failed to make backup request: %s", err.Error())
}
if resp.StatusCode != http.StatusOK {
t.Fatalf("failed to get expected StatusOK for remote backup fetch, got %d", resp.StatusCode)
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if exp, got := backupData, string(body); exp != got {
t.Fatalf("received incorrect backup data, exp: %s, got: %s", exp, got)
}
}
func Test_BackupFlagsNoLeaderOK(t *testing.T) { func Test_BackupFlagsNoLeaderOK(t *testing.T) {
m := &MockStore{} m := &MockStore{}
c := &mockClusterService{ c := &mockClusterService{
@ -1057,6 +1102,7 @@ type mockClusterService struct {
apiAddr string apiAddr string
executeFn func(er *command.ExecuteRequest, addr string, t time.Duration) ([]*command.ExecuteResult, error) executeFn func(er *command.ExecuteRequest, addr string, t time.Duration) ([]*command.ExecuteResult, error)
queryFn func(qr *command.QueryRequest, addr string, t time.Duration) ([]*command.QueryRows, error) queryFn func(qr *command.QueryRequest, addr string, t time.Duration) ([]*command.QueryRows, error)
backupFn func(br *command.BackupRequest, addr string, t time.Duration, w io.Writer) error
} }
func (m *mockClusterService) GetNodeAPIAddr(a string, t time.Duration) (string, error) { func (m *mockClusterService) GetNodeAPIAddr(a string, t time.Duration) (string, error) {
@ -1077,6 +1123,13 @@ func (m *mockClusterService) Query(qr *command.QueryRequest, addr string, creds
return nil, nil return nil, nil
} }
func (m *mockClusterService) Backup(br *command.BackupRequest, addr string, creds *cluster.Credentials, t time.Duration, w io.Writer) error {
if m.backupFn != nil {
return m.backupFn(br, addr, t, w)
}
return nil
}
type mockCredentialStore struct { type mockCredentialStore struct {
HasPermOK bool HasPermOK bool
aaFunc func(username, password, perm string) bool aaFunc func(username, password, perm string) bool

@ -1357,13 +1357,18 @@ class TestEndToEndBackupRestore(unittest.TestCase):
fd, self.db_file = tempfile.mkstemp() fd, self.db_file = tempfile.mkstemp()
os.close(fd) os.close(fd)
# Create a two-node cluster.
self.node0 = Node(RQLITED_PATH, '0') self.node0 = Node(RQLITED_PATH, '0')
self.node0.start() self.node0.start()
self.node0.wait_for_leader() self.node0.wait_for_leader()
self.node0.execute('CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)') self.node0.execute('CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)')
self.node0.execute('INSERT INTO foo(name) VALUES("fiona")') self.node0.execute('INSERT INTO foo(name) VALUES("fiona")')
self.node0.wait_for_all_fsm() self.node0.wait_for_all_fsm()
self.node1 = Node(RQLITED_PATH, '1')
self.node1.start(join=self.node0.APIAddr())
self.node1.wait_for_leader()
# Get a backup from the first node and check it.
self.node0.backup(self.db_file) self.node0.backup(self.db_file)
conn = sqlite3.connect(self.db_file) conn = sqlite3.connect(self.db_file)
rows = conn.execute('SELECT * FROM foo').fetchall() rows = conn.execute('SELECT * FROM foo').fetchall()
@ -1371,12 +1376,21 @@ class TestEndToEndBackupRestore(unittest.TestCase):
self.assertEqual(rows[0], (1, 'fiona')) self.assertEqual(rows[0], (1, 'fiona'))
conn.close() conn.close()
self.node1 = Node(RQLITED_PATH, '1') # Get a backup from the other node and check it too.
self.node1.start() self.node1.backup(self.db_file)
self.node1.wait_for_leader() conn = sqlite3.connect(self.db_file)
j = self.node1.restore(self.db_file) rows = conn.execute('SELECT * FROM foo').fetchall()
self.assertEqual(len(rows), 1)
self.assertEqual(rows[0], (1, 'fiona'))
conn.close()
# Load file into a brand new node, check the data is right.
self.node2 = Node(RQLITED_PATH, '1')
self.node2.start()
self.node2.wait_for_leader()
j = self.node2.restore(self.db_file)
self.assertEqual(j, d_("{'results': [{'last_insert_id': 1, 'rows_affected': 1}]}")) self.assertEqual(j, d_("{'results': [{'last_insert_id': 1, 'rows_affected': 1}]}"))
j = self.node1.query('SELECT * FROM foo') j = self.node2.query('SELECT * FROM foo')
self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona']], 'types': ['integer', 'text'], 'columns': ['id', 'name']}]}")) self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona']], 'types': ['integer', 'text'], 'columns': ['id', 'name']}]}"))
def tearDown(self): def tearDown(self):
@ -1384,6 +1398,8 @@ class TestEndToEndBackupRestore(unittest.TestCase):
deprovision_node(self.node0) deprovision_node(self.node0)
if hasattr(self, 'node1'): if hasattr(self, 'node1'):
deprovision_node(self.node1) deprovision_node(self.node1)
if hasattr(self, 'node2'):
deprovision_node(self.node2)
os.remove(self.db_file) os.remove(self.db_file)
class TestEndToEndSnapRestoreSingle(unittest.TestCase): class TestEndToEndSnapRestoreSingle(unittest.TestCase):

Loading…
Cancel
Save