1
0
Fork 0

Log performance of chunker in HTTP layer

master
Philip O'Toole 1 year ago
parent a55ee4836c
commit 43525010b0

@ -37,18 +37,22 @@ var gzipWriterPool = sync.Pool{
// Chunker is a reader that reads from an underlying io.Reader and returns
// LoadChunkRequests of a given size.
type Chunker struct {
r io.Reader
r *CountingReader
chunkSize int64
streamID string
sequenceNum int64
finished bool
statsMu sync.Mutex
nWritten int64
}
// NewChunker returns a new Chunker that reads from r and returns
// LoadChunkRequests of size chunkSize.
func NewChunker(r io.Reader, chunkSize int64) *Chunker {
return &Chunker{
r: r,
r: NewCountingReader(r),
chunkSize: chunkSize,
streamID: generateStreamID(),
}
@ -67,6 +71,9 @@ func generateStreamID() string {
// Next returns the next LoadChunkRequest, or io.EOF if finished.
func (c *Chunker) Next() (*command.LoadChunkRequest, error) {
c.statsMu.Lock()
defer c.statsMu.Unlock()
if c.finished {
return nil, io.EOF
}
@ -110,6 +117,7 @@ func (c *Chunker) Next() (*command.LoadChunkRequest, error) {
if err := gw.Close(); err != nil {
return nil, errors.New("failed to close gzip writer: " + err.Error())
}
c.nWritten += int64(buf.Len())
// If we didn't read any data, then we're finished
if totalRead == 0 {
@ -135,6 +143,13 @@ func (c *Chunker) Next() (*command.LoadChunkRequest, error) {
}, nil
}
// Counts returns the number of chunks generated, bytes read, and bytes written.
func (c *Chunker) Counts() (int64, int64, int64) {
c.statsMu.Lock()
defer c.statsMu.Unlock()
return c.sequenceNum, c.r.Count(), c.nWritten
}
func min(a, b int64) int64 {
if a < b {
return a

@ -0,0 +1,57 @@
package chunking
import (
"io"
)
// CountingReader is a wrapper around an io.Reader that counts the number of
// bytes read.
type CountingReader struct {
r io.Reader
n int64
}
// Read reads from the underlying io.Reader.
func (cr *CountingReader) Read(p []byte) (int, error) {
n, err := cr.r.Read(p)
cr.n += int64(n)
return n, err
}
// Count returns the number of bytes read.
func (cr *CountingReader) Count() int64 {
return cr.n
}
// NewCountingReader returns a new CountingReader that reads from r.
func NewCountingReader(r io.Reader) *CountingReader {
return &CountingReader{
r: r,
}
}
// CountingWriter is a wrapper around an io.Writer that counts the number of
// bytes written.
type CountingWriter struct {
w io.Writer
n int64
}
// Write writes to the underlying io.Writer.
func (cw *CountingWriter) Write(p []byte) (int, error) {
n, err := cw.w.Write(p)
cw.n += int64(n)
return n, err
}
// Count returns the number of bytes written.
func (cw *CountingWriter) Count() int64 {
return cw.n
}
// NewCountingWriter returns a new CountingWriter that writes to w.
func NewCountingWriter(w io.Writer) *CountingWriter {
return &CountingWriter{
w: w,
}
}

@ -967,6 +967,9 @@ func (s *Service) handleLoad(w http.ResponseWriter, r *http.Request) {
// forwarding was put in place.
}
if chunk.IsLast {
nChunks, nr, nw := chunker.Counts()
s.logger.Printf("%d bytes read, %d chunks generated, containing %d bytes of compressed data",
nr, nChunks, nw)
break
}
}

Loading…
Cancel
Save