1
0
Fork 0

Support Notify over Raft connection

master
Philip O'Toole 1 year ago
parent c837103928
commit 627c2c5588

@ -306,6 +306,44 @@ func (c *Client) RemoveNode(rn *command.RemoveNodeRequest, nodeAddr string, cred
return nil
}
// Notify notifies a remote node that this node is ready to bootstrap.
func (c *Client) Notify(nr *command.NotifyRequest, nodeAddr string, timeout time.Duration) error {
conn, err := c.dial(nodeAddr, c.timeout)
if err != nil {
return err
}
defer conn.Close()
// Create the request.
command := &Command{
Type: Command_COMMAND_TYPE_NOTIFY,
Request: &Command_NotifyRequest{
NotifyRequest: nr,
},
}
if err := writeCommand(conn, command, timeout); err != nil {
handleConnError(conn)
return err
}
p, err := readResponse(conn, timeout)
if err != nil {
handleConnError(conn)
return err
}
a := &CommandNotifyResponse{}
err = proto.Unmarshal(p, a)
if err != nil {
return err
}
if a.Error != "" {
return errors.New(a.Error)
}
return nil
}
// Stats returns stats on the Client instance
func (c *Client) Stats() (map[string]interface{}, error) {
c.mu.RLock()

@ -31,6 +31,7 @@ const (
Command_COMMAND_TYPE_BACKUP Command_Type = 4
Command_COMMAND_TYPE_LOAD Command_Type = 5
Command_COMMAND_TYPE_REMOVE_NODE Command_Type = 6
Command_COMMAND_TYPE_NOTIFY Command_Type = 7
)
// Enum value maps for Command_Type.
@ -43,6 +44,7 @@ var (
4: "COMMAND_TYPE_BACKUP",
5: "COMMAND_TYPE_LOAD",
6: "COMMAND_TYPE_REMOVE_NODE",
7: "COMMAND_TYPE_NOTIFY",
}
Command_Type_value = map[string]int32{
"COMMAND_TYPE_UNKNOWN": 0,
@ -52,6 +54,7 @@ var (
"COMMAND_TYPE_BACKUP": 4,
"COMMAND_TYPE_LOAD": 5,
"COMMAND_TYPE_REMOVE_NODE": 6,
"COMMAND_TYPE_NOTIFY": 7,
}
)
@ -197,6 +200,7 @@ type Command struct {
// *Command_BackupRequest
// *Command_LoadRequest
// *Command_RemoveNodeRequest
// *Command_NotifyRequest
Request isCommand_Request `protobuf_oneof:"request"`
Credentials *Credentials `protobuf:"bytes,4,opt,name=credentials,proto3" json:"credentials,omitempty"`
}
@ -282,6 +286,13 @@ func (x *Command) GetRemoveNodeRequest() *command.RemoveNodeRequest {
return nil
}
func (x *Command) GetNotifyRequest() *command.NotifyRequest {
if x, ok := x.GetRequest().(*Command_NotifyRequest); ok {
return x.NotifyRequest
}
return nil
}
func (x *Command) GetCredentials() *Credentials {
if x != nil {
return x.Credentials
@ -313,6 +324,10 @@ type Command_RemoveNodeRequest struct {
RemoveNodeRequest *command.RemoveNodeRequest `protobuf:"bytes,7,opt,name=remove_node_request,json=removeNodeRequest,proto3,oneof"`
}
type Command_NotifyRequest struct {
NotifyRequest *command.NotifyRequest `protobuf:"bytes,8,opt,name=notify_request,json=notifyRequest,proto3,oneof"`
}
func (*Command_ExecuteRequest) isCommand_Request() {}
func (*Command_QueryRequest) isCommand_Request() {}
@ -323,6 +338,8 @@ func (*Command_LoadRequest) isCommand_Request() {}
func (*Command_RemoveNodeRequest) isCommand_Request() {}
func (*Command_NotifyRequest) isCommand_Request() {}
type CommandExecuteResponse struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
@ -582,6 +599,53 @@ func (x *CommandRemoveNodeResponse) GetError() string {
return ""
}
type CommandNotifyResponse struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Error string `protobuf:"bytes,1,opt,name=error,proto3" json:"error,omitempty"`
}
func (x *CommandNotifyResponse) Reset() {
*x = CommandNotifyResponse{}
if protoimpl.UnsafeEnabled {
mi := &file_message_proto_msgTypes[8]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *CommandNotifyResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*CommandNotifyResponse) ProtoMessage() {}
func (x *CommandNotifyResponse) ProtoReflect() protoreflect.Message {
mi := &file_message_proto_msgTypes[8]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use CommandNotifyResponse.ProtoReflect.Descriptor instead.
func (*CommandNotifyResponse) Descriptor() ([]byte, []int) {
return file_message_proto_rawDescGZIP(), []int{8}
}
func (x *CommandNotifyResponse) GetError() string {
if x != nil {
return x.Error
}
return ""
}
var File_message_proto protoreflect.FileDescriptor
var file_message_proto_rawDesc = []byte{
@ -594,7 +658,7 @@ var file_message_proto_rawDesc = []byte{
0x73, 0x73, 0x77, 0x6f, 0x72, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x70, 0x61,
0x73, 0x73, 0x77, 0x6f, 0x72, 0x64, 0x22, 0x1b, 0x0a, 0x07, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73,
0x73, 0x12, 0x10, 0x0a, 0x03, 0x75, 0x72, 0x6c, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03,
0x75, 0x72, 0x6c, 0x22, 0x89, 0x05, 0x0a, 0x07, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x12,
0x75, 0x72, 0x6c, 0x22, 0xe3, 0x05, 0x0a, 0x07, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x12,
0x29, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x15, 0x2e,
0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e,
0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x42, 0x0a, 0x0f, 0x65, 0x78,
@ -618,48 +682,56 @@ var file_message_proto_rawDesc = []byte{
0x07, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e,
0x52, 0x65, 0x6d, 0x6f, 0x76, 0x65, 0x4e, 0x6f, 0x64, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73,
0x74, 0x48, 0x00, 0x52, 0x11, 0x72, 0x65, 0x6d, 0x6f, 0x76, 0x65, 0x4e, 0x6f, 0x64, 0x65, 0x52,
0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x36, 0x0a, 0x0b, 0x63, 0x72, 0x65, 0x64, 0x65, 0x6e,
0x74, 0x69, 0x61, 0x6c, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x63, 0x6c,
0x75, 0x73, 0x74, 0x65, 0x72, 0x2e, 0x43, 0x72, 0x65, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x61, 0x6c,
0x73, 0x52, 0x0b, 0x63, 0x72, 0x65, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x61, 0x6c, 0x73, 0x22, 0xc3,
0x01, 0x0a, 0x04, 0x54, 0x79, 0x70, 0x65, 0x12, 0x18, 0x0a, 0x14, 0x43, 0x4f, 0x4d, 0x4d, 0x41,
0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10,
0x00, 0x12, 0x21, 0x0a, 0x1d, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50,
0x45, 0x5f, 0x47, 0x45, 0x54, 0x5f, 0x4e, 0x4f, 0x44, 0x45, 0x5f, 0x41, 0x50, 0x49, 0x5f, 0x55,
0x52, 0x4c, 0x10, 0x01, 0x12, 0x18, 0x0a, 0x14, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f,
0x54, 0x59, 0x50, 0x45, 0x5f, 0x45, 0x58, 0x45, 0x43, 0x55, 0x54, 0x45, 0x10, 0x02, 0x12, 0x16,
0x0a, 0x12, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x51,
0x55, 0x45, 0x52, 0x59, 0x10, 0x03, 0x12, 0x17, 0x0a, 0x13, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e,
0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x42, 0x41, 0x43, 0x4b, 0x55, 0x50, 0x10, 0x04, 0x12,
0x15, 0x0a, 0x11, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f,
0x4c, 0x4f, 0x41, 0x44, 0x10, 0x05, 0x12, 0x1c, 0x0a, 0x18, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e,
0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x52, 0x45, 0x4d, 0x4f, 0x56, 0x45, 0x5f, 0x4e, 0x4f,
0x44, 0x45, 0x10, 0x06, 0x42, 0x09, 0x0a, 0x07, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22,
0x60, 0x0a, 0x16, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x45, 0x78, 0x65, 0x63, 0x75, 0x74,
0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72,
0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x12,
0x30, 0x0a, 0x07, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b,
0x32, 0x16, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e, 0x45, 0x78, 0x65, 0x63, 0x75,
0x74, 0x65, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x52, 0x07, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74,
0x73, 0x22, 0x54, 0x0a, 0x14, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x51, 0x75, 0x65, 0x72,
0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72,
0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x12,
0x26, 0x0a, 0x04, 0x72, 0x6f, 0x77, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x12, 0x2e,
0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e, 0x51, 0x75, 0x65, 0x72, 0x79, 0x52, 0x6f, 0x77,
0x73, 0x52, 0x04, 0x72, 0x6f, 0x77, 0x73, 0x22, 0x41, 0x0a, 0x15, 0x43, 0x6f, 0x6d, 0x6d, 0x61,
0x6e, 0x64, 0x42, 0x61, 0x63, 0x6b, 0x75, 0x70, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65,
0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52,
0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x02,
0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x22, 0x2b, 0x0a, 0x13, 0x43, 0x6f,
0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x4c, 0x6f, 0x61, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73,
0x65, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09,
0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x22, 0x31, 0x0a, 0x19, 0x43, 0x6f, 0x6d, 0x6d, 0x61,
0x6e, 0x64, 0x52, 0x65, 0x6d, 0x6f, 0x76, 0x65, 0x4e, 0x6f, 0x64, 0x65, 0x52, 0x65, 0x73, 0x70,
0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20,
0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x42, 0x22, 0x5a, 0x20, 0x67, 0x69,
0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x72, 0x71, 0x6c, 0x69, 0x74, 0x65, 0x2f,
0x72, 0x71, 0x6c, 0x69, 0x74, 0x65, 0x2f, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x62, 0x06,
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x3f, 0x0a, 0x0e, 0x6e, 0x6f, 0x74, 0x69, 0x66, 0x79,
0x5f, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x18, 0x08, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x16,
0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e, 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x79, 0x52,
0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x48, 0x00, 0x52, 0x0d, 0x6e, 0x6f, 0x74, 0x69, 0x66, 0x79,
0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x36, 0x0a, 0x0b, 0x63, 0x72, 0x65, 0x64, 0x65,
0x6e, 0x74, 0x69, 0x61, 0x6c, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x63,
0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x2e, 0x43, 0x72, 0x65, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x61,
0x6c, 0x73, 0x52, 0x0b, 0x63, 0x72, 0x65, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x61, 0x6c, 0x73, 0x22,
0xdc, 0x01, 0x0a, 0x04, 0x54, 0x79, 0x70, 0x65, 0x12, 0x18, 0x0a, 0x14, 0x43, 0x4f, 0x4d, 0x4d,
0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e,
0x10, 0x00, 0x12, 0x21, 0x0a, 0x1d, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59,
0x50, 0x45, 0x5f, 0x47, 0x45, 0x54, 0x5f, 0x4e, 0x4f, 0x44, 0x45, 0x5f, 0x41, 0x50, 0x49, 0x5f,
0x55, 0x52, 0x4c, 0x10, 0x01, 0x12, 0x18, 0x0a, 0x14, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44,
0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x45, 0x58, 0x45, 0x43, 0x55, 0x54, 0x45, 0x10, 0x02, 0x12,
0x16, 0x0a, 0x12, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f,
0x51, 0x55, 0x45, 0x52, 0x59, 0x10, 0x03, 0x12, 0x17, 0x0a, 0x13, 0x43, 0x4f, 0x4d, 0x4d, 0x41,
0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x42, 0x41, 0x43, 0x4b, 0x55, 0x50, 0x10, 0x04,
0x12, 0x15, 0x0a, 0x11, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45,
0x5f, 0x4c, 0x4f, 0x41, 0x44, 0x10, 0x05, 0x12, 0x1c, 0x0a, 0x18, 0x43, 0x4f, 0x4d, 0x4d, 0x41,
0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x52, 0x45, 0x4d, 0x4f, 0x56, 0x45, 0x5f, 0x4e,
0x4f, 0x44, 0x45, 0x10, 0x06, 0x12, 0x17, 0x0a, 0x13, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44,
0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x4e, 0x4f, 0x54, 0x49, 0x46, 0x59, 0x10, 0x07, 0x42, 0x09,
0x0a, 0x07, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x60, 0x0a, 0x16, 0x43, 0x6f, 0x6d,
0x6d, 0x61, 0x6e, 0x64, 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f,
0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01,
0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x30, 0x0a, 0x07, 0x72, 0x65, 0x73,
0x75, 0x6c, 0x74, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x16, 0x2e, 0x63, 0x6f, 0x6d,
0x6d, 0x61, 0x6e, 0x64, 0x2e, 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, 0x65, 0x52, 0x65, 0x73, 0x75,
0x6c, 0x74, 0x52, 0x07, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x73, 0x22, 0x54, 0x0a, 0x14, 0x43,
0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x51, 0x75, 0x65, 0x72, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f,
0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01,
0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x26, 0x0a, 0x04, 0x72, 0x6f, 0x77,
0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e,
0x64, 0x2e, 0x51, 0x75, 0x65, 0x72, 0x79, 0x52, 0x6f, 0x77, 0x73, 0x52, 0x04, 0x72, 0x6f, 0x77,
0x73, 0x22, 0x41, 0x0a, 0x15, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x42, 0x61, 0x63, 0x6b,
0x75, 0x70, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72,
0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72,
0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04,
0x64, 0x61, 0x74, 0x61, 0x22, 0x2b, 0x0a, 0x13, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x4c,
0x6f, 0x61, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x65,
0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f,
0x72, 0x22, 0x31, 0x0a, 0x19, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x52, 0x65, 0x6d, 0x6f,
0x76, 0x65, 0x4e, 0x6f, 0x64, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14,
0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65,
0x72, 0x72, 0x6f, 0x72, 0x22, 0x2d, 0x0a, 0x15, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x4e,
0x6f, 0x74, 0x69, 0x66, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a,
0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72,
0x72, 0x6f, 0x72, 0x42, 0x22, 0x5a, 0x20, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f,
0x6d, 0x2f, 0x72, 0x71, 0x6c, 0x69, 0x74, 0x65, 0x2f, 0x72, 0x71, 0x6c, 0x69, 0x74, 0x65, 0x2f,
0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
@ -675,7 +747,7 @@ func file_message_proto_rawDescGZIP() []byte {
}
var file_message_proto_enumTypes = make([]protoimpl.EnumInfo, 1)
var file_message_proto_msgTypes = make([]protoimpl.MessageInfo, 8)
var file_message_proto_msgTypes = make([]protoimpl.MessageInfo, 9)
var file_message_proto_goTypes = []interface{}{
(Command_Type)(0), // 0: cluster.Command.Type
(*Credentials)(nil), // 1: cluster.Credentials
@ -686,29 +758,32 @@ var file_message_proto_goTypes = []interface{}{
(*CommandBackupResponse)(nil), // 6: cluster.CommandBackupResponse
(*CommandLoadResponse)(nil), // 7: cluster.CommandLoadResponse
(*CommandRemoveNodeResponse)(nil), // 8: cluster.CommandRemoveNodeResponse
(*command.ExecuteRequest)(nil), // 9: command.ExecuteRequest
(*command.QueryRequest)(nil), // 10: command.QueryRequest
(*command.BackupRequest)(nil), // 11: command.BackupRequest
(*command.LoadRequest)(nil), // 12: command.LoadRequest
(*command.RemoveNodeRequest)(nil), // 13: command.RemoveNodeRequest
(*command.ExecuteResult)(nil), // 14: command.ExecuteResult
(*command.QueryRows)(nil), // 15: command.QueryRows
(*CommandNotifyResponse)(nil), // 9: cluster.CommandNotifyResponse
(*command.ExecuteRequest)(nil), // 10: command.ExecuteRequest
(*command.QueryRequest)(nil), // 11: command.QueryRequest
(*command.BackupRequest)(nil), // 12: command.BackupRequest
(*command.LoadRequest)(nil), // 13: command.LoadRequest
(*command.RemoveNodeRequest)(nil), // 14: command.RemoveNodeRequest
(*command.NotifyRequest)(nil), // 15: command.NotifyRequest
(*command.ExecuteResult)(nil), // 16: command.ExecuteResult
(*command.QueryRows)(nil), // 17: command.QueryRows
}
var file_message_proto_depIdxs = []int32{
0, // 0: cluster.Command.type:type_name -> cluster.Command.Type
9, // 1: cluster.Command.execute_request:type_name -> command.ExecuteRequest
10, // 2: cluster.Command.query_request:type_name -> command.QueryRequest
11, // 3: cluster.Command.backup_request:type_name -> command.BackupRequest
12, // 4: cluster.Command.load_request:type_name -> command.LoadRequest
13, // 5: cluster.Command.remove_node_request:type_name -> command.RemoveNodeRequest
1, // 6: cluster.Command.credentials:type_name -> cluster.Credentials
14, // 7: cluster.CommandExecuteResponse.results:type_name -> command.ExecuteResult
15, // 8: cluster.CommandQueryResponse.rows:type_name -> command.QueryRows
9, // [9:9] is the sub-list for method output_type
9, // [9:9] is the sub-list for method input_type
9, // [9:9] is the sub-list for extension type_name
9, // [9:9] is the sub-list for extension extendee
0, // [0:9] is the sub-list for field type_name
10, // 1: cluster.Command.execute_request:type_name -> command.ExecuteRequest
11, // 2: cluster.Command.query_request:type_name -> command.QueryRequest
12, // 3: cluster.Command.backup_request:type_name -> command.BackupRequest
13, // 4: cluster.Command.load_request:type_name -> command.LoadRequest
14, // 5: cluster.Command.remove_node_request:type_name -> command.RemoveNodeRequest
15, // 6: cluster.Command.notify_request:type_name -> command.NotifyRequest
1, // 7: cluster.Command.credentials:type_name -> cluster.Credentials
16, // 8: cluster.CommandExecuteResponse.results:type_name -> command.ExecuteResult
17, // 9: cluster.CommandQueryResponse.rows:type_name -> command.QueryRows
10, // [10:10] is the sub-list for method output_type
10, // [10:10] is the sub-list for method input_type
10, // [10:10] is the sub-list for extension type_name
10, // [10:10] is the sub-list for extension extendee
0, // [0:10] is the sub-list for field type_name
}
func init() { file_message_proto_init() }
@ -813,6 +888,18 @@ func file_message_proto_init() {
return nil
}
}
file_message_proto_msgTypes[8].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*CommandNotifyResponse); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
file_message_proto_msgTypes[2].OneofWrappers = []interface{}{
(*Command_ExecuteRequest)(nil),
@ -820,6 +907,7 @@ func file_message_proto_init() {
(*Command_BackupRequest)(nil),
(*Command_LoadRequest)(nil),
(*Command_RemoveNodeRequest)(nil),
(*Command_NotifyRequest)(nil),
}
type x struct{}
out := protoimpl.TypeBuilder{
@ -827,7 +915,7 @@ func file_message_proto_init() {
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_message_proto_rawDesc,
NumEnums: 1,
NumMessages: 8,
NumMessages: 9,
NumExtensions: 0,
NumServices: 0,
},

@ -23,6 +23,7 @@ message Command {
COMMAND_TYPE_BACKUP = 4;
COMMAND_TYPE_LOAD = 5;
COMMAND_TYPE_REMOVE_NODE = 6;
COMMAND_TYPE_NOTIFY = 7;
}
Type type = 1;
@ -32,6 +33,7 @@ message Command {
command.BackupRequest backup_request = 5;
command.LoadRequest load_request = 6;
command.RemoveNodeRequest remove_node_request = 7;
command.NotifyRequest notify_request = 8;
}
Credentials credentials = 4;
@ -59,3 +61,7 @@ message CommandLoadResponse {
message CommandRemoveNodeResponse {
string error = 1;
}
message CommandNotifyResponse {
string error = 1;
}

@ -30,6 +30,7 @@ const (
numBackupRequest = "num_backup_req"
numLoadRequest = "num_load_req"
numRemoveNodeRequest = "num_remove_node_req"
numNotifyRequest = "num_notify_req"
// Client stats for this package.
numGetNodeAPIRequestLocal = "num_get_node_api_req_local"
@ -53,6 +54,7 @@ func init() {
stats.Add(numLoadRequest, 0)
stats.Add(numRemoveNodeRequest, 0)
stats.Add(numGetNodeAPIRequestLocal, 0)
stats.Add(numNotifyRequest, 0)
}
// Dialer is the interface dialers must implement.
@ -82,6 +84,10 @@ type Database interface {
type Manager interface {
// Remove removes the node, given by id, from the cluster
Remove(rn *command.RemoveNodeRequest) error
// Notify notifies this node that a remote node is ready
// for bootstrapping.
Notify(n *command.NotifyRequest) error
}
// CredentialStore is the interface credential stores must support.
@ -368,6 +374,25 @@ func (s *Service) handleConn(conn net.Conn) {
}
}
p, err = proto.Marshal(resp)
if err != nil {
conn.Close()
}
writeBytesWithLength(conn, p)
case Command_COMMAND_TYPE_NOTIFY:
stats.Add(numNotifyRequest, 1)
resp := &CommandNotifyResponse{}
nr := c.GetNotifyRequest()
if nr == nil {
resp.Error = "NotifyRequest is nil"
} else {
if err := s.mgr.Notify(nr); err != nil {
resp.Error = err.Error()
}
}
p, err = proto.Marshal(resp)
if err != nil {
conn.Close()

@ -258,6 +258,46 @@ func Test_NewServiceTestExecuteQueryAuth(t *testing.T) {
}
}
func Test_NewServiceNotify(t *testing.T) {
ml := mustNewMockTransport()
mm := mustNewMockManager()
mm.notifyFn = func(n *command.NotifyRequest) error {
if n.Id != "foo" {
t.Fatalf("failed to get correct node ID, exp %s, got %s", "foo", n.Id)
}
if n.Address != "localhost" {
t.Fatalf("failed to get correct node address, exp %s, got %s", "localhost", n.Address)
}
return nil
}
s := New(ml, mustNewMockDatabase(), mm, mustNewMockCredentialStore())
if s == nil {
t.Fatalf("failed to create cluster service")
}
if err := s.Open(); err != nil {
t.Fatalf("failed to open cluster service")
}
// Create a notify request.
nr := &command.NotifyRequest{
Id: "foo",
Address: "localhost",
}
// Test by connecting to itself.
c := NewClient(ml, 30*time.Second)
err := c.Notify(nr, s.Addr(), 5*time.Second)
if err != nil {
t.Fatalf("failed to notify node: %s", err)
}
if err := s.Close(); err != nil {
t.Fatalf("failed to close cluster service")
}
}
type mockTransport struct {
tn net.Listener
remoteEncrypted bool
@ -351,6 +391,7 @@ func mustNewMockDatabase() *mockDatabase {
type MockManager struct {
removeNodeFn func(rn *command.RemoveNodeRequest) error
notifyFn func(n *command.NotifyRequest) error
}
func (m *MockManager) Remove(rn *command.RemoveNodeRequest) error {
@ -360,6 +401,13 @@ func (m *MockManager) Remove(rn *command.RemoveNodeRequest) error {
return m.removeNodeFn(rn)
}
func (m *MockManager) Notify(n *command.NotifyRequest) error {
if m.notifyFn == nil {
return nil
}
return m.notifyFn(n)
}
func mustNewMockManager() *MockManager {
return &MockManager{}
}

Loading…
Cancel
Save