From dd6f386450cb5598bdea798d21181df2500f93ff Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Sat, 9 Dec 2023 12:42:56 -0500 Subject: [PATCH] Refactor chunk loading --- http/service.go | 70 +++++++++++++++++++++++++++---------------------- store/store.go | 1 - 2 files changed, 39 insertions(+), 32 deletions(-) diff --git a/http/service.go b/http/service.go index e7996134..330b1c38 100644 --- a/http/service.go +++ b/http/service.go @@ -39,6 +39,9 @@ const ( var ( // ErrLeaderNotFound is returned when a node cannot locate a leader ErrLeaderNotFound = errors.New("leader not found") + + // ErrRemoteLoadNotAuthorized is returned when a remote node is not authorized to load a chunk + ErrRemoteLoadNotAuthorized = errors.New("remote load not authorized") ) type ResultsError interface { @@ -687,12 +690,6 @@ func (s *Service) handleLoad(w http.ResponseWriter, r *http.Request) { return } - timeout, err := timeoutParam(r, defaultTimeout) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - chunkSz, err := chunkSizeParam(r, defaultChunkSize) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) @@ -714,7 +711,6 @@ func (s *Service) handleLoad(w http.ResponseWriter, r *http.Request) { return } } - } if !validSQLite { @@ -758,35 +754,19 @@ func (s *Service) handleLoad(w http.ResponseWriter, r *http.Request) { return } - addr, err := s.store.LeaderAddr() + addr, err := s.loadClusterChunk(r, chunk) if err != nil { - http.Error(w, fmt.Sprintf("leader address: %s", err.Error()), - http.StatusInternalServerError) - return - } - if addr == "" { - stats.Add(numLeaderNotFound, 1) - http.Error(w, ErrLeaderNotFound.Error(), http.StatusServiceUnavailable) - return - } - - username, password, ok := r.BasicAuth() - if !ok { - username = "" - } - - w.Header().Add(ServedByHTTPHeader, addr) - loadErr := s.cluster.LoadChunk(chunk, addr, makeCredentials(username, password), timeout) - if loadErr != nil { - if loadErr.Error() == "unauthorized" { - http.Error(w, "remote load not authorized", http.StatusUnauthorized) + if err == ErrRemoteLoadNotAuthorized { + http.Error(w, err.Error(), http.StatusUnauthorized) + } else if err == ErrLeaderNotFound { + http.Error(w, err.Error(), http.StatusServiceUnavailable) } else { - http.Error(w, loadErr.Error(), http.StatusInternalServerError) + http.Error(w, err.Error(), http.StatusInternalServerError) } return } - stats.Add(numRemoteLoads, 1) - // Allow this if block to exit, so response remains as before request + w.Header().Add(ServedByHTTPHeader, addr) + // Allow this if block to exit without return, so response remains as before request // forwarding was put in place. } if chunk.IsLast { @@ -802,6 +782,34 @@ func (s *Service) handleLoad(w http.ResponseWriter, r *http.Request) { s.writeResponse(w, r, resp) } +func (s *Service) loadClusterChunk(r *http.Request, chunk *command.LoadChunkRequest) (string, error) { + addr, err := s.store.LeaderAddr() + if err != nil { + return "", err + } + if addr == "" { + stats.Add(numLeaderNotFound, 1) + return "", ErrLeaderNotFound + } + username, password, ok := r.BasicAuth() + if !ok { + username = "" + } + timeout, err := timeoutParam(r, defaultTimeout) + if err != nil { + return "", err + } + err = s.cluster.LoadChunk(chunk, addr, makeCredentials(username, password), timeout) + if err != nil { + if err.Error() == "unauthorized" { + return "", ErrRemoteLoadNotAuthorized + } + return "", err + } + stats.Add(numRemoteLoads, 1) + return addr, nil +} + // handleStatus returns status on the system. func (s *Service) handleStatus(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json; charset=utf-8") diff --git a/store/store.go b/store/store.go index 43e9f67f..3615794c 100644 --- a/store/store.go +++ b/store/store.go @@ -1676,7 +1676,6 @@ func (s *Store) Apply(l *raft.Log) (e interface{}) { } else { s.loadsInProgress[lcr.StreamId] = struct{}{} } - } if snapshotNeeded {