diff --git a/command/chunking/chunker.go b/command/chunking/chunker.go index 750aa36b..3bef1c39 100644 --- a/command/chunking/chunker.go +++ b/command/chunking/chunker.go @@ -37,18 +37,22 @@ var gzipWriterPool = sync.Pool{ // Chunker is a reader that reads from an underlying io.Reader and returns // LoadChunkRequests of a given size. type Chunker struct { - r io.Reader + r *CountingReader + chunkSize int64 streamID string sequenceNum int64 finished bool + + statsMu sync.Mutex + nWritten int64 } // NewChunker returns a new Chunker that reads from r and returns // LoadChunkRequests of size chunkSize. func NewChunker(r io.Reader, chunkSize int64) *Chunker { return &Chunker{ - r: r, + r: NewCountingReader(r), chunkSize: chunkSize, streamID: generateStreamID(), } @@ -67,6 +71,9 @@ func generateStreamID() string { // Next returns the next LoadChunkRequest, or io.EOF if finished. func (c *Chunker) Next() (*command.LoadChunkRequest, error) { + c.statsMu.Lock() + defer c.statsMu.Unlock() + if c.finished { return nil, io.EOF } @@ -110,6 +117,7 @@ func (c *Chunker) Next() (*command.LoadChunkRequest, error) { if err := gw.Close(); err != nil { return nil, errors.New("failed to close gzip writer: " + err.Error()) } + c.nWritten += int64(buf.Len()) // If we didn't read any data, then we're finished if totalRead == 0 { @@ -135,6 +143,13 @@ func (c *Chunker) Next() (*command.LoadChunkRequest, error) { }, nil } +// Counts returns the number of chunks generated, bytes read, and bytes written. +func (c *Chunker) Counts() (int64, int64, int64) { + c.statsMu.Lock() + defer c.statsMu.Unlock() + return c.sequenceNum, c.r.Count(), c.nWritten +} + func min(a, b int64) int64 { if a < b { return a diff --git a/command/chunking/counting_io.go b/command/chunking/counting_io.go new file mode 100644 index 00000000..748e0503 --- /dev/null +++ b/command/chunking/counting_io.go @@ -0,0 +1,57 @@ +package chunking + +import ( + "io" +) + +// CountingReader is a wrapper around an io.Reader that counts the number of +// bytes read. +type CountingReader struct { + r io.Reader + n int64 +} + +// Read reads from the underlying io.Reader. +func (cr *CountingReader) Read(p []byte) (int, error) { + n, err := cr.r.Read(p) + cr.n += int64(n) + return n, err +} + +// Count returns the number of bytes read. +func (cr *CountingReader) Count() int64 { + return cr.n +} + +// NewCountingReader returns a new CountingReader that reads from r. +func NewCountingReader(r io.Reader) *CountingReader { + return &CountingReader{ + r: r, + } +} + +// CountingWriter is a wrapper around an io.Writer that counts the number of +// bytes written. +type CountingWriter struct { + w io.Writer + n int64 +} + +// Write writes to the underlying io.Writer. +func (cw *CountingWriter) Write(p []byte) (int, error) { + n, err := cw.w.Write(p) + cw.n += int64(n) + return n, err +} + +// Count returns the number of bytes written. +func (cw *CountingWriter) Count() int64 { + return cw.n +} + +// NewCountingWriter returns a new CountingWriter that writes to w. +func NewCountingWriter(w io.Writer) *CountingWriter { + return &CountingWriter{ + w: w, + } +} diff --git a/http/service.go b/http/service.go index 5d16192e..9fdd38c2 100644 --- a/http/service.go +++ b/http/service.go @@ -967,6 +967,9 @@ func (s *Service) handleLoad(w http.ResponseWriter, r *http.Request) { // forwarding was put in place. } if chunk.IsLast { + nChunks, nr, nw := chunker.Counts() + s.logger.Printf("%d bytes read, %d chunks generated, containing %d bytes of compressed data", + nr, nChunks, nw) break } }