1
0
Fork 0
You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

169 lines
3.8 KiB
Go

package chunking
import (
"bytes"
"compress/gzip"
"crypto/rand"
"errors"
"fmt"
"io"
"sync"
"github.com/rqlite/rqlite/v8/command/proto"
"github.com/rqlite/rqlite/v8/progress"
)
const (
internalChunkSize = 1024 * 1024
)
// Define a sync.Pool to pool the buffers.
var bufferPool = sync.Pool{
New: func() interface{} {
return bytes.NewBuffer(nil)
},
}
// Define a sync.Pool to pool the gzip writers.
var gzipWriterPool = sync.Pool{
New: func() interface{} {
gw, err := gzip.NewWriterLevel(nil, gzip.BestSpeed)
if err != nil {
panic(fmt.Sprintf("failed to create gzip writer: %s", err.Error()))
}
return gw
},
}
// Chunker is a reader that reads from an underlying io.Reader and returns
// LoadChunkRequests of a given size.
type Chunker struct {
r *progress.CountingReader
chunkSize int64
streamID string
sequenceNum int64
finished bool
statsMu sync.Mutex
nWritten int64
}
// NewChunker returns a new Chunker that reads from r and returns
// LoadChunkRequests of size chunkSize.
func NewChunker(r io.Reader, chunkSize int64) *Chunker {
return &Chunker{
r: progress.NewCountingReader(r),
chunkSize: chunkSize,
streamID: generateStreamID(),
}
}
func generateStreamID() string {
b := make([]byte, 16)
_, err := io.ReadFull(rand.Reader, b)
if err != nil {
return ""
}
// Convert the random bytes into the format of a UUID.
return fmt.Sprintf("%x-%x-%x-%x-%x",
b[0:4], b[4:6], b[6:8], b[8:10], b[10:])
}
// Next returns the next LoadChunkRequest, or io.EOF if finished.
func (c *Chunker) Next() (*proto.LoadChunkRequest, error) {
c.statsMu.Lock()
defer c.statsMu.Unlock()
if c.finished {
return nil, io.EOF
}
// Get a buffer from the pool.
buf := bufferPool.Get().(*bytes.Buffer)
buf.Reset()
defer bufferPool.Put(buf)
// Get a gzip.Writer from the pool.
gw := gzipWriterPool.Get().(*gzip.Writer)
defer gzipWriterPool.Put(gw)
// Reset the gzip.Writer to use the buffer.
gw.Reset(buf)
// Create an intermediate buffer to read into
intermediateBuffer := make([]byte, min(internalChunkSize, c.chunkSize))
// Read data into intermediate buffer and write to gzip.Writer
var totalRead int64 = 0
for totalRead < c.chunkSize {
n, err := c.r.Read(intermediateBuffer)
totalRead += int64(n)
if n > 0 {
if _, err = gw.Write(intermediateBuffer[:n]); err != nil {
return nil, err
}
}
if err != nil {
if err == io.EOF {
c.finished = true
break
} else {
return nil, err
}
}
}
// Close the gzip.Writer to ensure all data is written to the buffer
if err := gw.Close(); err != nil {
return nil, errors.New("failed to close gzip writer: " + err.Error())
}
c.nWritten += int64(buf.Len())
// If we didn't read any data, then we're finished
if totalRead == 0 {
// If no previous chunks were sent at all then signal that.
if c.sequenceNum == 0 {
return nil, io.EOF
}
// If previous chunks were sent, return a final empty chunk with IsLast = true
return &proto.LoadChunkRequest{
StreamId: c.streamID,
SequenceNum: c.sequenceNum + 1,
IsLast: true,
Data: nil,
}, nil
}
c.sequenceNum++
return &proto.LoadChunkRequest{
StreamId: c.streamID,
SequenceNum: c.sequenceNum,
IsLast: totalRead < c.chunkSize,
Data: buf.Bytes(),
}, nil
}
// Abort returns a LoadChunkRequest that signals the receiver to abort the
// given stream.
func (c *Chunker) Abort() *proto.LoadChunkRequest {
return &proto.LoadChunkRequest{
StreamId: c.streamID,
Abort: true,
}
}
// Counts returns the number of chunks generated, bytes read, and bytes written.
func (c *Chunker) Counts() (int64, int64, int64) {
c.statsMu.Lock()
defer c.statsMu.Unlock()
return c.sequenceNum, c.r.Count(), c.nWritten
}
func min(a, b int64) int64 {
if a < b {
return a
}
return b
}