1
0
Fork 0

Raft snapshotting runs to 100%

Still failing later.
master
Philip O'Toole 1 year ago
parent cb384a9df2
commit 06a94e50bf

@ -16,6 +16,9 @@ type Compressor struct {
bufSz int bufSz int
buf *bytes.Buffer buf *bytes.Buffer
gzw *gzip.Writer gzw *gzip.Writer
nRx int64
nTx int64
} }
// NewCompressor returns an instantiated Compressor that reads from r and // NewCompressor returns an instantiated Compressor that reads from r and
@ -31,17 +34,21 @@ func NewCompressor(r io.Reader, bufSz int) *Compressor {
} }
// Read reads compressed data. // Read reads compressed data.
func (c *Compressor) Read(p []byte) (int, error) { func (c *Compressor) Read(p []byte) (n int, err error) {
if c.buf.Len() == 0 && c.gzw != nil { if c.buf.Len() == 0 && c.gzw != nil {
n, err := io.CopyN(c.gzw, c.r, int64(c.bufSz)) nn, err := io.CopyN(c.gzw, c.r, int64(c.bufSz))
c.nRx += nn
c.nTx += int64(len(p))
if err != nil { if err != nil {
c.gzw.Close() // Time to write the footer. // Time to write the footer.
if err := c.Close(); err != nil {
return 0, err
}
if err != io.EOF { if err != io.EOF {
// Actual error, let caller handle // Actual error, let caller handle
return 0, err return 0, err
} }
c.gzw = nil } else if nn > 0 {
} else if n > 0 {
// We read some data, but didn't hit any error. // We read some data, but didn't hit any error.
// Just flush the data in the buffer, ready // Just flush the data in the buffer, ready
// to be read. // to be read.
@ -58,5 +65,9 @@ func (c *Compressor) Close() error {
if c.gzw == nil { if c.gzw == nil {
return nil return nil
} }
return c.gzw.Close() if err := c.gzw.Close(); err != nil {
return err
}
c.gzw = nil
return nil
} }

@ -6,34 +6,39 @@ import (
) )
type Decompressor struct { type Decompressor struct {
r io.Reader cr *CountingReader
gzr *gzip.Reader gzr *gzip.Reader
n int64
nRx int64
nTx int64
} }
// NewDecompressor returns an instantied Decompressor that reads from r and // NewDecompressor returns an instantied Decompressor that reads from r and
// decompresses the data using gzip. // decompresses the data using gzip.
func NewDecompressor(r io.Reader) *Decompressor { func NewDecompressor(r io.Reader) *Decompressor {
return &Decompressor{ return &Decompressor{
r: r, cr: NewCountingReader(r),
} }
} }
// Read reads decompressed data. // Read reads decompressed data.
func (c *Decompressor) Read(p []byte) (n int, err error) { func (c *Decompressor) Read(p []byte) (nn int, err error) {
defer func() { defer func() {
c.n += int64(n) c.nTx += int64(nn)
}() }()
if c.gzr == nil { if c.gzr == nil {
var err error var err error
c.gzr, err = gzip.NewReader(c.r) c.gzr, err = gzip.NewReader(c.cr)
if err != nil { if err != nil {
return 0, err return 0, err
} }
c.gzr.Multistream(false)
} }
n, err = c.gzr.Read(p) n, err := c.gzr.Read(p)
c.nRx += int64(n)
if err == io.EOF { if err == io.EOF {
if err := c.gzr.Close(); err != nil { if err := c.gzr.Close(); err != nil {
return 0, err return 0, err
@ -42,3 +47,20 @@ func (c *Decompressor) Read(p []byte) (n int, err error) {
} }
return n, err return n, err
} }
type CountingReader struct {
r io.Reader
n int64
}
func NewCountingReader(r io.Reader) *CountingReader {
return &CountingReader{
r: r,
}
}
func (c *CountingReader) Read(p []byte) (n int, err error) {
n, err = c.r.Read(p)
c.n += int64(n)
return n, err
}

Loading…
Cancel
Save