1
0
Fork 0

Allow HTTP client to control number of retries

master
Philip O'Toole 8 months ago
parent 2562d654e8
commit 77ce0867bb

@ -22,9 +22,9 @@ import (
) )
const ( const (
initialPoolSize = 4 initialPoolSize = 4
maxPoolCapacity = 64 maxPoolCapacity = 64
maxRetries = 0 defaultMaxRetries = 8
protoBufferLengthSize = 8 protoBufferLengthSize = 8
) )
@ -112,7 +112,7 @@ func (c *Client) GetNodeAPIAddr(nodeAddr string, timeout time.Duration) (string,
command := &proto.Command{ command := &proto.Command{
Type: proto.Command_COMMAND_TYPE_GET_NODE_API_URL, Type: proto.Command_COMMAND_TYPE_GET_NODE_API_URL,
} }
p, nr, err := c.retry(command, nodeAddr, timeout) p, nr, err := c.retry(command, nodeAddr, timeout, defaultMaxRetries)
stats.Add(numGetNodeAPIRequestRetries, int64(nr)) stats.Add(numGetNodeAPIRequestRetries, int64(nr))
if err != nil { if err != nil {
return "", err return "", err
@ -130,7 +130,7 @@ func (c *Client) GetNodeAPIAddr(nodeAddr string, timeout time.Duration) (string,
// Execute performs an Execute on a remote node. If username is an empty string // Execute performs an Execute on a remote node. If username is an empty string
// no credential information will be included in the Execute request to the // no credential information will be included in the Execute request to the
// remote node. // remote node.
func (c *Client) Execute(er *command.ExecuteRequest, nodeAddr string, creds *proto.Credentials, timeout time.Duration) ([]*command.ExecuteResult, error) { func (c *Client) Execute(er *command.ExecuteRequest, nodeAddr string, creds *proto.Credentials, timeout time.Duration, retries int) ([]*command.ExecuteResult, error) {
command := &proto.Command{ command := &proto.Command{
Type: proto.Command_COMMAND_TYPE_EXECUTE, Type: proto.Command_COMMAND_TYPE_EXECUTE,
Request: &proto.Command_ExecuteRequest{ Request: &proto.Command_ExecuteRequest{
@ -138,7 +138,7 @@ func (c *Client) Execute(er *command.ExecuteRequest, nodeAddr string, creds *pro
}, },
Credentials: creds, Credentials: creds,
} }
p, nr, err := c.retry(command, nodeAddr, timeout) p, nr, err := c.retry(command, nodeAddr, timeout, retries)
stats.Add(numClientExecuteRetries, int64(nr)) stats.Add(numClientExecuteRetries, int64(nr))
if err != nil { if err != nil {
return nil, err return nil, err
@ -165,7 +165,7 @@ func (c *Client) Query(qr *command.QueryRequest, nodeAddr string, creds *proto.C
}, },
Credentials: creds, Credentials: creds,
} }
p, nr, err := c.retry(command, nodeAddr, timeout) p, nr, err := c.retry(command, nodeAddr, timeout, defaultMaxRetries)
stats.Add(numClientQueryRetries, int64(nr)) stats.Add(numClientQueryRetries, int64(nr))
if err != nil { if err != nil {
return nil, err return nil, err
@ -184,7 +184,7 @@ func (c *Client) Query(qr *command.QueryRequest, nodeAddr string, creds *proto.C
} }
// Request performs an ExecuteQuery on a remote node. // Request performs an ExecuteQuery on a remote node.
func (c *Client) Request(r *command.ExecuteQueryRequest, nodeAddr string, creds *proto.Credentials, timeout time.Duration) ([]*command.ExecuteQueryResponse, error) { func (c *Client) Request(r *command.ExecuteQueryRequest, nodeAddr string, creds *proto.Credentials, timeout time.Duration, retries int) ([]*command.ExecuteQueryResponse, error) {
command := &proto.Command{ command := &proto.Command{
Type: proto.Command_COMMAND_TYPE_REQUEST, Type: proto.Command_COMMAND_TYPE_REQUEST,
Request: &proto.Command_ExecuteQueryRequest{ Request: &proto.Command_ExecuteQueryRequest{
@ -192,7 +192,7 @@ func (c *Client) Request(r *command.ExecuteQueryRequest, nodeAddr string, creds
}, },
Credentials: creds, Credentials: creds,
} }
p, nr, err := c.retry(command, nodeAddr, timeout) p, nr, err := c.retry(command, nodeAddr, timeout, retries)
stats.Add(numClientRequestRetries, int64(nr)) stats.Add(numClientRequestRetries, int64(nr))
if err != nil { if err != nil {
return nil, err return nil, err
@ -266,7 +266,7 @@ func (c *Client) Backup(br *command.BackupRequest, nodeAddr string, creds *proto
} }
// Load loads a SQLite file into the database. // Load loads a SQLite file into the database.
func (c *Client) Load(lr *command.LoadRequest, nodeAddr string, creds *proto.Credentials, timeout time.Duration) error { func (c *Client) Load(lr *command.LoadRequest, nodeAddr string, creds *proto.Credentials, timeout time.Duration, retries int) error {
command := &proto.Command{ command := &proto.Command{
Type: proto.Command_COMMAND_TYPE_LOAD, Type: proto.Command_COMMAND_TYPE_LOAD,
Request: &proto.Command_LoadRequest{ Request: &proto.Command_LoadRequest{
@ -274,7 +274,7 @@ func (c *Client) Load(lr *command.LoadRequest, nodeAddr string, creds *proto.Cre
}, },
Credentials: creds, Credentials: creds,
} }
p, nr, err := c.retry(command, nodeAddr, timeout) p, nr, err := c.retry(command, nodeAddr, timeout, retries)
stats.Add(numClientLoadRetries, int64(nr)) stats.Add(numClientLoadRetries, int64(nr))
if err != nil { if err != nil {
return err return err
@ -488,7 +488,7 @@ func (c *Client) dial(nodeAddr string, timeout time.Duration) (net.Conn, error)
// retry retries a command on a remote node. It does this so we churn through connections // retry retries a command on a remote node. It does this so we churn through connections
// in the pool if we hit an error, as the remote node may have restarted and the pool's // in the pool if we hit an error, as the remote node may have restarted and the pool's
// connections are now stale. // connections are now stale.
func (c *Client) retry(command *proto.Command, nodeAddr string, timeout time.Duration) ([]byte, int, error) { func (c *Client) retry(command *proto.Command, nodeAddr string, timeout time.Duration, maxRetries int) ([]byte, int, error) {
var p []byte var p []byte
var errOuter error var errOuter error
var nRetries int var nRetries int

@ -90,7 +90,7 @@ func Test_ClientExecute(t *testing.T) {
c := NewClient(&simpleDialer{}, 0) c := NewClient(&simpleDialer{}, 0)
_, err := c.Execute(executeRequestFromString("INSERT INTO foo (id) VALUES (1)"), _, err := c.Execute(executeRequestFromString("INSERT INTO foo (id) VALUES (1)"),
srv.Addr(), nil, time.Second) srv.Addr(), nil, time.Second, defaultMaxRetries)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -168,7 +168,7 @@ func Test_ClientRequest(t *testing.T) {
c := NewClient(&simpleDialer{}, 0) c := NewClient(&simpleDialer{}, 0)
_, err := c.Request(executeQueryRequestFromString("SELECT * FROM foo"), _, err := c.Request(executeQueryRequestFromString("SELECT * FROM foo"),
srv.Addr(), nil, time.Second) srv.Addr(), nil, time.Second, defaultMaxRetries)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

@ -48,7 +48,7 @@ func Test_ServiceExecute(t *testing.T) {
} }
return nil, errors.New("execute failed") return nil, errors.New("execute failed")
} }
_, err := c.Execute(executeRequestFromString("some SQL"), s.Addr(), NO_CREDS, longWait) _, err := c.Execute(executeRequestFromString("some SQL"), s.Addr(), NO_CREDS, longWait, defaultMaxRetries)
if err == nil { if err == nil {
t.Fatalf("client failed to report error") t.Fatalf("client failed to report error")
} }
@ -66,7 +66,7 @@ func Test_ServiceExecute(t *testing.T) {
} }
return []*command.ExecuteResult{result}, nil return []*command.ExecuteResult{result}, nil
} }
res, err := c.Execute(executeRequestFromString("some SQL"), s.Addr(), NO_CREDS, longWait) res, err := c.Execute(executeRequestFromString("some SQL"), s.Addr(), NO_CREDS, longWait, defaultMaxRetries)
if err != nil { if err != nil {
t.Fatalf("failed to execute query: %s", err.Error()) t.Fatalf("failed to execute query: %s", err.Error())
} }
@ -83,7 +83,7 @@ func Test_ServiceExecute(t *testing.T) {
} }
return []*command.ExecuteResult{result}, nil return []*command.ExecuteResult{result}, nil
} }
res, err = c.Execute(executeRequestFromString("some SQL"), s.Addr(), NO_CREDS, longWait) res, err = c.Execute(executeRequestFromString("some SQL"), s.Addr(), NO_CREDS, longWait, defaultMaxRetries)
if err != nil { if err != nil {
t.Fatalf("failed to execute: %s", err.Error()) t.Fatalf("failed to execute: %s", err.Error())
} }
@ -95,7 +95,7 @@ func Test_ServiceExecute(t *testing.T) {
time.Sleep(longWait) time.Sleep(longWait)
return nil, nil return nil, nil
} }
_, err = c.Execute(executeRequestFromString("some SQL"), s.Addr(), NO_CREDS, shortWait) _, err = c.Execute(executeRequestFromString("some SQL"), s.Addr(), NO_CREDS, shortWait, defaultMaxRetries)
if err == nil { if err == nil {
t.Fatalf("failed to receive expected error") t.Fatalf("failed to receive expected error")
} }
@ -340,7 +340,7 @@ func Test_ServiceLoad(t *testing.T) {
return nil return nil
} }
err := c.Load(loadRequest(testData), s.Addr(), NO_CREDS, longWait) err := c.Load(loadRequest(testData), s.Addr(), NO_CREDS, longWait, defaultMaxRetries)
if err != nil { if err != nil {
t.Fatalf("failed to load database: %s", err.Error()) t.Fatalf("failed to load database: %s", err.Error())
} }

@ -197,7 +197,7 @@ func Test_NewServiceTestExecuteQueryAuthNoCredentials(t *testing.T) {
t.Fatalf("failed to set cluster client local parameters: %s", err) t.Fatalf("failed to set cluster client local parameters: %s", err)
} }
er := &command.ExecuteRequest{} er := &command.ExecuteRequest{}
_, err := cl.Execute(er, s.Addr(), nil, 5*time.Second) _, err := cl.Execute(er, s.Addr(), nil, 5*time.Second, defaultMaxRetries)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -240,11 +240,11 @@ func Test_NewServiceTestExecuteQueryAuth(t *testing.T) {
t.Fatalf("failed to set cluster client local parameters: %s", err) t.Fatalf("failed to set cluster client local parameters: %s", err)
} }
er := &command.ExecuteRequest{} er := &command.ExecuteRequest{}
_, err := cl.Execute(er, s.Addr(), makeCredentials("alice", "secret1"), 5*time.Second) _, err := cl.Execute(er, s.Addr(), makeCredentials("alice", "secret1"), 5*time.Second, defaultMaxRetries)
if err != nil { if err != nil {
t.Fatal("alice improperly unauthorized to execute") t.Fatal("alice improperly unauthorized to execute")
} }
_, err = cl.Execute(er, s.Addr(), makeCredentials("bob", "secret1"), 5*time.Second) _, err = cl.Execute(er, s.Addr(), makeCredentials("bob", "secret1"), 5*time.Second, defaultMaxRetries)
if err == nil { if err == nil {
t.Fatal("bob improperly authorized to execute") t.Fatal("bob improperly authorized to execute")
} }

@ -4,6 +4,7 @@ import (
"fmt" "fmt"
"net/http" "net/http"
"net/url" "net/url"
"strconv"
"strings" "strings"
"time" "time"
@ -34,6 +35,15 @@ func NewQueryParams(r *http.Request) (QueryParams, error) {
} }
} }
} }
for _, k := range []string{"retries"} {
r, ok := qp[k]
if ok {
_, err := strconv.Atoi(r)
if err != nil {
return nil, fmt.Errorf("%s is not a valid integer", k)
}
}
}
q, ok := qp["q"] q, ok := qp["q"]
if ok { if ok {
if q == "" { if q == "" {
@ -161,6 +171,16 @@ func (qp QueryParams) Timeout(def time.Duration) time.Duration {
return d return d
} }
// Retries returns the requested number of retries.
func (qp QueryParams) Retries(def int) int {
i, ok := qp["retries"]
if !ok {
return def
}
r, _ := strconv.Atoi(i)
return r
}
// Version returns the requested version. // Version returns the requested version.
func (qp QueryParams) Version() string { func (qp QueryParams) Version() string {
return qp["ver"] return qp["ver"]

@ -18,6 +18,8 @@ func Test_NewQueryParams(t *testing.T) {
{"Empty Query", "", QueryParams{}, false}, {"Empty Query", "", QueryParams{}, false},
{"Valid Query", "timeout=10s&q=test", QueryParams{"timeout": "10s", "q": "test"}, false}, {"Valid Query", "timeout=10s&q=test", QueryParams{"timeout": "10s", "q": "test"}, false},
{"Invalid Timeout", "timeout=invalid", nil, true}, {"Invalid Timeout", "timeout=invalid", nil, true},
{"Invalid Retry", "retries=invalid", nil, true},
{"Valid Retry", "retries=4", QueryParams{"retries": "4"}, false},
{"Empty Q", "q=", nil, true}, {"Empty Q", "q=", nil, true},
{"Invalid Q", "q", nil, true}, {"Invalid Q", "q", nil, true},
{"Valid Q, no case changes", "q=SELeCT", QueryParams{"q": "SELeCT"}, false}, {"Valid Q, no case changes", "q=SELeCT", QueryParams{"q": "SELeCT"}, false},

@ -102,19 +102,19 @@ type Cluster interface {
GetAddresser GetAddresser
// Execute performs an Execute Request on a remote node. // Execute performs an Execute Request on a remote node.
Execute(er *proto.ExecuteRequest, nodeAddr string, creds *clstrPB.Credentials, timeout time.Duration) ([]*proto.ExecuteResult, error) Execute(er *proto.ExecuteRequest, nodeAddr string, creds *clstrPB.Credentials, timeout time.Duration, retries int) ([]*proto.ExecuteResult, error)
// Query performs an Query Request on a remote node. // Query performs an Query Request on a remote node.
Query(qr *proto.QueryRequest, nodeAddr string, creds *clstrPB.Credentials, timeout time.Duration) ([]*proto.QueryRows, error) Query(qr *proto.QueryRequest, nodeAddr string, creds *clstrPB.Credentials, timeout time.Duration) ([]*proto.QueryRows, error)
// Request performs an ExecuteQuery Request on a remote node. // Request performs an ExecuteQuery Request on a remote node.
Request(eqr *proto.ExecuteQueryRequest, nodeAddr string, creds *clstrPB.Credentials, timeout time.Duration) ([]*proto.ExecuteQueryResponse, error) Request(eqr *proto.ExecuteQueryRequest, nodeAddr string, creds *clstrPB.Credentials, timeout time.Duration, retries int) ([]*proto.ExecuteQueryResponse, error)
// Backup retrieves a backup from a remote node and writes to the io.Writer. // Backup retrieves a backup from a remote node and writes to the io.Writer.
Backup(br *proto.BackupRequest, nodeAddr string, creds *clstrPB.Credentials, timeout time.Duration, w io.Writer) error Backup(br *proto.BackupRequest, nodeAddr string, creds *clstrPB.Credentials, timeout time.Duration, w io.Writer) error
// Load loads a SQLite database into the node. // Load loads a SQLite database into the node.
Load(lr *proto.LoadRequest, nodeAddr string, creds *clstrPB.Credentials, timeout time.Duration) error Load(lr *proto.LoadRequest, nodeAddr string, creds *clstrPB.Credentials, timeout time.Duration, retries int) error
// RemoveNode removes a node from the cluster. // RemoveNode removes a node from the cluster.
RemoveNode(rn *proto.RemoveNodeRequest, nodeAddr string, creds *clstrPB.Credentials, timeout time.Duration) error RemoveNode(rn *proto.RemoveNodeRequest, nodeAddr string, creds *clstrPB.Credentials, timeout time.Duration) error
@ -707,7 +707,8 @@ func (s *Service) handleLoad(w http.ResponseWriter, r *http.Request, qp QueryPar
} }
w.Header().Add(ServedByHTTPHeader, addr) w.Header().Add(ServedByHTTPHeader, addr)
loadErr := s.cluster.Load(lr, addr, makeCredentials(username, password), qp.Timeout(defaultTimeout)) loadErr := s.cluster.Load(lr, addr, makeCredentials(username, password),
qp.Timeout(defaultTimeout), qp.Retries(0))
if loadErr != nil { if loadErr != nil {
if loadErr.Error() == "unauthorized" { if loadErr.Error() == "unauthorized" {
http.Error(w, "remote load not authorized", http.StatusUnauthorized) http.Error(w, "remote load not authorized", http.StatusUnauthorized)
@ -1142,7 +1143,8 @@ func (s *Service) execute(w http.ResponseWriter, r *http.Request, qp QueryParams
} }
w.Header().Add(ServedByHTTPHeader, addr) w.Header().Add(ServedByHTTPHeader, addr)
results, resultsErr = s.cluster.Execute(er, addr, makeCredentials(username, password), qp.Timeout(defaultTimeout)) results, resultsErr = s.cluster.Execute(er, addr, makeCredentials(username, password),
qp.Timeout(defaultTimeout), qp.Retries(0))
if resultsErr != nil { if resultsErr != nil {
stats.Add(numRemoteExecutionsFailed, 1) stats.Add(numRemoteExecutionsFailed, 1)
if resultsErr.Error() == "unauthorized" { if resultsErr.Error() == "unauthorized" {
@ -1315,7 +1317,8 @@ func (s *Service) handleRequest(w http.ResponseWriter, r *http.Request, qp Query
} }
w.Header().Add(ServedByHTTPHeader, addr) w.Header().Add(ServedByHTTPHeader, addr)
results, resultErr = s.cluster.Request(eqr, addr, makeCredentials(username, password), qp.Timeout(defaultTimeout)) results, resultErr = s.cluster.Request(eqr, addr, makeCredentials(username, password),
qp.Timeout(defaultTimeout), qp.Retries(0))
if resultErr != nil { if resultErr != nil {
stats.Add(numRemoteRequestsFailed, 1) stats.Add(numRemoteRequestsFailed, 1)
if resultErr.Error() == "unauthorized" { if resultErr.Error() == "unauthorized" {
@ -1516,7 +1519,7 @@ func (s *Service) runQueue() {
req.SequenceNumber, s.Addr().String()) req.SequenceNumber, s.Addr().String())
stats.Add(numQueuedExecutionsNoLeader, 1) stats.Add(numQueuedExecutionsNoLeader, 1)
} else { } else {
_, err = s.cluster.Execute(er, addr, nil, defaultTimeout) _, err = s.cluster.Execute(er, addr, nil, defaultTimeout, 0)
if err != nil { if err != nil {
s.logger.Printf("execute queue write failed for sequence number %d on node %s: %s", s.logger.Printf("execute queue write failed for sequence number %d on node %s: %s",
req.SequenceNumber, s.Addr().String(), err.Error()) req.SequenceNumber, s.Addr().String(), err.Error())

@ -1411,7 +1411,7 @@ func (m *mockClusterService) GetNodeAPIAddr(a string, t time.Duration) (string,
return m.apiAddr, nil return m.apiAddr, nil
} }
func (m *mockClusterService) Execute(er *command.ExecuteRequest, addr string, creds *cluster.Credentials, t time.Duration) ([]*command.ExecuteResult, error) { func (m *mockClusterService) Execute(er *command.ExecuteRequest, addr string, creds *cluster.Credentials, t time.Duration, r int) ([]*command.ExecuteResult, error) {
if m.executeFn != nil { if m.executeFn != nil {
return m.executeFn(er, addr, t) return m.executeFn(er, addr, t)
} }
@ -1425,7 +1425,7 @@ func (m *mockClusterService) Query(qr *command.QueryRequest, addr string, creds
return nil, nil return nil, nil
} }
func (m *mockClusterService) Request(eqr *command.ExecuteQueryRequest, nodeAddr string, creds *cluster.Credentials, timeout time.Duration) ([]*command.ExecuteQueryResponse, error) { func (m *mockClusterService) Request(eqr *command.ExecuteQueryRequest, nodeAddr string, creds *cluster.Credentials, timeout time.Duration, r int) ([]*command.ExecuteQueryResponse, error) {
if m.requestFn != nil { if m.requestFn != nil {
return m.requestFn(eqr, nodeAddr, timeout) return m.requestFn(eqr, nodeAddr, timeout)
} }
@ -1439,7 +1439,7 @@ func (m *mockClusterService) Backup(br *command.BackupRequest, addr string, cred
return nil return nil
} }
func (m *mockClusterService) Load(lr *command.LoadRequest, nodeAddr string, creds *cluster.Credentials, timeout time.Duration) error { func (m *mockClusterService) Load(lr *command.LoadRequest, nodeAddr string, creds *cluster.Credentials, timeout time.Duration, r int) error {
if m.loadFn != nil { if m.loadFn != nil {
return m.loadFn(lr, nodeAddr, timeout) return m.loadFn(lr, nodeAddr, timeout)
} }

Loading…
Cancel
Save