1
0
Fork 0

Add Bytes() to Compacting Scanner

master
Philip O'Toole 8 months ago
parent 39fdd65a03
commit b84a9e228c

@ -1,7 +1,9 @@
package wal
import (
"encoding/binary"
"errors"
"fmt"
"io"
"sort"
)
@ -110,6 +112,56 @@ func (c *CompactingScanner) Next() (*Frame, error) {
return frame, nil
}
// Bytes returns a byte slice containing the entire contents of the compacted WAL file.
// The byte slice is suitable for writing to a new WAL file.
func (c *CompactingScanner) Bytes() ([]byte, error) {
pageSz := int(c.header.PageSize)
buf := make([]byte, WALHeaderSize+(len(c.frames)*WALFrameHeaderSize)+len(c.frames)*pageSz)
c.header.Copy(buf)
var bo binary.ByteOrder
switch magic := c.header.Magic; magic {
case 0x377f0682:
bo = binary.LittleEndian
case 0x377f0683:
bo = binary.BigEndian
default:
return nil, fmt.Errorf("invalid wal header magic: %x", magic)
}
frmHdr := WALHeaderSize
chksum1, chksum2 := c.header.Checksum1, c.header.Checksum2
for _, frame := range c.frames {
frmData := frmHdr + WALFrameHeaderSize
binary.BigEndian.PutUint32(buf[frmHdr:], frame.Pgno)
binary.BigEndian.PutUint32(buf[frmHdr+4:], frame.Commit)
binary.BigEndian.PutUint32(buf[frmHdr+8:], c.header.Salt1)
binary.BigEndian.PutUint32(buf[frmHdr+12:], c.header.Salt2)
// Checksum of frame header: "...the first 8 bytes..."
chksum1, chksum2 = WALChecksum(bo, chksum1, chksum2, buf[frmHdr:frmHdr+8])
// Read the frame data.
if _, err := c.readSeeker.Seek(frame.Offset+WALFrameHeaderSize, io.SeekStart); err != nil {
fmt.Println("error seeking to frame offset:", err)
return nil, err
}
if _, err := io.ReadFull(c.readSeeker, buf[frmData:frmData+pageSz]); err != nil {
fmt.Println("error reading frame data:", err)
return nil, err
}
// Update checksum using frame data: "..the content of all frames up to and including the current frame."
chksum1, chksum2 = WALChecksum(bo, chksum1, chksum2, buf[frmData:frmData+pageSz])
binary.BigEndian.PutUint32(buf[frmHdr+16:], chksum1)
binary.BigEndian.PutUint32(buf[frmHdr+20:], chksum2)
frmHdr += WALFrameHeaderSize + pageSz
}
return buf, nil
}
func (c *CompactingScanner) scan() error {
waitingForCommit := false
txFrames := make(map[uint32]*cFrame)

@ -101,3 +101,35 @@ func Test_CompactingScanner_Scan_Commit0(t *testing.T) {
t.Fatalf("expected EOF, got %v", err)
}
}
func Test_CompactingScanner_Bytes(t *testing.T) {
conn, path := mustCreateWAL(t, 128*1024)
defer conn.Close()
b, err := os.ReadFile(path)
if err != nil {
t.Fatal(err)
}
s, err := NewCompactingScanner(bytes.NewReader(b), false)
if err != nil {
t.Fatal(err)
}
var ramWriter bytes.Buffer
w, err := NewWriter(s)
if err != nil {
t.Fatal(err)
}
_, err = w.WriteTo(&ramWriter)
if err != nil {
t.Fatal(err)
}
buf, err := s.Bytes()
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(buf, ramWriter.Bytes()) {
t.Fatal("bytes mismatch")
}
}

@ -18,6 +18,22 @@ type WALHeader struct {
Checksum2 uint32
}
// Copy copies the WALHeader to the given byte slice. If the byte slice
// is too small, a panic occurs.
func (h *WALHeader) Copy(b []byte) {
if len(b) < WALHeaderSize {
panic("byte slice too small")
}
binary.BigEndian.PutUint32(b[0:], h.Magic)
binary.BigEndian.PutUint32(b[4:], h.Version)
binary.BigEndian.PutUint32(b[8:], h.PageSize)
binary.BigEndian.PutUint32(b[12:], h.Seq)
binary.BigEndian.PutUint32(b[16:], h.Salt1)
binary.BigEndian.PutUint32(b[20:], h.Salt2)
binary.BigEndian.PutUint32(b[24:], h.Checksum1)
binary.BigEndian.PutUint32(b[28:], h.Checksum2)
}
// Frame points to a single WAL frame in a WAL file.
type Frame struct {
Pgno uint32

Loading…
Cancel
Save