1
0
Fork 0

Long-lived connections in cluster service

Needed now that we're using connection pooling. Unit tests pass.
master
Philip O'Toole 3 years ago
parent c8483d2ec5
commit 3a70db5150

@ -4,7 +4,7 @@ import (
"encoding/binary"
"errors"
"fmt"
"io/ioutil"
"io"
"net"
"sync"
"time"
@ -69,14 +69,22 @@ func (c *Client) GetNodeAPIAddr(nodeAddr string) (string, error) {
return "", fmt.Errorf("write protobuf: %s", err)
}
b, err = ioutil.ReadAll(conn)
// Read length of response.
_, err = io.ReadFull(conn, b)
if err != nil {
handleConnError(conn)
return "", fmt.Errorf("read protobuf bytes: %s", err)
return "", err
}
sz := binary.LittleEndian.Uint16(b[0:])
// Read in the actual response.
p = make([]byte, sz)
_, err = io.ReadFull(conn, p)
if err != nil {
return "", err
}
a := &Address{}
err = proto.Unmarshal(b, a)
err = proto.Unmarshal(p, a)
if err != nil {
return "", fmt.Errorf("protobuf unmarshal: %s", err)
}
@ -131,14 +139,23 @@ func (c *Client) Execute(er *command.ExecuteRequest, nodeAddr string, timeout ti
handleConnError(conn)
return nil, err
}
b, err = ioutil.ReadAll(conn)
// Read length of response.
_, err = io.ReadFull(conn, b)
if err != nil {
return nil, err
}
sz := binary.LittleEndian.Uint16(b[0:])
// Read in the actual response.
p = make([]byte, sz)
_, err = io.ReadFull(conn, p)
if err != nil {
handleConnError(conn)
return nil, err
}
a := &CommandExecuteResponse{}
err = proto.Unmarshal(b, a)
err = proto.Unmarshal(p, a)
if err != nil {
return nil, err
}
@ -169,7 +186,7 @@ func (c *Client) Query(qr *command.QueryRequest, nodeAddr string, timeout time.D
return nil, fmt.Errorf("command marshal: %s", err)
}
// Write length of Protobuf, the Protobuf
// Write length of Protobuf, then the Protobuf
b := make([]byte, 4)
binary.LittleEndian.PutUint16(b[0:], uint16(len(p)))
@ -196,14 +213,23 @@ func (c *Client) Query(qr *command.QueryRequest, nodeAddr string, timeout time.D
handleConnError(conn)
return nil, err
}
b, err = ioutil.ReadAll(conn)
// Read length of response.
_, err = io.ReadFull(conn, b)
if err != nil {
return nil, err
}
sz := binary.LittleEndian.Uint16(b[0:])
// Read in the actual response.
p = make([]byte, sz)
_, err = io.ReadFull(conn, p)
if err != nil {
handleConnError(conn)
return nil, err
}
a := &CommandQueryResponse{}
err = proto.Unmarshal(b, a)
err = proto.Unmarshal(p, a)
if err != nil {
return nil, err
}
@ -216,8 +242,20 @@ func (c *Client) Query(qr *command.QueryRequest, nodeAddr string, timeout time.D
// Stats returns stats on the Client instance
func (c *Client) Stats() (map[string]interface{}, error) {
c.mu.RLock()
defer c.mu.RUnlock()
poolStats := make(map[string]interface{}, len(c.pools))
for k, v := range c.pools {
s, err := v.Stats()
if err != nil {
return nil, err
}
poolStats[k] = s
}
return map[string]interface{}{
"timeout": c.timeout,
"pool": poolStats,
}, nil
}

@ -153,95 +153,109 @@ func (s *Service) serve() error {
func (s *Service) handleConn(conn net.Conn) {
defer conn.Close()
b := make([]byte, 4)
_, err := io.ReadFull(conn, b)
if err != nil {
return
}
sz := binary.LittleEndian.Uint16(b[0:])
b = make([]byte, sz)
_, err = io.ReadFull(conn, b)
if err != nil {
return
}
c := &Command{}
err = proto.Unmarshal(b, c)
if err != nil {
conn.Close()
}
switch c.Type {
case Command_COMMAND_TYPE_GET_NODE_API_URL:
stats.Add(numGetNodeAPIRequest, 1)
s.mu.RLock()
defer s.mu.RUnlock()
for {
b := make([]byte, 4)
_, err := io.ReadFull(conn, b)
if err != nil {
return
}
sz := binary.LittleEndian.Uint16(b[0:])
a := &Address{}
scheme := "http"
if s.https {
scheme = "https"
p := make([]byte, sz)
_, err = io.ReadFull(conn, p)
if err != nil {
return
}
a.Url = fmt.Sprintf("%s://%s", scheme, s.apiAddr)
b, err = proto.Marshal(a)
c := &Command{}
err = proto.Unmarshal(p, c)
if err != nil {
conn.Close()
}
conn.Write(b)
stats.Add(numGetNodeAPIResponse, 1)
case Command_COMMAND_TYPE_EXECUTE:
stats.Add(numExecuteRequest, 1)
switch c.Type {
case Command_COMMAND_TYPE_GET_NODE_API_URL:
stats.Add(numGetNodeAPIRequest, 1)
resp := &CommandExecuteResponse{}
s.mu.RLock()
a := &Address{}
scheme := "http"
if s.https {
scheme = "https"
}
a.Url = fmt.Sprintf("%s://%s", scheme, s.apiAddr)
s.mu.RUnlock()
er := c.GetExecuteRequest()
if er == nil {
resp.Error = "ExecuteRequest is nil"
} else {
res, err := s.db.Execute(er)
p, err = proto.Marshal(a)
if err != nil {
resp.Error = err.Error()
conn.Close()
}
// Write length of Protobuf first, then write the actual Protobuf.
b = make([]byte, 4)
binary.LittleEndian.PutUint16(b[0:], uint16(len(p)))
conn.Write(b)
conn.Write(p)
stats.Add(numGetNodeAPIResponse, 1)
case Command_COMMAND_TYPE_EXECUTE:
stats.Add(numExecuteRequest, 1)
resp := &CommandExecuteResponse{}
er := c.GetExecuteRequest()
if er == nil {
resp.Error = "ExecuteRequest is nil"
} else {
resp.Results = make([]*command.ExecuteResult, len(res))
for i := range res {
resp.Results[i] = res[i]
res, err := s.db.Execute(er)
if err != nil {
resp.Error = err.Error()
} else {
resp.Results = make([]*command.ExecuteResult, len(res))
for i := range res {
resp.Results[i] = res[i]
}
}
}
}
b, err = proto.Marshal(resp)
if err != nil {
return
}
conn.Write(b)
p, err := proto.Marshal(resp)
if err != nil {
return
}
// Write length of Protobuf first, then write the actual Protobuf.
b = make([]byte, 4)
binary.LittleEndian.PutUint16(b[0:], uint16(len(p)))
conn.Write(b)
conn.Write(p)
case Command_COMMAND_TYPE_QUERY:
stats.Add(numQueryRequest, 1)
case Command_COMMAND_TYPE_QUERY:
stats.Add(numQueryRequest, 1)
resp := &CommandQueryResponse{}
resp := &CommandQueryResponse{}
qr := c.GetQueryRequest()
if qr == nil {
resp.Error = "QueryRequest is nil"
} else {
res, err := s.db.Query(qr)
if err != nil {
resp.Error = err.Error()
qr := c.GetQueryRequest()
if qr == nil {
resp.Error = "QueryRequest is nil"
} else {
resp.Rows = make([]*command.QueryRows, len(res))
for i := range res {
resp.Rows[i] = res[i]
res, err := s.db.Query(qr)
if err != nil {
resp.Error = err.Error()
} else {
resp.Rows = make([]*command.QueryRows, len(res))
for i := range res {
resp.Rows[i] = res[i]
}
}
}
}
b, err = proto.Marshal(resp)
if err != nil {
return
p, err = proto.Marshal(resp)
if err != nil {
return
}
// Write length of Protobuf first, then write the actual Protobuf.
b = make([]byte, 4)
binary.LittleEndian.PutUint16(b[0:], uint16(len(p)))
conn.Write(b)
conn.Write(p)
}
conn.Write(b)
}
}

Loading…
Cancel
Save