diff --git a/command/command.pb.go b/command/command.pb.go index 7d5b7cf6..53a7fac3 100644 --- a/command/command.pb.go +++ b/command/command.pb.go @@ -170,12 +170,13 @@ func (BackupRequest_Format) EnumDescriptor() ([]byte, []int) { type Command_Type int32 const ( - Command_COMMAND_TYPE_UNKNOWN Command_Type = 0 - Command_COMMAND_TYPE_QUERY Command_Type = 1 - Command_COMMAND_TYPE_EXECUTE Command_Type = 2 - Command_COMMAND_TYPE_NOOP Command_Type = 3 - Command_COMMAND_TYPE_LOAD Command_Type = 4 - Command_COMMAND_TYPE_JOIN Command_Type = 5 + Command_COMMAND_TYPE_UNKNOWN Command_Type = 0 + Command_COMMAND_TYPE_QUERY Command_Type = 1 + Command_COMMAND_TYPE_EXECUTE Command_Type = 2 + Command_COMMAND_TYPE_NOOP Command_Type = 3 + Command_COMMAND_TYPE_LOAD Command_Type = 4 + Command_COMMAND_TYPE_JOIN Command_Type = 5 + Command_COMMAND_TYPE_EXECUTE_QUERY Command_Type = 6 ) // Enum value maps for Command_Type. @@ -187,14 +188,16 @@ var ( 3: "COMMAND_TYPE_NOOP", 4: "COMMAND_TYPE_LOAD", 5: "COMMAND_TYPE_JOIN", + 6: "COMMAND_TYPE_EXECUTE_QUERY", } Command_Type_value = map[string]int32{ - "COMMAND_TYPE_UNKNOWN": 0, - "COMMAND_TYPE_QUERY": 1, - "COMMAND_TYPE_EXECUTE": 2, - "COMMAND_TYPE_NOOP": 3, - "COMMAND_TYPE_LOAD": 4, - "COMMAND_TYPE_JOIN": 5, + "COMMAND_TYPE_UNKNOWN": 0, + "COMMAND_TYPE_QUERY": 1, + "COMMAND_TYPE_EXECUTE": 2, + "COMMAND_TYPE_NOOP": 3, + "COMMAND_TYPE_LOAD": 4, + "COMMAND_TYPE_JOIN": 5, + "COMMAND_TYPE_EXECUTE_QUERY": 6, } ) @@ -1437,14 +1440,14 @@ var file_command_proto_rawDesc = []byte{ 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x22, 0x16, 0x0a, 0x04, 0x4e, 0x6f, 0x6f, 0x70, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, - 0x22, 0x8f, 0x02, 0x0a, 0x07, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x12, 0x29, 0x0a, 0x04, + 0x22, 0xaf, 0x02, 0x0a, 0x07, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x12, 0x29, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x15, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x1f, 0x0a, 0x0b, 0x73, 0x75, 0x62, 0x5f, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0a, 0x73, 0x75, 0x62, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x12, 0x1e, 0x0a, 0x0a, 0x63, 0x6f, 0x6d, 0x70, 0x72, 0x65, 0x73, 0x73, 0x65, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0a, 0x63, 0x6f, - 0x6d, 0x70, 0x72, 0x65, 0x73, 0x73, 0x65, 0x64, 0x22, 0x97, 0x01, 0x0a, 0x04, 0x54, 0x79, 0x70, + 0x6d, 0x70, 0x72, 0x65, 0x73, 0x73, 0x65, 0x64, 0x22, 0xb7, 0x01, 0x0a, 0x04, 0x54, 0x79, 0x70, 0x65, 0x12, 0x18, 0x0a, 0x14, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x16, 0x0a, 0x12, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x51, 0x55, 0x45, 0x52, @@ -1454,7 +1457,9 @@ var file_command_proto_rawDesc = []byte{ 0x4f, 0x50, 0x10, 0x03, 0x12, 0x15, 0x0a, 0x11, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x4c, 0x4f, 0x41, 0x44, 0x10, 0x04, 0x12, 0x15, 0x0a, 0x11, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x4a, 0x4f, 0x49, 0x4e, - 0x10, 0x05, 0x42, 0x22, 0x5a, 0x20, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, + 0x10, 0x05, 0x12, 0x1e, 0x0a, 0x1a, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, + 0x50, 0x45, 0x5f, 0x45, 0x58, 0x45, 0x43, 0x55, 0x54, 0x45, 0x5f, 0x51, 0x55, 0x45, 0x52, 0x59, + 0x10, 0x06, 0x42, 0x22, 0x5a, 0x20, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x72, 0x71, 0x6c, 0x69, 0x74, 0x65, 0x2f, 0x72, 0x71, 0x6c, 0x69, 0x74, 0x65, 0x2f, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } diff --git a/command/command.proto b/command/command.proto index 632dd471..2df33066 100644 --- a/command/command.proto +++ b/command/command.proto @@ -120,6 +120,7 @@ message Command { COMMAND_TYPE_NOOP = 3; COMMAND_TYPE_LOAD = 4; COMMAND_TYPE_JOIN = 5; + COMMAND_TYPE_EXECUTE_QUERY = 6; } Type type = 1; bytes sub_command = 2; diff --git a/store/store.go b/store/store.go index b942bc45..a6e612c1 100644 --- a/store/store.go +++ b/store/store.go @@ -970,6 +970,60 @@ func (s *Store) Query(qr *command.QueryRequest) ([]*command.QueryRows, error) { return s.db.Query(qr.Request, qr.Timings) } +// Request processes a request that may contain both Executes and Queries. +func (s *Store) Request(eqr *command.ExecuteQueryRequest) ([]*command.ExecuteQueryResponse, error) { + if !s.open { + return nil, ErrNotOpen + } + + if s.RequiresLeader(eqr) && s.raft.State() != raft.Leader { + return nil, ErrNotLeader + } + + if eqr.Level == command.ExecuteQueryRequest_QUERY_REQUEST_LEVEL_STRONG { + if !s.Ready() { + return nil, ErrNotReady + } + + b, compressed, err := s.reqMarshaller.Marshal(eqr) + if err != nil { + return nil, err + } + if compressed { + stats.Add(numCompressedCommands, 1) + } else { + stats.Add(numUncompressedCommands, 1) + } + + c := &command.Command{ + Type: command.Command_COMMAND_TYPE_EXECUTE_QUERY, + SubCommand: b, + Compressed: compressed, + } + + b, err = command.Marshal(c) + if err != nil { + return nil, err + } + + af := s.raft.Apply(b, s.ApplyTimeout) + if af.Error() != nil { + if af.Error() == raft.ErrNotLeader { + return nil, ErrNotLeader + } + return nil, af.Error() + } + + s.dbAppliedIndexMu.Lock() + s.dbAppliedIndex = af.Index() + s.dbAppliedIndexMu.Unlock() + r := af.Response().(*fsmExecuteQueryResponse) + return r.results, r.error + } + + return nil, nil +} + // Backup writes a snapshot of the underlying database to dst // // If Leader is true for the request, this operation is performed with a read consistency @@ -1244,6 +1298,26 @@ func (s *Store) Noop(id string) error { return nil } +// RequiresLeader returns whether the given ExecuteQueryRequest must be +// processed on the cluster Leader. +func (s *Store) RequiresLeader(eqr *command.ExecuteQueryRequest) bool { + if eqr.Level != command.ExecuteQueryRequest_QUERY_REQUEST_LEVEL_NONE { + return true + } + + for _, stmt := range eqr.Request.Statements { + sql := stmt.Sql + if sql == "" { + continue + } + ro, err := s.db.StmtReadOnly(sql) + if !ro || err != nil { + return true + } + } + return false +} + // setLogInfo records some key indexs about the log. func (s *Store) setLogInfo() error { var err error @@ -1305,6 +1379,11 @@ type fsmQueryResponse struct { error error } +type fsmExecuteQueryResponse struct { + results []*command.ExecuteQueryResponse + error error +} + type fsmGenericResponse struct { error error }