diff --git a/.gitignore b/.gitignore index 2767c1c0..a741043b 100644 --- a/.gitignore +++ b/.gitignore @@ -11,6 +11,9 @@ rqbench **/rqbench !**/rqbench/ +walr +**/walr + # Compiled Object files, Static and Dynamic libs (Shared Objects) *.o *.a diff --git a/db/wal/compactor.go b/db/wal/compactor.go new file mode 100644 index 00000000..d5a4c97f --- /dev/null +++ b/db/wal/compactor.go @@ -0,0 +1,89 @@ +package wal + +import "io" + +// Frame points to a single WAL frame in a WAL file. +type Frame struct { + Pgno uint32 + Commit uint32 + Offset int64 +} + +// Compactor is used to compact a WAL file. +type Compactor struct { + r io.ReadSeeker + wr *Reader +} + +// NewCompactor returns a new Compactor. +func NewCompactor(r io.ReadSeeker) *Compactor { + return &Compactor{ + r: r, + wr: NewReader(r), + } +} + +// WriteTo compacts the WAL file to the given writer. +func (c *Compactor) WriteTo(w io.Writer) (n int64, err error) { + frames, err := c.getFrames() + if err != nil { + return 0, err + } + if len(frames) == 0 { + return 0, nil + } + + // Copy the WAL header. + if _, err := c.r.Seek(0, io.SeekStart); err != nil { + return 0, err + } + if _, err := io.CopyN(w, c.r, WALHeaderSize); err != nil { + return 0, err + } + + // Write each new WAL Frame header, and the associated page data. + for _, f := range frames { + _ = f + } + return 0, nil +} + +func (c *Compactor) getFrames() (map[uint32]*Frame, error) { + if err := c.wr.ReadHeader(); err != io.EOF { + return nil, nil + } + + // Read the offset of the last version of each page in the WAL. + frames := make(map[uint32]*Frame) + txFrames := make(map[uint32]*Frame) + buf := make([]byte, c.wr.PageSize()) + for { + pgno, commit, err := c.wr.ReadFrame(buf) + if err == io.EOF { + break + } else if err != nil { + return nil, err + } + frame := &Frame{ + Pgno: pgno, + Commit: commit, + Offset: c.wr.Offset(), + } + + // Save latest frame information for each page. + txFrames[pgno] = frame + + // If this is not a committing frame, continue to next frame. + if commit == 0 { + continue + } + + // At the end of each transaction, copy frame information to main map. + for k, v := range txFrames { + frames[k] = v + } + txFrames = make(map[uint32]*Frame) + } + + return frames, nil +} diff --git a/db/wal/reader.go b/db/wal/reader.go index 07ffd313..466a9ea8 100644 --- a/db/wal/reader.go +++ b/db/wal/reader.go @@ -12,16 +12,16 @@ const ( WALFrameHeaderSize = 24 ) -// WALReader wraps an io.Reader and parses SQLite WAL frames. +// Reader wraps an io.Reader and parses SQLite WAL frames. // // This reader verifies the salt & checksum integrity while it reads. It does // not enforce transaction boundaries (i.e. it may return uncommitted frames). // It is the responsibility of the caller to handle this. // -// WALReader has been copied from the litefs source code. Many thanks to the +// Reader has been copied from the litefs source code. Many thanks to the // authors of that software. See https://github.com/superfly/litefs for more // details. -type WALReader struct { +type Reader struct { r io.Reader frameN int @@ -33,17 +33,17 @@ type WALReader struct { chksum1, chksum2 uint32 } -// NewWALReader returns a new instance of WALReader. -func NewWALReader(r io.Reader) *WALReader { - return &WALReader{r: r} +// NewReader returns a new instance of Reader. +func NewReader(r io.Reader) *Reader { + return &Reader{r: r} } // PageSize returns the page size from the header. Must call ReadHeader() first. -func (r *WALReader) PageSize() uint32 { return r.pageSize } +func (r *Reader) PageSize() uint32 { return r.pageSize } // Offset returns the file offset of the last read frame. // Returns zero if no frames have been read. -func (r *WALReader) Offset() int64 { +func (r *Reader) Offset() int64 { if r.frameN == 0 { return 0 } @@ -51,7 +51,7 @@ func (r *WALReader) Offset() int64 { } // ReadHeader reads the WAL header into the reader. Returns io.EOF if WAL is invalid. -func (r *WALReader) ReadHeader() error { +func (r *Reader) ReadHeader() error { // If we have a partial WAL, then mark WAL as done. hdr := make([]byte, WALHeaderSize) if _, err := io.ReadFull(r.r, hdr); err == io.ErrUnexpectedEOF { @@ -94,7 +94,7 @@ func (r *WALReader) ReadHeader() error { // ReadFrame reads the next frame from the WAL and returns the page number. // Returns io.EOF at the end of the valid WAL. -func (r *WALReader) ReadFrame(data []byte) (pgno, commit uint32, err error) { +func (r *Reader) ReadFrame(data []byte) (pgno, commit uint32, err error) { if len(data) != int(r.pageSize) { return 0, 0, fmt.Errorf("WALReader.ReadFrame(): buffer size (%d) must match page size (%d)", len(data), r.pageSize) } diff --git a/db/wal/reader_test.go b/db/wal/reader_test.go index aaf49345..f262d4c3 100644 --- a/db/wal/reader_test.go +++ b/db/wal/reader_test.go @@ -8,7 +8,7 @@ import ( "testing" ) -func TestWALReader(t *testing.T) { +func TestReader(t *testing.T) { t.Run("OK", func(t *testing.T) { buf := make([]byte, 4096) b, err := os.ReadFile("testdata/wal-reader/ok/wal") @@ -17,7 +17,7 @@ func TestWALReader(t *testing.T) { } // Initialize reader with header info. - r := NewWALReader(bytes.NewReader(b)) + r := NewReader(bytes.NewReader(b)) if err := r.ReadHeader(); err != nil { t.Fatal(err) } else if got, want := r.PageSize(), uint32(4096); got != want { @@ -78,7 +78,7 @@ func TestWALReader(t *testing.T) { } // Initialize reader with header info. - r := NewWALReader(bytes.NewReader(b)) + r := NewReader(bytes.NewReader(b)) if err := r.ReadHeader(); err != nil { t.Fatal(err) } else if got, want := r.PageSize(), uint32(4096); got != want { @@ -112,7 +112,7 @@ func TestWALReader(t *testing.T) { } // Initialize reader with header info. - r := NewWALReader(bytes.NewReader(b)) + r := NewReader(bytes.NewReader(b)) if err := r.ReadHeader(); err != nil { t.Fatal(err) } else if got, want := r.PageSize(), uint32(4096); got != want { @@ -139,21 +139,21 @@ func TestWALReader(t *testing.T) { }) t.Run("ZeroLength", func(t *testing.T) { - r := NewWALReader(bytes.NewReader(nil)) + r := NewReader(bytes.NewReader(nil)) if err := r.ReadHeader(); err != io.EOF { t.Fatalf("unexpected error: %#v", err) } }) t.Run("PartialHeader", func(t *testing.T) { - r := NewWALReader(bytes.NewReader(make([]byte, 10))) + r := NewReader(bytes.NewReader(make([]byte, 10))) if err := r.ReadHeader(); err != io.EOF { t.Fatalf("unexpected error: %#v", err) } }) t.Run("BadMagic", func(t *testing.T) { - r := NewWALReader(bytes.NewReader(make([]byte, 32))) + r := NewReader(bytes.NewReader(make([]byte, 32))) if err := r.ReadHeader(); err == nil || err.Error() != `invalid wal header magic: 0` { t.Fatalf("unexpected error: %#v", err) } @@ -165,7 +165,7 @@ func TestWALReader(t *testing.T) { 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00} - r := NewWALReader(bytes.NewReader(data)) + r := NewReader(bytes.NewReader(data)) if err := r.ReadHeader(); err != io.EOF { t.Fatalf("unexpected error: %#v", err) } @@ -177,7 +177,7 @@ func TestWALReader(t *testing.T) { 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x15, 0x7b, 0x20, 0x92, 0xbb, 0xf8, 0x34, 0x1d} - r := NewWALReader(bytes.NewReader(data)) + r := NewReader(bytes.NewReader(data)) if err := r.ReadHeader(); err == nil || err.Error() != `unsupported wal version: 1` { t.Fatalf("unexpected error: %#v", err) } @@ -190,7 +190,7 @@ func TestWALReader(t *testing.T) { } // Initialize reader with header info. - r := NewWALReader(bytes.NewReader(b)) + r := NewReader(bytes.NewReader(b)) if err := r.ReadHeader(); err != nil { t.Fatal(err) } @@ -205,7 +205,7 @@ func TestWALReader(t *testing.T) { t.Fatal(err) } - r := NewWALReader(bytes.NewReader(b[:40])) + r := NewReader(bytes.NewReader(b[:40])) if err := r.ReadHeader(); err != nil { t.Fatal(err) } else if _, _, err := r.ReadFrame(make([]byte, 4096)); err != io.EOF { @@ -219,7 +219,7 @@ func TestWALReader(t *testing.T) { t.Fatal(err) } - r := NewWALReader(bytes.NewReader(b[:56])) + r := NewReader(bytes.NewReader(b[:56])) if err := r.ReadHeader(); err != nil { t.Fatal(err) } else if _, _, err := r.ReadFrame(make([]byte, 4096)); err != io.EOF { @@ -233,7 +233,7 @@ func TestWALReader(t *testing.T) { t.Fatal(err) } - r := NewWALReader(bytes.NewReader(b[:1000])) + r := NewReader(bytes.NewReader(b[:1000])) if err := r.ReadHeader(); err != nil { t.Fatal(err) } else if _, _, err := r.ReadFrame(make([]byte, 4096)); err != io.EOF { diff --git a/db/wal/walr/main.go b/db/wal/walr/main.go index 880a0de5..755e2acf 100644 --- a/db/wal/walr/main.go +++ b/db/wal/walr/main.go @@ -35,7 +35,7 @@ func main() { os.Exit(1) } - r := wal.NewWALReader(walFD) + r := wal.NewReader(walFD) if err := r.ReadHeader(); err != nil { fmt.Println("failed to read WAL header:", err) os.Exit(1)