diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 724888a7..831187a6 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -52,10 +52,10 @@ export PATH=$PATH:$GOBIN export DEST_DIR=$GOPATH/src export SRC_DIR=$GOPATH/src/github.com/rqlite/rqlite/command -protoc -I=$SRC_DIR --go_out=$DEST_DIR $SRC_DIR/command.proto +protoc -I=$SRC_DIR --proto_path=$GOPATHsrc/github.com/rqlite/rqlite --go_out=$DEST_DIR $SRC_DIR/command.proto export SRC_DIR=$GOPATH/src/github.com/rqlite/rqlite/cluster -protoc -I=$SRC_DIR --go_out=$DEST_DIR $SRC_DIR/message.proto +protoc -I=$SRC_DIR --proto_path=$GOPATHsrc/github.com/rqlite/rqlite --go_out=$DEST_DIR $SRC_DIR/message.proto ``` ### Speeding up the build process diff --git a/cluster/message.pb.go b/cluster/message.pb.go index ecb1d42b..4c1c255b 100644 --- a/cluster/message.pb.go +++ b/cluster/message.pb.go @@ -1,12 +1,13 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.26.0 +// protoc-gen-go v1.27.1 // protoc v3.13.0 // source: message.proto package cluster import ( + command "github.com/rqlite/rqlite/command" protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" reflect "reflect" @@ -25,6 +26,8 @@ type Command_Type int32 const ( Command_COMMAND_TYPE_UNKNOWN Command_Type = 0 Command_COMMAND_TYPE_GET_NODE_API_URL Command_Type = 1 + Command_COMMAND_TYPE_EXECUTE Command_Type = 2 + Command_COMMAND_TYPE_QUERY Command_Type = 3 ) // Enum value maps for Command_Type. @@ -32,10 +35,14 @@ var ( Command_Type_name = map[int32]string{ 0: "COMMAND_TYPE_UNKNOWN", 1: "COMMAND_TYPE_GET_NODE_API_URL", + 2: "COMMAND_TYPE_EXECUTE", + 3: "COMMAND_TYPE_QUERY", } Command_Type_value = map[string]int32{ "COMMAND_TYPE_UNKNOWN": 0, "COMMAND_TYPE_GET_NODE_API_URL": 1, + "COMMAND_TYPE_EXECUTE": 2, + "COMMAND_TYPE_QUERY": 3, } ) @@ -118,7 +125,8 @@ type Command struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Type Command_Type `protobuf:"varint,1,opt,name=type,proto3,enum=Command_Type" json:"type,omitempty"` + Type Command_Type `protobuf:"varint,1,opt,name=type,proto3,enum=cluster.Command_Type" json:"type,omitempty"` + SubCommand []byte `protobuf:"bytes,2,opt,name=sub_command,json=subCommand,proto3" json:"sub_command,omitempty"` } func (x *Command) Reset() { @@ -160,22 +168,158 @@ func (x *Command) GetType() Command_Type { return Command_COMMAND_TYPE_UNKNOWN } +func (x *Command) GetSubCommand() []byte { + if x != nil { + return x.SubCommand + } + return nil +} + +type CommandExecuteResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Error string `protobuf:"bytes,1,opt,name=error,proto3" json:"error,omitempty"` + Results []*command.ExecuteResult `protobuf:"bytes,2,rep,name=results,proto3" json:"results,omitempty"` +} + +func (x *CommandExecuteResponse) Reset() { + *x = CommandExecuteResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_message_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *CommandExecuteResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*CommandExecuteResponse) ProtoMessage() {} + +func (x *CommandExecuteResponse) ProtoReflect() protoreflect.Message { + mi := &file_message_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use CommandExecuteResponse.ProtoReflect.Descriptor instead. +func (*CommandExecuteResponse) Descriptor() ([]byte, []int) { + return file_message_proto_rawDescGZIP(), []int{2} +} + +func (x *CommandExecuteResponse) GetError() string { + if x != nil { + return x.Error + } + return "" +} + +func (x *CommandExecuteResponse) GetResults() []*command.ExecuteResult { + if x != nil { + return x.Results + } + return nil +} + +type CommandQueryResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Error string `protobuf:"bytes,1,opt,name=error,proto3" json:"error,omitempty"` + Rows []*command.QueryRows `protobuf:"bytes,2,rep,name=rows,proto3" json:"rows,omitempty"` +} + +func (x *CommandQueryResponse) Reset() { + *x = CommandQueryResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_message_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *CommandQueryResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*CommandQueryResponse) ProtoMessage() {} + +func (x *CommandQueryResponse) ProtoReflect() protoreflect.Message { + mi := &file_message_proto_msgTypes[3] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use CommandQueryResponse.ProtoReflect.Descriptor instead. +func (*CommandQueryResponse) Descriptor() ([]byte, []int) { + return file_message_proto_rawDescGZIP(), []int{3} +} + +func (x *CommandQueryResponse) GetError() string { + if x != nil { + return x.Error + } + return "" +} + +func (x *CommandQueryResponse) GetRows() []*command.QueryRows { + if x != nil { + return x.Rows + } + return nil +} + var File_message_proto protoreflect.FileDescriptor var file_message_proto_rawDesc = []byte{ - 0x0a, 0x0d, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, + 0x0a, 0x0d, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, + 0x07, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x1a, 0x15, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, + 0x64, 0x2f, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x1b, 0x0a, 0x07, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x12, 0x10, 0x0a, 0x03, 0x75, 0x72, - 0x6c, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x75, 0x72, 0x6c, 0x22, 0x71, 0x0a, 0x07, - 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x12, 0x21, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, - 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x0d, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e, - 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x22, 0x43, 0x0a, 0x04, 0x54, 0x79, - 0x70, 0x65, 0x12, 0x18, 0x0a, 0x14, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, - 0x50, 0x45, 0x5f, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x21, 0x0a, 0x1d, - 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x47, 0x45, 0x54, - 0x5f, 0x4e, 0x4f, 0x44, 0x45, 0x5f, 0x41, 0x50, 0x49, 0x5f, 0x55, 0x52, 0x4c, 0x10, 0x01, 0x42, - 0x22, 0x5a, 0x20, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x72, 0x71, - 0x6c, 0x69, 0x74, 0x65, 0x2f, 0x72, 0x71, 0x6c, 0x69, 0x74, 0x65, 0x2f, 0x63, 0x6c, 0x75, 0x73, - 0x74, 0x65, 0x72, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x6c, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x75, 0x72, 0x6c, 0x22, 0xcc, 0x01, 0x0a, + 0x07, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x12, 0x29, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x15, 0x2e, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, + 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, + 0x79, 0x70, 0x65, 0x12, 0x1f, 0x0a, 0x0b, 0x73, 0x75, 0x62, 0x5f, 0x63, 0x6f, 0x6d, 0x6d, 0x61, + 0x6e, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0a, 0x73, 0x75, 0x62, 0x43, 0x6f, 0x6d, + 0x6d, 0x61, 0x6e, 0x64, 0x22, 0x75, 0x0a, 0x04, 0x54, 0x79, 0x70, 0x65, 0x12, 0x18, 0x0a, 0x14, + 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x55, 0x4e, 0x4b, + 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x21, 0x0a, 0x1d, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, + 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x47, 0x45, 0x54, 0x5f, 0x4e, 0x4f, 0x44, 0x45, 0x5f, + 0x41, 0x50, 0x49, 0x5f, 0x55, 0x52, 0x4c, 0x10, 0x01, 0x12, 0x18, 0x0a, 0x14, 0x43, 0x4f, 0x4d, + 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x45, 0x58, 0x45, 0x43, 0x55, 0x54, + 0x45, 0x10, 0x02, 0x12, 0x16, 0x0a, 0x12, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, + 0x59, 0x50, 0x45, 0x5f, 0x51, 0x55, 0x45, 0x52, 0x59, 0x10, 0x03, 0x22, 0x60, 0x0a, 0x16, 0x43, + 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, 0x65, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x30, 0x0a, 0x07, 0x72, + 0x65, 0x73, 0x75, 0x6c, 0x74, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x16, 0x2e, 0x63, + 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e, 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, 0x65, 0x52, 0x65, + 0x73, 0x75, 0x6c, 0x74, 0x52, 0x07, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x73, 0x22, 0x54, 0x0a, + 0x14, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x51, 0x75, 0x65, 0x72, 0x79, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x26, 0x0a, 0x04, 0x72, + 0x6f, 0x77, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, + 0x61, 0x6e, 0x64, 0x2e, 0x51, 0x75, 0x65, 0x72, 0x79, 0x52, 0x6f, 0x77, 0x73, 0x52, 0x04, 0x72, + 0x6f, 0x77, 0x73, 0x42, 0x22, 0x5a, 0x20, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, + 0x6d, 0x2f, 0x72, 0x71, 0x6c, 0x69, 0x74, 0x65, 0x2f, 0x72, 0x71, 0x6c, 0x69, 0x74, 0x65, 0x2f, + 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -191,19 +335,25 @@ func file_message_proto_rawDescGZIP() []byte { } var file_message_proto_enumTypes = make([]protoimpl.EnumInfo, 1) -var file_message_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_message_proto_msgTypes = make([]protoimpl.MessageInfo, 4) var file_message_proto_goTypes = []interface{}{ - (Command_Type)(0), // 0: Command.Type - (*Address)(nil), // 1: Address - (*Command)(nil), // 2: Command + (Command_Type)(0), // 0: cluster.Command.Type + (*Address)(nil), // 1: cluster.Address + (*Command)(nil), // 2: cluster.Command + (*CommandExecuteResponse)(nil), // 3: cluster.CommandExecuteResponse + (*CommandQueryResponse)(nil), // 4: cluster.CommandQueryResponse + (*command.ExecuteResult)(nil), // 5: command.ExecuteResult + (*command.QueryRows)(nil), // 6: command.QueryRows } var file_message_proto_depIdxs = []int32{ - 0, // 0: Command.type:type_name -> Command.Type - 1, // [1:1] is the sub-list for method output_type - 1, // [1:1] is the sub-list for method input_type - 1, // [1:1] is the sub-list for extension type_name - 1, // [1:1] is the sub-list for extension extendee - 0, // [0:1] is the sub-list for field type_name + 0, // 0: cluster.Command.type:type_name -> cluster.Command.Type + 5, // 1: cluster.CommandExecuteResponse.results:type_name -> command.ExecuteResult + 6, // 2: cluster.CommandQueryResponse.rows:type_name -> command.QueryRows + 3, // [3:3] is the sub-list for method output_type + 3, // [3:3] is the sub-list for method input_type + 3, // [3:3] is the sub-list for extension type_name + 3, // [3:3] is the sub-list for extension extendee + 0, // [0:3] is the sub-list for field type_name } func init() { file_message_proto_init() } @@ -236,6 +386,30 @@ func file_message_proto_init() { return nil } } + file_message_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*CommandExecuteResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_message_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*CommandQueryResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } type x struct{} out := protoimpl.TypeBuilder{ @@ -243,7 +417,7 @@ func file_message_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_message_proto_rawDesc, NumEnums: 1, - NumMessages: 2, + NumMessages: 4, NumExtensions: 0, NumServices: 0, }, diff --git a/cluster/message.proto b/cluster/message.proto index 270e837a..bd59373c 100644 --- a/cluster/message.proto +++ b/cluster/message.proto @@ -1,4 +1,7 @@ syntax = "proto3"; +package cluster; + +import "command/command.proto"; option go_package = "github.com/rqlite/rqlite/cluster"; @@ -10,6 +13,19 @@ message Command { enum Type { COMMAND_TYPE_UNKNOWN = 0; COMMAND_TYPE_GET_NODE_API_URL = 1; + COMMAND_TYPE_EXECUTE = 2; + COMMAND_TYPE_QUERY = 3; } Type type = 1; + bytes sub_command = 2; +} + +message CommandExecuteResponse { + string error = 1; + repeated command.ExecuteResult results = 2; +} + +message CommandQueryResponse { + string error = 1; + repeated command.QueryRows rows = 2; } diff --git a/cluster/service.go b/cluster/service.go index d96748df..e6c7fb56 100644 --- a/cluster/service.go +++ b/cluster/service.go @@ -13,6 +13,7 @@ import ( "time" "github.com/golang/protobuf/proto" + "github.com/rqlite/rqlite/command" ) // stats captures stats for the Cluster service. @@ -21,6 +22,8 @@ var stats *expvar.Map const ( numGetNodeAPIRequest = "num_get_node_api_req" numGetNodeAPIResponse = "num_get_node_api_resp" + numExecuteRequest = "num_execute_req" + numQueryRequest = "num_query_req" ) const ( @@ -35,6 +38,8 @@ func init() { stats = expvar.NewMap("cluster") stats.Add(numGetNodeAPIRequest, 0) stats.Add(numGetNodeAPIResponse, 0) + stats.Add(numExecuteRequest, 0) + stats.Add(numQueryRequest, 0) } // Dialer is the interface dialers must implement. @@ -44,6 +49,21 @@ type Dialer interface { Dial(address string, timeout time.Duration) (net.Conn, error) } +// Database is the interface any queryable system must implement +type Database interface { + // Execute executes a slice of queries, each of which is not expected + // to return rows. If timings is true, then timing information will + // be return. If tx is true, then either all queries will be executed + // successfully or it will as though none executed. + Execute(er *command.ExecuteRequest) ([]*command.ExecuteResult, error) + + // Query executes a slice of queries, each of which returns rows. If + // timings is true, then timing information will be returned. If tx + // is true, then all queries will take place while a read transaction + // is held on the database. + Query(qr *command.QueryRequest) ([]*command.QueryRows, error) +} + // Transport is the interface the network layer must provide. type Transport interface { net.Listener @@ -55,6 +75,8 @@ type Service struct { tn Transport // Network layer this service uses addr net.Addr // Address on which this service is listening + db Database // The queryable system. + mu sync.RWMutex https bool // Serving HTTPS? apiAddr string // host:port this node serves the HTTP API. @@ -63,10 +85,11 @@ type Service struct { } // New returns a new instance of the cluster service -func New(tn Transport) *Service { +func New(tn Transport, db Database) *Service { return &Service{ tn: tn, addr: tn.Addr(), + db: db, logger: log.New(os.Stderr, "[cluster] ", log.LstdFlags), } } @@ -173,5 +196,56 @@ func (s *Service) handleConn(conn net.Conn) { } conn.Write(b) stats.Add(numGetNodeAPIResponse, 1) + + case Command_COMMAND_TYPE_EXECUTE: + stats.Add(numExecuteRequest, 1) + + er := &command.ExecuteRequest{} + if err = proto.Unmarshal(c.SubCommand, er); err != nil { + // Write some error, then close? + return + } + + resp := &CommandExecuteResponse{} + res, err := s.db.Execute(er) + if err != nil { + resp.Error = err.Error() + } else { + resp.Results = make([]*command.ExecuteResult, len(res)) + for i := range res { + resp.Results[i] = res[i] + } + } + b, err = proto.Marshal(resp) + if err != nil { + return + } + conn.Write(b) + + case Command_COMMAND_TYPE_QUERY: + stats.Add(numQueryRequest, 1) + return + + qr := &command.QueryRequest{} + if err = proto.Unmarshal(c.SubCommand, qr); err != nil { + // Write some error, then close? + return + } + + resp := &CommandQueryResponse{} + res, err := s.db.Query(qr) + if err != nil { + resp.Error = err.Error() + } else { + resp.Rows = make([]*command.QueryRows, len(res)) + for i := range res { + resp.Rows[i] = res[i] + } + } + b, err = proto.Marshal(resp) + if err != nil { + return + } + conn.Write(b) } } diff --git a/cluster/service_mux_test.go b/cluster/service_mux_test.go index 409d3478..1555e2b0 100644 --- a/cluster/service_mux_test.go +++ b/cluster/service_mux_test.go @@ -15,7 +15,7 @@ func Test_NewServiceSetGetNodeAPIAddrMuxed(t *testing.T) { go mux.Serve() tn := mux.Listen(1) // Could be any byte value. - s := New(tn) + s := New(tn, mustNewMockDatabase()) if s == nil { t.Fatalf("failed to create cluster service") } @@ -49,7 +49,7 @@ func Test_NewServiceSetGetNodeAPIAddrMuxedTLS(t *testing.T) { go mux.Serve() tn := mux.Listen(1) // Could be any byte value. - s := New(tn) + s := New(tn, mustNewMockDatabase()) if s == nil { t.Fatalf("failed to create cluster service") } diff --git a/cluster/service_test.go b/cluster/service_test.go index 0591a6e3..2829f3bb 100644 --- a/cluster/service_test.go +++ b/cluster/service_test.go @@ -7,12 +7,13 @@ import ( "testing" "time" + "github.com/rqlite/rqlite/command" "github.com/rqlite/rqlite/testdata/x509" ) func Test_NewServiceOpenClose(t *testing.T) { ml := mustNewMockTransport() - s := New(ml) + s := New(ml, mustNewMockDatabase()) if s == nil { t.Fatalf("failed to create cluster service") } @@ -30,7 +31,7 @@ func Test_NewServiceOpenClose(t *testing.T) { func Test_NewServiceSetGetAPIAddr(t *testing.T) { ml := mustNewMockTransport() - s := New(ml) + s := New(ml, mustNewMockDatabase()) if s == nil { t.Fatalf("failed to create cluster service") } @@ -51,7 +52,7 @@ func Test_NewServiceSetGetAPIAddr(t *testing.T) { func Test_NewServiceSetGetNodeAPIAddr(t *testing.T) { ml := mustNewMockTransport() - s := New(ml) + s := New(ml, mustNewMockDatabase()) if s == nil { t.Fatalf("failed to create cluster service") } @@ -89,7 +90,7 @@ func Test_NewServiceSetGetNodeAPIAddr(t *testing.T) { func Test_NewServiceSetGetNodeAPIAddrTLS(t *testing.T) { ml := mustNewMockTLSTransport() - s := New(ml) + s := New(ml, mustNewMockDatabase()) if s == nil { t.Fatalf("failed to create cluster service") } @@ -130,24 +131,6 @@ type mockTransport struct { remoteEncrypted bool } -func mustNewMockTransport() *mockTransport { - tn, err := net.Listen("tcp", "localhost:0") - if err != nil { - panic("failed to create mock listener") - } - return &mockTransport{ - tn: tn, - } -} - -func mustNewMockTLSTransport() *mockTransport { - tn := mustNewMockTransport() - return &mockTransport{ - tn: tls.NewListener(tn, mustCreateTLSConfig()), - remoteEncrypted: true, - } -} - func (ml *mockTransport) Accept() (c net.Conn, err error) { return ml.tn.Accept() } @@ -177,6 +160,41 @@ func (ml *mockTransport) Dial(addr string, timeout time.Duration) (net.Conn, err return conn, err } +func mustNewMockTransport() *mockTransport { + tn, err := net.Listen("tcp", "localhost:0") + if err != nil { + panic("failed to create mock listener") + } + return &mockTransport{ + tn: tn, + } +} + +func mustNewMockTLSTransport() *mockTransport { + tn := mustNewMockTransport() + return &mockTransport{ + tn: tls.NewListener(tn, mustCreateTLSConfig()), + remoteEncrypted: true, + } +} + +type mockDatabase struct { + executeFn func(er *command.ExecuteRequest) ([]*command.ExecuteResult, error) + queryFn func(qr *command.QueryRequest) ([]*command.QueryRows, error) +} + +func (m *mockDatabase) Execute(er *command.ExecuteRequest) ([]*command.ExecuteResult, error) { + return m.executeFn(er) +} + +func (m *mockDatabase) Query(qr *command.QueryRequest) ([]*command.QueryRows, error) { + return m.queryFn(qr) +} + +func mustNewMockDatabase() *mockDatabase { + return &mockDatabase{} +} + func mustCreateTLSConfig() *tls.Config { var err error