|
|
|
@ -39,6 +39,9 @@ const (
|
|
|
|
|
var (
|
|
|
|
|
// ErrLeaderNotFound is returned when a node cannot locate a leader
|
|
|
|
|
ErrLeaderNotFound = errors.New("leader not found")
|
|
|
|
|
|
|
|
|
|
// ErrRemoteLoadNotAuthorized is returned when a remote node is not authorized to load a chunk
|
|
|
|
|
ErrRemoteLoadNotAuthorized = errors.New("remote load not authorized")
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
type ResultsError interface {
|
|
|
|
@ -687,12 +690,6 @@ func (s *Service) handleLoad(w http.ResponseWriter, r *http.Request) {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
timeout, err := timeoutParam(r, defaultTimeout)
|
|
|
|
|
if err != nil {
|
|
|
|
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
chunkSz, err := chunkSizeParam(r, defaultChunkSize)
|
|
|
|
|
if err != nil {
|
|
|
|
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
|
|
|
@ -714,7 +711,6 @@ func (s *Service) handleLoad(w http.ResponseWriter, r *http.Request) {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if !validSQLite {
|
|
|
|
@ -758,35 +754,19 @@ func (s *Service) handleLoad(w http.ResponseWriter, r *http.Request) {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
addr, err := s.store.LeaderAddr()
|
|
|
|
|
addr, err := s.loadClusterChunk(r, chunk)
|
|
|
|
|
if err != nil {
|
|
|
|
|
http.Error(w, fmt.Sprintf("leader address: %s", err.Error()),
|
|
|
|
|
http.StatusInternalServerError)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
if addr == "" {
|
|
|
|
|
stats.Add(numLeaderNotFound, 1)
|
|
|
|
|
http.Error(w, ErrLeaderNotFound.Error(), http.StatusServiceUnavailable)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
username, password, ok := r.BasicAuth()
|
|
|
|
|
if !ok {
|
|
|
|
|
username = ""
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
w.Header().Add(ServedByHTTPHeader, addr)
|
|
|
|
|
loadErr := s.cluster.LoadChunk(chunk, addr, makeCredentials(username, password), timeout)
|
|
|
|
|
if loadErr != nil {
|
|
|
|
|
if loadErr.Error() == "unauthorized" {
|
|
|
|
|
http.Error(w, "remote load not authorized", http.StatusUnauthorized)
|
|
|
|
|
if err == ErrRemoteLoadNotAuthorized {
|
|
|
|
|
http.Error(w, err.Error(), http.StatusUnauthorized)
|
|
|
|
|
} else if err == ErrLeaderNotFound {
|
|
|
|
|
http.Error(w, err.Error(), http.StatusServiceUnavailable)
|
|
|
|
|
} else {
|
|
|
|
|
http.Error(w, loadErr.Error(), http.StatusInternalServerError)
|
|
|
|
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
|
|
|
|
}
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
stats.Add(numRemoteLoads, 1)
|
|
|
|
|
// Allow this if block to exit, so response remains as before request
|
|
|
|
|
w.Header().Add(ServedByHTTPHeader, addr)
|
|
|
|
|
// Allow this if block to exit without return, so response remains as before request
|
|
|
|
|
// forwarding was put in place.
|
|
|
|
|
}
|
|
|
|
|
if chunk.IsLast {
|
|
|
|
@ -802,6 +782,34 @@ func (s *Service) handleLoad(w http.ResponseWriter, r *http.Request) {
|
|
|
|
|
s.writeResponse(w, r, resp)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *Service) loadClusterChunk(r *http.Request, chunk *command.LoadChunkRequest) (string, error) {
|
|
|
|
|
addr, err := s.store.LeaderAddr()
|
|
|
|
|
if err != nil {
|
|
|
|
|
return "", err
|
|
|
|
|
}
|
|
|
|
|
if addr == "" {
|
|
|
|
|
stats.Add(numLeaderNotFound, 1)
|
|
|
|
|
return "", ErrLeaderNotFound
|
|
|
|
|
}
|
|
|
|
|
username, password, ok := r.BasicAuth()
|
|
|
|
|
if !ok {
|
|
|
|
|
username = ""
|
|
|
|
|
}
|
|
|
|
|
timeout, err := timeoutParam(r, defaultTimeout)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return "", err
|
|
|
|
|
}
|
|
|
|
|
err = s.cluster.LoadChunk(chunk, addr, makeCredentials(username, password), timeout)
|
|
|
|
|
if err != nil {
|
|
|
|
|
if err.Error() == "unauthorized" {
|
|
|
|
|
return "", ErrRemoteLoadNotAuthorized
|
|
|
|
|
}
|
|
|
|
|
return "", err
|
|
|
|
|
}
|
|
|
|
|
stats.Add(numRemoteLoads, 1)
|
|
|
|
|
return addr, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// handleStatus returns status on the system.
|
|
|
|
|
func (s *Service) handleStatus(w http.ResponseWriter, r *http.Request) {
|
|
|
|
|
w.Header().Set("Content-Type", "application/json; charset=utf-8")
|
|
|
|
|