|
|
|
@ -16,12 +16,14 @@ const (
|
|
|
|
|
streamVersion = 1
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
// NewStreamHeader creates a new StreamHeader.
|
|
|
|
|
func NewStreamHeader() *StreamHeader {
|
|
|
|
|
return &StreamHeader{
|
|
|
|
|
Version: streamVersion,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// NewStreamHeaderFromReader reads a StreamHeader from the given reader.
|
|
|
|
|
func NewStreamHeaderFromReader(r io.Reader) (*StreamHeader, int64, error) {
|
|
|
|
|
var totalSizeRead int64
|
|
|
|
|
|
|
|
|
@ -51,6 +53,7 @@ func NewStreamHeaderFromReader(r io.Reader) (*StreamHeader, int64, error) {
|
|
|
|
|
return strHdr, totalSizeRead, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// FileSize returns the total size of the files in the snapshot.
|
|
|
|
|
func (s *StreamHeader) FileSize() int64 {
|
|
|
|
|
if fs := s.GetFullSnapshot(); fs != nil {
|
|
|
|
|
var size int64
|
|
|
|
@ -63,6 +66,7 @@ func (s *StreamHeader) FileSize() int64 {
|
|
|
|
|
return 0
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Stream is a stream of data that can be read from a snapshot.
|
|
|
|
|
type Stream struct {
|
|
|
|
|
headerLen int64
|
|
|
|
|
|
|
|
|
@ -71,6 +75,8 @@ type Stream struct {
|
|
|
|
|
totalFileSize int64
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// NewIncrementalStream creates a new stream from a byte slice, presumably
|
|
|
|
|
// representing WAL data.
|
|
|
|
|
func NewIncrementalStream(data []byte) (*Stream, error) {
|
|
|
|
|
strHdr := NewStreamHeader()
|
|
|
|
|
strHdr.Payload = &StreamHeader_IncrementalSnapshot{
|
|
|
|
@ -92,6 +98,8 @@ func NewIncrementalStream(data []byte) (*Stream, error) {
|
|
|
|
|
}, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// NewFullStream creates a new stream from a SQLite file and 0 or more
|
|
|
|
|
// WAL files.
|
|
|
|
|
func NewFullStream(files ...string) (*Stream, error) {
|
|
|
|
|
if len(files) == 0 {
|
|
|
|
|
return nil, errors.New("no files provided")
|
|
|
|
@ -155,10 +163,13 @@ func NewFullStream(files ...string) (*Stream, error) {
|
|
|
|
|
}, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Size returns the total number of bytes that will be read from the stream,
|
|
|
|
|
// if the stream is fully read.
|
|
|
|
|
func (s *Stream) Size() int64 {
|
|
|
|
|
return int64(strHeaderLenSize + int64(s.headerLen) + s.totalFileSize)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Read reads from the stream.
|
|
|
|
|
func (s *Stream) Read(p []byte) (n int, err error) {
|
|
|
|
|
if s.readClosersIdx >= len(s.readClosers) {
|
|
|
|
|
return 0, io.EOF
|
|
|
|
@ -176,6 +187,7 @@ func (s *Stream) Read(p []byte) (n int, err error) {
|
|
|
|
|
return n, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Close closes the stream.
|
|
|
|
|
func (s *Stream) Close() error {
|
|
|
|
|
for _, r := range s.readClosers {
|
|
|
|
|
if err := r.Close(); err != nil {
|
|
|
|
|