diff --git a/cluster/client.go b/cluster/client.go index 639c2860..2eead814 100644 --- a/cluster/client.go +++ b/cluster/client.go @@ -4,7 +4,7 @@ import ( "encoding/binary" "errors" "fmt" - "io/ioutil" + "io" "net" "sync" "time" @@ -69,14 +69,22 @@ func (c *Client) GetNodeAPIAddr(nodeAddr string) (string, error) { return "", fmt.Errorf("write protobuf: %s", err) } - b, err = ioutil.ReadAll(conn) + // Read length of response. + _, err = io.ReadFull(conn, b) if err != nil { - handleConnError(conn) - return "", fmt.Errorf("read protobuf bytes: %s", err) + return "", err + } + sz := binary.LittleEndian.Uint16(b[0:]) + + // Read in the actual response. + p = make([]byte, sz) + _, err = io.ReadFull(conn, p) + if err != nil { + return "", err } a := &Address{} - err = proto.Unmarshal(b, a) + err = proto.Unmarshal(p, a) if err != nil { return "", fmt.Errorf("protobuf unmarshal: %s", err) } @@ -131,14 +139,23 @@ func (c *Client) Execute(er *command.ExecuteRequest, nodeAddr string, timeout ti handleConnError(conn) return nil, err } - b, err = ioutil.ReadAll(conn) + + // Read length of response. + _, err = io.ReadFull(conn, b) + if err != nil { + return nil, err + } + sz := binary.LittleEndian.Uint16(b[0:]) + + // Read in the actual response. + p = make([]byte, sz) + _, err = io.ReadFull(conn, p) if err != nil { - handleConnError(conn) return nil, err } a := &CommandExecuteResponse{} - err = proto.Unmarshal(b, a) + err = proto.Unmarshal(p, a) if err != nil { return nil, err } @@ -169,7 +186,7 @@ func (c *Client) Query(qr *command.QueryRequest, nodeAddr string, timeout time.D return nil, fmt.Errorf("command marshal: %s", err) } - // Write length of Protobuf, the Protobuf + // Write length of Protobuf, then the Protobuf b := make([]byte, 4) binary.LittleEndian.PutUint16(b[0:], uint16(len(p))) @@ -196,14 +213,23 @@ func (c *Client) Query(qr *command.QueryRequest, nodeAddr string, timeout time.D handleConnError(conn) return nil, err } - b, err = ioutil.ReadAll(conn) + + // Read length of response. + _, err = io.ReadFull(conn, b) + if err != nil { + return nil, err + } + sz := binary.LittleEndian.Uint16(b[0:]) + + // Read in the actual response. + p = make([]byte, sz) + _, err = io.ReadFull(conn, p) if err != nil { - handleConnError(conn) return nil, err } a := &CommandQueryResponse{} - err = proto.Unmarshal(b, a) + err = proto.Unmarshal(p, a) if err != nil { return nil, err } @@ -216,8 +242,20 @@ func (c *Client) Query(qr *command.QueryRequest, nodeAddr string, timeout time.D // Stats returns stats on the Client instance func (c *Client) Stats() (map[string]interface{}, error) { + c.mu.RLock() + defer c.mu.RUnlock() + + poolStats := make(map[string]interface{}, len(c.pools)) + for k, v := range c.pools { + s, err := v.Stats() + if err != nil { + return nil, err + } + poolStats[k] = s + } return map[string]interface{}{ "timeout": c.timeout, + "pool": poolStats, }, nil } diff --git a/cluster/service.go b/cluster/service.go index 5f0d642f..c2071f3c 100644 --- a/cluster/service.go +++ b/cluster/service.go @@ -153,95 +153,109 @@ func (s *Service) serve() error { func (s *Service) handleConn(conn net.Conn) { defer conn.Close() - b := make([]byte, 4) - _, err := io.ReadFull(conn, b) - if err != nil { - return - } - sz := binary.LittleEndian.Uint16(b[0:]) - - b = make([]byte, sz) - _, err = io.ReadFull(conn, b) - if err != nil { - return - } - - c := &Command{} - err = proto.Unmarshal(b, c) - if err != nil { - conn.Close() - } - - switch c.Type { - case Command_COMMAND_TYPE_GET_NODE_API_URL: - stats.Add(numGetNodeAPIRequest, 1) - s.mu.RLock() - defer s.mu.RUnlock() + for { + b := make([]byte, 4) + _, err := io.ReadFull(conn, b) + if err != nil { + return + } + sz := binary.LittleEndian.Uint16(b[0:]) - a := &Address{} - scheme := "http" - if s.https { - scheme = "https" + p := make([]byte, sz) + _, err = io.ReadFull(conn, p) + if err != nil { + return } - a.Url = fmt.Sprintf("%s://%s", scheme, s.apiAddr) - b, err = proto.Marshal(a) + c := &Command{} + err = proto.Unmarshal(p, c) if err != nil { conn.Close() } - conn.Write(b) - stats.Add(numGetNodeAPIResponse, 1) - case Command_COMMAND_TYPE_EXECUTE: - stats.Add(numExecuteRequest, 1) + switch c.Type { + case Command_COMMAND_TYPE_GET_NODE_API_URL: + stats.Add(numGetNodeAPIRequest, 1) - resp := &CommandExecuteResponse{} + s.mu.RLock() + a := &Address{} + scheme := "http" + if s.https { + scheme = "https" + } + a.Url = fmt.Sprintf("%s://%s", scheme, s.apiAddr) + s.mu.RUnlock() - er := c.GetExecuteRequest() - if er == nil { - resp.Error = "ExecuteRequest is nil" - } else { - res, err := s.db.Execute(er) + p, err = proto.Marshal(a) if err != nil { - resp.Error = err.Error() + conn.Close() + } + // Write length of Protobuf first, then write the actual Protobuf. + b = make([]byte, 4) + binary.LittleEndian.PutUint16(b[0:], uint16(len(p))) + conn.Write(b) + conn.Write(p) + stats.Add(numGetNodeAPIResponse, 1) + + case Command_COMMAND_TYPE_EXECUTE: + stats.Add(numExecuteRequest, 1) + + resp := &CommandExecuteResponse{} + + er := c.GetExecuteRequest() + if er == nil { + resp.Error = "ExecuteRequest is nil" } else { - resp.Results = make([]*command.ExecuteResult, len(res)) - for i := range res { - resp.Results[i] = res[i] + res, err := s.db.Execute(er) + if err != nil { + resp.Error = err.Error() + } else { + resp.Results = make([]*command.ExecuteResult, len(res)) + for i := range res { + resp.Results[i] = res[i] + } } } - } - b, err = proto.Marshal(resp) - if err != nil { - return - } - conn.Write(b) + p, err := proto.Marshal(resp) + if err != nil { + return + } + // Write length of Protobuf first, then write the actual Protobuf. + b = make([]byte, 4) + binary.LittleEndian.PutUint16(b[0:], uint16(len(p))) + conn.Write(b) + conn.Write(p) - case Command_COMMAND_TYPE_QUERY: - stats.Add(numQueryRequest, 1) + case Command_COMMAND_TYPE_QUERY: + stats.Add(numQueryRequest, 1) - resp := &CommandQueryResponse{} + resp := &CommandQueryResponse{} - qr := c.GetQueryRequest() - if qr == nil { - resp.Error = "QueryRequest is nil" - } else { - res, err := s.db.Query(qr) - if err != nil { - resp.Error = err.Error() + qr := c.GetQueryRequest() + if qr == nil { + resp.Error = "QueryRequest is nil" } else { - resp.Rows = make([]*command.QueryRows, len(res)) - for i := range res { - resp.Rows[i] = res[i] + res, err := s.db.Query(qr) + if err != nil { + resp.Error = err.Error() + } else { + resp.Rows = make([]*command.QueryRows, len(res)) + for i := range res { + resp.Rows[i] = res[i] + } } } - } - b, err = proto.Marshal(resp) - if err != nil { - return + p, err = proto.Marshal(resp) + if err != nil { + return + } + // Write length of Protobuf first, then write the actual Protobuf. + b = make([]byte, 4) + binary.LittleEndian.PutUint16(b[0:], uint16(len(p))) + conn.Write(b) + conn.Write(p) } - conn.Write(b) } }