1
0
Fork 0

Start adding remote remove node

More testing, including unit testing, required.
master
Philip O'Toole 2 years ago
parent e3cc4eb5d0
commit 98575d727b

@ -265,6 +265,45 @@ func (c *Client) Load(lr *command.LoadRequest, nodeAddr string, creds *Credentia
return nil
}
// RemoveNode removes a node from the cluster
func (c *Client) RemoveNode(rn *command.RemoveNodeRequest, nodeAddr string, creds *Credentials, timeout time.Duration) error {
conn, err := c.dial(nodeAddr, c.timeout)
if err != nil {
return err
}
defer conn.Close()
// Create the request.
command := &Command{
Type: Command_COMMAND_TYPE_REMOVE_NODE,
Request: &Command_RemoveNodeRequest{
RemoveNodeRequest: rn,
},
Credentials: creds,
}
if err := writeCommand(conn, command, timeout); err != nil {
handleConnError(conn)
return err
}
p, err := readResponse(conn, timeout)
if err != nil {
handleConnError(conn)
return err
}
a := &CommandRemoveNodeResponse{}
err = proto.Unmarshal(p, a)
if err != nil {
return err
}
if a.Error != "" {
return errors.New(a.Error)
}
return nil
}
// Stats returns stats on the Client instance
func (c *Client) Stats() (map[string]interface{}, error) {
c.mu.RLock()

@ -30,6 +30,7 @@ const (
Command_COMMAND_TYPE_QUERY Command_Type = 3
Command_COMMAND_TYPE_BACKUP Command_Type = 4
Command_COMMAND_TYPE_LOAD Command_Type = 5
Command_COMMAND_TYPE_REMOVE_NODE Command_Type = 6
)
// Enum value maps for Command_Type.
@ -41,6 +42,7 @@ var (
3: "COMMAND_TYPE_QUERY",
4: "COMMAND_TYPE_BACKUP",
5: "COMMAND_TYPE_LOAD",
6: "COMMAND_TYPE_REMOVE_NODE",
}
Command_Type_value = map[string]int32{
"COMMAND_TYPE_UNKNOWN": 0,
@ -49,6 +51,7 @@ var (
"COMMAND_TYPE_QUERY": 3,
"COMMAND_TYPE_BACKUP": 4,
"COMMAND_TYPE_LOAD": 5,
"COMMAND_TYPE_REMOVE_NODE": 6,
}
)
@ -192,6 +195,7 @@ type Command struct {
// *Command_QueryRequest
// *Command_BackupRequest
// *Command_LoadRequest
// *Command_RemoveNodeRequest
Request isCommand_Request `protobuf_oneof:"request"`
Credentials *Credentials `protobuf:"bytes,4,opt,name=credentials,proto3" json:"credentials,omitempty"`
}
@ -270,6 +274,13 @@ func (x *Command) GetLoadRequest() *command.LoadRequest {
return nil
}
func (x *Command) GetRemoveNodeRequest() *command.RemoveNodeRequest {
if x, ok := x.GetRequest().(*Command_RemoveNodeRequest); ok {
return x.RemoveNodeRequest
}
return nil
}
func (x *Command) GetCredentials() *Credentials {
if x != nil {
return x.Credentials
@ -297,6 +308,10 @@ type Command_LoadRequest struct {
LoadRequest *command.LoadRequest `protobuf:"bytes,6,opt,name=load_request,json=loadRequest,proto3,oneof"`
}
type Command_RemoveNodeRequest struct {
RemoveNodeRequest *command.RemoveNodeRequest `protobuf:"bytes,7,opt,name=remove_node_request,json=removeNodeRequest,proto3,oneof"`
}
func (*Command_ExecuteRequest) isCommand_Request() {}
func (*Command_QueryRequest) isCommand_Request() {}
@ -305,6 +320,8 @@ func (*Command_BackupRequest) isCommand_Request() {}
func (*Command_LoadRequest) isCommand_Request() {}
func (*Command_RemoveNodeRequest) isCommand_Request() {}
type CommandExecuteResponse struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
@ -517,6 +534,53 @@ func (x *CommandLoadResponse) GetError() string {
return ""
}
type CommandRemoveNodeResponse struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Error string `protobuf:"bytes,1,opt,name=error,proto3" json:"error,omitempty"`
}
func (x *CommandRemoveNodeResponse) Reset() {
*x = CommandRemoveNodeResponse{}
if protoimpl.UnsafeEnabled {
mi := &file_message_proto_msgTypes[7]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *CommandRemoveNodeResponse) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*CommandRemoveNodeResponse) ProtoMessage() {}
func (x *CommandRemoveNodeResponse) ProtoReflect() protoreflect.Message {
mi := &file_message_proto_msgTypes[7]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use CommandRemoveNodeResponse.ProtoReflect.Descriptor instead.
func (*CommandRemoveNodeResponse) Descriptor() ([]byte, []int) {
return file_message_proto_rawDescGZIP(), []int{7}
}
func (x *CommandRemoveNodeResponse) GetError() string {
if x != nil {
return x.Error
}
return ""
}
var File_message_proto protoreflect.FileDescriptor
var file_message_proto_rawDesc = []byte{
@ -529,7 +593,7 @@ var file_message_proto_rawDesc = []byte{
0x73, 0x73, 0x77, 0x6f, 0x72, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x70, 0x61,
0x73, 0x73, 0x77, 0x6f, 0x72, 0x64, 0x22, 0x1b, 0x0a, 0x07, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73,
0x73, 0x12, 0x10, 0x0a, 0x03, 0x75, 0x72, 0x6c, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03,
0x75, 0x72, 0x6c, 0x22, 0x9d, 0x04, 0x0a, 0x07, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x12,
0x75, 0x72, 0x6c, 0x22, 0x89, 0x05, 0x0a, 0x07, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x12,
0x29, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x15, 0x2e,
0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e,
0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x42, 0x0a, 0x0f, 0x65, 0x78,
@ -548,43 +612,53 @@ var file_message_proto_rawDesc = []byte{
0x0c, 0x6c, 0x6f, 0x61, 0x64, 0x5f, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x18, 0x06, 0x20,
0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e, 0x4c, 0x6f,
0x61, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x48, 0x00, 0x52, 0x0b, 0x6c, 0x6f, 0x61,
0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x36, 0x0a, 0x0b, 0x63, 0x72, 0x65, 0x64,
0x65, 0x6e, 0x74, 0x69, 0x61, 0x6c, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e,
0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x2e, 0x43, 0x72, 0x65, 0x64, 0x65, 0x6e, 0x74, 0x69,
0x61, 0x6c, 0x73, 0x52, 0x0b, 0x63, 0x72, 0x65, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x61, 0x6c, 0x73,
0x22, 0xa5, 0x01, 0x0a, 0x04, 0x54, 0x79, 0x70, 0x65, 0x12, 0x18, 0x0a, 0x14, 0x43, 0x4f, 0x4d,
0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57,
0x4e, 0x10, 0x00, 0x12, 0x21, 0x0a, 0x1d, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54,
0x59, 0x50, 0x45, 0x5f, 0x47, 0x45, 0x54, 0x5f, 0x4e, 0x4f, 0x44, 0x45, 0x5f, 0x41, 0x50, 0x49,
0x5f, 0x55, 0x52, 0x4c, 0x10, 0x01, 0x12, 0x18, 0x0a, 0x14, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e,
0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x45, 0x58, 0x45, 0x43, 0x55, 0x54, 0x45, 0x10, 0x02,
0x12, 0x16, 0x0a, 0x12, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45,
0x5f, 0x51, 0x55, 0x45, 0x52, 0x59, 0x10, 0x03, 0x12, 0x17, 0x0a, 0x13, 0x43, 0x4f, 0x4d, 0x4d,
0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x42, 0x41, 0x43, 0x4b, 0x55, 0x50, 0x10,
0x04, 0x12, 0x15, 0x0a, 0x11, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50,
0x45, 0x5f, 0x4c, 0x4f, 0x41, 0x44, 0x10, 0x05, 0x42, 0x09, 0x0a, 0x07, 0x72, 0x65, 0x71, 0x75,
0x65, 0x73, 0x74, 0x22, 0x60, 0x0a, 0x16, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x45, 0x78,
0x65, 0x63, 0x75, 0x74, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a,
0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72,
0x72, 0x6f, 0x72, 0x12, 0x30, 0x0a, 0x07, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x73, 0x18, 0x02,
0x20, 0x03, 0x28, 0x0b, 0x32, 0x16, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e, 0x45,
0x78, 0x65, 0x63, 0x75, 0x74, 0x65, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x52, 0x07, 0x72, 0x65,
0x73, 0x75, 0x6c, 0x74, 0x73, 0x22, 0x54, 0x0a, 0x14, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64,
0x51, 0x75, 0x65, 0x72, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a,
0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72,
0x72, 0x6f, 0x72, 0x12, 0x26, 0x0a, 0x04, 0x72, 0x6f, 0x77, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28,
0x0b, 0x32, 0x12, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e, 0x51, 0x75, 0x65, 0x72,
0x79, 0x52, 0x6f, 0x77, 0x73, 0x52, 0x04, 0x72, 0x6f, 0x77, 0x73, 0x22, 0x41, 0x0a, 0x15, 0x43,
0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x42, 0x61, 0x63, 0x6b, 0x75, 0x70, 0x52, 0x65, 0x73, 0x70,
0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x4c, 0x0a, 0x13, 0x72, 0x65, 0x6d, 0x6f,
0x76, 0x65, 0x5f, 0x6e, 0x6f, 0x64, 0x65, 0x5f, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x18,
0x07, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e,
0x52, 0x65, 0x6d, 0x6f, 0x76, 0x65, 0x4e, 0x6f, 0x64, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73,
0x74, 0x48, 0x00, 0x52, 0x11, 0x72, 0x65, 0x6d, 0x6f, 0x76, 0x65, 0x4e, 0x6f, 0x64, 0x65, 0x52,
0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x36, 0x0a, 0x0b, 0x63, 0x72, 0x65, 0x64, 0x65, 0x6e,
0x74, 0x69, 0x61, 0x6c, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x63, 0x6c,
0x75, 0x73, 0x74, 0x65, 0x72, 0x2e, 0x43, 0x72, 0x65, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x61, 0x6c,
0x73, 0x52, 0x0b, 0x63, 0x72, 0x65, 0x64, 0x65, 0x6e, 0x74, 0x69, 0x61, 0x6c, 0x73, 0x22, 0xc3,
0x01, 0x0a, 0x04, 0x54, 0x79, 0x70, 0x65, 0x12, 0x18, 0x0a, 0x14, 0x43, 0x4f, 0x4d, 0x4d, 0x41,
0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10,
0x00, 0x12, 0x21, 0x0a, 0x1d, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50,
0x45, 0x5f, 0x47, 0x45, 0x54, 0x5f, 0x4e, 0x4f, 0x44, 0x45, 0x5f, 0x41, 0x50, 0x49, 0x5f, 0x55,
0x52, 0x4c, 0x10, 0x01, 0x12, 0x18, 0x0a, 0x14, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f,
0x54, 0x59, 0x50, 0x45, 0x5f, 0x45, 0x58, 0x45, 0x43, 0x55, 0x54, 0x45, 0x10, 0x02, 0x12, 0x16,
0x0a, 0x12, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x51,
0x55, 0x45, 0x52, 0x59, 0x10, 0x03, 0x12, 0x17, 0x0a, 0x13, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e,
0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x42, 0x41, 0x43, 0x4b, 0x55, 0x50, 0x10, 0x04, 0x12,
0x15, 0x0a, 0x11, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f,
0x4c, 0x4f, 0x41, 0x44, 0x10, 0x05, 0x12, 0x1c, 0x0a, 0x18, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e,
0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x52, 0x45, 0x4d, 0x4f, 0x56, 0x45, 0x5f, 0x4e, 0x4f,
0x44, 0x45, 0x10, 0x06, 0x42, 0x09, 0x0a, 0x07, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22,
0x60, 0x0a, 0x16, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x45, 0x78, 0x65, 0x63, 0x75, 0x74,
0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72,
0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x12,
0x30, 0x0a, 0x07, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b,
0x32, 0x16, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e, 0x45, 0x78, 0x65, 0x63, 0x75,
0x74, 0x65, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x52, 0x07, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74,
0x73, 0x22, 0x54, 0x0a, 0x14, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x51, 0x75, 0x65, 0x72,
0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72,
0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x12,
0x26, 0x0a, 0x04, 0x72, 0x6f, 0x77, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x12, 0x2e,
0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e, 0x51, 0x75, 0x65, 0x72, 0x79, 0x52, 0x6f, 0x77,
0x73, 0x52, 0x04, 0x72, 0x6f, 0x77, 0x73, 0x22, 0x41, 0x0a, 0x15, 0x43, 0x6f, 0x6d, 0x6d, 0x61,
0x6e, 0x64, 0x42, 0x61, 0x63, 0x6b, 0x75, 0x70, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65,
0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52,
0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x02,
0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x22, 0x2b, 0x0a, 0x13, 0x43, 0x6f,
0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x4c, 0x6f, 0x61, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73,
0x65, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09,
0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x22, 0x31, 0x0a, 0x19, 0x43, 0x6f, 0x6d, 0x6d, 0x61,
0x6e, 0x64, 0x52, 0x65, 0x6d, 0x6f, 0x76, 0x65, 0x4e, 0x6f, 0x64, 0x65, 0x52, 0x65, 0x73, 0x70,
0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20,
0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61,
0x74, 0x61, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x22, 0x2b,
0x0a, 0x13, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x4c, 0x6f, 0x61, 0x64, 0x52, 0x65, 0x73,
0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01,
0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x42, 0x22, 0x5a, 0x20, 0x67,
0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x72, 0x71, 0x6c, 0x69, 0x74, 0x65,
0x2f, 0x72, 0x71, 0x6c, 0x69, 0x74, 0x65, 0x2f, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x62,
0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x42, 0x22, 0x5a, 0x20, 0x67, 0x69,
0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x72, 0x71, 0x6c, 0x69, 0x74, 0x65, 0x2f,
0x72, 0x71, 0x6c, 0x69, 0x74, 0x65, 0x2f, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x62, 0x06,
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
@ -600,37 +674,40 @@ func file_message_proto_rawDescGZIP() []byte {
}
var file_message_proto_enumTypes = make([]protoimpl.EnumInfo, 1)
var file_message_proto_msgTypes = make([]protoimpl.MessageInfo, 7)
var file_message_proto_msgTypes = make([]protoimpl.MessageInfo, 8)
var file_message_proto_goTypes = []interface{}{
(Command_Type)(0), // 0: cluster.Command.Type
(*Credentials)(nil), // 1: cluster.Credentials
(*Address)(nil), // 2: cluster.Address
(*Command)(nil), // 3: cluster.Command
(*CommandExecuteResponse)(nil), // 4: cluster.CommandExecuteResponse
(*CommandQueryResponse)(nil), // 5: cluster.CommandQueryResponse
(*CommandBackupResponse)(nil), // 6: cluster.CommandBackupResponse
(*CommandLoadResponse)(nil), // 7: cluster.CommandLoadResponse
(*command.ExecuteRequest)(nil), // 8: command.ExecuteRequest
(*command.QueryRequest)(nil), // 9: command.QueryRequest
(*command.BackupRequest)(nil), // 10: command.BackupRequest
(*command.LoadRequest)(nil), // 11: command.LoadRequest
(*command.ExecuteResult)(nil), // 12: command.ExecuteResult
(*command.QueryRows)(nil), // 13: command.QueryRows
(Command_Type)(0), // 0: cluster.Command.Type
(*Credentials)(nil), // 1: cluster.Credentials
(*Address)(nil), // 2: cluster.Address
(*Command)(nil), // 3: cluster.Command
(*CommandExecuteResponse)(nil), // 4: cluster.CommandExecuteResponse
(*CommandQueryResponse)(nil), // 5: cluster.CommandQueryResponse
(*CommandBackupResponse)(nil), // 6: cluster.CommandBackupResponse
(*CommandLoadResponse)(nil), // 7: cluster.CommandLoadResponse
(*CommandRemoveNodeResponse)(nil), // 8: cluster.CommandRemoveNodeResponse
(*command.ExecuteRequest)(nil), // 9: command.ExecuteRequest
(*command.QueryRequest)(nil), // 10: command.QueryRequest
(*command.BackupRequest)(nil), // 11: command.BackupRequest
(*command.LoadRequest)(nil), // 12: command.LoadRequest
(*command.RemoveNodeRequest)(nil), // 13: command.RemoveNodeRequest
(*command.ExecuteResult)(nil), // 14: command.ExecuteResult
(*command.QueryRows)(nil), // 15: command.QueryRows
}
var file_message_proto_depIdxs = []int32{
0, // 0: cluster.Command.type:type_name -> cluster.Command.Type
8, // 1: cluster.Command.execute_request:type_name -> command.ExecuteRequest
9, // 2: cluster.Command.query_request:type_name -> command.QueryRequest
10, // 3: cluster.Command.backup_request:type_name -> command.BackupRequest
11, // 4: cluster.Command.load_request:type_name -> command.LoadRequest
1, // 5: cluster.Command.credentials:type_name -> cluster.Credentials
12, // 6: cluster.CommandExecuteResponse.results:type_name -> command.ExecuteResult
13, // 7: cluster.CommandQueryResponse.rows:type_name -> command.QueryRows
8, // [8:8] is the sub-list for method output_type
8, // [8:8] is the sub-list for method input_type
8, // [8:8] is the sub-list for extension type_name
8, // [8:8] is the sub-list for extension extendee
0, // [0:8] is the sub-list for field type_name
9, // 1: cluster.Command.execute_request:type_name -> command.ExecuteRequest
10, // 2: cluster.Command.query_request:type_name -> command.QueryRequest
11, // 3: cluster.Command.backup_request:type_name -> command.BackupRequest
12, // 4: cluster.Command.load_request:type_name -> command.LoadRequest
13, // 5: cluster.Command.remove_node_request:type_name -> command.RemoveNodeRequest
1, // 6: cluster.Command.credentials:type_name -> cluster.Credentials
14, // 7: cluster.CommandExecuteResponse.results:type_name -> command.ExecuteResult
15, // 8: cluster.CommandQueryResponse.rows:type_name -> command.QueryRows
9, // [9:9] is the sub-list for method output_type
9, // [9:9] is the sub-list for method input_type
9, // [9:9] is the sub-list for extension type_name
9, // [9:9] is the sub-list for extension extendee
0, // [0:9] is the sub-list for field type_name
}
func init() { file_message_proto_init() }
@ -723,12 +800,25 @@ func file_message_proto_init() {
return nil
}
}
file_message_proto_msgTypes[7].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*CommandRemoveNodeResponse); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
file_message_proto_msgTypes[2].OneofWrappers = []interface{}{
(*Command_ExecuteRequest)(nil),
(*Command_QueryRequest)(nil),
(*Command_BackupRequest)(nil),
(*Command_LoadRequest)(nil),
(*Command_RemoveNodeRequest)(nil),
}
type x struct{}
out := protoimpl.TypeBuilder{
@ -736,7 +826,7 @@ func file_message_proto_init() {
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_message_proto_rawDesc,
NumEnums: 1,
NumMessages: 7,
NumMessages: 8,
NumExtensions: 0,
NumServices: 0,
},

@ -22,6 +22,7 @@ message Command {
COMMAND_TYPE_QUERY = 3;
COMMAND_TYPE_BACKUP = 4;
COMMAND_TYPE_LOAD = 5;
COMMAND_TYPE_REMOVE_NODE = 6;
}
Type type = 1;
@ -30,6 +31,7 @@ message Command {
command.QueryRequest query_request = 3;
command.BackupRequest backup_request = 5;
command.LoadRequest load_request = 6;
command.RemoveNodeRequest remove_node_request = 7;
}
Credentials credentials = 4;
@ -53,3 +55,7 @@ message CommandBackupResponse {
message CommandLoadResponse {
string error = 1;
}
message CommandRemoveNodeResponse {
string error = 1;
}

@ -29,6 +29,7 @@ const (
numQueryRequest = "num_query_req"
numBackupRequest = "num_backup_req"
numLoadRequest = "num_load_req"
numRemoveNodeRequest = "num_remove_node_req"
// Client stats for this package.
numGetNodeAPIRequestLocal = "num_get_node_api_req_local"
@ -50,6 +51,7 @@ func init() {
stats.Add(numQueryRequest, 0)
stats.Add(numBackupRequest, 0)
stats.Add(numLoadRequest, 0)
stats.Add(numRemoveNodeRequest, 0)
stats.Add(numGetNodeAPIRequestLocal, 0)
}
@ -76,6 +78,11 @@ type Database interface {
Load(lr *command.LoadRequest) error
}
type Manager interface {
// Remove removes the node, given by id, from the cluster
Remove(rn *command.RemoveNodeRequest) error
}
// CredentialStore is the interface credential stores must support.
type CredentialStore interface {
// AA authenticates and checks authorization for the given perm.
@ -93,7 +100,8 @@ type Service struct {
tn Transport // Network layer this service uses
addr net.Addr // Address on which this service is listening
db Database // The queryable system.
db Database // The queryable system.
mgr Manager // The cluster management system.
credentialStore CredentialStore
@ -105,11 +113,12 @@ type Service struct {
}
// New returns a new instance of the cluster service
func New(tn Transport, db Database, credentialStore CredentialStore) *Service {
func New(tn Transport, db Database, m Manager, credentialStore CredentialStore) *Service {
return &Service{
tn: tn,
addr: tn.Addr(),
db: db,
mgr: m,
logger: log.New(os.Stderr, "[cluster] ", log.LstdFlags),
credentialStore: credentialStore,
}
@ -346,6 +355,27 @@ func (s *Service) handleConn(conn net.Conn) {
return
}
writeBytesWithLength(conn, p)
case Command_COMMAND_TYPE_REMOVE_NODE:
stats.Add(numRemoveNodeRequest, 1)
resp := &CommandRemoveNodeResponse{}
rn := c.GetRemoveNodeRequest()
if rn == nil {
resp.Error = "LoadRequest is nil"
} else if !s.checkCommandPerm(c, auth.PermRemove) {
resp.Error = "unauthorized"
} else {
if err := s.mgr.Remove(rn); err != nil {
resp.Error = err.Error()
}
}
p, err = proto.Marshal(resp)
if err != nil {
conn.Close()
}
writeBytesWithLength(conn, p)
}
}
}

@ -26,8 +26,9 @@ func Test_ServiceExecute(t *testing.T) {
go mux.Serve()
tn := mux.Listen(1) // Could be any byte value.
db := mustNewMockDatabase()
mgr := mustNewMockManager()
cred := mustNewMockCredentialStore()
s := New(tn, db, cred)
s := New(tn, db, mgr, cred)
if s == nil {
t.Fatalf("failed to create cluster service")
}
@ -115,7 +116,8 @@ func Test_ServiceQuery(t *testing.T) {
tn := mux.Listen(1) // Could be any byte value.
db := mustNewMockDatabase()
cred := mustNewMockCredentialStore()
s := New(tn, db, cred)
mgr := mustNewMockManager()
s := New(tn, db, mgr, cred)
if s == nil {
t.Fatalf("failed to create cluster service")
}
@ -204,8 +206,9 @@ func Test_ServiceQueryLarge(t *testing.T) {
go mux.Serve()
tn := mux.Listen(1) // Could be any byte value.
db := mustNewMockDatabase()
mgr := mustNewMockManager()
cred := mustNewMockCredentialStore()
s := New(tn, db, cred)
s := New(tn, db, mgr, cred)
if s == nil {
t.Fatalf("failed to create cluster service")
}
@ -264,8 +267,9 @@ func Test_ServiceBackup(t *testing.T) {
go mux.Serve()
tn := mux.Listen(1) // Could be any byte value.
db := mustNewMockDatabase()
mgr := mustNewMockManager()
cred := mustNewMockCredentialStore()
s := New(tn, db, cred)
s := New(tn, db, mgr, cred)
if s == nil {
t.Fatalf("failed to create cluster service")
}
@ -310,8 +314,9 @@ func Test_ServiceLoad(t *testing.T) {
go mux.Serve()
tn := mux.Listen(1) // Could be any byte value.
db := mustNewMockDatabase()
mgr := mustNewMockManager()
cred := mustNewMockCredentialStore()
s := New(tn, db, cred)
s := New(tn, db, mgr, cred)
if s == nil {
t.Fatalf("failed to create cluster service")
}

@ -16,7 +16,7 @@ func Test_NewServiceSetGetNodeAPIAddrMuxed(t *testing.T) {
go mux.Serve()
tn := mux.Listen(1) // Could be any byte value.
s := New(tn, mustNewMockDatabase(), mustNewMockCredentialStore())
s := New(tn, mustNewMockDatabase(), mustNewMockManager(), mustNewMockCredentialStore())
if s == nil {
t.Fatalf("failed to create cluster service")
}
@ -50,7 +50,7 @@ func Test_NewServiceSetGetNodeAPIAddrMuxedTLS(t *testing.T) {
go mux.Serve()
tn := mux.Listen(1) // Could be any byte value.
s := New(tn, mustNewMockDatabase(), mustNewMockCredentialStore())
s := New(tn, mustNewMockDatabase(), mustNewMockManager(), mustNewMockCredentialStore())
if s == nil {
t.Fatalf("failed to create cluster service")
}

@ -15,7 +15,7 @@ import (
func Test_NewServiceOpenClose(t *testing.T) {
ml := mustNewMockTransport()
s := New(ml, mustNewMockDatabase(), mustNewMockCredentialStore())
s := New(ml, mustNewMockDatabase(), mustNewMockManager(), mustNewMockCredentialStore())
if s == nil {
t.Fatalf("failed to create cluster service")
}
@ -33,7 +33,7 @@ func Test_NewServiceOpenClose(t *testing.T) {
func Test_NewServiceSetGetAPIAddr(t *testing.T) {
ml := mustNewMockTransport()
s := New(ml, mustNewMockDatabase(), mustNewMockCredentialStore())
s := New(ml, mustNewMockDatabase(), mustNewMockManager(), mustNewMockCredentialStore())
if s == nil {
t.Fatalf("failed to create cluster service")
}
@ -54,7 +54,7 @@ func Test_NewServiceSetGetAPIAddr(t *testing.T) {
func Test_NewServiceSetGetNodeAPIAddr(t *testing.T) {
ml := mustNewMockTransport()
s := New(ml, mustNewMockDatabase(), mustNewMockCredentialStore())
s := New(ml, mustNewMockDatabase(), mustNewMockManager(), mustNewMockCredentialStore())
if s == nil {
t.Fatalf("failed to create cluster service")
}
@ -99,7 +99,7 @@ func Test_NewServiceSetGetNodeAPIAddr(t *testing.T) {
func Test_NewServiceSetGetNodeAPIAddrLocal(t *testing.T) {
ml := mustNewMockTransport()
s := New(ml, mustNewMockDatabase(), mustNewMockCredentialStore())
s := New(ml, mustNewMockDatabase(), mustNewMockManager(), mustNewMockCredentialStore())
if s == nil {
t.Fatalf("failed to create cluster service")
}
@ -136,7 +136,7 @@ func Test_NewServiceSetGetNodeAPIAddrLocal(t *testing.T) {
func Test_NewServiceSetGetNodeAPIAddrTLS(t *testing.T) {
ml := mustNewMockTLSTransport()
s := New(ml, mustNewMockDatabase(), mustNewMockCredentialStore())
s := New(ml, mustNewMockDatabase(), mustNewMockManager(), mustNewMockCredentialStore())
if s == nil {
t.Fatalf("failed to create cluster service")
}
@ -175,12 +175,13 @@ func Test_NewServiceSetGetNodeAPIAddrTLS(t *testing.T) {
func Test_NewServiceTestExecuteQueryAuthNoCredentials(t *testing.T) {
ml := mustNewMockTransport()
db := mustNewMockDatabase()
clstr := mustNewMockManager()
// Test that for a cluster with no credential store configed
// all users are authed for both operations
var c CredentialStore = nil
c = nil
s := New(ml, db, c)
s := New(ml, db, clstr, c)
if s == nil {
t.Fatalf("failed to create cluster service")
}
@ -211,6 +212,7 @@ func Test_NewServiceTestExecuteQueryAuthNoCredentials(t *testing.T) {
func Test_NewServiceTestExecuteQueryAuth(t *testing.T) {
ml := mustNewMockTransport()
db := mustNewMockDatabase()
clstr := mustNewMockManager()
f := func(username string, password string, perm string) bool {
if username == "alice" && password == "secret1" && perm == "execute" {
@ -222,7 +224,7 @@ func Test_NewServiceTestExecuteQueryAuth(t *testing.T) {
}
c := &mockCredentialStore{aaFunc: f}
s := New(ml, db, c)
s := New(ml, db, clstr, c)
if s == nil {
t.Fatalf("failed to create cluster service")
}
@ -347,6 +349,21 @@ func mustNewMockDatabase() *mockDatabase {
return &mockDatabase{executeFn: e, queryFn: q}
}
type MockManager struct {
removeNodeFn func(rn *command.RemoveNodeRequest) error
}
func (m *MockManager) Remove(rn *command.RemoveNodeRequest) error {
if m.removeNodeFn == nil {
return nil
}
return m.removeNodeFn(rn)
}
func mustNewMockManager() *MockManager {
return &MockManager{}
}
func mustCreateTLSConfig() *tls.Config {
var err error

@ -101,7 +101,7 @@ func main() {
}
// Create cluster service now, so nodes will be able to learn information about each other.
clstr, err := clusterService(cfg, mux.Listen(cluster.MuxClusterHeader), str, credStr)
clstr, err := clusterService(cfg, mux.Listen(cluster.MuxClusterHeader), str, str, credStr)
if err != nil {
log.Fatalf("failed to create cluster service: %s", err.Error())
}
@ -319,8 +319,8 @@ func credentialStore(cfg *Config) (*auth.CredentialsStore, error) {
return cs, nil
}
func clusterService(cfg *Config, tn cluster.Transport, db cluster.Database, credStr *auth.CredentialsStore) (*cluster.Service, error) {
c := cluster.New(tn, db, credStr)
func clusterService(cfg *Config, tn cluster.Transport, db cluster.Database, mgr cluster.Manager, credStr *auth.CredentialsStore) (*cluster.Service, error) {
c := cluster.New(tn, db, mgr, credStr)
c.SetAPIAddr(cfg.HTTPAdv)
c.EnableHTTPS(cfg.X509Cert != "" && cfg.X509Key != "") // Conditions met for an HTTPS API

@ -170,7 +170,7 @@ func (x Command_Type) Number() protoreflect.EnumNumber {
// Deprecated: Use Command_Type.Descriptor instead.
func (Command_Type) EnumDescriptor() ([]byte, []int) {
return file_command_proto_rawDescGZIP(), []int{11, 0}
return file_command_proto_rawDescGZIP(), []int{12, 0}
}
type Parameter struct {
@ -838,6 +838,53 @@ func (x *LoadRequest) GetData() []byte {
return nil
}
type RemoveNodeRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
}
func (x *RemoveNodeRequest) Reset() {
*x = RemoveNodeRequest{}
if protoimpl.UnsafeEnabled {
mi := &file_command_proto_msgTypes[10]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *RemoveNodeRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*RemoveNodeRequest) ProtoMessage() {}
func (x *RemoveNodeRequest) ProtoReflect() protoreflect.Message {
mi := &file_command_proto_msgTypes[10]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use RemoveNodeRequest.ProtoReflect.Descriptor instead.
func (*RemoveNodeRequest) Descriptor() ([]byte, []int) {
return file_command_proto_rawDescGZIP(), []int{10}
}
func (x *RemoveNodeRequest) GetId() string {
if x != nil {
return x.Id
}
return ""
}
type Noop struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
@ -849,7 +896,7 @@ type Noop struct {
func (x *Noop) Reset() {
*x = Noop{}
if protoimpl.UnsafeEnabled {
mi := &file_command_proto_msgTypes[10]
mi := &file_command_proto_msgTypes[11]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@ -862,7 +909,7 @@ func (x *Noop) String() string {
func (*Noop) ProtoMessage() {}
func (x *Noop) ProtoReflect() protoreflect.Message {
mi := &file_command_proto_msgTypes[10]
mi := &file_command_proto_msgTypes[11]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@ -875,7 +922,7 @@ func (x *Noop) ProtoReflect() protoreflect.Message {
// Deprecated: Use Noop.ProtoReflect.Descriptor instead.
func (*Noop) Descriptor() ([]byte, []int) {
return file_command_proto_rawDescGZIP(), []int{10}
return file_command_proto_rawDescGZIP(), []int{11}
}
func (x *Noop) GetId() string {
@ -898,7 +945,7 @@ type Command struct {
func (x *Command) Reset() {
*x = Command{}
if protoimpl.UnsafeEnabled {
mi := &file_command_proto_msgTypes[11]
mi := &file_command_proto_msgTypes[12]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@ -911,7 +958,7 @@ func (x *Command) String() string {
func (*Command) ProtoMessage() {}
func (x *Command) ProtoReflect() protoreflect.Message {
mi := &file_command_proto_msgTypes[11]
mi := &file_command_proto_msgTypes[12]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@ -924,7 +971,7 @@ func (x *Command) ProtoReflect() protoreflect.Message {
// Deprecated: Use Command.ProtoReflect.Descriptor instead.
func (*Command) Descriptor() ([]byte, []int) {
return file_command_proto_rawDescGZIP(), []int{11}
return file_command_proto_rawDescGZIP(), []int{12}
}
func (x *Command) GetType() Command_Type {
@ -1030,26 +1077,29 @@ var file_command_proto_rawDesc = []byte{
0x52, 0x4d, 0x41, 0x54, 0x5f, 0x42, 0x49, 0x4e, 0x41, 0x52, 0x59, 0x10, 0x02, 0x22, 0x21, 0x0a,
0x0b, 0x4c, 0x6f, 0x61, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x12, 0x0a, 0x04,
0x64, 0x61, 0x74, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61,
0x22, 0x16, 0x0a, 0x04, 0x4e, 0x6f, 0x6f, 0x70, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01,
0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x22, 0xf8, 0x01, 0x0a, 0x07, 0x43, 0x6f, 0x6d,
0x6d, 0x61, 0x6e, 0x64, 0x12, 0x29, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01,
0x28, 0x0e, 0x32, 0x15, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e, 0x43, 0x6f, 0x6d,
0x6d, 0x61, 0x6e, 0x64, 0x2e, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12,
0x1f, 0x0a, 0x0b, 0x73, 0x75, 0x62, 0x5f, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x18, 0x02,
0x20, 0x01, 0x28, 0x0c, 0x52, 0x0a, 0x73, 0x75, 0x62, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64,
0x12, 0x1e, 0x0a, 0x0a, 0x63, 0x6f, 0x6d, 0x70, 0x72, 0x65, 0x73, 0x73, 0x65, 0x64, 0x18, 0x03,
0x20, 0x01, 0x28, 0x08, 0x52, 0x0a, 0x63, 0x6f, 0x6d, 0x70, 0x72, 0x65, 0x73, 0x73, 0x65, 0x64,
0x22, 0x80, 0x01, 0x0a, 0x04, 0x54, 0x79, 0x70, 0x65, 0x12, 0x18, 0x0a, 0x14, 0x43, 0x4f, 0x4d,
0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57,
0x4e, 0x10, 0x00, 0x12, 0x16, 0x0a, 0x12, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54,
0x59, 0x50, 0x45, 0x5f, 0x51, 0x55, 0x45, 0x52, 0x59, 0x10, 0x01, 0x12, 0x18, 0x0a, 0x14, 0x43,
0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x45, 0x58, 0x45, 0x43,
0x55, 0x54, 0x45, 0x10, 0x02, 0x12, 0x15, 0x0a, 0x11, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44,
0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x4e, 0x4f, 0x4f, 0x50, 0x10, 0x03, 0x12, 0x15, 0x0a, 0x11,
0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x4c, 0x4f, 0x41,
0x44, 0x10, 0x04, 0x42, 0x22, 0x5a, 0x20, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f,
0x6d, 0x2f, 0x72, 0x71, 0x6c, 0x69, 0x74, 0x65, 0x2f, 0x72, 0x71, 0x6c, 0x69, 0x74, 0x65, 0x2f,
0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
0x22, 0x23, 0x0a, 0x11, 0x52, 0x65, 0x6d, 0x6f, 0x76, 0x65, 0x4e, 0x6f, 0x64, 0x65, 0x52, 0x65,
0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28,
0x09, 0x52, 0x02, 0x69, 0x64, 0x22, 0x16, 0x0a, 0x04, 0x4e, 0x6f, 0x6f, 0x70, 0x12, 0x0e, 0x0a,
0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x22, 0xf8, 0x01,
0x0a, 0x07, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x12, 0x29, 0x0a, 0x04, 0x74, 0x79, 0x70,
0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x15, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e,
0x64, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04,
0x74, 0x79, 0x70, 0x65, 0x12, 0x1f, 0x0a, 0x0b, 0x73, 0x75, 0x62, 0x5f, 0x63, 0x6f, 0x6d, 0x6d,
0x61, 0x6e, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0a, 0x73, 0x75, 0x62, 0x43, 0x6f,
0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x12, 0x1e, 0x0a, 0x0a, 0x63, 0x6f, 0x6d, 0x70, 0x72, 0x65, 0x73,
0x73, 0x65, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0a, 0x63, 0x6f, 0x6d, 0x70, 0x72,
0x65, 0x73, 0x73, 0x65, 0x64, 0x22, 0x80, 0x01, 0x0a, 0x04, 0x54, 0x79, 0x70, 0x65, 0x12, 0x18,
0x0a, 0x14, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x55,
0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x16, 0x0a, 0x12, 0x43, 0x4f, 0x4d, 0x4d,
0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x51, 0x55, 0x45, 0x52, 0x59, 0x10, 0x01,
0x12, 0x18, 0x0a, 0x14, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45,
0x5f, 0x45, 0x58, 0x45, 0x43, 0x55, 0x54, 0x45, 0x10, 0x02, 0x12, 0x15, 0x0a, 0x11, 0x43, 0x4f,
0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x4e, 0x4f, 0x4f, 0x50, 0x10,
0x03, 0x12, 0x15, 0x0a, 0x11, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50,
0x45, 0x5f, 0x4c, 0x4f, 0x41, 0x44, 0x10, 0x04, 0x42, 0x22, 0x5a, 0x20, 0x67, 0x69, 0x74, 0x68,
0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x72, 0x71, 0x6c, 0x69, 0x74, 0x65, 0x2f, 0x72, 0x71,
0x6c, 0x69, 0x74, 0x65, 0x2f, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x62, 0x06, 0x70, 0x72,
0x6f, 0x74, 0x6f, 0x33,
}
var (
@ -1065,7 +1115,7 @@ func file_command_proto_rawDescGZIP() []byte {
}
var file_command_proto_enumTypes = make([]protoimpl.EnumInfo, 3)
var file_command_proto_msgTypes = make([]protoimpl.MessageInfo, 12)
var file_command_proto_msgTypes = make([]protoimpl.MessageInfo, 13)
var file_command_proto_goTypes = []interface{}{
(QueryRequest_Level)(0), // 0: command.QueryRequest.Level
(BackupRequest_Format)(0), // 1: command.BackupRequest.Format
@ -1080,8 +1130,9 @@ var file_command_proto_goTypes = []interface{}{
(*ExecuteResult)(nil), // 10: command.ExecuteResult
(*BackupRequest)(nil), // 11: command.BackupRequest
(*LoadRequest)(nil), // 12: command.LoadRequest
(*Noop)(nil), // 13: command.Noop
(*Command)(nil), // 14: command.Command
(*RemoveNodeRequest)(nil), // 13: command.RemoveNodeRequest
(*Noop)(nil), // 14: command.Noop
(*Command)(nil), // 15: command.Command
}
var file_command_proto_depIdxs = []int32{
3, // 0: command.Statement.parameters:type_name -> command.Parameter
@ -1227,7 +1278,7 @@ func file_command_proto_init() {
}
}
file_command_proto_msgTypes[10].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*Noop); i {
switch v := v.(*RemoveNodeRequest); i {
case 0:
return &v.state
case 1:
@ -1239,6 +1290,18 @@ func file_command_proto_init() {
}
}
file_command_proto_msgTypes[11].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*Noop); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_command_proto_msgTypes[12].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*Command); i {
case 0:
return &v.state
@ -1264,7 +1327,7 @@ func file_command_proto_init() {
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_command_proto_rawDesc,
NumEnums: 3,
NumMessages: 12,
NumMessages: 13,
NumExtensions: 0,
NumServices: 0,
},

@ -74,6 +74,10 @@ message LoadRequest {
bytes data = 1;
}
message RemoveNodeRequest {
string id = 1;
}
message Noop {
string id = 1;
}

@ -67,8 +67,8 @@ type Store interface {
// Notify notifies this node that a node is available at addr.
Notify(id, addr string) error
// Remove removes the node, specified by id, from the cluster.
Remove(id string) error
// RemoveNode removes the node from the cluster.
Remove(rn *command.RemoveNodeRequest) error
// LeaderAddr returns the Raft address of the leader of the cluster.
LeaderAddr() (string, error)
@ -100,6 +100,9 @@ type Cluster interface {
// Load loads a SQLite database into the node.
Load(lr *command.LoadRequest, nodeAddr string, creds *cluster.Credentials, timeout time.Duration) error
// RemoveNode removes a node from the cluster.
RemoveNode(rn *command.RemoveNodeRequest, nodeAddr string, creds *cluster.Credentials, timeout time.Duration) error
// Stats returns stats on the Cluster.
Stats() (map[string]interface{}, error)
}
@ -182,6 +185,7 @@ const (
numRemoteQueries = "remote_queries"
numRemoteBackups = "remote_backups"
numRemoteLoads = "remote_loads"
numRemoteRemoveNode = "remte_remove_node"
numReadyz = "num_readyz"
numStatus = "num_status"
numBackups = "backups"
@ -216,6 +220,7 @@ func init() {
stats.Add(numRemoteQueries, 0)
stats.Add(numRemoteBackups, 0)
stats.Add(numRemoteLoads, 0)
stats.Add(numRemoteRemoveNode, 0)
stats.Add(numReadyz, 0)
stats.Add(numStatus, 0)
stats.Add(numBackups, 0)
@ -552,6 +557,12 @@ func (s *Service) handleRemove(w http.ResponseWriter, r *http.Request) {
return
}
redirect, err := isRedirect(r)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
b, err := ioutil.ReadAll(r.Body)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
@ -574,20 +585,62 @@ func (s *Service) handleRemove(w http.ResponseWriter, r *http.Request) {
return
}
if err := s.store.Remove(remoteID); err != nil {
timeout, err := timeoutParam(r, defaultTimeout)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
rn := &command.RemoveNodeRequest{
Id: remoteID,
}
err = s.store.Remove(rn)
if err != nil {
if err == store.ErrNotLeader {
leaderAPIAddr := s.LeaderAPIAddr()
if leaderAPIAddr == "" {
if redirect {
leaderAPIAddr := s.LeaderAPIAddr()
if leaderAPIAddr == "" {
stats.Add(numLeaderNotFound, 1)
http.Error(w, ErrLeaderNotFound.Error(), http.StatusServiceUnavailable)
return
}
redirect := s.FormRedirect(r, leaderAPIAddr)
http.Redirect(w, r, redirect, http.StatusMovedPermanently)
return
}
addr, err := s.store.LeaderAddr()
if err != nil {
http.Error(w, fmt.Sprintf("leader address: %s", err.Error()),
http.StatusInternalServerError)
return
}
if addr == "" {
stats.Add(numLeaderNotFound, 1)
http.Error(w, ErrLeaderNotFound.Error(), http.StatusServiceUnavailable)
return
}
redirect := s.FormRedirect(r, leaderAPIAddr)
http.Redirect(w, r, redirect, http.StatusMovedPermanently)
username, password, ok := r.BasicAuth()
if !ok {
username = ""
}
w.Header().Add(ServedByHTTPHeader, addr)
removeErr := s.cluster.RemoveNode(rn, addr, makeCredentials(username, password), timeout)
if removeErr != nil {
if removeErr.Error() == "unauthorized" {
http.Error(w, "remote remove node not authorized", http.StatusUnauthorized)
} else {
http.Error(w, removeErr.Error(), http.StatusInternalServerError)
}
return
}
stats.Add(numRemoteRemoveNode, 1)
return
}
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

@ -1265,7 +1265,7 @@ func (m *MockStore) Notify(id, addr string) error {
return nil
}
func (m *MockStore) Remove(id string) error {
func (m *MockStore) Remove(rn *command.RemoveNodeRequest) error {
return nil
}
@ -1296,11 +1296,12 @@ func (m *MockStore) Load(lr *command.LoadRequest) error {
}
type mockClusterService struct {
apiAddr string
executeFn func(er *command.ExecuteRequest, addr string, t time.Duration) ([]*command.ExecuteResult, error)
queryFn func(qr *command.QueryRequest, addr string, t time.Duration) ([]*command.QueryRows, error)
backupFn func(br *command.BackupRequest, addr string, t time.Duration, w io.Writer) error
loadFn func(lr *command.LoadRequest, addr string, t time.Duration) error
apiAddr string
executeFn func(er *command.ExecuteRequest, addr string, t time.Duration) ([]*command.ExecuteResult, error)
queryFn func(qr *command.QueryRequest, addr string, t time.Duration) ([]*command.QueryRows, error)
backupFn func(br *command.BackupRequest, addr string, t time.Duration, w io.Writer) error
loadFn func(lr *command.LoadRequest, addr string, t time.Duration) error
removeNodeFn func(rn *command.RemoveNodeRequest, nodeAddr string, t time.Duration) error
}
func (m *mockClusterService) GetNodeAPIAddr(a string, t time.Duration) (string, error) {
@ -1335,6 +1336,13 @@ func (m *mockClusterService) Load(lr *command.LoadRequest, addr string, creds *c
return nil
}
func (m *mockClusterService) RemoveNode(rn *command.RemoveNodeRequest, addr string, creds *cluster.Credentials, t time.Duration) error {
if m.removeNodeFn != nil {
return m.removeNodeFn(rn, addr, t)
}
return nil
}
type mockCredentialStore struct {
HasPermOK bool
aaFunc func(username, password, perm string) bool

@ -1048,11 +1048,12 @@ func (s *Store) Join(id, addr string, voter bool) error {
return nil
}
// Remove removes a node from the store, specified by ID.
func (s *Store) Remove(id string) error {
// Remove removes a node from the store.
func (s *Store) Remove(rn *command.RemoveNodeRequest) error {
if !s.open {
return ErrNotOpen
}
id := rn.Id
s.logger.Printf("received request to remove node %s", id)
if err := s.remove(id); err != nil {

@ -48,7 +48,7 @@ func Test_StoreSingleNodeNotOpen(t *testing.T) {
if err := s.Notify("id", "localhost"); err != ErrNotOpen {
t.Fatalf("wrong error received for non-open store: %s", err)
}
if err := s.Remove("id"); err != ErrNotOpen {
if err := s.Remove(nil); err != ErrNotOpen {
t.Fatalf("wrong error received for non-open store: %s", err)
}
if _, err := s.Nodes(); err != ErrNotOpen {
@ -1192,7 +1192,7 @@ func Test_MultiNodeJoinRemove(t *testing.T) {
}
// Remove a node.
if err := s0.Remove(s1.ID()); err != nil {
if err := s0.Remove(removeNodeRequest(s1.ID())); err != nil {
t.Fatalf("failed to remove %s from cluster: %s", s1.ID(), err.Error())
}
@ -1353,7 +1353,7 @@ func Test_MultiNodeJoinNonVoterRemove(t *testing.T) {
}
// Remove the non-voter.
if err := s0.Remove(s1.ID()); err != nil {
if err := s0.Remove(removeNodeRequest(s1.ID())); err != nil {
t.Fatalf("failed to remove %s from cluster: %s", s1.ID(), err.Error())
}
@ -2156,6 +2156,12 @@ func loadRequestFromFile(path string) *command.LoadRequest {
}
}
func removeNodeRequest(id string) *command.RemoveNodeRequest {
return &command.RemoveNodeRequest{
Id: id,
}
}
// waitForLeaderID waits until the Store's LeaderID is set, or the timeout
// expires. Because setting Leader ID requires Raft to set the cluster
// configuration, it's not entirely deterministic when it will be set.

Loading…
Cancel
Save