Add new chunker and chunked load proto
parent
b7ea9ea096
commit
dcc29ed76f
@ -0,0 +1,142 @@
|
|||||||
|
package chunking
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"compress/gzip"
|
||||||
|
"crypto/rand"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/rqlite/rqlite/command"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
internalChunkSize = 512 * 1024
|
||||||
|
)
|
||||||
|
|
||||||
|
// Define a sync.Pool to pool the buffers.
|
||||||
|
var bufferPool = sync.Pool{
|
||||||
|
New: func() interface{} {
|
||||||
|
return bytes.NewBuffer(nil)
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// Define a sync.Pool to pool the gzip writers.
|
||||||
|
var gzipWriterPool = sync.Pool{
|
||||||
|
New: func() interface{} {
|
||||||
|
return gzip.NewWriter(nil)
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// Chunker is a reader that reads from an underlying io.Reader and returns
|
||||||
|
// LoadChunkRequests of a given size.
|
||||||
|
type Chunker struct {
|
||||||
|
r io.Reader
|
||||||
|
chunkSize int64
|
||||||
|
streamID string
|
||||||
|
sequenceNum int64
|
||||||
|
finished bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewChunker returns a new Chunker that reads from r and returns
|
||||||
|
// LoadChunkRequests of size chunkSize.
|
||||||
|
func NewChunker(r io.Reader, chunkSize int64) *Chunker {
|
||||||
|
return &Chunker{
|
||||||
|
r: r,
|
||||||
|
chunkSize: chunkSize,
|
||||||
|
streamID: generateStreamID(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func generateStreamID() string {
|
||||||
|
b := make([]byte, 16)
|
||||||
|
_, err := io.ReadFull(rand.Reader, b)
|
||||||
|
if err != nil {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
// Convert the random bytes into the format of a UUID.
|
||||||
|
return fmt.Sprintf("%x-%x-%x-%x-%x",
|
||||||
|
b[0:4], b[4:6], b[6:8], b[8:10], b[10:])
|
||||||
|
}
|
||||||
|
|
||||||
|
// Next returns the next LoadChunkRequest, or io.EOF if finished.
|
||||||
|
func (c *Chunker) Next() (*command.LoadChunkRequest, error) {
|
||||||
|
if c.finished {
|
||||||
|
return nil, io.EOF
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get a buffer from the pool.
|
||||||
|
buf := bufferPool.Get().(*bytes.Buffer)
|
||||||
|
buf.Reset()
|
||||||
|
defer bufferPool.Put(buf)
|
||||||
|
|
||||||
|
// Get a gzip.Writer from the pool.
|
||||||
|
gw := gzipWriterPool.Get().(*gzip.Writer)
|
||||||
|
defer gzipWriterPool.Put(gw)
|
||||||
|
|
||||||
|
// Reset the gzip.Writer to use the buffer.
|
||||||
|
gw.Reset(buf)
|
||||||
|
|
||||||
|
// Create an intermediate buffer to read into
|
||||||
|
intermediateBuffer := make([]byte, min(internalChunkSize, c.chunkSize))
|
||||||
|
|
||||||
|
// Read data into intermediate buffer and write to gzip.Writer
|
||||||
|
var totalRead int64 = 0
|
||||||
|
for totalRead < c.chunkSize {
|
||||||
|
n, err := c.r.Read(intermediateBuffer)
|
||||||
|
totalRead += int64(n)
|
||||||
|
if n > 0 {
|
||||||
|
if _, err = gw.Write(intermediateBuffer[:n]); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
if err == io.EOF {
|
||||||
|
break
|
||||||
|
} else {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close the gzip.Writer to ensure all data is written to the buffer
|
||||||
|
if err := gw.Close(); err != nil {
|
||||||
|
return nil, errors.New("failed to close gzip writer: " + err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
if totalRead < c.chunkSize {
|
||||||
|
c.finished = true
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we didn't write any data, then we're finished
|
||||||
|
if totalRead == 0 {
|
||||||
|
// If no previous chunks were sent at all signal that.
|
||||||
|
if c.sequenceNum == 0 {
|
||||||
|
return nil, io.EOF
|
||||||
|
}
|
||||||
|
// If previous chunks were sent, return a final empty chunk with IsLast = true
|
||||||
|
return &command.LoadChunkRequest{
|
||||||
|
StreamId: c.streamID,
|
||||||
|
SequenceNum: c.sequenceNum + 1,
|
||||||
|
IsLast: true,
|
||||||
|
Data: nil,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
c.sequenceNum++
|
||||||
|
return &command.LoadChunkRequest{
|
||||||
|
StreamId: c.streamID,
|
||||||
|
SequenceNum: c.sequenceNum,
|
||||||
|
IsLast: totalRead < c.chunkSize,
|
||||||
|
Data: buf.Bytes(),
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func min(a, b int64) int64 {
|
||||||
|
if a < b {
|
||||||
|
return a
|
||||||
|
}
|
||||||
|
return b
|
||||||
|
}
|
@ -0,0 +1,258 @@
|
|||||||
|
package chunking
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"compress/gzip"
|
||||||
|
"errors"
|
||||||
|
"io"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Test_ChunkerEmptyReader tests that a Chunker created with an empty reader
|
||||||
|
// returns io.EOF on the first call to Next.
|
||||||
|
func Test_ChunkerEmptyReader(t *testing.T) {
|
||||||
|
// Create an empty reader that immediately returns io.EOF
|
||||||
|
r := bytes.NewReader([]byte{})
|
||||||
|
chunker := NewChunker(r, 1024)
|
||||||
|
|
||||||
|
// Expect the first call to Next to return io.EOF
|
||||||
|
_, err := chunker.Next()
|
||||||
|
if err != io.EOF {
|
||||||
|
t.Fatalf("expected io.EOF, got %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Further calls to Next should also return io.EOF
|
||||||
|
_, err = chunker.Next()
|
||||||
|
if err != io.EOF {
|
||||||
|
t.Fatalf("expected io.EOF, got %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test_ChunkerSingleChunk tests that a Chunker created with a reader that
|
||||||
|
// contains a single chunk returns the expected chunk, when the chunk size is
|
||||||
|
// smaller than the amount of data in the reader.
|
||||||
|
func Test_ChunkerSingleChunk(t *testing.T) {
|
||||||
|
data := []byte("Hello, world!")
|
||||||
|
chunker := NewChunker(bytes.NewReader(data), 32)
|
||||||
|
|
||||||
|
chunk, err := chunker.Next()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// The expected sequence number should be 1
|
||||||
|
if chunk.SequenceNum != 1 {
|
||||||
|
t.Errorf("unexpected sequence number: got %d, want %d", chunk.SequenceNum, 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Should be the last chunk
|
||||||
|
if chunk.IsLast != true {
|
||||||
|
t.Errorf("unexpected IsLast value: got %v, want %v", chunk.IsLast, true)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Decompress the gzip data and compare it with expected
|
||||||
|
gzipReader, err := gzip.NewReader(bytes.NewReader(chunk.Data))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to create gzip reader: %v", err)
|
||||||
|
}
|
||||||
|
defer gzipReader.Close()
|
||||||
|
|
||||||
|
decompressed := new(bytes.Buffer)
|
||||||
|
if _, err = io.Copy(decompressed, gzipReader); err != nil {
|
||||||
|
t.Fatalf("failed to decompress data: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if decompressed.String() != string(data) {
|
||||||
|
t.Errorf("unexpected chunk data: got %s, want %s", decompressed.String(), string(data))
|
||||||
|
}
|
||||||
|
|
||||||
|
// After all chunks are read, Next should return nil, io.EOF
|
||||||
|
chunk, err = chunker.Next()
|
||||||
|
if chunk != nil || err != io.EOF {
|
||||||
|
t.Errorf("expected (nil, io.EOF), got (%v, %v)", chunk, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test_ChunkerSingleChunkLarge tests that a Chunker created with a reader that
|
||||||
|
// contains a single chunk returns the expected chunk, when the chunk size is
|
||||||
|
// much larger than the amount of data in the reader, and is larger than the
|
||||||
|
// internal chunk size.
|
||||||
|
func Test_ChunkerSingleChunkLarge(t *testing.T) {
|
||||||
|
data := []byte("Hello, world!")
|
||||||
|
chunker := NewChunker(bytes.NewReader(data), internalChunkSize*3)
|
||||||
|
|
||||||
|
chunk, err := chunker.Next()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// The expected sequence number should be 1
|
||||||
|
if chunk.SequenceNum != 1 {
|
||||||
|
t.Errorf("unexpected sequence number: got %d, want %d", chunk.SequenceNum, 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Should be the last chunk
|
||||||
|
if chunk.IsLast != true {
|
||||||
|
t.Errorf("unexpected IsLast value: got %v, want %v", chunk.IsLast, true)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Decompress the gzip data and compare it with expected
|
||||||
|
gzipReader, err := gzip.NewReader(bytes.NewReader(chunk.Data))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to create gzip reader: %v", err)
|
||||||
|
}
|
||||||
|
defer gzipReader.Close()
|
||||||
|
|
||||||
|
decompressed := new(bytes.Buffer)
|
||||||
|
if _, err = io.Copy(decompressed, gzipReader); err != nil {
|
||||||
|
t.Fatalf("failed to decompress data: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if decompressed.String() != string(data) {
|
||||||
|
t.Errorf("unexpected chunk data: got %s, want %s", decompressed.String(), string(data))
|
||||||
|
}
|
||||||
|
|
||||||
|
// After all chunks are read, Next should return nil, io.EOF
|
||||||
|
chunk, err = chunker.Next()
|
||||||
|
if chunk != nil || err != io.EOF {
|
||||||
|
t.Errorf("expected (nil, io.EOF), got (%v, %v)", chunk, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test_ChunkerSingleChunkExact tests that a Chunker created with a reader that
|
||||||
|
// contains a single chunk returns the expected chunk, when the chunk size is
|
||||||
|
// exactly the amount of data in the reader.
|
||||||
|
func Test_ChunkerSingleChunkExact(t *testing.T) {
|
||||||
|
data := []byte("Hello")
|
||||||
|
chunker := NewChunker(bytes.NewReader(data), 5)
|
||||||
|
|
||||||
|
chunk, err := chunker.Next()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// The expected sequence number should be 1
|
||||||
|
if chunk.SequenceNum != 1 {
|
||||||
|
t.Errorf("unexpected sequence number: got %d, want %d", chunk.SequenceNum, 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Won't be last since chunker doesn't know the size of the data
|
||||||
|
if chunk.IsLast {
|
||||||
|
t.Errorf("unexpected IsLast value: got %v, want %v", chunk.IsLast, true)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Decompress the gzip data and compare it with expected
|
||||||
|
gzipReader, err := gzip.NewReader(bytes.NewReader(chunk.Data))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to create gzip reader: %v", err)
|
||||||
|
}
|
||||||
|
defer gzipReader.Close()
|
||||||
|
|
||||||
|
decompressed := new(bytes.Buffer)
|
||||||
|
if _, err = io.Copy(decompressed, gzipReader); err != nil {
|
||||||
|
t.Fatalf("failed to decompress data: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if decompressed.String() != string(data) {
|
||||||
|
t.Errorf("unexpected chunk data: got %s, want %s", decompressed.String(), string(data))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Call Next again, get a second chunk, which should be the last chunk
|
||||||
|
chunk, err = chunker.Next()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// The expected sequence number should be 2
|
||||||
|
if chunk.SequenceNum != 2 {
|
||||||
|
t.Errorf("unexpected sequence number: got %d, want %d", chunk.SequenceNum, 2)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Should be the last chunk
|
||||||
|
if !chunk.IsLast {
|
||||||
|
t.Errorf("unexpected IsLast value: got %v, want %v", chunk.IsLast, true)
|
||||||
|
}
|
||||||
|
if chunk.Data != nil {
|
||||||
|
t.Errorf("unexpected chunk data: got %v, want %v", chunk.Data, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
// After all chunks are read, Next should return nil, io.EOF
|
||||||
|
chunk, err = chunker.Next()
|
||||||
|
if chunk != nil || err != io.EOF {
|
||||||
|
t.Errorf("expected (nil, io.EOF), got (%v, %v)", chunk, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test_ChunkerMultiChunks tests that a Chunker created with a reader that
|
||||||
|
// that contains enough data that multiple chunks should be returned, returns
|
||||||
|
// the expected chunks.
|
||||||
|
func Test_ChunkerMultiChunks(t *testing.T) {
|
||||||
|
data := []byte("Hello, world!")
|
||||||
|
chunkSize := int64(5)
|
||||||
|
|
||||||
|
chunker := NewChunker(bytes.NewReader(data), chunkSize)
|
||||||
|
|
||||||
|
// Expected chunks: "Hello", ", wor", "ld!"
|
||||||
|
expectedChunks := []string{
|
||||||
|
"Hello",
|
||||||
|
", wor",
|
||||||
|
"ld!",
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, expected := range expectedChunks {
|
||||||
|
chunk, err := chunker.Next()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// The expected sequence number should be i+1
|
||||||
|
if chunk.SequenceNum != int64(i+1) {
|
||||||
|
t.Errorf("unexpected sequence number: got %d, want %d", chunk.SequenceNum, i+1)
|
||||||
|
}
|
||||||
|
|
||||||
|
// The expected IsLast value should be true only for the last chunk
|
||||||
|
expectedIsLast := i == len(expectedChunks)-1
|
||||||
|
if chunk.IsLast != expectedIsLast {
|
||||||
|
t.Errorf("unexpected IsLast value: got %v, want %v", chunk.IsLast, expectedIsLast)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Decompress the gzip data and compare it with expected
|
||||||
|
gzipReader, err := gzip.NewReader(bytes.NewReader(chunk.Data))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to create gzip reader: %v", err)
|
||||||
|
}
|
||||||
|
defer gzipReader.Close()
|
||||||
|
|
||||||
|
decompressed := new(bytes.Buffer)
|
||||||
|
if _, err = io.Copy(decompressed, gzipReader); err != nil {
|
||||||
|
t.Fatalf("failed to decompress data: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if decompressed.String() != expected {
|
||||||
|
t.Errorf("unexpected chunk data: got %s, want %s", decompressed.String(), expected)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// After all chunks are read, Next should return nil, io.EOF
|
||||||
|
chunk, err := chunker.Next()
|
||||||
|
if chunk != nil || err != io.EOF {
|
||||||
|
t.Errorf("expected (nil, io.EOF), got (%v, %v)", chunk, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type errorReader struct{}
|
||||||
|
|
||||||
|
func (r *errorReader) Read([]byte) (int, error) {
|
||||||
|
return 0, errors.New("test error")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test_ChunkerReaderError tests that a Chunker created with a reader that
|
||||||
|
// returns an error other than io.EOF returns that error.
|
||||||
|
func Test_ChunkerReaderError(t *testing.T) {
|
||||||
|
chunker := NewChunker(&errorReader{}, 1024)
|
||||||
|
|
||||||
|
_, err := chunker.Next()
|
||||||
|
if err == nil || err.Error() != "test error" {
|
||||||
|
t.Errorf("expected test error, got %v", err)
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue