diff --git a/cluster/message.pb.go b/cluster/message.pb.go index aeb27f96..bd9e8722 100644 --- a/cluster/message.pb.go +++ b/cluster/message.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.30.0 +// protoc-gen-go v1.31.0 // protoc v3.6.1 // source: message.proto diff --git a/command/chunking/chunker.go b/command/chunking/chunker.go new file mode 100644 index 00000000..368e6f5e --- /dev/null +++ b/command/chunking/chunker.go @@ -0,0 +1,142 @@ +package chunking + +import ( + "bytes" + "compress/gzip" + "crypto/rand" + "errors" + "fmt" + "io" + "sync" + + "github.com/rqlite/rqlite/command" +) + +const ( + internalChunkSize = 512 * 1024 +) + +// Define a sync.Pool to pool the buffers. +var bufferPool = sync.Pool{ + New: func() interface{} { + return bytes.NewBuffer(nil) + }, +} + +// Define a sync.Pool to pool the gzip writers. +var gzipWriterPool = sync.Pool{ + New: func() interface{} { + return gzip.NewWriter(nil) + }, +} + +// Chunker is a reader that reads from an underlying io.Reader and returns +// LoadChunkRequests of a given size. +type Chunker struct { + r io.Reader + chunkSize int64 + streamID string + sequenceNum int64 + finished bool +} + +// NewChunker returns a new Chunker that reads from r and returns +// LoadChunkRequests of size chunkSize. +func NewChunker(r io.Reader, chunkSize int64) *Chunker { + return &Chunker{ + r: r, + chunkSize: chunkSize, + streamID: generateStreamID(), + } +} + +func generateStreamID() string { + b := make([]byte, 16) + _, err := io.ReadFull(rand.Reader, b) + if err != nil { + return "" + } + // Convert the random bytes into the format of a UUID. + return fmt.Sprintf("%x-%x-%x-%x-%x", + b[0:4], b[4:6], b[6:8], b[8:10], b[10:]) +} + +// Next returns the next LoadChunkRequest, or io.EOF if finished. +func (c *Chunker) Next() (*command.LoadChunkRequest, error) { + if c.finished { + return nil, io.EOF + } + + // Get a buffer from the pool. + buf := bufferPool.Get().(*bytes.Buffer) + buf.Reset() + defer bufferPool.Put(buf) + + // Get a gzip.Writer from the pool. + gw := gzipWriterPool.Get().(*gzip.Writer) + defer gzipWriterPool.Put(gw) + + // Reset the gzip.Writer to use the buffer. + gw.Reset(buf) + + // Create an intermediate buffer to read into + intermediateBuffer := make([]byte, min(internalChunkSize, c.chunkSize)) + + // Read data into intermediate buffer and write to gzip.Writer + var totalRead int64 = 0 + for totalRead < c.chunkSize { + n, err := c.r.Read(intermediateBuffer) + totalRead += int64(n) + if n > 0 { + if _, err = gw.Write(intermediateBuffer[:n]); err != nil { + return nil, err + } + } + if err != nil { + if err == io.EOF { + break + } else { + return nil, err + } + } + } + + // Close the gzip.Writer to ensure all data is written to the buffer + if err := gw.Close(); err != nil { + return nil, errors.New("failed to close gzip writer: " + err.Error()) + } + + if totalRead < c.chunkSize { + c.finished = true + } + + // If we didn't write any data, then we're finished + if totalRead == 0 { + // If no previous chunks were sent at all signal that. + if c.sequenceNum == 0 { + return nil, io.EOF + } + // If previous chunks were sent, return a final empty chunk with IsLast = true + return &command.LoadChunkRequest{ + StreamId: c.streamID, + SequenceNum: c.sequenceNum + 1, + IsLast: true, + Data: nil, + }, nil + } + + c.sequenceNum++ + return &command.LoadChunkRequest{ + StreamId: c.streamID, + SequenceNum: c.sequenceNum, + IsLast: totalRead < c.chunkSize, + Data: buf.Bytes(), + }, nil +} + +func min(a, b int64) int64 { + if a < b { + return a + } + return b +} diff --git a/command/chunking/chunker_test.go b/command/chunking/chunker_test.go new file mode 100644 index 00000000..e836c3b8 --- /dev/null +++ b/command/chunking/chunker_test.go @@ -0,0 +1,258 @@ +package chunking + +import ( + "bytes" + "compress/gzip" + "errors" + "io" + "testing" +) + +// Test_ChunkerEmptyReader tests that a Chunker created with an empty reader +// returns io.EOF on the first call to Next. +func Test_ChunkerEmptyReader(t *testing.T) { + // Create an empty reader that immediately returns io.EOF + r := bytes.NewReader([]byte{}) + chunker := NewChunker(r, 1024) + + // Expect the first call to Next to return io.EOF + _, err := chunker.Next() + if err != io.EOF { + t.Fatalf("expected io.EOF, got %v", err) + } + + // Further calls to Next should also return io.EOF + _, err = chunker.Next() + if err != io.EOF { + t.Fatalf("expected io.EOF, got %v", err) + } +} + +// Test_ChunkerSingleChunk tests that a Chunker created with a reader that +// contains a single chunk returns the expected chunk, when the chunk size is +// smaller than the amount of data in the reader. +func Test_ChunkerSingleChunk(t *testing.T) { + data := []byte("Hello, world!") + chunker := NewChunker(bytes.NewReader(data), 32) + + chunk, err := chunker.Next() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // The expected sequence number should be 1 + if chunk.SequenceNum != 1 { + t.Errorf("unexpected sequence number: got %d, want %d", chunk.SequenceNum, 1) + } + + // Should be the last chunk + if chunk.IsLast != true { + t.Errorf("unexpected IsLast value: got %v, want %v", chunk.IsLast, true) + } + + // Decompress the gzip data and compare it with expected + gzipReader, err := gzip.NewReader(bytes.NewReader(chunk.Data)) + if err != nil { + t.Fatalf("failed to create gzip reader: %v", err) + } + defer gzipReader.Close() + + decompressed := new(bytes.Buffer) + if _, err = io.Copy(decompressed, gzipReader); err != nil { + t.Fatalf("failed to decompress data: %v", err) + } + + if decompressed.String() != string(data) { + t.Errorf("unexpected chunk data: got %s, want %s", decompressed.String(), string(data)) + } + + // After all chunks are read, Next should return nil, io.EOF + chunk, err = chunker.Next() + if chunk != nil || err != io.EOF { + t.Errorf("expected (nil, io.EOF), got (%v, %v)", chunk, err) + } +} + +// Test_ChunkerSingleChunkLarge tests that a Chunker created with a reader that +// contains a single chunk returns the expected chunk, when the chunk size is +// much larger than the amount of data in the reader, and is larger than the +// internal chunk size. +func Test_ChunkerSingleChunkLarge(t *testing.T) { + data := []byte("Hello, world!") + chunker := NewChunker(bytes.NewReader(data), internalChunkSize*3) + + chunk, err := chunker.Next() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // The expected sequence number should be 1 + if chunk.SequenceNum != 1 { + t.Errorf("unexpected sequence number: got %d, want %d", chunk.SequenceNum, 1) + } + + // Should be the last chunk + if chunk.IsLast != true { + t.Errorf("unexpected IsLast value: got %v, want %v", chunk.IsLast, true) + } + + // Decompress the gzip data and compare it with expected + gzipReader, err := gzip.NewReader(bytes.NewReader(chunk.Data)) + if err != nil { + t.Fatalf("failed to create gzip reader: %v", err) + } + defer gzipReader.Close() + + decompressed := new(bytes.Buffer) + if _, err = io.Copy(decompressed, gzipReader); err != nil { + t.Fatalf("failed to decompress data: %v", err) + } + + if decompressed.String() != string(data) { + t.Errorf("unexpected chunk data: got %s, want %s", decompressed.String(), string(data)) + } + + // After all chunks are read, Next should return nil, io.EOF + chunk, err = chunker.Next() + if chunk != nil || err != io.EOF { + t.Errorf("expected (nil, io.EOF), got (%v, %v)", chunk, err) + } +} + +// Test_ChunkerSingleChunkExact tests that a Chunker created with a reader that +// contains a single chunk returns the expected chunk, when the chunk size is +// exactly the amount of data in the reader. +func Test_ChunkerSingleChunkExact(t *testing.T) { + data := []byte("Hello") + chunker := NewChunker(bytes.NewReader(data), 5) + + chunk, err := chunker.Next() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // The expected sequence number should be 1 + if chunk.SequenceNum != 1 { + t.Errorf("unexpected sequence number: got %d, want %d", chunk.SequenceNum, 1) + } + + // Won't be last since chunker doesn't know the size of the data + if chunk.IsLast { + t.Errorf("unexpected IsLast value: got %v, want %v", chunk.IsLast, true) + } + + // Decompress the gzip data and compare it with expected + gzipReader, err := gzip.NewReader(bytes.NewReader(chunk.Data)) + if err != nil { + t.Fatalf("failed to create gzip reader: %v", err) + } + defer gzipReader.Close() + + decompressed := new(bytes.Buffer) + if _, err = io.Copy(decompressed, gzipReader); err != nil { + t.Fatalf("failed to decompress data: %v", err) + } + + if decompressed.String() != string(data) { + t.Errorf("unexpected chunk data: got %s, want %s", decompressed.String(), string(data)) + } + + // Call Next again, get a second chunk, which should be the last chunk + chunk, err = chunker.Next() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // The expected sequence number should be 2 + if chunk.SequenceNum != 2 { + t.Errorf("unexpected sequence number: got %d, want %d", chunk.SequenceNum, 2) + } + + // Should be the last chunk + if !chunk.IsLast { + t.Errorf("unexpected IsLast value: got %v, want %v", chunk.IsLast, true) + } + if chunk.Data != nil { + t.Errorf("unexpected chunk data: got %v, want %v", chunk.Data, nil) + } + + // After all chunks are read, Next should return nil, io.EOF + chunk, err = chunker.Next() + if chunk != nil || err != io.EOF { + t.Errorf("expected (nil, io.EOF), got (%v, %v)", chunk, err) + } +} + +// Test_ChunkerMultiChunks tests that a Chunker created with a reader that +// that contains enough data that multiple chunks should be returned, returns +// the expected chunks. +func Test_ChunkerMultiChunks(t *testing.T) { + data := []byte("Hello, world!") + chunkSize := int64(5) + + chunker := NewChunker(bytes.NewReader(data), chunkSize) + + // Expected chunks: "Hello", ", wor", "ld!" + expectedChunks := []string{ + "Hello", + ", wor", + "ld!", + } + + for i, expected := range expectedChunks { + chunk, err := chunker.Next() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // The expected sequence number should be i+1 + if chunk.SequenceNum != int64(i+1) { + t.Errorf("unexpected sequence number: got %d, want %d", chunk.SequenceNum, i+1) + } + + // The expected IsLast value should be true only for the last chunk + expectedIsLast := i == len(expectedChunks)-1 + if chunk.IsLast != expectedIsLast { + t.Errorf("unexpected IsLast value: got %v, want %v", chunk.IsLast, expectedIsLast) + } + + // Decompress the gzip data and compare it with expected + gzipReader, err := gzip.NewReader(bytes.NewReader(chunk.Data)) + if err != nil { + t.Fatalf("failed to create gzip reader: %v", err) + } + defer gzipReader.Close() + + decompressed := new(bytes.Buffer) + if _, err = io.Copy(decompressed, gzipReader); err != nil { + t.Fatalf("failed to decompress data: %v", err) + } + + if decompressed.String() != expected { + t.Errorf("unexpected chunk data: got %s, want %s", decompressed.String(), expected) + } + } + + // After all chunks are read, Next should return nil, io.EOF + chunk, err := chunker.Next() + if chunk != nil || err != io.EOF { + t.Errorf("expected (nil, io.EOF), got (%v, %v)", chunk, err) + } +} + +type errorReader struct{} + +func (r *errorReader) Read([]byte) (int, error) { + return 0, errors.New("test error") +} + +// Test_ChunkerReaderError tests that a Chunker created with a reader that +// returns an error other than io.EOF returns that error. +func Test_ChunkerReaderError(t *testing.T) { + chunker := NewChunker(&errorReader{}, 1024) + + _, err := chunker.Next() + if err == nil || err.Error() != "test error" { + t.Errorf("expected test error, got %v", err) + } +} diff --git a/command/command.pb.go b/command/command.pb.go index c6ca9c87..5d889247 100644 --- a/command/command.pb.go +++ b/command/command.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.30.0 +// protoc-gen-go v1.31.0 // protoc v3.6.1 // source: command.proto @@ -176,7 +176,7 @@ func (x Command_Type) Number() protoreflect.EnumNumber { // Deprecated: Use Command_Type.Descriptor instead. func (Command_Type) EnumDescriptor() ([]byte, []int) { - return file_command_proto_rawDescGZIP(), []int{16, 0} + return file_command_proto_rawDescGZIP(), []int{17, 0} } type Parameter struct { @@ -1011,6 +1011,77 @@ func (x *LoadRequest) GetData() []byte { return nil } +type LoadChunkRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + StreamId string `protobuf:"bytes,1,opt,name=stream_id,json=streamId,proto3" json:"stream_id,omitempty"` + SequenceNum int64 `protobuf:"varint,2,opt,name=sequence_num,json=sequenceNum,proto3" json:"sequence_num,omitempty"` + IsLast bool `protobuf:"varint,3,opt,name=is_last,json=isLast,proto3" json:"is_last,omitempty"` + Data []byte `protobuf:"bytes,4,opt,name=data,proto3" json:"data,omitempty"` +} + +func (x *LoadChunkRequest) Reset() { + *x = LoadChunkRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_command_proto_msgTypes[12] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *LoadChunkRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*LoadChunkRequest) ProtoMessage() {} + +func (x *LoadChunkRequest) ProtoReflect() protoreflect.Message { + mi := &file_command_proto_msgTypes[12] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use LoadChunkRequest.ProtoReflect.Descriptor instead. +func (*LoadChunkRequest) Descriptor() ([]byte, []int) { + return file_command_proto_rawDescGZIP(), []int{12} +} + +func (x *LoadChunkRequest) GetStreamId() string { + if x != nil { + return x.StreamId + } + return "" +} + +func (x *LoadChunkRequest) GetSequenceNum() int64 { + if x != nil { + return x.SequenceNum + } + return 0 +} + +func (x *LoadChunkRequest) GetIsLast() bool { + if x != nil { + return x.IsLast + } + return false +} + +func (x *LoadChunkRequest) GetData() []byte { + if x != nil { + return x.Data + } + return nil +} + type JoinRequest struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -1024,7 +1095,7 @@ type JoinRequest struct { func (x *JoinRequest) Reset() { *x = JoinRequest{} if protoimpl.UnsafeEnabled { - mi := &file_command_proto_msgTypes[12] + mi := &file_command_proto_msgTypes[13] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1037,7 +1108,7 @@ func (x *JoinRequest) String() string { func (*JoinRequest) ProtoMessage() {} func (x *JoinRequest) ProtoReflect() protoreflect.Message { - mi := &file_command_proto_msgTypes[12] + mi := &file_command_proto_msgTypes[13] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1050,7 +1121,7 @@ func (x *JoinRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use JoinRequest.ProtoReflect.Descriptor instead. func (*JoinRequest) Descriptor() ([]byte, []int) { - return file_command_proto_rawDescGZIP(), []int{12} + return file_command_proto_rawDescGZIP(), []int{13} } func (x *JoinRequest) GetId() string { @@ -1086,7 +1157,7 @@ type NotifyRequest struct { func (x *NotifyRequest) Reset() { *x = NotifyRequest{} if protoimpl.UnsafeEnabled { - mi := &file_command_proto_msgTypes[13] + mi := &file_command_proto_msgTypes[14] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1099,7 +1170,7 @@ func (x *NotifyRequest) String() string { func (*NotifyRequest) ProtoMessage() {} func (x *NotifyRequest) ProtoReflect() protoreflect.Message { - mi := &file_command_proto_msgTypes[13] + mi := &file_command_proto_msgTypes[14] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1112,7 +1183,7 @@ func (x *NotifyRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use NotifyRequest.ProtoReflect.Descriptor instead. func (*NotifyRequest) Descriptor() ([]byte, []int) { - return file_command_proto_rawDescGZIP(), []int{13} + return file_command_proto_rawDescGZIP(), []int{14} } func (x *NotifyRequest) GetId() string { @@ -1140,7 +1211,7 @@ type RemoveNodeRequest struct { func (x *RemoveNodeRequest) Reset() { *x = RemoveNodeRequest{} if protoimpl.UnsafeEnabled { - mi := &file_command_proto_msgTypes[14] + mi := &file_command_proto_msgTypes[15] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1153,7 +1224,7 @@ func (x *RemoveNodeRequest) String() string { func (*RemoveNodeRequest) ProtoMessage() {} func (x *RemoveNodeRequest) ProtoReflect() protoreflect.Message { - mi := &file_command_proto_msgTypes[14] + mi := &file_command_proto_msgTypes[15] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1166,7 +1237,7 @@ func (x *RemoveNodeRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use RemoveNodeRequest.ProtoReflect.Descriptor instead. func (*RemoveNodeRequest) Descriptor() ([]byte, []int) { - return file_command_proto_rawDescGZIP(), []int{14} + return file_command_proto_rawDescGZIP(), []int{15} } func (x *RemoveNodeRequest) GetId() string { @@ -1187,7 +1258,7 @@ type Noop struct { func (x *Noop) Reset() { *x = Noop{} if protoimpl.UnsafeEnabled { - mi := &file_command_proto_msgTypes[15] + mi := &file_command_proto_msgTypes[16] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1200,7 +1271,7 @@ func (x *Noop) String() string { func (*Noop) ProtoMessage() {} func (x *Noop) ProtoReflect() protoreflect.Message { - mi := &file_command_proto_msgTypes[15] + mi := &file_command_proto_msgTypes[16] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1213,7 +1284,7 @@ func (x *Noop) ProtoReflect() protoreflect.Message { // Deprecated: Use Noop.ProtoReflect.Descriptor instead. func (*Noop) Descriptor() ([]byte, []int) { - return file_command_proto_rawDescGZIP(), []int{15} + return file_command_proto_rawDescGZIP(), []int{16} } func (x *Noop) GetId() string { @@ -1236,7 +1307,7 @@ type Command struct { func (x *Command) Reset() { *x = Command{} if protoimpl.UnsafeEnabled { - mi := &file_command_proto_msgTypes[16] + mi := &file_command_proto_msgTypes[17] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -1249,7 +1320,7 @@ func (x *Command) String() string { func (*Command) ProtoMessage() {} func (x *Command) ProtoReflect() protoreflect.Message { - mi := &file_command_proto_msgTypes[16] + mi := &file_command_proto_msgTypes[17] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -1262,7 +1333,7 @@ func (x *Command) ProtoReflect() protoreflect.Message { // Deprecated: Use Command.ProtoReflect.Descriptor instead. func (*Command) Descriptor() ([]byte, []int) { - return file_command_proto_rawDescGZIP(), []int{16} + return file_command_proto_rawDescGZIP(), []int{17} } func (x *Command) GetType() Command_Type { @@ -1387,41 +1458,49 @@ var file_command_proto_rawDesc = []byte{ 0x45, 0x53, 0x54, 0x5f, 0x46, 0x4f, 0x52, 0x4d, 0x41, 0x54, 0x5f, 0x42, 0x49, 0x4e, 0x41, 0x52, 0x59, 0x10, 0x02, 0x22, 0x21, 0x0a, 0x0b, 0x4c, 0x6f, 0x61, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, - 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x22, 0x4d, 0x0a, 0x0b, 0x4a, 0x6f, 0x69, 0x6e, 0x52, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x18, 0x0a, 0x07, 0x61, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, - 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x61, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x12, - 0x14, 0x0a, 0x05, 0x76, 0x6f, 0x74, 0x65, 0x72, 0x18, 0x03, 0x20, 0x01, 0x28, 0x08, 0x52, 0x05, - 0x76, 0x6f, 0x74, 0x65, 0x72, 0x22, 0x39, 0x0a, 0x0d, 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x79, 0x52, + 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x22, 0x7f, 0x0a, 0x10, 0x4c, 0x6f, 0x61, 0x64, 0x43, 0x68, + 0x75, 0x6e, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1b, 0x0a, 0x09, 0x73, 0x74, + 0x72, 0x65, 0x61, 0x6d, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x73, + 0x74, 0x72, 0x65, 0x61, 0x6d, 0x49, 0x64, 0x12, 0x21, 0x0a, 0x0c, 0x73, 0x65, 0x71, 0x75, 0x65, + 0x6e, 0x63, 0x65, 0x5f, 0x6e, 0x75, 0x6d, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0b, 0x73, + 0x65, 0x71, 0x75, 0x65, 0x6e, 0x63, 0x65, 0x4e, 0x75, 0x6d, 0x12, 0x17, 0x0a, 0x07, 0x69, 0x73, + 0x5f, 0x6c, 0x61, 0x73, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x08, 0x52, 0x06, 0x69, 0x73, 0x4c, + 0x61, 0x73, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x04, 0x20, 0x01, 0x28, + 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x22, 0x4d, 0x0a, 0x0b, 0x4a, 0x6f, 0x69, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x18, 0x0a, 0x07, 0x61, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x61, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, - 0x22, 0x23, 0x0a, 0x11, 0x52, 0x65, 0x6d, 0x6f, 0x76, 0x65, 0x4e, 0x6f, 0x64, 0x65, 0x52, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x02, 0x69, 0x64, 0x22, 0x16, 0x0a, 0x04, 0x4e, 0x6f, 0x6f, 0x70, 0x12, 0x0e, 0x0a, - 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x22, 0xaf, 0x02, - 0x0a, 0x07, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x12, 0x29, 0x0a, 0x04, 0x74, 0x79, 0x70, - 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x15, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, - 0x64, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, - 0x74, 0x79, 0x70, 0x65, 0x12, 0x1f, 0x0a, 0x0b, 0x73, 0x75, 0x62, 0x5f, 0x63, 0x6f, 0x6d, 0x6d, - 0x61, 0x6e, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0a, 0x73, 0x75, 0x62, 0x43, 0x6f, - 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x12, 0x1e, 0x0a, 0x0a, 0x63, 0x6f, 0x6d, 0x70, 0x72, 0x65, 0x73, - 0x73, 0x65, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0a, 0x63, 0x6f, 0x6d, 0x70, 0x72, - 0x65, 0x73, 0x73, 0x65, 0x64, 0x22, 0xb7, 0x01, 0x0a, 0x04, 0x54, 0x79, 0x70, 0x65, 0x12, 0x18, - 0x0a, 0x14, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x55, - 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x16, 0x0a, 0x12, 0x43, 0x4f, 0x4d, 0x4d, - 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x51, 0x55, 0x45, 0x52, 0x59, 0x10, 0x01, - 0x12, 0x18, 0x0a, 0x14, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, - 0x5f, 0x45, 0x58, 0x45, 0x43, 0x55, 0x54, 0x45, 0x10, 0x02, 0x12, 0x15, 0x0a, 0x11, 0x43, 0x4f, - 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x4e, 0x4f, 0x4f, 0x50, 0x10, - 0x03, 0x12, 0x15, 0x0a, 0x11, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, - 0x45, 0x5f, 0x4c, 0x4f, 0x41, 0x44, 0x10, 0x04, 0x12, 0x15, 0x0a, 0x11, 0x43, 0x4f, 0x4d, 0x4d, - 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x4a, 0x4f, 0x49, 0x4e, 0x10, 0x05, 0x12, - 0x1e, 0x0a, 0x1a, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, - 0x45, 0x58, 0x45, 0x43, 0x55, 0x54, 0x45, 0x5f, 0x51, 0x55, 0x45, 0x52, 0x59, 0x10, 0x06, 0x42, - 0x22, 0x5a, 0x20, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x72, 0x71, - 0x6c, 0x69, 0x74, 0x65, 0x2f, 0x72, 0x71, 0x6c, 0x69, 0x74, 0x65, 0x2f, 0x63, 0x6f, 0x6d, 0x6d, - 0x61, 0x6e, 0x64, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x12, 0x14, 0x0a, 0x05, 0x76, 0x6f, 0x74, 0x65, 0x72, 0x18, 0x03, 0x20, 0x01, 0x28, 0x08, 0x52, + 0x05, 0x76, 0x6f, 0x74, 0x65, 0x72, 0x22, 0x39, 0x0a, 0x0d, 0x4e, 0x6f, 0x74, 0x69, 0x66, 0x79, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x18, 0x0a, 0x07, 0x61, 0x64, 0x64, 0x72, 0x65, + 0x73, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x61, 0x64, 0x64, 0x72, 0x65, 0x73, + 0x73, 0x22, 0x23, 0x0a, 0x11, 0x52, 0x65, 0x6d, 0x6f, 0x76, 0x65, 0x4e, 0x6f, 0x64, 0x65, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x22, 0x16, 0x0a, 0x04, 0x4e, 0x6f, 0x6f, 0x70, 0x12, 0x0e, + 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x22, 0xaf, + 0x02, 0x0a, 0x07, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x12, 0x29, 0x0a, 0x04, 0x74, 0x79, + 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x15, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x61, + 0x6e, 0x64, 0x2e, 0x43, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e, 0x54, 0x79, 0x70, 0x65, 0x52, + 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x1f, 0x0a, 0x0b, 0x73, 0x75, 0x62, 0x5f, 0x63, 0x6f, 0x6d, + 0x6d, 0x61, 0x6e, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0a, 0x73, 0x75, 0x62, 0x43, + 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x12, 0x1e, 0x0a, 0x0a, 0x63, 0x6f, 0x6d, 0x70, 0x72, 0x65, + 0x73, 0x73, 0x65, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0a, 0x63, 0x6f, 0x6d, 0x70, + 0x72, 0x65, 0x73, 0x73, 0x65, 0x64, 0x22, 0xb7, 0x01, 0x0a, 0x04, 0x54, 0x79, 0x70, 0x65, 0x12, + 0x18, 0x0a, 0x14, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, + 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x16, 0x0a, 0x12, 0x43, 0x4f, 0x4d, + 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x51, 0x55, 0x45, 0x52, 0x59, 0x10, + 0x01, 0x12, 0x18, 0x0a, 0x14, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, + 0x45, 0x5f, 0x45, 0x58, 0x45, 0x43, 0x55, 0x54, 0x45, 0x10, 0x02, 0x12, 0x15, 0x0a, 0x11, 0x43, + 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x4e, 0x4f, 0x4f, 0x50, + 0x10, 0x03, 0x12, 0x15, 0x0a, 0x11, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, + 0x50, 0x45, 0x5f, 0x4c, 0x4f, 0x41, 0x44, 0x10, 0x04, 0x12, 0x15, 0x0a, 0x11, 0x43, 0x4f, 0x4d, + 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x4a, 0x4f, 0x49, 0x4e, 0x10, 0x05, + 0x12, 0x1e, 0x0a, 0x1a, 0x43, 0x4f, 0x4d, 0x4d, 0x41, 0x4e, 0x44, 0x5f, 0x54, 0x59, 0x50, 0x45, + 0x5f, 0x45, 0x58, 0x45, 0x43, 0x55, 0x54, 0x45, 0x5f, 0x51, 0x55, 0x45, 0x52, 0x59, 0x10, 0x06, + 0x42, 0x22, 0x5a, 0x20, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x72, + 0x71, 0x6c, 0x69, 0x74, 0x65, 0x2f, 0x72, 0x71, 0x6c, 0x69, 0x74, 0x65, 0x2f, 0x63, 0x6f, 0x6d, + 0x6d, 0x61, 0x6e, 0x64, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -1437,7 +1516,7 @@ func file_command_proto_rawDescGZIP() []byte { } var file_command_proto_enumTypes = make([]protoimpl.EnumInfo, 3) -var file_command_proto_msgTypes = make([]protoimpl.MessageInfo, 17) +var file_command_proto_msgTypes = make([]protoimpl.MessageInfo, 18) var file_command_proto_goTypes = []interface{}{ (QueryRequest_Level)(0), // 0: command.QueryRequest.Level (BackupRequest_Format)(0), // 1: command.BackupRequest.Format @@ -1454,11 +1533,12 @@ var file_command_proto_goTypes = []interface{}{ (*ExecuteQueryResponse)(nil), // 12: command.ExecuteQueryResponse (*BackupRequest)(nil), // 13: command.BackupRequest (*LoadRequest)(nil), // 14: command.LoadRequest - (*JoinRequest)(nil), // 15: command.JoinRequest - (*NotifyRequest)(nil), // 16: command.NotifyRequest - (*RemoveNodeRequest)(nil), // 17: command.RemoveNodeRequest - (*Noop)(nil), // 18: command.Noop - (*Command)(nil), // 19: command.Command + (*LoadChunkRequest)(nil), // 15: command.LoadChunkRequest + (*JoinRequest)(nil), // 16: command.JoinRequest + (*NotifyRequest)(nil), // 17: command.NotifyRequest + (*RemoveNodeRequest)(nil), // 18: command.RemoveNodeRequest + (*Noop)(nil), // 19: command.Noop + (*Command)(nil), // 20: command.Command } var file_command_proto_depIdxs = []int32{ 3, // 0: command.Statement.parameters:type_name -> command.Parameter @@ -1632,7 +1712,7 @@ func file_command_proto_init() { } } file_command_proto_msgTypes[12].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*JoinRequest); i { + switch v := v.(*LoadChunkRequest); i { case 0: return &v.state case 1: @@ -1644,7 +1724,7 @@ func file_command_proto_init() { } } file_command_proto_msgTypes[13].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*NotifyRequest); i { + switch v := v.(*JoinRequest); i { case 0: return &v.state case 1: @@ -1656,7 +1736,7 @@ func file_command_proto_init() { } } file_command_proto_msgTypes[14].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*RemoveNodeRequest); i { + switch v := v.(*NotifyRequest); i { case 0: return &v.state case 1: @@ -1668,7 +1748,7 @@ func file_command_proto_init() { } } file_command_proto_msgTypes[15].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*Noop); i { + switch v := v.(*RemoveNodeRequest); i { case 0: return &v.state case 1: @@ -1680,6 +1760,18 @@ func file_command_proto_init() { } } file_command_proto_msgTypes[16].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Noop); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_command_proto_msgTypes[17].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*Command); i { case 0: return &v.state @@ -1710,7 +1802,7 @@ func file_command_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_command_proto_rawDesc, NumEnums: 3, - NumMessages: 17, + NumMessages: 18, NumExtensions: 0, NumServices: 0, }, diff --git a/command/command.proto b/command/command.proto index fb9f98c6..1e5caaff 100644 --- a/command/command.proto +++ b/command/command.proto @@ -89,6 +89,13 @@ message LoadRequest { bytes data = 1; } +message LoadChunkRequest { + string stream_id = 1; + int64 sequence_num = 2; + bool is_last = 3; + bytes data = 4; +} + message JoinRequest { string id = 1; string address = 2;