From c07401c778fa8fed5939f0729badf9ab27f4bb4e Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Sun, 23 Apr 2023 10:36:16 -0400 Subject: [PATCH] Support Joining over Raft connection --- CHANGELOG.md | 1 + cluster/client.go | 39 +++++++ cluster/message.pb.go | 228 ++++++++++++++++++++++++++++------------ cluster/message.proto | 6 ++ cluster/service.go | 25 +++++ cluster/service_test.go | 52 +++++++++ 6 files changed, 281 insertions(+), 70 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index bec567cc..5dadeab3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ## Implementation changes and bug fixes - [PR #1218](https://github.com/rqlite/rqlite/pull/1218): Check for more possible errors in peers.json. Thanks @Tjstretchalot - [PR #1220](https://github.com/rqlite/rqlite/pull/1220): Support Notify over Raft connection. +- [PR #1221](https://github.com/rqlite/rqlite/pull/1221): Support Join over Raft connection. ## 7.14.2 (April 7th 2023) This release is the first to includes various bug fixes and optimizations thanks to running much of the code through [Chat GPT-4](https://openai.com/product/gpt-4), most of which are not explicitly listed in the [CHANGELOG](https://github.com/rqlite/rqlite/edit/master/CHANGELOG.md), but you can check the commit history for details. Future releases of rqlite will probably include more such changes. diff --git a/cluster/client.go b/cluster/client.go index 826329b4..dffc7feb 100644 --- a/cluster/client.go +++ b/cluster/client.go @@ -344,6 +344,45 @@ func (c *Client) Notify(nr *command.NotifyRequest, nodeAddr string, timeout time return nil } +// Join joins a this node to a cluster +func (c *Client) Join(jr *command.JoinRequest, nodeAddr string, timeout time.Duration) error { + conn, err := c.dial(nodeAddr, c.timeout) + if err != nil { + return err + } + defer conn.Close() + + // Create the request. + command := &Command{ + Type: Command_COMMAND_TYPE_JOIN, + Request: &Command_JoinRequest{ + JoinRequest: jr, + }, + } + + if err := writeCommand(conn, command, timeout); err != nil { + handleConnError(conn) + return err + } + + p, err := readResponse(conn, timeout) + if err != nil { + handleConnError(conn) + return err + } + + a := &CommandJoinResponse{} + err = proto.Unmarshal(p, a) + if err != nil { + return err + } + + if a.Error != "" { + return errors.New(a.Error) + } + return nil +} + // Stats returns stats on the Client instance func (c *Client) Stats() (map[string]interface{}, error) { c.mu.RLock() diff --git a/cluster/message.pb.go b/cluster/message.pb.go index a79dd047..5d7116d8 100644 --- a/cluster/message.pb.go +++ b/cluster/message.pb.go @@ -32,6 +32,7 @@ const ( Command_COMMAND_TYPE_LOAD Command_Type = 5 Command_COMMAND_TYPE_REMOVE_NODE Command_Type = 6 Command_COMMAND_TYPE_NOTIFY Command_Type = 7 + Command_COMMAND_TYPE_JOIN Command_Type = 8 ) // Enum value maps for Command_Type. @@ -45,6 +46,7 @@ var ( 5: "COMMAND_TYPE_LOAD", 6: "COMMAND_TYPE_REMOVE_NODE", 7: "COMMAND_TYPE_NOTIFY", + 8: "COMMAND_TYPE_JOIN", } Command_Type_value = map[string]int32{ "COMMAND_TYPE_UNKNOWN": 0, @@ -55,6 +57,7 @@ var ( "COMMAND_TYPE_LOAD": 5, "COMMAND_TYPE_REMOVE_NODE": 6, "COMMAND_TYPE_NOTIFY": 7, + "COMMAND_TYPE_JOIN": 8, } ) @@ -201,6 +204,7 @@ type Command struct { // *Command_LoadRequest // *Command_RemoveNodeRequest // *Command_NotifyRequest + // *Command_JoinRequest Request isCommand_Request `protobuf_oneof:"request"` Credentials *Credentials `protobuf:"bytes,4,opt,name=credentials,proto3" json:"credentials,omitempty"` } @@ -293,6 +297,13 @@ func (x *Command) GetNotifyRequest() *command.NotifyRequest { return nil } +func (x *Command) GetJoinRequest() *command.JoinRequest { + if x, ok := x.GetRequest().(*Command_JoinRequest); ok { + return x.JoinRequest + } + return nil +} + func (x *Command) GetCredentials() *Credentials { if x != nil { return x.Credentials @@ -328,6 +339,10 @@ type Command_NotifyRequest struct { NotifyRequest *command.NotifyRequest `protobuf:"bytes,8,opt,name=notify_request,json=notifyRequest,proto3,oneof"` } +type Command_JoinRequest struct { + JoinRequest *command.JoinRequest `protobuf:"bytes,9,opt,name=join_request,json=joinRequest,proto3,oneof"` +} + func (*Command_ExecuteRequest) isCommand_Request() {} func (*Command_QueryRequest) isCommand_Request() {} @@ -340,6 +355,8 @@ func (*Command_RemoveNodeRequest) isCommand_Request() {} func (*Command_NotifyRequest) isCommand_Request() {} +func (*Command_JoinRequest) isCommand_Request() {} + type CommandExecuteResponse struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -646,6 +663,53 @@ func (x *CommandNotifyResponse) GetError() string { return "" } +type CommandJoinResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Error string `protobuf:"bytes,1,opt,name=error,proto3" json:"error,omitempty"` +} + +func (x *CommandJoinResponse) Reset() { + *x = CommandJoinResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_message_proto_msgTypes[9] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *CommandJoinResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*CommandJoinResponse) ProtoMessage() {} + +func (x *CommandJoinResponse) ProtoReflect() protoreflect.Message { + mi := &file_message_proto_msgTypes[9] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use CommandJoinResponse.ProtoReflect.Descriptor instead. +func (*CommandJoinResponse) Descriptor() ([]byte, []int) { + return file_message_proto_rawDescGZIP(), []int{9} +} + +func (x *CommandJoinResponse) GetError() string { + if x != nil { + return x.Error + } + return "" +} + var File_message_proto protoreflect.FileDescriptor var file_message_proto_rawDesc = []byte{ @@ -658,7 +722,7 @@ var file_message_proto_rawDesc = []byte{ 0x73, 0x73, 0x77, 0x6f, 0x72, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x70, 0x61, 0x73, 0x73, 0x77, 0x6f, 0x72, 0x64, 0x22, 0x1b, 0x0a, 0x07, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x12, 0x10, 0x0a, 0x03, 0x75, 0x72, 0x6c, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, - 0x75, 0x72, 0x6c, 0x22, 0xe3, 0x05, 0x0a, 0x07, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x12, + 0x75, 0x72, 0x6c, 0x22, 0xb5, 0x06, 0x0a, 0x07, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x12, 0x29, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x15, 0x2e, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x42, 0x0a, 0x0f, 0x65, 0x78, @@ -686,52 +750,60 @@ var file_message_proto_rawDesc = []byte{ 0x5f, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x18, 0x08, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x16, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e, 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x48, 0x00, 0x52, 0x0d, 0x6e, 0x6f, 0x74, 0x69, 0x66, 0x79, - 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x36, 0x0a, 0x0b, 0x63, 0x72, 0x65, 0x64, 0x65, - 0x6e, 0x74, 0x69, 0x61, 0x6c, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x63, - 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x2e, 0x43, 0x72, 0x65, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x61, - 0x6c, 0x73, 0x52, 0x0b, 0x63, 0x72, 0x65, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x61, 0x6c, 0x73, 0x22, - 0xdc, 0x01, 0x0a, 0x04, 0x54, 0x79, 0x70, 0x65, 0x12, 0x18, 0x0a, 0x14, 0x43, 0x4f, 0x4d, 0x4d, - 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, - 0x10, 0x00, 0x12, 0x21, 0x0a, 0x1d, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, - 0x50, 0x45, 0x5f, 0x47, 0x45, 0x54, 0x5f, 0x4e, 0x4f, 0x44, 0x45, 0x5f, 0x41, 0x50, 0x49, 0x5f, - 0x55, 0x52, 0x4c, 0x10, 0x01, 0x12, 0x18, 0x0a, 0x14, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, - 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x45, 0x58, 0x45, 0x43, 0x55, 0x54, 0x45, 0x10, 0x02, 0x12, - 0x16, 0x0a, 0x12, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, - 0x51, 0x55, 0x45, 0x52, 0x59, 0x10, 0x03, 0x12, 0x17, 0x0a, 0x13, 0x43, 0x4f, 0x4d, 0x4d, 0x41, - 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x42, 0x41, 0x43, 0x4b, 0x55, 0x50, 0x10, 0x04, - 0x12, 0x15, 0x0a, 0x11, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, - 0x5f, 0x4c, 0x4f, 0x41, 0x44, 0x10, 0x05, 0x12, 0x1c, 0x0a, 0x18, 0x43, 0x4f, 0x4d, 0x4d, 0x41, - 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x52, 0x45, 0x4d, 0x4f, 0x56, 0x45, 0x5f, 0x4e, - 0x4f, 0x44, 0x45, 0x10, 0x06, 0x12, 0x17, 0x0a, 0x13, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, - 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x4e, 0x4f, 0x54, 0x49, 0x46, 0x59, 0x10, 0x07, 0x42, 0x09, - 0x0a, 0x07, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x60, 0x0a, 0x16, 0x43, 0x6f, 0x6d, - 0x6d, 0x61, 0x6e, 0x64, 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x30, 0x0a, 0x07, 0x72, 0x65, 0x73, - 0x75, 0x6c, 0x74, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x16, 0x2e, 0x63, 0x6f, 0x6d, - 0x6d, 0x61, 0x6e, 0x64, 0x2e, 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, 0x65, 0x52, 0x65, 0x73, 0x75, - 0x6c, 0x74, 0x52, 0x07, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x73, 0x22, 0x54, 0x0a, 0x14, 0x43, - 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x51, 0x75, 0x65, 0x72, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x26, 0x0a, 0x04, 0x72, 0x6f, 0x77, - 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, - 0x64, 0x2e, 0x51, 0x75, 0x65, 0x72, 0x79, 0x52, 0x6f, 0x77, 0x73, 0x52, 0x04, 0x72, 0x6f, 0x77, - 0x73, 0x22, 0x41, 0x0a, 0x15, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x42, 0x61, 0x63, 0x6b, - 0x75, 0x70, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, - 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, - 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, - 0x64, 0x61, 0x74, 0x61, 0x22, 0x2b, 0x0a, 0x13, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x4c, - 0x6f, 0x61, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x65, - 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, - 0x72, 0x22, 0x31, 0x0a, 0x19, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x52, 0x65, 0x6d, 0x6f, - 0x76, 0x65, 0x4e, 0x6f, 0x64, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, - 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, - 0x72, 0x72, 0x6f, 0x72, 0x22, 0x2d, 0x0a, 0x15, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x4e, - 0x6f, 0x74, 0x69, 0x66, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x39, 0x0a, 0x0c, 0x6a, 0x6f, 0x69, 0x6e, 0x5f, + 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x18, 0x09, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, + 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e, 0x4a, 0x6f, 0x69, 0x6e, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x48, 0x00, 0x52, 0x0b, 0x6a, 0x6f, 0x69, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x12, 0x36, 0x0a, 0x0b, 0x63, 0x72, 0x65, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x61, 0x6c, + 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, + 0x72, 0x2e, 0x43, 0x72, 0x65, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x61, 0x6c, 0x73, 0x52, 0x0b, 0x63, + 0x72, 0x65, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x61, 0x6c, 0x73, 0x22, 0xf3, 0x01, 0x0a, 0x04, 0x54, + 0x79, 0x70, 0x65, 0x12, 0x18, 0x0a, 0x14, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, + 0x59, 0x50, 0x45, 0x5f, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x21, 0x0a, + 0x1d, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x47, 0x45, + 0x54, 0x5f, 0x4e, 0x4f, 0x44, 0x45, 0x5f, 0x41, 0x50, 0x49, 0x5f, 0x55, 0x52, 0x4c, 0x10, 0x01, + 0x12, 0x18, 0x0a, 0x14, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, + 0x5f, 0x45, 0x58, 0x45, 0x43, 0x55, 0x54, 0x45, 0x10, 0x02, 0x12, 0x16, 0x0a, 0x12, 0x43, 0x4f, + 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x51, 0x55, 0x45, 0x52, 0x59, + 0x10, 0x03, 0x12, 0x17, 0x0a, 0x13, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, + 0x50, 0x45, 0x5f, 0x42, 0x41, 0x43, 0x4b, 0x55, 0x50, 0x10, 0x04, 0x12, 0x15, 0x0a, 0x11, 0x43, + 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x4c, 0x4f, 0x41, 0x44, + 0x10, 0x05, 0x12, 0x1c, 0x0a, 0x18, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, + 0x50, 0x45, 0x5f, 0x52, 0x45, 0x4d, 0x4f, 0x56, 0x45, 0x5f, 0x4e, 0x4f, 0x44, 0x45, 0x10, 0x06, + 0x12, 0x17, 0x0a, 0x13, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, + 0x5f, 0x4e, 0x4f, 0x54, 0x49, 0x46, 0x59, 0x10, 0x07, 0x12, 0x15, 0x0a, 0x11, 0x43, 0x4f, 0x4d, + 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x4a, 0x4f, 0x49, 0x4e, 0x10, 0x08, + 0x42, 0x09, 0x0a, 0x07, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x60, 0x0a, 0x16, 0x43, + 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, 0x65, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x30, 0x0a, 0x07, 0x72, + 0x65, 0x73, 0x75, 0x6c, 0x74, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x16, 0x2e, 0x63, + 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e, 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, 0x65, 0x52, 0x65, + 0x73, 0x75, 0x6c, 0x74, 0x52, 0x07, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x73, 0x22, 0x54, 0x0a, + 0x14, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x51, 0x75, 0x65, 0x72, 0x79, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x26, 0x0a, 0x04, 0x72, + 0x6f, 0x77, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, + 0x61, 0x6e, 0x64, 0x2e, 0x51, 0x75, 0x65, 0x72, 0x79, 0x52, 0x6f, 0x77, 0x73, 0x52, 0x04, 0x72, + 0x6f, 0x77, 0x73, 0x22, 0x41, 0x0a, 0x15, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x42, 0x61, + 0x63, 0x6b, 0x75, 0x70, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, + 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, + 0x6f, 0x72, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, + 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x22, 0x2b, 0x0a, 0x13, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, + 0x64, 0x4c, 0x6f, 0x61, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, - 0x72, 0x6f, 0x72, 0x42, 0x22, 0x5a, 0x20, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, - 0x6d, 0x2f, 0x72, 0x71, 0x6c, 0x69, 0x74, 0x65, 0x2f, 0x72, 0x71, 0x6c, 0x69, 0x74, 0x65, 0x2f, - 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x72, 0x6f, 0x72, 0x22, 0x31, 0x0a, 0x19, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x52, 0x65, + 0x6d, 0x6f, 0x76, 0x65, 0x4e, 0x6f, 0x64, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, + 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x22, 0x2d, 0x0a, 0x15, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, + 0x64, 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, + 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, + 0x65, 0x72, 0x72, 0x6f, 0x72, 0x22, 0x2b, 0x0a, 0x13, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, + 0x4a, 0x6f, 0x69, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, + 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, + 0x6f, 0x72, 0x42, 0x22, 0x5a, 0x20, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, + 0x2f, 0x72, 0x71, 0x6c, 0x69, 0x74, 0x65, 0x2f, 0x72, 0x71, 0x6c, 0x69, 0x74, 0x65, 0x2f, 0x63, + 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -747,7 +819,7 @@ func file_message_proto_rawDescGZIP() []byte { } var file_message_proto_enumTypes = make([]protoimpl.EnumInfo, 1) -var file_message_proto_msgTypes = make([]protoimpl.MessageInfo, 9) +var file_message_proto_msgTypes = make([]protoimpl.MessageInfo, 10) var file_message_proto_goTypes = []interface{}{ (Command_Type)(0), // 0: cluster.Command.Type (*Credentials)(nil), // 1: cluster.Credentials @@ -759,31 +831,34 @@ var file_message_proto_goTypes = []interface{}{ (*CommandLoadResponse)(nil), // 7: cluster.CommandLoadResponse (*CommandRemoveNodeResponse)(nil), // 8: cluster.CommandRemoveNodeResponse (*CommandNotifyResponse)(nil), // 9: cluster.CommandNotifyResponse - (*command.ExecuteRequest)(nil), // 10: command.ExecuteRequest - (*command.QueryRequest)(nil), // 11: command.QueryRequest - (*command.BackupRequest)(nil), // 12: command.BackupRequest - (*command.LoadRequest)(nil), // 13: command.LoadRequest - (*command.RemoveNodeRequest)(nil), // 14: command.RemoveNodeRequest - (*command.NotifyRequest)(nil), // 15: command.NotifyRequest - (*command.ExecuteResult)(nil), // 16: command.ExecuteResult - (*command.QueryRows)(nil), // 17: command.QueryRows + (*CommandJoinResponse)(nil), // 10: cluster.CommandJoinResponse + (*command.ExecuteRequest)(nil), // 11: command.ExecuteRequest + (*command.QueryRequest)(nil), // 12: command.QueryRequest + (*command.BackupRequest)(nil), // 13: command.BackupRequest + (*command.LoadRequest)(nil), // 14: command.LoadRequest + (*command.RemoveNodeRequest)(nil), // 15: command.RemoveNodeRequest + (*command.NotifyRequest)(nil), // 16: command.NotifyRequest + (*command.JoinRequest)(nil), // 17: command.JoinRequest + (*command.ExecuteResult)(nil), // 18: command.ExecuteResult + (*command.QueryRows)(nil), // 19: command.QueryRows } var file_message_proto_depIdxs = []int32{ 0, // 0: cluster.Command.type:type_name -> cluster.Command.Type - 10, // 1: cluster.Command.execute_request:type_name -> command.ExecuteRequest - 11, // 2: cluster.Command.query_request:type_name -> command.QueryRequest - 12, // 3: cluster.Command.backup_request:type_name -> command.BackupRequest - 13, // 4: cluster.Command.load_request:type_name -> command.LoadRequest - 14, // 5: cluster.Command.remove_node_request:type_name -> command.RemoveNodeRequest - 15, // 6: cluster.Command.notify_request:type_name -> command.NotifyRequest - 1, // 7: cluster.Command.credentials:type_name -> cluster.Credentials - 16, // 8: cluster.CommandExecuteResponse.results:type_name -> command.ExecuteResult - 17, // 9: cluster.CommandQueryResponse.rows:type_name -> command.QueryRows - 10, // [10:10] is the sub-list for method output_type - 10, // [10:10] is the sub-list for method input_type - 10, // [10:10] is the sub-list for extension type_name - 10, // [10:10] is the sub-list for extension extendee - 0, // [0:10] is the sub-list for field type_name + 11, // 1: cluster.Command.execute_request:type_name -> command.ExecuteRequest + 12, // 2: cluster.Command.query_request:type_name -> command.QueryRequest + 13, // 3: cluster.Command.backup_request:type_name -> command.BackupRequest + 14, // 4: cluster.Command.load_request:type_name -> command.LoadRequest + 15, // 5: cluster.Command.remove_node_request:type_name -> command.RemoveNodeRequest + 16, // 6: cluster.Command.notify_request:type_name -> command.NotifyRequest + 17, // 7: cluster.Command.join_request:type_name -> command.JoinRequest + 1, // 8: cluster.Command.credentials:type_name -> cluster.Credentials + 18, // 9: cluster.CommandExecuteResponse.results:type_name -> command.ExecuteResult + 19, // 10: cluster.CommandQueryResponse.rows:type_name -> command.QueryRows + 11, // [11:11] is the sub-list for method output_type + 11, // [11:11] is the sub-list for method input_type + 11, // [11:11] is the sub-list for extension type_name + 11, // [11:11] is the sub-list for extension extendee + 0, // [0:11] is the sub-list for field type_name } func init() { file_message_proto_init() } @@ -900,6 +975,18 @@ func file_message_proto_init() { return nil } } + file_message_proto_msgTypes[9].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*CommandJoinResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } file_message_proto_msgTypes[2].OneofWrappers = []interface{}{ (*Command_ExecuteRequest)(nil), @@ -908,6 +995,7 @@ func file_message_proto_init() { (*Command_LoadRequest)(nil), (*Command_RemoveNodeRequest)(nil), (*Command_NotifyRequest)(nil), + (*Command_JoinRequest)(nil), } type x struct{} out := protoimpl.TypeBuilder{ @@ -915,7 +1003,7 @@ func file_message_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_message_proto_rawDesc, NumEnums: 1, - NumMessages: 9, + NumMessages: 10, NumExtensions: 0, NumServices: 0, }, diff --git a/cluster/message.proto b/cluster/message.proto index f23eadbc..473f2cd8 100644 --- a/cluster/message.proto +++ b/cluster/message.proto @@ -24,6 +24,7 @@ message Command { COMMAND_TYPE_LOAD = 5; COMMAND_TYPE_REMOVE_NODE = 6; COMMAND_TYPE_NOTIFY = 7; + COMMAND_TYPE_JOIN = 8; } Type type = 1; @@ -34,6 +35,7 @@ message Command { command.LoadRequest load_request = 6; command.RemoveNodeRequest remove_node_request = 7; command.NotifyRequest notify_request = 8; + command.JoinRequest join_request = 9; } Credentials credentials = 4; @@ -65,3 +67,7 @@ message CommandRemoveNodeResponse { message CommandNotifyResponse { string error = 1; } + +message CommandJoinResponse { + string error = 1; +} diff --git a/cluster/service.go b/cluster/service.go index 39ec68c3..38df5941 100644 --- a/cluster/service.go +++ b/cluster/service.go @@ -31,6 +31,7 @@ const ( numLoadRequest = "num_load_req" numRemoveNodeRequest = "num_remove_node_req" numNotifyRequest = "num_notify_req" + numJoinRequest = "num_join_req" // Client stats for this package. numGetNodeAPIRequestLocal = "num_get_node_api_req_local" @@ -55,6 +56,7 @@ func init() { stats.Add(numRemoveNodeRequest, 0) stats.Add(numGetNodeAPIRequestLocal, 0) stats.Add(numNotifyRequest, 0) + stats.Add(numJoinRequest, 0) } // Dialer is the interface dialers must implement. @@ -88,6 +90,10 @@ type Manager interface { // Notify notifies this node that a remote node is ready // for bootstrapping. Notify(n *command.NotifyRequest) error + + // Join notifies this node that a remote node is ready + // to join the cluster. + Join(n *command.JoinRequest) error } // CredentialStore is the interface credential stores must support. @@ -393,6 +399,25 @@ func (s *Service) handleConn(conn net.Conn) { } } + p, err = proto.Marshal(resp) + if err != nil { + conn.Close() + } + writeBytesWithLength(conn, p) + + case Command_COMMAND_TYPE_JOIN: + stats.Add(numJoinRequest, 1) + resp := &CommandJoinResponse{} + + jr := c.GetJoinRequest() + if jr == nil { + resp.Error = "NotifyRequest is nil" + } else { + if err := s.mgr.Join(jr); err != nil { + resp.Error = err.Error() + } + } + p, err = proto.Marshal(resp) if err != nil { conn.Close() diff --git a/cluster/service_test.go b/cluster/service_test.go index 438eb42d..e5b70848 100644 --- a/cluster/service_test.go +++ b/cluster/service_test.go @@ -298,6 +298,50 @@ func Test_NewServiceNotify(t *testing.T) { } } +func Test_NewServiceJoin(t *testing.T) { + ml := mustNewMockTransport() + mm := mustNewMockManager() + mm.joinFn = func(j *command.JoinRequest) error { + if j.Id != "foo" { + t.Fatalf("failed to get correct node ID, exp %s, got %s", "foo", j.Id) + } + if j.Address != "localhost" { + t.Fatalf("failed to get correct node address, exp %s, got %s", "localhost", j.Address) + } + if !j.Voter { + t.Fatalf("failed to get correct voter setting, exp %t, got %t", true, j.Voter) + } + return nil + } + + s := New(ml, mustNewMockDatabase(), mm, mustNewMockCredentialStore()) + if s == nil { + t.Fatalf("failed to create cluster service") + } + + if err := s.Open(); err != nil { + t.Fatalf("failed to open cluster service") + } + + // Create a Join request. + jr := &command.JoinRequest{ + Id: "foo", + Address: "localhost", + Voter: true, + } + + // Test by connecting to itself. + c := NewClient(ml, 30*time.Second) + err := c.Join(jr, s.Addr(), 5*time.Second) + if err != nil { + t.Fatalf("failed to notify node: %s", err) + } + + if err := s.Close(); err != nil { + t.Fatalf("failed to close cluster service") + } +} + type mockTransport struct { tn net.Listener remoteEncrypted bool @@ -392,6 +436,7 @@ func mustNewMockDatabase() *mockDatabase { type MockManager struct { removeNodeFn func(rn *command.RemoveNodeRequest) error notifyFn func(n *command.NotifyRequest) error + joinFn func(n *command.JoinRequest) error } func (m *MockManager) Remove(rn *command.RemoveNodeRequest) error { @@ -408,6 +453,13 @@ func (m *MockManager) Notify(n *command.NotifyRequest) error { return m.notifyFn(n) } +func (m *MockManager) Join(n *command.JoinRequest) error { + if m.joinFn == nil { + return nil + } + return m.joinFn(n) +} + func mustNewMockManager() *MockManager { return &MockManager{} }