diff --git a/cluster/client.go b/cluster/client.go index 56e43f6a..09601e34 100644 --- a/cluster/client.go +++ b/cluster/client.go @@ -22,9 +22,9 @@ import ( ) const ( - initialPoolSize = 4 - maxPoolCapacity = 64 - maxRetries = 0 + initialPoolSize = 4 + maxPoolCapacity = 64 + defaultMaxRetries = 8 protoBufferLengthSize = 8 ) @@ -112,7 +112,7 @@ func (c *Client) GetNodeAPIAddr(nodeAddr string, timeout time.Duration) (string, command := &proto.Command{ Type: proto.Command_COMMAND_TYPE_GET_NODE_API_URL, } - p, nr, err := c.retry(command, nodeAddr, timeout) + p, nr, err := c.retry(command, nodeAddr, timeout, defaultMaxRetries) stats.Add(numGetNodeAPIRequestRetries, int64(nr)) if err != nil { return "", err @@ -130,7 +130,7 @@ func (c *Client) GetNodeAPIAddr(nodeAddr string, timeout time.Duration) (string, // Execute performs an Execute on a remote node. If username is an empty string // no credential information will be included in the Execute request to the // remote node. -func (c *Client) Execute(er *command.ExecuteRequest, nodeAddr string, creds *proto.Credentials, timeout time.Duration) ([]*command.ExecuteResult, error) { +func (c *Client) Execute(er *command.ExecuteRequest, nodeAddr string, creds *proto.Credentials, timeout time.Duration, retries int) ([]*command.ExecuteResult, error) { command := &proto.Command{ Type: proto.Command_COMMAND_TYPE_EXECUTE, Request: &proto.Command_ExecuteRequest{ @@ -138,7 +138,7 @@ func (c *Client) Execute(er *command.ExecuteRequest, nodeAddr string, creds *pro }, Credentials: creds, } - p, nr, err := c.retry(command, nodeAddr, timeout) + p, nr, err := c.retry(command, nodeAddr, timeout, retries) stats.Add(numClientExecuteRetries, int64(nr)) if err != nil { return nil, err @@ -165,7 +165,7 @@ func (c *Client) Query(qr *command.QueryRequest, nodeAddr string, creds *proto.C }, Credentials: creds, } - p, nr, err := c.retry(command, nodeAddr, timeout) + p, nr, err := c.retry(command, nodeAddr, timeout, defaultMaxRetries) stats.Add(numClientQueryRetries, int64(nr)) if err != nil { return nil, err @@ -184,7 +184,7 @@ func (c *Client) Query(qr *command.QueryRequest, nodeAddr string, creds *proto.C } // Request performs an ExecuteQuery on a remote node. -func (c *Client) Request(r *command.ExecuteQueryRequest, nodeAddr string, creds *proto.Credentials, timeout time.Duration) ([]*command.ExecuteQueryResponse, error) { +func (c *Client) Request(r *command.ExecuteQueryRequest, nodeAddr string, creds *proto.Credentials, timeout time.Duration, retries int) ([]*command.ExecuteQueryResponse, error) { command := &proto.Command{ Type: proto.Command_COMMAND_TYPE_REQUEST, Request: &proto.Command_ExecuteQueryRequest{ @@ -192,7 +192,7 @@ func (c *Client) Request(r *command.ExecuteQueryRequest, nodeAddr string, creds }, Credentials: creds, } - p, nr, err := c.retry(command, nodeAddr, timeout) + p, nr, err := c.retry(command, nodeAddr, timeout, retries) stats.Add(numClientRequestRetries, int64(nr)) if err != nil { return nil, err @@ -266,7 +266,7 @@ func (c *Client) Backup(br *command.BackupRequest, nodeAddr string, creds *proto } // Load loads a SQLite file into the database. -func (c *Client) Load(lr *command.LoadRequest, nodeAddr string, creds *proto.Credentials, timeout time.Duration) error { +func (c *Client) Load(lr *command.LoadRequest, nodeAddr string, creds *proto.Credentials, timeout time.Duration, retries int) error { command := &proto.Command{ Type: proto.Command_COMMAND_TYPE_LOAD, Request: &proto.Command_LoadRequest{ @@ -274,7 +274,7 @@ func (c *Client) Load(lr *command.LoadRequest, nodeAddr string, creds *proto.Cre }, Credentials: creds, } - p, nr, err := c.retry(command, nodeAddr, timeout) + p, nr, err := c.retry(command, nodeAddr, timeout, retries) stats.Add(numClientLoadRetries, int64(nr)) if err != nil { return err @@ -488,7 +488,7 @@ func (c *Client) dial(nodeAddr string, timeout time.Duration) (net.Conn, error) // retry retries a command on a remote node. It does this so we churn through connections // in the pool if we hit an error, as the remote node may have restarted and the pool's // connections are now stale. -func (c *Client) retry(command *proto.Command, nodeAddr string, timeout time.Duration) ([]byte, int, error) { +func (c *Client) retry(command *proto.Command, nodeAddr string, timeout time.Duration, maxRetries int) ([]byte, int, error) { var p []byte var errOuter error var nRetries int diff --git a/cluster/client_test.go b/cluster/client_test.go index eaff6cbb..2e416de9 100644 --- a/cluster/client_test.go +++ b/cluster/client_test.go @@ -90,7 +90,7 @@ func Test_ClientExecute(t *testing.T) { c := NewClient(&simpleDialer{}, 0) _, err := c.Execute(executeRequestFromString("INSERT INTO foo (id) VALUES (1)"), - srv.Addr(), nil, time.Second) + srv.Addr(), nil, time.Second, defaultMaxRetries) if err != nil { t.Fatal(err) } @@ -168,7 +168,7 @@ func Test_ClientRequest(t *testing.T) { c := NewClient(&simpleDialer{}, 0) _, err := c.Request(executeQueryRequestFromString("SELECT * FROM foo"), - srv.Addr(), nil, time.Second) + srv.Addr(), nil, time.Second, defaultMaxRetries) if err != nil { t.Fatal(err) } diff --git a/cluster/service_db_clstr_test.go b/cluster/service_db_clstr_test.go index 38f80d33..d3326acf 100644 --- a/cluster/service_db_clstr_test.go +++ b/cluster/service_db_clstr_test.go @@ -48,7 +48,7 @@ func Test_ServiceExecute(t *testing.T) { } return nil, errors.New("execute failed") } - _, err := c.Execute(executeRequestFromString("some SQL"), s.Addr(), NO_CREDS, longWait) + _, err := c.Execute(executeRequestFromString("some SQL"), s.Addr(), NO_CREDS, longWait, defaultMaxRetries) if err == nil { t.Fatalf("client failed to report error") } @@ -66,7 +66,7 @@ func Test_ServiceExecute(t *testing.T) { } return []*command.ExecuteResult{result}, nil } - res, err := c.Execute(executeRequestFromString("some SQL"), s.Addr(), NO_CREDS, longWait) + res, err := c.Execute(executeRequestFromString("some SQL"), s.Addr(), NO_CREDS, longWait, defaultMaxRetries) if err != nil { t.Fatalf("failed to execute query: %s", err.Error()) } @@ -83,7 +83,7 @@ func Test_ServiceExecute(t *testing.T) { } return []*command.ExecuteResult{result}, nil } - res, err = c.Execute(executeRequestFromString("some SQL"), s.Addr(), NO_CREDS, longWait) + res, err = c.Execute(executeRequestFromString("some SQL"), s.Addr(), NO_CREDS, longWait, defaultMaxRetries) if err != nil { t.Fatalf("failed to execute: %s", err.Error()) } @@ -95,7 +95,7 @@ func Test_ServiceExecute(t *testing.T) { time.Sleep(longWait) return nil, nil } - _, err = c.Execute(executeRequestFromString("some SQL"), s.Addr(), NO_CREDS, shortWait) + _, err = c.Execute(executeRequestFromString("some SQL"), s.Addr(), NO_CREDS, shortWait, defaultMaxRetries) if err == nil { t.Fatalf("failed to receive expected error") } @@ -340,7 +340,7 @@ func Test_ServiceLoad(t *testing.T) { return nil } - err := c.Load(loadRequest(testData), s.Addr(), NO_CREDS, longWait) + err := c.Load(loadRequest(testData), s.Addr(), NO_CREDS, longWait, defaultMaxRetries) if err != nil { t.Fatalf("failed to load database: %s", err.Error()) } diff --git a/cluster/service_test.go b/cluster/service_test.go index 6cccf615..14d11f53 100644 --- a/cluster/service_test.go +++ b/cluster/service_test.go @@ -197,7 +197,7 @@ func Test_NewServiceTestExecuteQueryAuthNoCredentials(t *testing.T) { t.Fatalf("failed to set cluster client local parameters: %s", err) } er := &command.ExecuteRequest{} - _, err := cl.Execute(er, s.Addr(), nil, 5*time.Second) + _, err := cl.Execute(er, s.Addr(), nil, 5*time.Second, defaultMaxRetries) if err != nil { t.Fatal(err) } @@ -240,11 +240,11 @@ func Test_NewServiceTestExecuteQueryAuth(t *testing.T) { t.Fatalf("failed to set cluster client local parameters: %s", err) } er := &command.ExecuteRequest{} - _, err := cl.Execute(er, s.Addr(), makeCredentials("alice", "secret1"), 5*time.Second) + _, err := cl.Execute(er, s.Addr(), makeCredentials("alice", "secret1"), 5*time.Second, defaultMaxRetries) if err != nil { t.Fatal("alice improperly unauthorized to execute") } - _, err = cl.Execute(er, s.Addr(), makeCredentials("bob", "secret1"), 5*time.Second) + _, err = cl.Execute(er, s.Addr(), makeCredentials("bob", "secret1"), 5*time.Second, defaultMaxRetries) if err == nil { t.Fatal("bob improperly authorized to execute") } diff --git a/http/query_params.go b/http/query_params.go index 8f7e4f3d..4823c1b8 100644 --- a/http/query_params.go +++ b/http/query_params.go @@ -4,6 +4,7 @@ import ( "fmt" "net/http" "net/url" + "strconv" "strings" "time" @@ -34,6 +35,15 @@ func NewQueryParams(r *http.Request) (QueryParams, error) { } } } + for _, k := range []string{"retries"} { + r, ok := qp[k] + if ok { + _, err := strconv.Atoi(r) + if err != nil { + return nil, fmt.Errorf("%s is not a valid integer", k) + } + } + } q, ok := qp["q"] if ok { if q == "" { @@ -161,6 +171,16 @@ func (qp QueryParams) Timeout(def time.Duration) time.Duration { return d } +// Retries returns the requested number of retries. +func (qp QueryParams) Retries(def int) int { + i, ok := qp["retries"] + if !ok { + return def + } + r, _ := strconv.Atoi(i) + return r +} + // Version returns the requested version. func (qp QueryParams) Version() string { return qp["ver"] diff --git a/http/query_params_test.go b/http/query_params_test.go index 0f1c20a5..3fe3f798 100644 --- a/http/query_params_test.go +++ b/http/query_params_test.go @@ -18,6 +18,8 @@ func Test_NewQueryParams(t *testing.T) { {"Empty Query", "", QueryParams{}, false}, {"Valid Query", "timeout=10s&q=test", QueryParams{"timeout": "10s", "q": "test"}, false}, {"Invalid Timeout", "timeout=invalid", nil, true}, + {"Invalid Retry", "retries=invalid", nil, true}, + {"Valid Retry", "retries=4", QueryParams{"retries": "4"}, false}, {"Empty Q", "q=", nil, true}, {"Invalid Q", "q", nil, true}, {"Valid Q, no case changes", "q=SELeCT", QueryParams{"q": "SELeCT"}, false}, diff --git a/http/service.go b/http/service.go index a1db4df9..c65a77e8 100644 --- a/http/service.go +++ b/http/service.go @@ -102,19 +102,19 @@ type Cluster interface { GetAddresser // Execute performs an Execute Request on a remote node. - Execute(er *proto.ExecuteRequest, nodeAddr string, creds *clstrPB.Credentials, timeout time.Duration) ([]*proto.ExecuteResult, error) + Execute(er *proto.ExecuteRequest, nodeAddr string, creds *clstrPB.Credentials, timeout time.Duration, retries int) ([]*proto.ExecuteResult, error) // Query performs an Query Request on a remote node. Query(qr *proto.QueryRequest, nodeAddr string, creds *clstrPB.Credentials, timeout time.Duration) ([]*proto.QueryRows, error) // Request performs an ExecuteQuery Request on a remote node. - Request(eqr *proto.ExecuteQueryRequest, nodeAddr string, creds *clstrPB.Credentials, timeout time.Duration) ([]*proto.ExecuteQueryResponse, error) + Request(eqr *proto.ExecuteQueryRequest, nodeAddr string, creds *clstrPB.Credentials, timeout time.Duration, retries int) ([]*proto.ExecuteQueryResponse, error) // Backup retrieves a backup from a remote node and writes to the io.Writer. Backup(br *proto.BackupRequest, nodeAddr string, creds *clstrPB.Credentials, timeout time.Duration, w io.Writer) error // Load loads a SQLite database into the node. - Load(lr *proto.LoadRequest, nodeAddr string, creds *clstrPB.Credentials, timeout time.Duration) error + Load(lr *proto.LoadRequest, nodeAddr string, creds *clstrPB.Credentials, timeout time.Duration, retries int) error // RemoveNode removes a node from the cluster. RemoveNode(rn *proto.RemoveNodeRequest, nodeAddr string, creds *clstrPB.Credentials, timeout time.Duration) error @@ -707,7 +707,8 @@ func (s *Service) handleLoad(w http.ResponseWriter, r *http.Request, qp QueryPar } w.Header().Add(ServedByHTTPHeader, addr) - loadErr := s.cluster.Load(lr, addr, makeCredentials(username, password), qp.Timeout(defaultTimeout)) + loadErr := s.cluster.Load(lr, addr, makeCredentials(username, password), + qp.Timeout(defaultTimeout), qp.Retries(0)) if loadErr != nil { if loadErr.Error() == "unauthorized" { http.Error(w, "remote load not authorized", http.StatusUnauthorized) @@ -1142,7 +1143,8 @@ func (s *Service) execute(w http.ResponseWriter, r *http.Request, qp QueryParams } w.Header().Add(ServedByHTTPHeader, addr) - results, resultsErr = s.cluster.Execute(er, addr, makeCredentials(username, password), qp.Timeout(defaultTimeout)) + results, resultsErr = s.cluster.Execute(er, addr, makeCredentials(username, password), + qp.Timeout(defaultTimeout), qp.Retries(0)) if resultsErr != nil { stats.Add(numRemoteExecutionsFailed, 1) if resultsErr.Error() == "unauthorized" { @@ -1315,7 +1317,8 @@ func (s *Service) handleRequest(w http.ResponseWriter, r *http.Request, qp Query } w.Header().Add(ServedByHTTPHeader, addr) - results, resultErr = s.cluster.Request(eqr, addr, makeCredentials(username, password), qp.Timeout(defaultTimeout)) + results, resultErr = s.cluster.Request(eqr, addr, makeCredentials(username, password), + qp.Timeout(defaultTimeout), qp.Retries(0)) if resultErr != nil { stats.Add(numRemoteRequestsFailed, 1) if resultErr.Error() == "unauthorized" { @@ -1516,7 +1519,7 @@ func (s *Service) runQueue() { req.SequenceNumber, s.Addr().String()) stats.Add(numQueuedExecutionsNoLeader, 1) } else { - _, err = s.cluster.Execute(er, addr, nil, defaultTimeout) + _, err = s.cluster.Execute(er, addr, nil, defaultTimeout, 0) if err != nil { s.logger.Printf("execute queue write failed for sequence number %d on node %s: %s", req.SequenceNumber, s.Addr().String(), err.Error()) diff --git a/http/service_test.go b/http/service_test.go index 277ffc21..d5efbc1c 100644 --- a/http/service_test.go +++ b/http/service_test.go @@ -1411,7 +1411,7 @@ func (m *mockClusterService) GetNodeAPIAddr(a string, t time.Duration) (string, return m.apiAddr, nil } -func (m *mockClusterService) Execute(er *command.ExecuteRequest, addr string, creds *cluster.Credentials, t time.Duration) ([]*command.ExecuteResult, error) { +func (m *mockClusterService) Execute(er *command.ExecuteRequest, addr string, creds *cluster.Credentials, t time.Duration, r int) ([]*command.ExecuteResult, error) { if m.executeFn != nil { return m.executeFn(er, addr, t) } @@ -1425,7 +1425,7 @@ func (m *mockClusterService) Query(qr *command.QueryRequest, addr string, creds return nil, nil } -func (m *mockClusterService) Request(eqr *command.ExecuteQueryRequest, nodeAddr string, creds *cluster.Credentials, timeout time.Duration) ([]*command.ExecuteQueryResponse, error) { +func (m *mockClusterService) Request(eqr *command.ExecuteQueryRequest, nodeAddr string, creds *cluster.Credentials, timeout time.Duration, r int) ([]*command.ExecuteQueryResponse, error) { if m.requestFn != nil { return m.requestFn(eqr, nodeAddr, timeout) } @@ -1439,7 +1439,7 @@ func (m *mockClusterService) Backup(br *command.BackupRequest, addr string, cred return nil } -func (m *mockClusterService) Load(lr *command.LoadRequest, nodeAddr string, creds *cluster.Credentials, timeout time.Duration) error { +func (m *mockClusterService) Load(lr *command.LoadRequest, nodeAddr string, creds *cluster.Credentials, timeout time.Duration, r int) error { if m.loadFn != nil { return m.loadFn(lr, nodeAddr, timeout) }