1
0
Fork 0

Send Abort load chunk requests as needed

master
Philip O'Toole 10 months ago
parent dd6f386450
commit 37fe324d6a

@ -143,6 +143,15 @@ func (c *Chunker) Next() (*command.LoadChunkRequest, error) {
}, nil
}
// Abort returns a LoadChunkRequest that signals the receiver to abort the
// given stream.
func (c *Chunker) Abort() *command.LoadChunkRequest {
return &command.LoadChunkRequest{
StreamId: c.streamID,
Abort: true,
}
}
// Counts returns the number of chunks generated, bytes read, and bytes written.
func (c *Chunker) Counts() (int64, int64, int64) {
c.statsMu.Lock()

@ -742,11 +742,11 @@ func (s *Service) handleLoad(w http.ResponseWriter, r *http.Request) {
for {
chunk, err := chunker.Next()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
chunk = chunker.Abort()
}
err = s.store.LoadChunk(chunk)
if err != nil && err != store.ErrNotLeader {
s.store.LoadChunk(chunker.Abort())
http.Error(w, err.Error(), http.StatusInternalServerError)
return
} else if err != nil && err == store.ErrNotLeader {
@ -763,6 +763,7 @@ func (s *Service) handleLoad(w http.ResponseWriter, r *http.Request) {
} else {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
s.loadClusterChunk(r, chunker.Abort())
return
}
w.Header().Add(ServedByHTTPHeader, addr)
@ -775,6 +776,10 @@ func (s *Service) handleLoad(w http.ResponseWriter, r *http.Request) {
nr, nChunks, nw, float64(nr)/float64(nw))
break
}
if chunk.Abort {
s.logger.Printf("load request aborted")
break
}
}
}

Loading…
Cancel
Save