1
0
Fork 0

Support Joining over Raft connection

master
Philip O'Toole 1 year ago
parent 94e5d5e788
commit c07401c778

@ -2,6 +2,7 @@
## Implementation changes and bug fixes
- [PR #1218](https://github.com/rqlite/rqlite/pull/1218): Check for more possible errors in peers.json. Thanks @Tjstretchalot
- [PR #1220](https://github.com/rqlite/rqlite/pull/1220): Support Notify over Raft connection.
- [PR #1221](https://github.com/rqlite/rqlite/pull/1221): Support Join over Raft connection.
## 7.14.2 (April 7th 2023)
This release is the first to includes various bug fixes and optimizations thanks to running much of the code through [Chat GPT-4](https://openai.com/product/gpt-4), most of which are not explicitly listed in the [CHANGELOG](https://github.com/rqlite/rqlite/edit/master/CHANGELOG.md), but you can check the commit history for details. Future releases of rqlite will probably include more such changes.

@ -344,6 +344,45 @@ func (c *Client) Notify(nr *command.NotifyRequest, nodeAddr string, timeout time
return nil
}
// Join joins a this node to a cluster
func (c *Client) Join(jr *command.JoinRequest, nodeAddr string, timeout time.Duration) error {
conn, err := c.dial(nodeAddr, c.timeout)
if err != nil {
return err
}
defer conn.Close()
// Create the request.
command := &Command{
Type: Command_COMMAND_TYPE_JOIN,
Request: &Command_JoinRequest{
JoinRequest: jr,
},
}
if err := writeCommand(conn, command, timeout); err != nil {
handleConnError(conn)
return err
}
p, err := readResponse(conn, timeout)
if err != nil {
handleConnError(conn)
return err
}
a := &CommandJoinResponse{}
err = proto.Unmarshal(p, a)
if err != nil {
return err
}
if a.Error != "" {
return errors.New(a.Error)
}
return nil
}
// Stats returns stats on the Client instance
func (c *Client) Stats() (map[string]interface{}, error) {
c.mu.RLock()

@ -32,6 +32,7 @@ const (
Command_COMMAND_TYPE_LOAD Command_Type = 5
Command_COMMAND_TYPE_REMOVE_NODE Command_Type = 6
Command_COMMAND_TYPE_NOTIFY Command_Type = 7
Command_COMMAND_TYPE_JOIN Command_Type = 8
)
// Enum value maps for Command_Type.
@ -45,6 +46,7 @@ var (
5: "COMMAND_TYPE_LOAD",
6: "COMMAND_TYPE_REMOVE_NODE",
7: "COMMAND_TYPE_NOTIFY",
8: "COMMAND_TYPE_JOIN",
}
Command_Type_value = map[string]int32{
"COMMAND_TYPE_UNKNOWN": 0,
@ -55,6 +57,7 @@ var (
"COMMAND_TYPE_LOAD": 5,
"COMMAND_TYPE_REMOVE_NODE": 6,
"COMMAND_TYPE_NOTIFY": 7,
"COMMAND_TYPE_JOIN": 8,
}
)
@ -201,6 +204,7 @@ type Command struct {
// *Command_LoadRequest
// *Command_RemoveNodeRequest
// *Command_NotifyRequest
// *Command_JoinRequest
Request isCommand_Request `protobuf_oneof:"request"`
Credentials *Credentials `protobuf:"bytes,4,opt,name=credentials,proto3" json:"credentials,omitempty"`
}
@ -293,6 +297,13 @@ func (x *Command) GetNotifyRequest() *command.NotifyRequest {
return nil
}
func (x *Command) GetJoinRequest() *command.JoinRequest {
if x, ok := x.GetRequest().(*Command_JoinRequest); ok {
return x.JoinRequest
}
return nil
}
func (x *Command) GetCredentials() *Credentials {
if x != nil {
return x.Credentials
@ -328,6 +339,10 @@ type Command_NotifyRequest struct {
NotifyRequest *command.NotifyRequest `protobuf:"bytes,8,opt,name=notify_request,json=notifyRequest,proto3,oneof"`
}
type Command_JoinRequest struct {
JoinRequest *command.JoinRequest `protobuf:"bytes,9,opt,name=join_request,json=joinRequest,proto3,oneof"`
}
func (*Command_ExecuteRequest) isCommand_Request() {}
func (*Command_QueryRequest) isCommand_Request() {}
@ -340,6 +355,8 @@ func (*Command_RemoveNodeRequest) isCommand_Request() {}
func (*Command_NotifyRequest) isCommand_Request() {}
func (*Command_JoinRequest) isCommand_Request() {}
type CommandExecuteResponse struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
@ -646,6 +663,53 @@ func (x *CommandNotifyResponse) GetError() string {
return ""
}
type CommandJoinResponse struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Error string `protobuf:"bytes,1,opt,name=error,proto3" json:"error,omitempty"`
}
func (x *CommandJoinResponse) Reset() {
*x = CommandJoinResponse{}
if protoimpl.UnsafeEnabled {
mi := &file_message_proto_msgTypes[9]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *CommandJoinResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*CommandJoinResponse) ProtoMessage() {}
func (x *CommandJoinResponse) ProtoReflect() protoreflect.Message {
mi := &file_message_proto_msgTypes[9]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use CommandJoinResponse.ProtoReflect.Descriptor instead.
func (*CommandJoinResponse) Descriptor() ([]byte, []int) {
return file_message_proto_rawDescGZIP(), []int{9}
}
func (x *CommandJoinResponse) GetError() string {
if x != nil {
return x.Error
}
return ""
}
var File_message_proto protoreflect.FileDescriptor
var file_message_proto_rawDesc = []byte{
@ -658,7 +722,7 @@ var file_message_proto_rawDesc = []byte{
0x73, 0x73, 0x77, 0x6f, 0x72, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x70, 0x61,
0x73, 0x73, 0x77, 0x6f, 0x72, 0x64, 0x22, 0x1b, 0x0a, 0x07, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73,
0x73, 0x12, 0x10, 0x0a, 0x03, 0x75, 0x72, 0x6c, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03,
0x75, 0x72, 0x6c, 0x22, 0xe3, 0x05, 0x0a, 0x07, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x12,
0x75, 0x72, 0x6c, 0x22, 0xb5, 0x06, 0x0a, 0x07, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x12,
0x29, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x15, 0x2e,
0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e,
0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x42, 0x0a, 0x0f, 0x65, 0x78,
@ -686,52 +750,60 @@ var file_message_proto_rawDesc = []byte{
0x5f, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x18, 0x08, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x16,
0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e, 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x79, 0x52,
0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x48, 0x00, 0x52, 0x0d, 0x6e, 0x6f, 0x74, 0x69, 0x66, 0x79,
0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x36, 0x0a, 0x0b, 0x63, 0x72, 0x65, 0x64, 0x65,
0x6e, 0x74, 0x69, 0x61, 0x6c, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x63,
0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x2e, 0x43, 0x72, 0x65, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x61,
0x6c, 0x73, 0x52, 0x0b, 0x63, 0x72, 0x65, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x61, 0x6c, 0x73, 0x22,
0xdc, 0x01, 0x0a, 0x04, 0x54, 0x79, 0x70, 0x65, 0x12, 0x18, 0x0a, 0x14, 0x43, 0x4f, 0x4d, 0x4d,
0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e,
0x10, 0x00, 0x12, 0x21, 0x0a, 0x1d, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59,
0x50, 0x45, 0x5f, 0x47, 0x45, 0x54, 0x5f, 0x4e, 0x4f, 0x44, 0x45, 0x5f, 0x41, 0x50, 0x49, 0x5f,
0x55, 0x52, 0x4c, 0x10, 0x01, 0x12, 0x18, 0x0a, 0x14, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44,
0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x45, 0x58, 0x45, 0x43, 0x55, 0x54, 0x45, 0x10, 0x02, 0x12,
0x16, 0x0a, 0x12, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f,
0x51, 0x55, 0x45, 0x52, 0x59, 0x10, 0x03, 0x12, 0x17, 0x0a, 0x13, 0x43, 0x4f, 0x4d, 0x4d, 0x41,
0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x42, 0x41, 0x43, 0x4b, 0x55, 0x50, 0x10, 0x04,
0x12, 0x15, 0x0a, 0x11, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45,
0x5f, 0x4c, 0x4f, 0x41, 0x44, 0x10, 0x05, 0x12, 0x1c, 0x0a, 0x18, 0x43, 0x4f, 0x4d, 0x4d, 0x41,
0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x52, 0x45, 0x4d, 0x4f, 0x56, 0x45, 0x5f, 0x4e,
0x4f, 0x44, 0x45, 0x10, 0x06, 0x12, 0x17, 0x0a, 0x13, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44,
0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x4e, 0x4f, 0x54, 0x49, 0x46, 0x59, 0x10, 0x07, 0x42, 0x09,
0x0a, 0x07, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x60, 0x0a, 0x16, 0x43, 0x6f, 0x6d,
0x6d, 0x61, 0x6e, 0x64, 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f,
0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01,
0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x30, 0x0a, 0x07, 0x72, 0x65, 0x73,
0x75, 0x6c, 0x74, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x16, 0x2e, 0x63, 0x6f, 0x6d,
0x6d, 0x61, 0x6e, 0x64, 0x2e, 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, 0x65, 0x52, 0x65, 0x73, 0x75,
0x6c, 0x74, 0x52, 0x07, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x73, 0x22, 0x54, 0x0a, 0x14, 0x43,
0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x51, 0x75, 0x65, 0x72, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f,
0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01,
0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x26, 0x0a, 0x04, 0x72, 0x6f, 0x77,
0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e,
0x64, 0x2e, 0x51, 0x75, 0x65, 0x72, 0x79, 0x52, 0x6f, 0x77, 0x73, 0x52, 0x04, 0x72, 0x6f, 0x77,
0x73, 0x22, 0x41, 0x0a, 0x15, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x42, 0x61, 0x63, 0x6b,
0x75, 0x70, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72,
0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72,
0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04,
0x64, 0x61, 0x74, 0x61, 0x22, 0x2b, 0x0a, 0x13, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x4c,
0x6f, 0x61, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x65,
0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f,
0x72, 0x22, 0x31, 0x0a, 0x19, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x52, 0x65, 0x6d, 0x6f,
0x76, 0x65, 0x4e, 0x6f, 0x64, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14,
0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65,
0x72, 0x72, 0x6f, 0x72, 0x22, 0x2d, 0x0a, 0x15, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x4e,
0x6f, 0x74, 0x69, 0x66, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a,
0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x39, 0x0a, 0x0c, 0x6a, 0x6f, 0x69, 0x6e, 0x5f,
0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x18, 0x09, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e,
0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e, 0x4a, 0x6f, 0x69, 0x6e, 0x52, 0x65, 0x71, 0x75,
0x65, 0x73, 0x74, 0x48, 0x00, 0x52, 0x0b, 0x6a, 0x6f, 0x69, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65,
0x73, 0x74, 0x12, 0x36, 0x0a, 0x0b, 0x63, 0x72, 0x65, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x61, 0x6c,
0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65,
0x72, 0x2e, 0x43, 0x72, 0x65, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x61, 0x6c, 0x73, 0x52, 0x0b, 0x63,
0x72, 0x65, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x61, 0x6c, 0x73, 0x22, 0xf3, 0x01, 0x0a, 0x04, 0x54,
0x79, 0x70, 0x65, 0x12, 0x18, 0x0a, 0x14, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54,
0x59, 0x50, 0x45, 0x5f, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x21, 0x0a,
0x1d, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x47, 0x45,
0x54, 0x5f, 0x4e, 0x4f, 0x44, 0x45, 0x5f, 0x41, 0x50, 0x49, 0x5f, 0x55, 0x52, 0x4c, 0x10, 0x01,
0x12, 0x18, 0x0a, 0x14, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45,
0x5f, 0x45, 0x58, 0x45, 0x43, 0x55, 0x54, 0x45, 0x10, 0x02, 0x12, 0x16, 0x0a, 0x12, 0x43, 0x4f,
0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x51, 0x55, 0x45, 0x52, 0x59,
0x10, 0x03, 0x12, 0x17, 0x0a, 0x13, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59,
0x50, 0x45, 0x5f, 0x42, 0x41, 0x43, 0x4b, 0x55, 0x50, 0x10, 0x04, 0x12, 0x15, 0x0a, 0x11, 0x43,
0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x4c, 0x4f, 0x41, 0x44,
0x10, 0x05, 0x12, 0x1c, 0x0a, 0x18, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59,
0x50, 0x45, 0x5f, 0x52, 0x45, 0x4d, 0x4f, 0x56, 0x45, 0x5f, 0x4e, 0x4f, 0x44, 0x45, 0x10, 0x06,
0x12, 0x17, 0x0a, 0x13, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45,
0x5f, 0x4e, 0x4f, 0x54, 0x49, 0x46, 0x59, 0x10, 0x07, 0x12, 0x15, 0x0a, 0x11, 0x43, 0x4f, 0x4d,
0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x4a, 0x4f, 0x49, 0x4e, 0x10, 0x08,
0x42, 0x09, 0x0a, 0x07, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x60, 0x0a, 0x16, 0x43,
0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, 0x65, 0x52, 0x65, 0x73,
0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01,
0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x30, 0x0a, 0x07, 0x72,
0x65, 0x73, 0x75, 0x6c, 0x74, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x16, 0x2e, 0x63,
0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e, 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, 0x65, 0x52, 0x65,
0x73, 0x75, 0x6c, 0x74, 0x52, 0x07, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x73, 0x22, 0x54, 0x0a,
0x14, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x51, 0x75, 0x65, 0x72, 0x79, 0x52, 0x65, 0x73,
0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01,
0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x26, 0x0a, 0x04, 0x72,
0x6f, 0x77, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x63, 0x6f, 0x6d, 0x6d,
0x61, 0x6e, 0x64, 0x2e, 0x51, 0x75, 0x65, 0x72, 0x79, 0x52, 0x6f, 0x77, 0x73, 0x52, 0x04, 0x72,
0x6f, 0x77, 0x73, 0x22, 0x41, 0x0a, 0x15, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x42, 0x61,
0x63, 0x6b, 0x75, 0x70, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05,
0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72,
0x6f, 0x72, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c,
0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x22, 0x2b, 0x0a, 0x13, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e,
0x64, 0x4c, 0x6f, 0x61, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a,
0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72,
0x72, 0x6f, 0x72, 0x42, 0x22, 0x5a, 0x20, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f,
0x6d, 0x2f, 0x72, 0x71, 0x6c, 0x69, 0x74, 0x65, 0x2f, 0x72, 0x71, 0x6c, 0x69, 0x74, 0x65, 0x2f,
0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
0x72, 0x6f, 0x72, 0x22, 0x31, 0x0a, 0x19, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x52, 0x65,
0x6d, 0x6f, 0x76, 0x65, 0x4e, 0x6f, 0x64, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65,
0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52,
0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x22, 0x2d, 0x0a, 0x15, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e,
0x64, 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12,
0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05,
0x65, 0x72, 0x72, 0x6f, 0x72, 0x22, 0x2b, 0x0a, 0x13, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64,
0x4a, 0x6f, 0x69, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05,
0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72,
0x6f, 0x72, 0x42, 0x22, 0x5a, 0x20, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d,
0x2f, 0x72, 0x71, 0x6c, 0x69, 0x74, 0x65, 0x2f, 0x72, 0x71, 0x6c, 0x69, 0x74, 0x65, 0x2f, 0x63,
0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
@ -747,7 +819,7 @@ func file_message_proto_rawDescGZIP() []byte {
}
var file_message_proto_enumTypes = make([]protoimpl.EnumInfo, 1)
var file_message_proto_msgTypes = make([]protoimpl.MessageInfo, 9)
var file_message_proto_msgTypes = make([]protoimpl.MessageInfo, 10)
var file_message_proto_goTypes = []interface{}{
(Command_Type)(0), // 0: cluster.Command.Type
(*Credentials)(nil), // 1: cluster.Credentials
@ -759,31 +831,34 @@ var file_message_proto_goTypes = []interface{}{
(*CommandLoadResponse)(nil), // 7: cluster.CommandLoadResponse
(*CommandRemoveNodeResponse)(nil), // 8: cluster.CommandRemoveNodeResponse
(*CommandNotifyResponse)(nil), // 9: cluster.CommandNotifyResponse
(*command.ExecuteRequest)(nil), // 10: command.ExecuteRequest
(*command.QueryRequest)(nil), // 11: command.QueryRequest
(*command.BackupRequest)(nil), // 12: command.BackupRequest
(*command.LoadRequest)(nil), // 13: command.LoadRequest
(*command.RemoveNodeRequest)(nil), // 14: command.RemoveNodeRequest
(*command.NotifyRequest)(nil), // 15: command.NotifyRequest
(*command.ExecuteResult)(nil), // 16: command.ExecuteResult
(*command.QueryRows)(nil), // 17: command.QueryRows
(*CommandJoinResponse)(nil), // 10: cluster.CommandJoinResponse
(*command.ExecuteRequest)(nil), // 11: command.ExecuteRequest
(*command.QueryRequest)(nil), // 12: command.QueryRequest
(*command.BackupRequest)(nil), // 13: command.BackupRequest
(*command.LoadRequest)(nil), // 14: command.LoadRequest
(*command.RemoveNodeRequest)(nil), // 15: command.RemoveNodeRequest
(*command.NotifyRequest)(nil), // 16: command.NotifyRequest
(*command.JoinRequest)(nil), // 17: command.JoinRequest
(*command.ExecuteResult)(nil), // 18: command.ExecuteResult
(*command.QueryRows)(nil), // 19: command.QueryRows
}
var file_message_proto_depIdxs = []int32{
0, // 0: cluster.Command.type:type_name -> cluster.Command.Type
10, // 1: cluster.Command.execute_request:type_name -> command.ExecuteRequest
11, // 2: cluster.Command.query_request:type_name -> command.QueryRequest
12, // 3: cluster.Command.backup_request:type_name -> command.BackupRequest
13, // 4: cluster.Command.load_request:type_name -> command.LoadRequest
14, // 5: cluster.Command.remove_node_request:type_name -> command.RemoveNodeRequest
15, // 6: cluster.Command.notify_request:type_name -> command.NotifyRequest
1, // 7: cluster.Command.credentials:type_name -> cluster.Credentials
16, // 8: cluster.CommandExecuteResponse.results:type_name -> command.ExecuteResult
17, // 9: cluster.CommandQueryResponse.rows:type_name -> command.QueryRows
10, // [10:10] is the sub-list for method output_type
10, // [10:10] is the sub-list for method input_type
10, // [10:10] is the sub-list for extension type_name
10, // [10:10] is the sub-list for extension extendee
0, // [0:10] is the sub-list for field type_name
11, // 1: cluster.Command.execute_request:type_name -> command.ExecuteRequest
12, // 2: cluster.Command.query_request:type_name -> command.QueryRequest
13, // 3: cluster.Command.backup_request:type_name -> command.BackupRequest
14, // 4: cluster.Command.load_request:type_name -> command.LoadRequest
15, // 5: cluster.Command.remove_node_request:type_name -> command.RemoveNodeRequest
16, // 6: cluster.Command.notify_request:type_name -> command.NotifyRequest
17, // 7: cluster.Command.join_request:type_name -> command.JoinRequest
1, // 8: cluster.Command.credentials:type_name -> cluster.Credentials
18, // 9: cluster.CommandExecuteResponse.results:type_name -> command.ExecuteResult
19, // 10: cluster.CommandQueryResponse.rows:type_name -> command.QueryRows
11, // [11:11] is the sub-list for method output_type
11, // [11:11] is the sub-list for method input_type
11, // [11:11] is the sub-list for extension type_name
11, // [11:11] is the sub-list for extension extendee
0, // [0:11] is the sub-list for field type_name
}
func init() { file_message_proto_init() }
@ -900,6 +975,18 @@ func file_message_proto_init() {
return nil
}
}
file_message_proto_msgTypes[9].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*CommandJoinResponse); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
file_message_proto_msgTypes[2].OneofWrappers = []interface{}{
(*Command_ExecuteRequest)(nil),
@ -908,6 +995,7 @@ func file_message_proto_init() {
(*Command_LoadRequest)(nil),
(*Command_RemoveNodeRequest)(nil),
(*Command_NotifyRequest)(nil),
(*Command_JoinRequest)(nil),
}
type x struct{}
out := protoimpl.TypeBuilder{
@ -915,7 +1003,7 @@ func file_message_proto_init() {
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_message_proto_rawDesc,
NumEnums: 1,
NumMessages: 9,
NumMessages: 10,
NumExtensions: 0,
NumServices: 0,
},

@ -24,6 +24,7 @@ message Command {
COMMAND_TYPE_LOAD = 5;
COMMAND_TYPE_REMOVE_NODE = 6;
COMMAND_TYPE_NOTIFY = 7;
COMMAND_TYPE_JOIN = 8;
}
Type type = 1;
@ -34,6 +35,7 @@ message Command {
command.LoadRequest load_request = 6;
command.RemoveNodeRequest remove_node_request = 7;
command.NotifyRequest notify_request = 8;
command.JoinRequest join_request = 9;
}
Credentials credentials = 4;
@ -65,3 +67,7 @@ message CommandRemoveNodeResponse {
message CommandNotifyResponse {
string error = 1;
}
message CommandJoinResponse {
string error = 1;
}

@ -31,6 +31,7 @@ const (
numLoadRequest = "num_load_req"
numRemoveNodeRequest = "num_remove_node_req"
numNotifyRequest = "num_notify_req"
numJoinRequest = "num_join_req"
// Client stats for this package.
numGetNodeAPIRequestLocal = "num_get_node_api_req_local"
@ -55,6 +56,7 @@ func init() {
stats.Add(numRemoveNodeRequest, 0)
stats.Add(numGetNodeAPIRequestLocal, 0)
stats.Add(numNotifyRequest, 0)
stats.Add(numJoinRequest, 0)
}
// Dialer is the interface dialers must implement.
@ -88,6 +90,10 @@ type Manager interface {
// Notify notifies this node that a remote node is ready
// for bootstrapping.
Notify(n *command.NotifyRequest) error
// Join notifies this node that a remote node is ready
// to join the cluster.
Join(n *command.JoinRequest) error
}
// CredentialStore is the interface credential stores must support.
@ -393,6 +399,25 @@ func (s *Service) handleConn(conn net.Conn) {
}
}
p, err = proto.Marshal(resp)
if err != nil {
conn.Close()
}
writeBytesWithLength(conn, p)
case Command_COMMAND_TYPE_JOIN:
stats.Add(numJoinRequest, 1)
resp := &CommandJoinResponse{}
jr := c.GetJoinRequest()
if jr == nil {
resp.Error = "NotifyRequest is nil"
} else {
if err := s.mgr.Join(jr); err != nil {
resp.Error = err.Error()
}
}
p, err = proto.Marshal(resp)
if err != nil {
conn.Close()

@ -298,6 +298,50 @@ func Test_NewServiceNotify(t *testing.T) {
}
}
func Test_NewServiceJoin(t *testing.T) {
ml := mustNewMockTransport()
mm := mustNewMockManager()
mm.joinFn = func(j *command.JoinRequest) error {
if j.Id != "foo" {
t.Fatalf("failed to get correct node ID, exp %s, got %s", "foo", j.Id)
}
if j.Address != "localhost" {
t.Fatalf("failed to get correct node address, exp %s, got %s", "localhost", j.Address)
}
if !j.Voter {
t.Fatalf("failed to get correct voter setting, exp %t, got %t", true, j.Voter)
}
return nil
}
s := New(ml, mustNewMockDatabase(), mm, mustNewMockCredentialStore())
if s == nil {
t.Fatalf("failed to create cluster service")
}
if err := s.Open(); err != nil {
t.Fatalf("failed to open cluster service")
}
// Create a Join request.
jr := &command.JoinRequest{
Id: "foo",
Address: "localhost",
Voter: true,
}
// Test by connecting to itself.
c := NewClient(ml, 30*time.Second)
err := c.Join(jr, s.Addr(), 5*time.Second)
if err != nil {
t.Fatalf("failed to notify node: %s", err)
}
if err := s.Close(); err != nil {
t.Fatalf("failed to close cluster service")
}
}
type mockTransport struct {
tn net.Listener
remoteEncrypted bool
@ -392,6 +436,7 @@ func mustNewMockDatabase() *mockDatabase {
type MockManager struct {
removeNodeFn func(rn *command.RemoveNodeRequest) error
notifyFn func(n *command.NotifyRequest) error
joinFn func(n *command.JoinRequest) error
}
func (m *MockManager) Remove(rn *command.RemoveNodeRequest) error {
@ -408,6 +453,13 @@ func (m *MockManager) Notify(n *command.NotifyRequest) error {
return m.notifyFn(n)
}
func (m *MockManager) Join(n *command.JoinRequest) error {
if m.joinFn == nil {
return nil
}
return m.joinFn(n)
}
func mustNewMockManager() *MockManager {
return &MockManager{}
}

Loading…
Cancel
Save