1
0
Fork 0

Add initial skeleton of WAL Compactor

master
Philip O'Toole 1 year ago
parent 0c0c6cd757
commit 41c59ea531

3
.gitignore vendored

@ -11,6 +11,9 @@ rqbench
**/rqbench
!**/rqbench/
walr
**/walr
# Compiled Object files, Static and Dynamic libs (Shared Objects)
*.o
*.a

@ -0,0 +1,89 @@
package wal
import "io"
// Frame points to a single WAL frame in a WAL file.
type Frame struct {
Pgno uint32
Commit uint32
Offset int64
}
// Compactor is used to compact a WAL file.
type Compactor struct {
r io.ReadSeeker
wr *Reader
}
// NewCompactor returns a new Compactor.
func NewCompactor(r io.ReadSeeker) *Compactor {
return &Compactor{
r: r,
wr: NewReader(r),
}
}
// WriteTo compacts the WAL file to the given writer.
func (c *Compactor) WriteTo(w io.Writer) (n int64, err error) {
frames, err := c.getFrames()
if err != nil {
return 0, err
}
if len(frames) == 0 {
return 0, nil
}
// Copy the WAL header.
if _, err := c.r.Seek(0, io.SeekStart); err != nil {
return 0, err
}
if _, err := io.CopyN(w, c.r, WALHeaderSize); err != nil {
return 0, err
}
// Write each new WAL Frame header, and the associated page data.
for _, f := range frames {
_ = f
}
return 0, nil
}
func (c *Compactor) getFrames() (map[uint32]*Frame, error) {
if err := c.wr.ReadHeader(); err != io.EOF {
return nil, nil
}
// Read the offset of the last version of each page in the WAL.
frames := make(map[uint32]*Frame)
txFrames := make(map[uint32]*Frame)
buf := make([]byte, c.wr.PageSize())
for {
pgno, commit, err := c.wr.ReadFrame(buf)
if err == io.EOF {
break
} else if err != nil {
return nil, err
}
frame := &Frame{
Pgno: pgno,
Commit: commit,
Offset: c.wr.Offset(),
}
// Save latest frame information for each page.
txFrames[pgno] = frame
// If this is not a committing frame, continue to next frame.
if commit == 0 {
continue
}
// At the end of each transaction, copy frame information to main map.
for k, v := range txFrames {
frames[k] = v
}
txFrames = make(map[uint32]*Frame)
}
return frames, nil
}

@ -12,16 +12,16 @@ const (
WALFrameHeaderSize = 24
)
// WALReader wraps an io.Reader and parses SQLite WAL frames.
// Reader wraps an io.Reader and parses SQLite WAL frames.
//
// This reader verifies the salt & checksum integrity while it reads. It does
// not enforce transaction boundaries (i.e. it may return uncommitted frames).
// It is the responsibility of the caller to handle this.
//
// WALReader has been copied from the litefs source code. Many thanks to the
// Reader has been copied from the litefs source code. Many thanks to the
// authors of that software. See https://github.com/superfly/litefs for more
// details.
type WALReader struct {
type Reader struct {
r io.Reader
frameN int
@ -33,17 +33,17 @@ type WALReader struct {
chksum1, chksum2 uint32
}
// NewWALReader returns a new instance of WALReader.
func NewWALReader(r io.Reader) *WALReader {
return &WALReader{r: r}
// NewReader returns a new instance of Reader.
func NewReader(r io.Reader) *Reader {
return &Reader{r: r}
}
// PageSize returns the page size from the header. Must call ReadHeader() first.
func (r *WALReader) PageSize() uint32 { return r.pageSize }
func (r *Reader) PageSize() uint32 { return r.pageSize }
// Offset returns the file offset of the last read frame.
// Returns zero if no frames have been read.
func (r *WALReader) Offset() int64 {
func (r *Reader) Offset() int64 {
if r.frameN == 0 {
return 0
}
@ -51,7 +51,7 @@ func (r *WALReader) Offset() int64 {
}
// ReadHeader reads the WAL header into the reader. Returns io.EOF if WAL is invalid.
func (r *WALReader) ReadHeader() error {
func (r *Reader) ReadHeader() error {
// If we have a partial WAL, then mark WAL as done.
hdr := make([]byte, WALHeaderSize)
if _, err := io.ReadFull(r.r, hdr); err == io.ErrUnexpectedEOF {
@ -94,7 +94,7 @@ func (r *WALReader) ReadHeader() error {
// ReadFrame reads the next frame from the WAL and returns the page number.
// Returns io.EOF at the end of the valid WAL.
func (r *WALReader) ReadFrame(data []byte) (pgno, commit uint32, err error) {
func (r *Reader) ReadFrame(data []byte) (pgno, commit uint32, err error) {
if len(data) != int(r.pageSize) {
return 0, 0, fmt.Errorf("WALReader.ReadFrame(): buffer size (%d) must match page size (%d)", len(data), r.pageSize)
}

@ -8,7 +8,7 @@ import (
"testing"
)
func TestWALReader(t *testing.T) {
func TestReader(t *testing.T) {
t.Run("OK", func(t *testing.T) {
buf := make([]byte, 4096)
b, err := os.ReadFile("testdata/wal-reader/ok/wal")
@ -17,7 +17,7 @@ func TestWALReader(t *testing.T) {
}
// Initialize reader with header info.
r := NewWALReader(bytes.NewReader(b))
r := NewReader(bytes.NewReader(b))
if err := r.ReadHeader(); err != nil {
t.Fatal(err)
} else if got, want := r.PageSize(), uint32(4096); got != want {
@ -78,7 +78,7 @@ func TestWALReader(t *testing.T) {
}
// Initialize reader with header info.
r := NewWALReader(bytes.NewReader(b))
r := NewReader(bytes.NewReader(b))
if err := r.ReadHeader(); err != nil {
t.Fatal(err)
} else if got, want := r.PageSize(), uint32(4096); got != want {
@ -112,7 +112,7 @@ func TestWALReader(t *testing.T) {
}
// Initialize reader with header info.
r := NewWALReader(bytes.NewReader(b))
r := NewReader(bytes.NewReader(b))
if err := r.ReadHeader(); err != nil {
t.Fatal(err)
} else if got, want := r.PageSize(), uint32(4096); got != want {
@ -139,21 +139,21 @@ func TestWALReader(t *testing.T) {
})
t.Run("ZeroLength", func(t *testing.T) {
r := NewWALReader(bytes.NewReader(nil))
r := NewReader(bytes.NewReader(nil))
if err := r.ReadHeader(); err != io.EOF {
t.Fatalf("unexpected error: %#v", err)
}
})
t.Run("PartialHeader", func(t *testing.T) {
r := NewWALReader(bytes.NewReader(make([]byte, 10)))
r := NewReader(bytes.NewReader(make([]byte, 10)))
if err := r.ReadHeader(); err != io.EOF {
t.Fatalf("unexpected error: %#v", err)
}
})
t.Run("BadMagic", func(t *testing.T) {
r := NewWALReader(bytes.NewReader(make([]byte, 32)))
r := NewReader(bytes.NewReader(make([]byte, 32)))
if err := r.ReadHeader(); err == nil || err.Error() != `invalid wal header magic: 0` {
t.Fatalf("unexpected error: %#v", err)
}
@ -165,7 +165,7 @@ func TestWALReader(t *testing.T) {
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}
r := NewWALReader(bytes.NewReader(data))
r := NewReader(bytes.NewReader(data))
if err := r.ReadHeader(); err != io.EOF {
t.Fatalf("unexpected error: %#v", err)
}
@ -177,7 +177,7 @@ func TestWALReader(t *testing.T) {
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x15, 0x7b, 0x20, 0x92, 0xbb, 0xf8, 0x34, 0x1d}
r := NewWALReader(bytes.NewReader(data))
r := NewReader(bytes.NewReader(data))
if err := r.ReadHeader(); err == nil || err.Error() != `unsupported wal version: 1` {
t.Fatalf("unexpected error: %#v", err)
}
@ -190,7 +190,7 @@ func TestWALReader(t *testing.T) {
}
// Initialize reader with header info.
r := NewWALReader(bytes.NewReader(b))
r := NewReader(bytes.NewReader(b))
if err := r.ReadHeader(); err != nil {
t.Fatal(err)
}
@ -205,7 +205,7 @@ func TestWALReader(t *testing.T) {
t.Fatal(err)
}
r := NewWALReader(bytes.NewReader(b[:40]))
r := NewReader(bytes.NewReader(b[:40]))
if err := r.ReadHeader(); err != nil {
t.Fatal(err)
} else if _, _, err := r.ReadFrame(make([]byte, 4096)); err != io.EOF {
@ -219,7 +219,7 @@ func TestWALReader(t *testing.T) {
t.Fatal(err)
}
r := NewWALReader(bytes.NewReader(b[:56]))
r := NewReader(bytes.NewReader(b[:56]))
if err := r.ReadHeader(); err != nil {
t.Fatal(err)
} else if _, _, err := r.ReadFrame(make([]byte, 4096)); err != io.EOF {
@ -233,7 +233,7 @@ func TestWALReader(t *testing.T) {
t.Fatal(err)
}
r := NewWALReader(bytes.NewReader(b[:1000]))
r := NewReader(bytes.NewReader(b[:1000]))
if err := r.ReadHeader(); err != nil {
t.Fatal(err)
} else if _, _, err := r.ReadFrame(make([]byte, 4096)); err != io.EOF {

@ -35,7 +35,7 @@ func main() {
os.Exit(1)
}
r := wal.NewWALReader(walFD)
r := wal.NewReader(walFD)
if err := r.ReadHeader(); err != nil {
fmt.Println("failed to read WAL header:", err)
os.Exit(1)

Loading…
Cancel
Save