From 1049e41b6cfae63705628482bc1e315e1387ae34 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Tue, 18 Oct 2022 22:40:31 -0400 Subject: [PATCH 01/10] WIP before moving to proto backup model --- cluster/client.go | 51 +++++++++++++++ cluster/message.pb.go | 142 +++++++++++++++++++++++++++++------------- cluster/message.proto | 4 ++ cluster/service.go | 8 +++ command/command.pb.go | 2 +- 5 files changed, 164 insertions(+), 43 deletions(-) diff --git a/cluster/client.go b/cluster/client.go index f7676225..8218f7cd 100644 --- a/cluster/client.go +++ b/cluster/client.go @@ -293,6 +293,57 @@ func (c *Client) Query(qr *command.QueryRequest, nodeAddr string, creds *Credent return a.Rows, nil } +// BackupTo retrieves a backup from a remote node and writes to the io.Writer +func (c *Client) BackupTo(nodeAddr string, creds *Credentials, timeout time.Duration, w io.Writer) error { + conn, err := c.dial(nodeAddr, c.timeout) + if err != nil { + return err + } + defer conn.Close() + + // Send the request + command := &Command{ + Type: Command_COMMAND_TYPE_GET_BACKUP, + } + p, err := proto.Marshal(command) + if err != nil { + return fmt.Errorf("command marshal: %s", err) + } + + // Write length of Protobuf + b := make([]byte, 4) + binary.LittleEndian.PutUint16(b[0:], uint16(len(p))) + if err := conn.SetDeadline(time.Now().Add(timeout)); err != nil { + handleConnError(conn) + return err + } + _, err = conn.Write(b) + if err != nil { + handleConnError(conn) + return fmt.Errorf("write protobuf length: %s", err) + } + if err := conn.SetDeadline(time.Now().Add(timeout)); err != nil { + handleConnError(conn) + return err + } + _, err = conn.Write(p) + if err != nil { + handleConnError(conn) + return fmt.Errorf("write protobuf: %s", err) + } + + // Read the backup and write to the writer + if err := conn.SetDeadline(time.Now().Add(timeout)); err != nil { + handleConnError(conn) + return err + } + + if _, err := io.Copy(w, conn); err != nil { + return fmt.Errorf("backup copy: %s", err) + } + return nil +} + // Stats returns stats on the Client instance func (c *Client) Stats() (map[string]interface{}, error) { c.mu.RLock() diff --git a/cluster/message.pb.go b/cluster/message.pb.go index f3c5e045..795b050f 100644 --- a/cluster/message.pb.go +++ b/cluster/message.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.28.0 +// protoc-gen-go v1.28.1 // protoc v3.6.1 // source: message.proto @@ -28,6 +28,7 @@ const ( Command_COMMAND_TYPE_GET_NODE_API_URL Command_Type = 1 Command_COMMAND_TYPE_EXECUTE Command_Type = 2 Command_COMMAND_TYPE_QUERY Command_Type = 3 + Command_COMMAND_TYPE_GET_BACKUP Command_Type = 4 ) // Enum value maps for Command_Type. @@ -37,12 +38,14 @@ var ( 1: "COMMAND_TYPE_GET_NODE_API_URL", 2: "COMMAND_TYPE_EXECUTE", 3: "COMMAND_TYPE_QUERY", + 4: "COMMAND_TYPE_GET_BACKUP", } Command_Type_value = map[string]int32{ "COMMAND_TYPE_UNKNOWN": 0, "COMMAND_TYPE_GET_NODE_API_URL": 1, "COMMAND_TYPE_EXECUTE": 2, "COMMAND_TYPE_QUERY": 3, + "COMMAND_TYPE_GET_BACKUP": 4, } ) @@ -381,6 +384,44 @@ func (x *CommandQueryResponse) GetRows() []*command.QueryRows { return nil } +type CommandBackupResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *CommandBackupResponse) Reset() { + *x = CommandBackupResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_message_proto_msgTypes[5] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *CommandBackupResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*CommandBackupResponse) ProtoMessage() {} + +func (x *CommandBackupResponse) ProtoReflect() protoreflect.Message { + mi := &file_message_proto_msgTypes[5] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use CommandBackupResponse.ProtoReflect.Descriptor instead. +func (*CommandBackupResponse) Descriptor() ([]byte, []int) { + return file_message_proto_rawDescGZIP(), []int{5} +} + var File_message_proto protoreflect.FileDescriptor var file_message_proto_rawDesc = []byte{ @@ -393,7 +434,7 @@ var file_message_proto_rawDesc = []byte{ 0x73, 0x73, 0x77, 0x6f, 0x72, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x70, 0x61, 0x73, 0x73, 0x77, 0x6f, 0x72, 0x64, 0x22, 0x1b, 0x0a, 0x07, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x12, 0x10, 0x0a, 0x03, 0x75, 0x72, 0x6c, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, - 0x75, 0x72, 0x6c, 0x22, 0xf0, 0x02, 0x0a, 0x07, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x12, + 0x75, 0x72, 0x6c, 0x22, 0x8e, 0x03, 0x0a, 0x07, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x12, 0x29, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x15, 0x2e, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x42, 0x0a, 0x0f, 0x65, 0x78, @@ -408,29 +449,33 @@ var file_message_proto_rawDesc = []byte{ 0x63, 0x72, 0x65, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x61, 0x6c, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x2e, 0x43, 0x72, 0x65, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x61, 0x6c, 0x73, 0x52, 0x0b, 0x63, 0x72, 0x65, 0x64, 0x65, 0x6e, 0x74, - 0x69, 0x61, 0x6c, 0x73, 0x22, 0x75, 0x0a, 0x04, 0x54, 0x79, 0x70, 0x65, 0x12, 0x18, 0x0a, 0x14, - 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x55, 0x4e, 0x4b, - 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x21, 0x0a, 0x1d, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, - 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x47, 0x45, 0x54, 0x5f, 0x4e, 0x4f, 0x44, 0x45, 0x5f, - 0x41, 0x50, 0x49, 0x5f, 0x55, 0x52, 0x4c, 0x10, 0x01, 0x12, 0x18, 0x0a, 0x14, 0x43, 0x4f, 0x4d, - 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x45, 0x58, 0x45, 0x43, 0x55, 0x54, - 0x45, 0x10, 0x02, 0x12, 0x16, 0x0a, 0x12, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, - 0x59, 0x50, 0x45, 0x5f, 0x51, 0x55, 0x45, 0x52, 0x59, 0x10, 0x03, 0x42, 0x09, 0x0a, 0x07, 0x72, - 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x60, 0x0a, 0x16, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, - 0x64, 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, - 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x30, 0x0a, 0x07, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, - 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x16, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, - 0x64, 0x2e, 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, 0x65, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x52, - 0x07, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x73, 0x22, 0x54, 0x0a, 0x14, 0x43, 0x6f, 0x6d, 0x6d, - 0x61, 0x6e, 0x64, 0x51, 0x75, 0x65, 0x72, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, - 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x26, 0x0a, 0x04, 0x72, 0x6f, 0x77, 0x73, 0x18, 0x02, - 0x20, 0x03, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e, 0x51, - 0x75, 0x65, 0x72, 0x79, 0x52, 0x6f, 0x77, 0x73, 0x52, 0x04, 0x72, 0x6f, 0x77, 0x73, 0x42, 0x22, - 0x5a, 0x20, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x72, 0x71, 0x6c, - 0x69, 0x74, 0x65, 0x2f, 0x72, 0x71, 0x6c, 0x69, 0x74, 0x65, 0x2f, 0x63, 0x6c, 0x75, 0x73, 0x74, - 0x65, 0x72, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x69, 0x61, 0x6c, 0x73, 0x22, 0x92, 0x01, 0x0a, 0x04, 0x54, 0x79, 0x70, 0x65, 0x12, 0x18, 0x0a, + 0x14, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x55, 0x4e, + 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x21, 0x0a, 0x1d, 0x43, 0x4f, 0x4d, 0x4d, 0x41, + 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x47, 0x45, 0x54, 0x5f, 0x4e, 0x4f, 0x44, 0x45, + 0x5f, 0x41, 0x50, 0x49, 0x5f, 0x55, 0x52, 0x4c, 0x10, 0x01, 0x12, 0x18, 0x0a, 0x14, 0x43, 0x4f, + 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x45, 0x58, 0x45, 0x43, 0x55, + 0x54, 0x45, 0x10, 0x02, 0x12, 0x16, 0x0a, 0x12, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, + 0x54, 0x59, 0x50, 0x45, 0x5f, 0x51, 0x55, 0x45, 0x52, 0x59, 0x10, 0x03, 0x12, 0x1b, 0x0a, 0x17, + 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x47, 0x45, 0x54, + 0x5f, 0x42, 0x41, 0x43, 0x4b, 0x55, 0x50, 0x10, 0x04, 0x42, 0x09, 0x0a, 0x07, 0x72, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x22, 0x60, 0x0a, 0x16, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x45, + 0x78, 0x65, 0x63, 0x75, 0x74, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, + 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, + 0x72, 0x72, 0x6f, 0x72, 0x12, 0x30, 0x0a, 0x07, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x73, 0x18, + 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x16, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e, + 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, 0x65, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x52, 0x07, 0x72, + 0x65, 0x73, 0x75, 0x6c, 0x74, 0x73, 0x22, 0x54, 0x0a, 0x14, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, + 0x64, 0x51, 0x75, 0x65, 0x72, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, + 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, + 0x72, 0x72, 0x6f, 0x72, 0x12, 0x26, 0x0a, 0x04, 0x72, 0x6f, 0x77, 0x73, 0x18, 0x02, 0x20, 0x03, + 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e, 0x51, 0x75, 0x65, + 0x72, 0x79, 0x52, 0x6f, 0x77, 0x73, 0x52, 0x04, 0x72, 0x6f, 0x77, 0x73, 0x22, 0x17, 0x0a, 0x15, + 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x42, 0x61, 0x63, 0x6b, 0x75, 0x70, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x22, 0x5a, 0x20, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, + 0x63, 0x6f, 0x6d, 0x2f, 0x72, 0x71, 0x6c, 0x69, 0x74, 0x65, 0x2f, 0x72, 0x71, 0x6c, 0x69, 0x74, + 0x65, 0x2f, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x33, } var ( @@ -446,7 +491,7 @@ func file_message_proto_rawDescGZIP() []byte { } var file_message_proto_enumTypes = make([]protoimpl.EnumInfo, 1) -var file_message_proto_msgTypes = make([]protoimpl.MessageInfo, 5) +var file_message_proto_msgTypes = make([]protoimpl.MessageInfo, 6) var file_message_proto_goTypes = []interface{}{ (Command_Type)(0), // 0: cluster.Command.Type (*Credentials)(nil), // 1: cluster.Credentials @@ -454,23 +499,24 @@ var file_message_proto_goTypes = []interface{}{ (*Command)(nil), // 3: cluster.Command (*CommandExecuteResponse)(nil), // 4: cluster.CommandExecuteResponse (*CommandQueryResponse)(nil), // 5: cluster.CommandQueryResponse - (*command.ExecuteRequest)(nil), // 6: command.ExecuteRequest - (*command.QueryRequest)(nil), // 7: command.QueryRequest - (*command.ExecuteResult)(nil), // 8: command.ExecuteResult - (*command.QueryRows)(nil), // 9: command.QueryRows + (*CommandBackupResponse)(nil), // 6: cluster.CommandBackupResponse + (*command.ExecuteRequest)(nil), // 7: command.ExecuteRequest + (*command.QueryRequest)(nil), // 8: command.QueryRequest + (*command.ExecuteResult)(nil), // 9: command.ExecuteResult + (*command.QueryRows)(nil), // 10: command.QueryRows } var file_message_proto_depIdxs = []int32{ - 0, // 0: cluster.Command.type:type_name -> cluster.Command.Type - 6, // 1: cluster.Command.execute_request:type_name -> command.ExecuteRequest - 7, // 2: cluster.Command.query_request:type_name -> command.QueryRequest - 1, // 3: cluster.Command.credentials:type_name -> cluster.Credentials - 8, // 4: cluster.CommandExecuteResponse.results:type_name -> command.ExecuteResult - 9, // 5: cluster.CommandQueryResponse.rows:type_name -> command.QueryRows - 6, // [6:6] is the sub-list for method output_type - 6, // [6:6] is the sub-list for method input_type - 6, // [6:6] is the sub-list for extension type_name - 6, // [6:6] is the sub-list for extension extendee - 0, // [0:6] is the sub-list for field type_name + 0, // 0: cluster.Command.type:type_name -> cluster.Command.Type + 7, // 1: cluster.Command.execute_request:type_name -> command.ExecuteRequest + 8, // 2: cluster.Command.query_request:type_name -> command.QueryRequest + 1, // 3: cluster.Command.credentials:type_name -> cluster.Credentials + 9, // 4: cluster.CommandExecuteResponse.results:type_name -> command.ExecuteResult + 10, // 5: cluster.CommandQueryResponse.rows:type_name -> command.QueryRows + 6, // [6:6] is the sub-list for method output_type + 6, // [6:6] is the sub-list for method input_type + 6, // [6:6] is the sub-list for extension type_name + 6, // [6:6] is the sub-list for extension extendee + 0, // [0:6] is the sub-list for field type_name } func init() { file_message_proto_init() } @@ -539,6 +585,18 @@ func file_message_proto_init() { return nil } } + file_message_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*CommandBackupResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } file_message_proto_msgTypes[2].OneofWrappers = []interface{}{ (*Command_ExecuteRequest)(nil), @@ -550,7 +608,7 @@ func file_message_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_message_proto_rawDesc, NumEnums: 1, - NumMessages: 5, + NumMessages: 6, NumExtensions: 0, NumServices: 0, }, diff --git a/cluster/message.proto b/cluster/message.proto index 97159bf4..683b2396 100644 --- a/cluster/message.proto +++ b/cluster/message.proto @@ -20,6 +20,7 @@ message Command { COMMAND_TYPE_GET_NODE_API_URL = 1; COMMAND_TYPE_EXECUTE = 2; COMMAND_TYPE_QUERY = 3; + COMMAND_TYPE_GET_BACKUP = 4; } Type type = 1; @@ -40,3 +41,6 @@ message CommandQueryResponse { string error = 1; repeated command.QueryRows rows = 2; } + +message CommandBackupResponse { +} diff --git a/cluster/service.go b/cluster/service.go index 6ca73460..2cfbb159 100644 --- a/cluster/service.go +++ b/cluster/service.go @@ -25,6 +25,7 @@ const ( numGetNodeAPIResponse = "num_get_node_api_resp" numExecuteRequest = "num_execute_req" numQueryRequest = "num_query_req" + numBackupRequest = "num_backup_req" // Client stats for this package. numGetNodeAPIRequestLocal = "num_get_node_api_req_local" @@ -44,6 +45,7 @@ func init() { stats.Add(numGetNodeAPIResponse, 0) stats.Add(numExecuteRequest, 0) stats.Add(numQueryRequest, 0) + stats.Add(numBackupRequest, 0) stats.Add(numGetNodeAPIRequestLocal, 0) } @@ -62,6 +64,8 @@ type Database interface { // Query executes a slice of queries, each of which returns rows. Query(qr *command.QueryRequest) ([]*command.QueryRows, error) + + Backup(leader bool, fmt BackupFormat, dst io.Writer) error } // CredentialStore is the interface credential stores must support. @@ -293,6 +297,10 @@ func (s *Service) handleConn(conn net.Conn) { binary.LittleEndian.PutUint32(b[0:], uint32(len(p))) conn.Write(b) conn.Write(p) + + case Command_COMMAND_TYPE_GET_BACKUP: + stats.Add(numBackupRequest, 1) + err := s.Backup(true) } } } diff --git a/command/command.pb.go b/command/command.pb.go index 96c1f529..49f4764a 100644 --- a/command/command.pb.go +++ b/command/command.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.28.0 +// protoc-gen-go v1.28.1 // protoc v3.6.1 // source: command.proto From e492c5f5f056164b72846beadda82e32aabfb1de Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Tue, 18 Oct 2022 23:41:57 -0400 Subject: [PATCH 02/10] More Backup-forwarding WIP --- cluster/client.go | 2 +- cluster/message.pb.go | 121 ++++++++++++++++++++++++++---------------- cluster/message.proto | 4 +- cluster/service.go | 33 ++++++++++-- 4 files changed, 110 insertions(+), 50 deletions(-) diff --git a/cluster/client.go b/cluster/client.go index 8218f7cd..11428374 100644 --- a/cluster/client.go +++ b/cluster/client.go @@ -303,7 +303,7 @@ func (c *Client) BackupTo(nodeAddr string, creds *Credentials, timeout time.Dura // Send the request command := &Command{ - Type: Command_COMMAND_TYPE_GET_BACKUP, + Type: Command_COMMAND_TYPE_BACKUP, } p, err := proto.Marshal(command) if err != nil { diff --git a/cluster/message.pb.go b/cluster/message.pb.go index 795b050f..73c24666 100644 --- a/cluster/message.pb.go +++ b/cluster/message.pb.go @@ -28,7 +28,7 @@ const ( Command_COMMAND_TYPE_GET_NODE_API_URL Command_Type = 1 Command_COMMAND_TYPE_EXECUTE Command_Type = 2 Command_COMMAND_TYPE_QUERY Command_Type = 3 - Command_COMMAND_TYPE_GET_BACKUP Command_Type = 4 + Command_COMMAND_TYPE_BACKUP Command_Type = 4 ) // Enum value maps for Command_Type. @@ -38,14 +38,14 @@ var ( 1: "COMMAND_TYPE_GET_NODE_API_URL", 2: "COMMAND_TYPE_EXECUTE", 3: "COMMAND_TYPE_QUERY", - 4: "COMMAND_TYPE_GET_BACKUP", + 4: "COMMAND_TYPE_BACKUP", } Command_Type_value = map[string]int32{ "COMMAND_TYPE_UNKNOWN": 0, "COMMAND_TYPE_GET_NODE_API_URL": 1, "COMMAND_TYPE_EXECUTE": 2, "COMMAND_TYPE_QUERY": 3, - "COMMAND_TYPE_GET_BACKUP": 4, + "COMMAND_TYPE_BACKUP": 4, } ) @@ -187,6 +187,7 @@ type Command struct { // Types that are assignable to Request: // *Command_ExecuteRequest // *Command_QueryRequest + // *Command_BackupRequest Request isCommand_Request `protobuf_oneof:"request"` Credentials *Credentials `protobuf:"bytes,4,opt,name=credentials,proto3" json:"credentials,omitempty"` } @@ -251,6 +252,13 @@ func (x *Command) GetQueryRequest() *command.QueryRequest { return nil } +func (x *Command) GetBackupRequest() *command.BackupRequest { + if x, ok := x.GetRequest().(*Command_BackupRequest); ok { + return x.BackupRequest + } + return nil +} + func (x *Command) GetCredentials() *Credentials { if x != nil { return x.Credentials @@ -270,10 +278,16 @@ type Command_QueryRequest struct { QueryRequest *command.QueryRequest `protobuf:"bytes,3,opt,name=query_request,json=queryRequest,proto3,oneof"` } +type Command_BackupRequest struct { + BackupRequest *command.BackupRequest `protobuf:"bytes,5,opt,name=backup_request,json=backupRequest,proto3,oneof"` +} + func (*Command_ExecuteRequest) isCommand_Request() {} func (*Command_QueryRequest) isCommand_Request() {} +func (*Command_BackupRequest) isCommand_Request() {} + type CommandExecuteResponse struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -388,6 +402,8 @@ type CommandBackupResponse struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields + + Error string `protobuf:"bytes,1,opt,name=error,proto3" json:"error,omitempty"` } func (x *CommandBackupResponse) Reset() { @@ -422,6 +438,13 @@ func (*CommandBackupResponse) Descriptor() ([]byte, []int) { return file_message_proto_rawDescGZIP(), []int{5} } +func (x *CommandBackupResponse) GetError() string { + if x != nil { + return x.Error + } + return "" +} + var File_message_proto protoreflect.FileDescriptor var file_message_proto_rawDesc = []byte{ @@ -434,7 +457,7 @@ var file_message_proto_rawDesc = []byte{ 0x73, 0x73, 0x77, 0x6f, 0x72, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x70, 0x61, 0x73, 0x73, 0x77, 0x6f, 0x72, 0x64, 0x22, 0x1b, 0x0a, 0x07, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x12, 0x10, 0x0a, 0x03, 0x75, 0x72, 0x6c, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, - 0x75, 0x72, 0x6c, 0x22, 0x8e, 0x03, 0x0a, 0x07, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x12, + 0x75, 0x72, 0x6c, 0x22, 0xcb, 0x03, 0x0a, 0x07, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x12, 0x29, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x15, 0x2e, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x42, 0x0a, 0x0f, 0x65, 0x78, @@ -445,37 +468,42 @@ var file_message_proto_rawDesc = []byte{ 0x0a, 0x0d, 0x71, 0x75, 0x65, 0x72, 0x79, 0x5f, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x15, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e, 0x51, 0x75, 0x65, 0x72, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x48, 0x00, 0x52, 0x0c, - 0x71, 0x75, 0x65, 0x72, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x36, 0x0a, 0x0b, - 0x63, 0x72, 0x65, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x61, 0x6c, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, - 0x0b, 0x32, 0x14, 0x2e, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x2e, 0x43, 0x72, 0x65, 0x64, - 0x65, 0x6e, 0x74, 0x69, 0x61, 0x6c, 0x73, 0x52, 0x0b, 0x63, 0x72, 0x65, 0x64, 0x65, 0x6e, 0x74, - 0x69, 0x61, 0x6c, 0x73, 0x22, 0x92, 0x01, 0x0a, 0x04, 0x54, 0x79, 0x70, 0x65, 0x12, 0x18, 0x0a, - 0x14, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x55, 0x4e, - 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x21, 0x0a, 0x1d, 0x43, 0x4f, 0x4d, 0x4d, 0x41, - 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x47, 0x45, 0x54, 0x5f, 0x4e, 0x4f, 0x44, 0x45, - 0x5f, 0x41, 0x50, 0x49, 0x5f, 0x55, 0x52, 0x4c, 0x10, 0x01, 0x12, 0x18, 0x0a, 0x14, 0x43, 0x4f, - 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x45, 0x58, 0x45, 0x43, 0x55, - 0x54, 0x45, 0x10, 0x02, 0x12, 0x16, 0x0a, 0x12, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, - 0x54, 0x59, 0x50, 0x45, 0x5f, 0x51, 0x55, 0x45, 0x52, 0x59, 0x10, 0x03, 0x12, 0x1b, 0x0a, 0x17, - 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x47, 0x45, 0x54, - 0x5f, 0x42, 0x41, 0x43, 0x4b, 0x55, 0x50, 0x10, 0x04, 0x42, 0x09, 0x0a, 0x07, 0x72, 0x65, 0x71, - 0x75, 0x65, 0x73, 0x74, 0x22, 0x60, 0x0a, 0x16, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x45, - 0x78, 0x65, 0x63, 0x75, 0x74, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, - 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, - 0x72, 0x72, 0x6f, 0x72, 0x12, 0x30, 0x0a, 0x07, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x73, 0x18, - 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x16, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e, - 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, 0x65, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x52, 0x07, 0x72, - 0x65, 0x73, 0x75, 0x6c, 0x74, 0x73, 0x22, 0x54, 0x0a, 0x14, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, - 0x64, 0x51, 0x75, 0x65, 0x72, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, - 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, - 0x72, 0x72, 0x6f, 0x72, 0x12, 0x26, 0x0a, 0x04, 0x72, 0x6f, 0x77, 0x73, 0x18, 0x02, 0x20, 0x03, - 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e, 0x51, 0x75, 0x65, - 0x72, 0x79, 0x52, 0x6f, 0x77, 0x73, 0x52, 0x04, 0x72, 0x6f, 0x77, 0x73, 0x22, 0x17, 0x0a, 0x15, - 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x42, 0x61, 0x63, 0x6b, 0x75, 0x70, 0x52, 0x65, 0x73, - 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x22, 0x5a, 0x20, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, - 0x63, 0x6f, 0x6d, 0x2f, 0x72, 0x71, 0x6c, 0x69, 0x74, 0x65, 0x2f, 0x72, 0x71, 0x6c, 0x69, 0x74, - 0x65, 0x2f, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x33, + 0x71, 0x75, 0x65, 0x72, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x3f, 0x0a, 0x0e, + 0x62, 0x61, 0x63, 0x6b, 0x75, 0x70, 0x5f, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x18, 0x05, + 0x20, 0x01, 0x28, 0x0b, 0x32, 0x16, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e, 0x42, + 0x61, 0x63, 0x6b, 0x75, 0x70, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x48, 0x00, 0x52, 0x0d, + 0x62, 0x61, 0x63, 0x6b, 0x75, 0x70, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x36, 0x0a, + 0x0b, 0x63, 0x72, 0x65, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x61, 0x6c, 0x73, 0x18, 0x04, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x2e, 0x43, 0x72, 0x65, + 0x64, 0x65, 0x6e, 0x74, 0x69, 0x61, 0x6c, 0x73, 0x52, 0x0b, 0x63, 0x72, 0x65, 0x64, 0x65, 0x6e, + 0x74, 0x69, 0x61, 0x6c, 0x73, 0x22, 0x8e, 0x01, 0x0a, 0x04, 0x54, 0x79, 0x70, 0x65, 0x12, 0x18, + 0x0a, 0x14, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x55, + 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x21, 0x0a, 0x1d, 0x43, 0x4f, 0x4d, 0x4d, + 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x47, 0x45, 0x54, 0x5f, 0x4e, 0x4f, 0x44, + 0x45, 0x5f, 0x41, 0x50, 0x49, 0x5f, 0x55, 0x52, 0x4c, 0x10, 0x01, 0x12, 0x18, 0x0a, 0x14, 0x43, + 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x45, 0x58, 0x45, 0x43, + 0x55, 0x54, 0x45, 0x10, 0x02, 0x12, 0x16, 0x0a, 0x12, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, + 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x51, 0x55, 0x45, 0x52, 0x59, 0x10, 0x03, 0x12, 0x17, 0x0a, + 0x13, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x42, 0x41, + 0x43, 0x4b, 0x55, 0x50, 0x10, 0x04, 0x42, 0x09, 0x0a, 0x07, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x22, 0x60, 0x0a, 0x16, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x45, 0x78, 0x65, 0x63, + 0x75, 0x74, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x65, + 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, + 0x72, 0x12, 0x30, 0x0a, 0x07, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x73, 0x18, 0x02, 0x20, 0x03, + 0x28, 0x0b, 0x32, 0x16, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e, 0x45, 0x78, 0x65, + 0x63, 0x75, 0x74, 0x65, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x52, 0x07, 0x72, 0x65, 0x73, 0x75, + 0x6c, 0x74, 0x73, 0x22, 0x54, 0x0a, 0x14, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x51, 0x75, + 0x65, 0x72, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x65, + 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, + 0x72, 0x12, 0x26, 0x0a, 0x04, 0x72, 0x6f, 0x77, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, + 0x12, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e, 0x51, 0x75, 0x65, 0x72, 0x79, 0x52, + 0x6f, 0x77, 0x73, 0x52, 0x04, 0x72, 0x6f, 0x77, 0x73, 0x22, 0x2d, 0x0a, 0x15, 0x43, 0x6f, 0x6d, + 0x6d, 0x61, 0x6e, 0x64, 0x42, 0x61, 0x63, 0x6b, 0x75, 0x70, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x42, 0x22, 0x5a, 0x20, 0x67, 0x69, 0x74, 0x68, + 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x72, 0x71, 0x6c, 0x69, 0x74, 0x65, 0x2f, 0x72, 0x71, + 0x6c, 0x69, 0x74, 0x65, 0x2f, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x62, 0x06, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -502,21 +530,23 @@ var file_message_proto_goTypes = []interface{}{ (*CommandBackupResponse)(nil), // 6: cluster.CommandBackupResponse (*command.ExecuteRequest)(nil), // 7: command.ExecuteRequest (*command.QueryRequest)(nil), // 8: command.QueryRequest - (*command.ExecuteResult)(nil), // 9: command.ExecuteResult - (*command.QueryRows)(nil), // 10: command.QueryRows + (*command.BackupRequest)(nil), // 9: command.BackupRequest + (*command.ExecuteResult)(nil), // 10: command.ExecuteResult + (*command.QueryRows)(nil), // 11: command.QueryRows } var file_message_proto_depIdxs = []int32{ 0, // 0: cluster.Command.type:type_name -> cluster.Command.Type 7, // 1: cluster.Command.execute_request:type_name -> command.ExecuteRequest 8, // 2: cluster.Command.query_request:type_name -> command.QueryRequest - 1, // 3: cluster.Command.credentials:type_name -> cluster.Credentials - 9, // 4: cluster.CommandExecuteResponse.results:type_name -> command.ExecuteResult - 10, // 5: cluster.CommandQueryResponse.rows:type_name -> command.QueryRows - 6, // [6:6] is the sub-list for method output_type - 6, // [6:6] is the sub-list for method input_type - 6, // [6:6] is the sub-list for extension type_name - 6, // [6:6] is the sub-list for extension extendee - 0, // [0:6] is the sub-list for field type_name + 9, // 3: cluster.Command.backup_request:type_name -> command.BackupRequest + 1, // 4: cluster.Command.credentials:type_name -> cluster.Credentials + 10, // 5: cluster.CommandExecuteResponse.results:type_name -> command.ExecuteResult + 11, // 6: cluster.CommandQueryResponse.rows:type_name -> command.QueryRows + 7, // [7:7] is the sub-list for method output_type + 7, // [7:7] is the sub-list for method input_type + 7, // [7:7] is the sub-list for extension type_name + 7, // [7:7] is the sub-list for extension extendee + 0, // [0:7] is the sub-list for field type_name } func init() { file_message_proto_init() } @@ -601,6 +631,7 @@ func file_message_proto_init() { file_message_proto_msgTypes[2].OneofWrappers = []interface{}{ (*Command_ExecuteRequest)(nil), (*Command_QueryRequest)(nil), + (*Command_BackupRequest)(nil), } type x struct{} out := protoimpl.TypeBuilder{ diff --git a/cluster/message.proto b/cluster/message.proto index 683b2396..61ef0792 100644 --- a/cluster/message.proto +++ b/cluster/message.proto @@ -20,13 +20,14 @@ message Command { COMMAND_TYPE_GET_NODE_API_URL = 1; COMMAND_TYPE_EXECUTE = 2; COMMAND_TYPE_QUERY = 3; - COMMAND_TYPE_GET_BACKUP = 4; + COMMAND_TYPE_BACKUP = 4; } Type type = 1; oneof request { command.ExecuteRequest execute_request = 2; command.QueryRequest query_request = 3; + command.BackupRequest backup_request = 5; } Credentials credentials = 4; @@ -43,4 +44,5 @@ message CommandQueryResponse { } message CommandBackupResponse { + string error = 1; } diff --git a/cluster/service.go b/cluster/service.go index 2cfbb159..c417cd4f 100644 --- a/cluster/service.go +++ b/cluster/service.go @@ -65,7 +65,8 @@ type Database interface { // Query executes a slice of queries, each of which returns rows. Query(qr *command.QueryRequest) ([]*command.QueryRows, error) - Backup(leader bool, fmt BackupFormat, dst io.Writer) error + // Backup writes a backup of the database to the writer. + Backup(br *command.BackupRequest, dst io.Writer) error } // CredentialStore is the interface credential stores must support. @@ -298,9 +299,35 @@ func (s *Service) handleConn(conn net.Conn) { conn.Write(b) conn.Write(p) - case Command_COMMAND_TYPE_GET_BACKUP: + case Command_COMMAND_TYPE_BACKUP: stats.Add(numBackupRequest, 1) - err := s.Backup(true) + + resp := &CommandBackupResponse{} + + br := c.GetBackupRequest() + if br == nil { + resp.Error = "BackupRequest is nil" + } else if !s.checkCommandPerm(c, auth.PermBackup) { + resp.Error = "unauthorized" + } + + p, err = proto.Marshal(resp) + if err != nil { + // Close????? XXX + return + } + // Write length of Protobuf first, then write the actual Protobuf. + b = make([]byte, 4) + binary.LittleEndian.PutUint32(b[0:], uint32(len(p))) + conn.Write(b) + conn.Write(p) + + if resp.Error == "" { + // Finally, just stream the backup data itself (and hope for the best!) + s.db.Backup(br, conn) + } + + //Close() to signal end of data? XXX } } } From 6ab718df2d2e016c55de3937a9941acb0ac949a2 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Tue, 18 Oct 2022 23:43:05 -0400 Subject: [PATCH 03/10] Better name --- cluster/client.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cluster/client.go b/cluster/client.go index 11428374..e6bc74b4 100644 --- a/cluster/client.go +++ b/cluster/client.go @@ -293,8 +293,8 @@ func (c *Client) Query(qr *command.QueryRequest, nodeAddr string, creds *Credent return a.Rows, nil } -// BackupTo retrieves a backup from a remote node and writes to the io.Writer -func (c *Client) BackupTo(nodeAddr string, creds *Credentials, timeout time.Duration, w io.Writer) error { +// Backup retrieves a backup from a remote node and writes to the io.Writer +func (c *Client) Backup(nodeAddr string, creds *Credentials, timeout time.Duration, w io.Writer) error { conn, err := c.dial(nodeAddr, c.timeout) if err != nil { return err From 34c99141483b1464d8474a7e4add5a4523afa4b1 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Wed, 19 Oct 2022 22:52:55 -0400 Subject: [PATCH 04/10] WIP -- unit test passing --- cluster/client.go | 40 ++++++++++++++++++--- cluster/message.pb.go | 19 +++++++--- cluster/message.proto | 1 + cluster/service.go | 20 ++++++----- cluster/service_database_test.go | 62 ++++++++++++++++++++++++++++++++ cluster/service_test.go | 9 +++++ 6 files changed, 133 insertions(+), 18 deletions(-) diff --git a/cluster/client.go b/cluster/client.go index e6bc74b4..2515d65a 100644 --- a/cluster/client.go +++ b/cluster/client.go @@ -294,7 +294,7 @@ func (c *Client) Query(qr *command.QueryRequest, nodeAddr string, creds *Credent } // Backup retrieves a backup from a remote node and writes to the io.Writer -func (c *Client) Backup(nodeAddr string, creds *Credentials, timeout time.Duration, w io.Writer) error { +func (c *Client) Backup(br *command.BackupRequest, nodeAddr string, creds *Credentials, timeout time.Duration, w io.Writer) error { conn, err := c.dial(nodeAddr, c.timeout) if err != nil { return err @@ -304,6 +304,10 @@ func (c *Client) Backup(nodeAddr string, creds *Credentials, timeout time.Durati // Send the request command := &Command{ Type: Command_COMMAND_TYPE_BACKUP, + Request: &Command_BackupRequest{ + BackupRequest: br, + }, + Credentials: creds, } p, err := proto.Marshal(command) if err != nil { @@ -322,6 +326,8 @@ func (c *Client) Backup(nodeAddr string, creds *Credentials, timeout time.Durati handleConnError(conn) return fmt.Errorf("write protobuf length: %s", err) } + + // Now write backup request proto itself. if err := conn.SetDeadline(time.Now().Add(timeout)); err != nil { handleConnError(conn) return err @@ -332,14 +338,40 @@ func (c *Client) Backup(nodeAddr string, creds *Credentials, timeout time.Durati return fmt.Errorf("write protobuf: %s", err) } - // Read the backup and write to the writer + // Read the backup response if err := conn.SetDeadline(time.Now().Add(timeout)); err != nil { handleConnError(conn) return err } - if _, err := io.Copy(w, conn); err != nil { - return fmt.Errorf("backup copy: %s", err) + // Read length of response. + _, err = io.ReadFull(conn, b) + if err != nil { + handleConnError(conn) + return err + } + sz := binary.LittleEndian.Uint32(b[0:]) + + // Read in the actual response. + p = make([]byte, sz) + _, err = io.ReadFull(conn, p) + if err != nil { + handleConnError(conn) + return err + } + + resp := &CommandBackupResponse{} + err = proto.Unmarshal(p, resp) + if err != nil { + return err + } + + if resp.Error != "" { + return fmt.Errorf("backup response: %s", resp.Error) + } + + if _, err := w.Write(resp.Data); err != nil { + return fmt.Errorf("backup write: %s", err) } return nil } diff --git a/cluster/message.pb.go b/cluster/message.pb.go index 73c24666..6037e47d 100644 --- a/cluster/message.pb.go +++ b/cluster/message.pb.go @@ -404,6 +404,7 @@ type CommandBackupResponse struct { unknownFields protoimpl.UnknownFields Error string `protobuf:"bytes,1,opt,name=error,proto3" json:"error,omitempty"` + Data []byte `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"` } func (x *CommandBackupResponse) Reset() { @@ -445,6 +446,13 @@ func (x *CommandBackupResponse) GetError() string { return "" } +func (x *CommandBackupResponse) GetData() []byte { + if x != nil { + return x.Data + } + return nil +} + var File_message_proto protoreflect.FileDescriptor var file_message_proto_rawDesc = []byte{ @@ -497,13 +505,14 @@ var file_message_proto_rawDesc = []byte{ 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x26, 0x0a, 0x04, 0x72, 0x6f, 0x77, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e, 0x51, 0x75, 0x65, 0x72, 0x79, 0x52, - 0x6f, 0x77, 0x73, 0x52, 0x04, 0x72, 0x6f, 0x77, 0x73, 0x22, 0x2d, 0x0a, 0x15, 0x43, 0x6f, 0x6d, + 0x6f, 0x77, 0x73, 0x52, 0x04, 0x72, 0x6f, 0x77, 0x73, 0x22, 0x41, 0x0a, 0x15, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x42, 0x61, 0x63, 0x6b, 0x75, 0x70, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x42, 0x22, 0x5a, 0x20, 0x67, 0x69, 0x74, 0x68, - 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x72, 0x71, 0x6c, 0x69, 0x74, 0x65, 0x2f, 0x72, 0x71, - 0x6c, 0x69, 0x74, 0x65, 0x2f, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x62, 0x06, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x33, + 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x42, 0x22, 0x5a, 0x20, + 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x72, 0x71, 0x6c, 0x69, 0x74, + 0x65, 0x2f, 0x72, 0x71, 0x6c, 0x69, 0x74, 0x65, 0x2f, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, + 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/cluster/message.proto b/cluster/message.proto index 61ef0792..4390d636 100644 --- a/cluster/message.proto +++ b/cluster/message.proto @@ -45,4 +45,5 @@ message CommandQueryResponse { message CommandBackupResponse { string error = 1; + bytes data = 2; } diff --git a/cluster/service.go b/cluster/service.go index c417cd4f..62e8fab1 100644 --- a/cluster/service.go +++ b/cluster/service.go @@ -1,6 +1,7 @@ package cluster import ( + "bytes" "encoding/binary" "expvar" "fmt" @@ -309,25 +310,26 @@ func (s *Service) handleConn(conn net.Conn) { resp.Error = "BackupRequest is nil" } else if !s.checkCommandPerm(c, auth.PermBackup) { resp.Error = "unauthorized" - } + } else { + buf := new(bytes.Buffer) + if err := s.db.Backup(br, buf); err != nil { + resp.Error = err.Error() + } else { + resp.Data = buf.Bytes() + } + } p, err = proto.Marshal(resp) if err != nil { - // Close????? XXX + conn.Close() return } + // Write length of Protobuf first, then write the actual Protobuf. b = make([]byte, 4) binary.LittleEndian.PutUint32(b[0:], uint32(len(p))) conn.Write(b) conn.Write(p) - - if resp.Error == "" { - // Finally, just stream the backup data itself (and hope for the best!) - s.db.Backup(br, conn) - } - - //Close() to signal end of data? XXX } } } diff --git a/cluster/service_database_test.go b/cluster/service_database_test.go index f435e9ad..6ba8890f 100644 --- a/cluster/service_database_test.go +++ b/cluster/service_database_test.go @@ -1,9 +1,11 @@ package cluster import ( + "bytes" "encoding/binary" "errors" "fmt" + "io" "os" "strings" "testing" @@ -258,6 +260,52 @@ func Test_ServiceQueryLarge(t *testing.T) { } } +func Test_ServiceBackup(t *testing.T) { + ln, mux := mustNewMux() + go mux.Serve() + tn := mux.Listen(1) // Could be any byte value. + db := mustNewMockDatabase() + cred := mustNewMockCredentialStore() + s := New(tn, db, cred) + if s == nil { + t.Fatalf("failed to create cluster service") + } + + c := NewClient(mustNewDialer(1, false, false), 30*time.Second) + + if err := s.Open(); err != nil { + t.Fatalf("failed to open cluster service: %s", err.Error()) + } + + // Ready for Backup tests now. + testData := []byte("this is SQLite data") + db.backupFn = func(br *command.BackupRequest, dst io.Writer) error { + if br.Format != command.BackupRequest_BACKUP_REQUEST_FORMAT_BINARY { + t.Fatalf("wrong backup format requested") + } + dst.Write(testData) + return nil + } + + buf := new(bytes.Buffer) + err := c.Backup(backupRequestBinary(true), s.Addr(), NO_CREDS, longWait, buf) + if err != nil { + t.Fatalf("failed to backup database: %s", err.Error()) + } + + if bytes.Compare(buf.Bytes(), testData) != 0 { + t.Fatalf("backup data is not as expected, exp: %s, got: %s", testData, buf.Bytes()) + } + + // Clean up resources. + if err := ln.Close(); err != nil { + t.Fatalf("failed to close Mux's listener: %s", err) + } + if err := s.Close(); err != nil { + t.Fatalf("failed to close cluster service") + } +} + // Test_BinaryEncoding_Backwards ensures that software earlier than v6.6.2 // can communicate with v6.6.2+ releases. v6.6.2 increased the maximum size // of cluster responses. @@ -314,6 +362,20 @@ func queryRequestFromStrings(s []string) *command.QueryRequest { } } +func backupRequestSQL(leader bool) *command.BackupRequest { + return &command.BackupRequest{ + Format: command.BackupRequest_BACKUP_REQUEST_FORMAT_SQL, + Leader: leader, + } +} + +func backupRequestBinary(leader bool) *command.BackupRequest { + return &command.BackupRequest{ + Format: command.BackupRequest_BACKUP_REQUEST_FORMAT_BINARY, + Leader: leader, + } +} + func asJSON(v interface{}) string { b, err := encoding.JSONMarshal(v) if err != nil { diff --git a/cluster/service_test.go b/cluster/service_test.go index 1d19b21f..62fdc6b9 100644 --- a/cluster/service_test.go +++ b/cluster/service_test.go @@ -3,6 +3,7 @@ package cluster import ( "crypto/tls" "fmt" + "io" "net" "os" "testing" @@ -310,6 +311,7 @@ func mustNewMockTLSTransport() *mockTransport { type mockDatabase struct { executeFn func(er *command.ExecuteRequest) ([]*command.ExecuteResult, error) queryFn func(qr *command.QueryRequest) ([]*command.QueryRows, error) + backupFn func(br *command.BackupRequest, dst io.Writer) error } func (m *mockDatabase) Execute(er *command.ExecuteRequest) ([]*command.ExecuteResult, error) { @@ -320,6 +322,13 @@ func (m *mockDatabase) Query(qr *command.QueryRequest) ([]*command.QueryRows, er return m.queryFn(qr) } +func (m *mockDatabase) Backup(br *command.BackupRequest, dst io.Writer) error { + if m.backupFn == nil { + return nil + } + return m.backupFn(br, dst) +} + func mustNewMockDatabase() *mockDatabase { e := func(er *command.ExecuteRequest) ([]*command.ExecuteResult, error) { return []*command.ExecuteResult{}, nil From 1827c6eec5c5d163b052f755d4e5814a6369d5d8 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Wed, 19 Oct 2022 23:04:44 -0400 Subject: [PATCH 05/10] Compress backups before transmission between nodes --- cluster/client.go | 26 ++++++++++++++++++++++++++ cluster/service.go | 26 +++++++++++++++++++++++++- 2 files changed, 51 insertions(+), 1 deletion(-) diff --git a/cluster/client.go b/cluster/client.go index 2515d65a..86b51d57 100644 --- a/cluster/client.go +++ b/cluster/client.go @@ -1,6 +1,8 @@ package cluster import ( + "bytes" + "compress/gzip" "encoding/binary" "errors" "fmt" @@ -360,6 +362,13 @@ func (c *Client) Backup(br *command.BackupRequest, nodeAddr string, creds *Crede return err } + // Decompress.... + p, err = gzUncompress(p) + if err != nil { + handleConnError(conn) + return err + } + resp := &CommandBackupResponse{} err = proto.Unmarshal(p, resp) if err != nil { @@ -447,3 +456,20 @@ func handleConnError(conn net.Conn) { pc.MarkUnusable() } } + +func gzUncompress(b []byte) ([]byte, error) { + gz, err := gzip.NewReader(bytes.NewReader(b)) + if err != nil { + return nil, fmt.Errorf("unmarshal gzip NewReader: %s", err) + } + + ub, err := io.ReadAll(gz) + if err != nil { + return nil, fmt.Errorf("unmarshal gzip ReadAll: %s", err) + } + + if err := gz.Close(); err != nil { + return nil, fmt.Errorf("unmarshal gzip Close: %s", err) + } + return ub, nil +} diff --git a/cluster/service.go b/cluster/service.go index 62e8fab1..fd5b0209 100644 --- a/cluster/service.go +++ b/cluster/service.go @@ -2,6 +2,7 @@ package cluster import ( "bytes" + "compress/gzip" "encoding/binary" "expvar" "fmt" @@ -317,7 +318,6 @@ func (s *Service) handleConn(conn net.Conn) { } else { resp.Data = buf.Bytes() } - } p, err = proto.Marshal(resp) if err != nil { @@ -325,6 +325,13 @@ func (s *Service) handleConn(conn net.Conn) { return } + // Compress the backup. + p, err = gzCompress(p) + if err != nil { + conn.Close() + return + } + // Write length of Protobuf first, then write the actual Protobuf. b = make([]byte, 4) binary.LittleEndian.PutUint32(b[0:], uint32(len(p))) @@ -333,3 +340,20 @@ func (s *Service) handleConn(conn net.Conn) { } } } + +// gzCompress compresses the given byte slice. +func gzCompress(b []byte) ([]byte, error) { + var buf bytes.Buffer + gzw, err := gzip.NewWriterLevel(&buf, gzip.BestCompression) + if err != nil { + return nil, fmt.Errorf("gzip new writer: %s", err) + } + + if _, err := gzw.Write(b); err != nil { + return nil, fmt.Errorf("gzip Write: %s", err) + } + if err := gzw.Close(); err != nil { + return nil, fmt.Errorf("gzip Close: %s", err) + } + return buf.Bytes(), nil +} From 16a0b41321ce18fe8df52f8dd5c05cb922484362 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Wed, 19 Oct 2022 23:07:57 -0400 Subject: [PATCH 06/10] Better errors --- cluster/client.go | 4 ++-- cluster/service.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/cluster/client.go b/cluster/client.go index 86b51d57..c4338bd7 100644 --- a/cluster/client.go +++ b/cluster/client.go @@ -366,13 +366,13 @@ func (c *Client) Backup(br *command.BackupRequest, nodeAddr string, creds *Crede p, err = gzUncompress(p) if err != nil { handleConnError(conn) - return err + return fmt.Errorf("backup decompress: %s", err) } resp := &CommandBackupResponse{} err = proto.Unmarshal(p, resp) if err != nil { - return err + return fmt.Errorf("backup unmarshal: %s", err) } if resp.Error != "" { diff --git a/cluster/service.go b/cluster/service.go index fd5b0209..aba38267 100644 --- a/cluster/service.go +++ b/cluster/service.go @@ -325,7 +325,7 @@ func (s *Service) handleConn(conn net.Conn) { return } - // Compress the backup. + // Compress the backup for less space on the wire between nodes. p, err = gzCompress(p) if err != nil { conn.Close() From 8a69aa53aaaf79390be78dfabe0b9bb20d5125ed Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Wed, 19 Oct 2022 23:57:08 -0400 Subject: [PATCH 07/10] HTTP-level unit testing of remote backup --- cluster/client.go | 2 +- http/service.go | 56 ++++++++++++++++++++++++++++++++++++++----- http/service_test.go | 57 ++++++++++++++++++++++++++++++++++++++++++-- 3 files changed, 106 insertions(+), 9 deletions(-) diff --git a/cluster/client.go b/cluster/client.go index c4338bd7..9e7eb271 100644 --- a/cluster/client.go +++ b/cluster/client.go @@ -376,7 +376,7 @@ func (c *Client) Backup(br *command.BackupRequest, nodeAddr string, creds *Crede } if resp.Error != "" { - return fmt.Errorf("backup response: %s", resp.Error) + return errors.New(resp.Error) } if _, err := w.Write(resp.Data); err != nil { diff --git a/http/service.go b/http/service.go index a0ad0373..5e0156c4 100644 --- a/http/service.go +++ b/http/service.go @@ -94,6 +94,9 @@ type Cluster interface { // Query performs an Query Request on a remote node. Query(qr *command.QueryRequest, nodeAddr string, creds *cluster.Credentials, timeout time.Duration) ([]*command.QueryRows, error) + // Backup retrieves a backup from a remote node and writes to the io.Writer + Backup(br *command.BackupRequest, nodeAddr string, creds *cluster.Credentials, timeout time.Duration, w io.Writer) error + // Stats returns stats on the Cluster. Stats() (map[string]interface{}, error) } @@ -168,6 +171,7 @@ const ( numQueries = "queries" numRemoteExecutions = "remote_executions" numRemoteQueries = "remote_queries" + numRemoteBackups = "remote_backups" numReadyz = "num_readyz" numStatus = "num_status" numBackups = "backups" @@ -200,6 +204,7 @@ func init() { stats.Add(numQueries, 0) stats.Add(numRemoteExecutions, 0) stats.Add(numRemoteQueries, 0) + stats.Add(numRemoteBackups, 0) stats.Add(numReadyz, 0) stats.Add(numStatus, 0) stats.Add(numBackups, 0) @@ -565,30 +570,69 @@ func (s *Service) handleBackup(w http.ResponseWriter, r *http.Request) { http.Error(w, err.Error(), http.StatusInternalServerError) return } + redirect, err := isRedirect(r) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + format, err := backupFormat(w, r) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } - bf, err := backupFormat(w, r) + timeout, err := timeoutParam(r, defaultTimeout) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } br := &command.BackupRequest{ - Format: bf, + Format: format, Leader: !noLeader, } err = s.store.Backup(br, w) if err != nil { if err == store.ErrNotLeader { - leaderAPIAddr := s.LeaderAPIAddr() - if leaderAPIAddr == "" { + if redirect { + leaderAPIAddr := s.LeaderAPIAddr() + if leaderAPIAddr == "" { + stats.Add(numLeaderNotFound, 1) + http.Error(w, ErrLeaderNotFound.Error(), http.StatusServiceUnavailable) + return + } + + redirect := s.FormRedirect(r, leaderAPIAddr) + http.Redirect(w, r, redirect, http.StatusMovedPermanently) + return + } + + addr, err := s.store.LeaderAddr() + if err != nil { + http.Error(w, fmt.Sprintf("leader address: %s", err.Error()), + http.StatusInternalServerError) + return + } + if addr == "" { stats.Add(numLeaderNotFound, 1) http.Error(w, ErrLeaderNotFound.Error(), http.StatusServiceUnavailable) return } - redirect := s.FormRedirect(r, leaderAPIAddr) - http.Redirect(w, r, redirect, http.StatusMovedPermanently) + username, password, ok := r.BasicAuth() + if !ok { + username = "" + } + + backupErr := s.cluster.Backup(br, addr, makeCredentials(username, password), timeout, w) + if backupErr != nil && backupErr.Error() == "unauthorized" { + http.Error(w, "remote backup not authorized", http.StatusUnauthorized) + return + } + stats.Add(numRemoteBackups, 1) + w.Header().Add(ServedByHTTPHeader, addr) return } http.Error(w, err.Error(), http.StatusInternalServerError) diff --git a/http/service_test.go b/http/service_test.go index 32909a6f..2a2f5a62 100644 --- a/http/service_test.go +++ b/http/service_test.go @@ -579,7 +579,7 @@ func Test_BackupOK(t *testing.T) { } } -func Test_BackupFlagsNoLeader(t *testing.T) { +func Test_BackupFlagsNoLeaderRedirect(t *testing.T) { m := &MockStore{} c := &mockClusterService{ apiAddr: "http://1.2.3.4:999", @@ -602,7 +602,7 @@ func Test_BackupFlagsNoLeader(t *testing.T) { } host := fmt.Sprintf("http://%s", s.Addr().String()) - resp, err := client.Get(host + "/db/backup") + resp, err := client.Get(host + "/db/backup?redirect") if err != nil { t.Fatalf("failed to make backup request: %s", err.Error()) } @@ -611,6 +611,51 @@ func Test_BackupFlagsNoLeader(t *testing.T) { } } +func Test_BackupFlagsNoLeaderRemoteFetch(t *testing.T) { + m := &MockStore{ + leaderAddr: "foo:1234", + } + c := &mockClusterService{ + apiAddr: "http://1.2.3.4:999", + } + + s := New("127.0.0.1:0", m, c, nil) + + if err := s.Start(); err != nil { + t.Fatalf("failed to start service") + } + defer s.Close() + + m.backupFn = func(br *command.BackupRequest, dst io.Writer) error { + return store.ErrNotLeader + } + + backupData := "this is SQLite data" + c.backupFn = func(br *command.BackupRequest, addr string, t time.Duration, w io.Writer) error { + w.Write([]byte(backupData)) + return nil + } + + client := &http.Client{} + client.CheckRedirect = func(req *http.Request, via []*http.Request) error { + return http.ErrUseLastResponse + } + + host := fmt.Sprintf("http://%s", s.Addr().String()) + resp, err := client.Get(host + "/db/backup") + if err != nil { + t.Fatalf("failed to make backup request: %s", err.Error()) + } + if resp.StatusCode != http.StatusOK { + t.Fatalf("failed to get expected StatusOK for remote backup fetch, got %d", resp.StatusCode) + } + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + if exp, got := backupData, string(body); exp != got { + t.Fatalf("received incorrect backup data, exp: %s, got: %s", exp, got) + } +} + func Test_BackupFlagsNoLeaderOK(t *testing.T) { m := &MockStore{} c := &mockClusterService{ @@ -1057,6 +1102,7 @@ type mockClusterService struct { apiAddr string executeFn func(er *command.ExecuteRequest, addr string, t time.Duration) ([]*command.ExecuteResult, error) queryFn func(qr *command.QueryRequest, addr string, t time.Duration) ([]*command.QueryRows, error) + backupFn func(br *command.BackupRequest, addr string, t time.Duration, w io.Writer) error } func (m *mockClusterService) GetNodeAPIAddr(a string, t time.Duration) (string, error) { @@ -1077,6 +1123,13 @@ func (m *mockClusterService) Query(qr *command.QueryRequest, addr string, creds return nil, nil } +func (m *mockClusterService) Backup(br *command.BackupRequest, addr string, creds *cluster.Credentials, t time.Duration, w io.Writer) error { + if m.backupFn != nil { + return m.backupFn(br, addr, t, w) + } + return nil +} + type mockCredentialStore struct { HasPermOK bool aaFunc func(username, password, perm string) bool From 37969ff6433621bb28f0e97573fc6ab63151d377 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Thu, 20 Oct 2022 00:03:05 -0400 Subject: [PATCH 08/10] Update Backup documentation --- DOC/BACKUPS.md | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/DOC/BACKUPS.md b/DOC/BACKUPS.md index 2f270ba0..8db2f18d 100644 --- a/DOC/BACKUPS.md +++ b/DOC/BACKUPS.md @@ -9,9 +9,11 @@ This command will write the SQLite database file to `bak.sqlite3`. You can also access the rqlite API directly, via a HTTP `GET` request to the endpoint `/db/backup`. For example, using `curl`, and assuming the node is listening on `localhost:4001`, you could retrieve a backup as follows: ```bash -curl -s -L -XGET localhost:4001/db/backup -o bak.sqlite3 +curl -s -XGET localhost:4001/db/backup -o bak.sqlite3 ``` -Note that if the node is not the Leader, a HTTP 301 response will be returned with the Leader's address. The `curl` command above will automatically follow the Leader, due to the presence of the option `-L`. +Note that if the node is not the Leader, the node will transparently forward the request to Leader, wait for the backup data from the Leader, and return it to the client. If, instead, you want a backup of SQLite database of the actual node that receives the request, add `noleader` to the URL as a query parameter. + +If you do not wish a Follower to transparently forward a backup request to a Leader, add `redirect` to the URL as a query parameter. In that case if a Follower receives a backup request the Follower will respond with [HTTP 301 Moved Permanently](https://en.wikipedia.org/wiki/HTTP_301) and include the address of the Leader as the `Location` header in the response. It is then up the clients to re-issue the command to the Leader. In either case the generated file can then be used to restore a node (or cluster) using the [restore API](https://github.com/rqlite/rqlite/blob/master/DOC/RESTORE_FROM_SQLITE.md). @@ -27,6 +29,6 @@ curl -s -L -XGET localhost:4001/db/backup?fmt=sql -o bak.sql ``` ## Backup isolation level -The isolation offered by backups is `READ COMMITTED`. This means that any changes due to transactions to the database, that take place during the backup, will be reflected immediately once the transaction is committed, but not before. +The isolation offered by binary backups is `READ COMMITTED`. This means that any changes due to transactions to the database, that take place during the backup, will be reflected immediately once the transaction is committed, but not before. See the [SQLite documentation](https://www.sqlite.org/isolation.html) for more details. From f14b08658eeaf8985cff4140ccd07360270867fb Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Thu, 20 Oct 2022 00:04:49 -0400 Subject: [PATCH 09/10] Update CHANGELOG --- CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 377fe549..aaf2d69a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,7 @@ ## 7.8.0 (unreleased) +### New features +- [PR #1081](https://github.com/rqlite/rqlite/pull/1081): Transparently forward Backup requests to Leaders. + ### Implementation changes and bug fixes - [PR #1079](https://github.com/rqlite/rqlite/pull/1079): Use a Protobuf model for Backup requests. - [PR #1078](https://github.com/rqlite/rqlite/pull/1078): Decrease bootstrap polling interval from 5 seconds to 2 seconds. From f652adc2e0473b97f3fe52f94b895032e0452108 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Thu, 20 Oct 2022 00:18:40 -0400 Subject: [PATCH 10/10] End-to-end testing of remote backup fetch --- system_test/full_system_test.py | 26 +++++++++++++++++++++----- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/system_test/full_system_test.py b/system_test/full_system_test.py index f679cece..5719b462 100644 --- a/system_test/full_system_test.py +++ b/system_test/full_system_test.py @@ -1357,13 +1357,18 @@ class TestEndToEndBackupRestore(unittest.TestCase): fd, self.db_file = tempfile.mkstemp() os.close(fd) + # Create a two-node cluster. self.node0 = Node(RQLITED_PATH, '0') self.node0.start() self.node0.wait_for_leader() self.node0.execute('CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)') self.node0.execute('INSERT INTO foo(name) VALUES("fiona")') self.node0.wait_for_all_fsm() + self.node1 = Node(RQLITED_PATH, '1') + self.node1.start(join=self.node0.APIAddr()) + self.node1.wait_for_leader() + # Get a backup from the first node and check it. self.node0.backup(self.db_file) conn = sqlite3.connect(self.db_file) rows = conn.execute('SELECT * FROM foo').fetchall() @@ -1371,12 +1376,21 @@ class TestEndToEndBackupRestore(unittest.TestCase): self.assertEqual(rows[0], (1, 'fiona')) conn.close() - self.node1 = Node(RQLITED_PATH, '1') - self.node1.start() - self.node1.wait_for_leader() - j = self.node1.restore(self.db_file) + # Get a backup from the other node and check it too. + self.node1.backup(self.db_file) + conn = sqlite3.connect(self.db_file) + rows = conn.execute('SELECT * FROM foo').fetchall() + self.assertEqual(len(rows), 1) + self.assertEqual(rows[0], (1, 'fiona')) + conn.close() + + # Load file into a brand new node, check the data is right. + self.node2 = Node(RQLITED_PATH, '1') + self.node2.start() + self.node2.wait_for_leader() + j = self.node2.restore(self.db_file) self.assertEqual(j, d_("{'results': [{'last_insert_id': 1, 'rows_affected': 1}]}")) - j = self.node1.query('SELECT * FROM foo') + j = self.node2.query('SELECT * FROM foo') self.assertEqual(j, d_("{'results': [{'values': [[1, 'fiona']], 'types': ['integer', 'text'], 'columns': ['id', 'name']}]}")) def tearDown(self): @@ -1384,6 +1398,8 @@ class TestEndToEndBackupRestore(unittest.TestCase): deprovision_node(self.node0) if hasattr(self, 'node1'): deprovision_node(self.node1) + if hasattr(self, 'node2'): + deprovision_node(self.node2) os.remove(self.db_file) class TestEndToEndSnapRestoreSingle(unittest.TestCase):