1
0
Fork 0

Fine renames

master
Philip O'Toole 10 months ago
parent ee83091741
commit f1dc1acbf8

@ -0,0 +1,147 @@
package wal
import (
"errors"
"io"
"sort"
)
var (
// ErrOpenTransaction is returned when the final frame in the WAL file is not a committing frame.
ErrOpenTransaction = errors.New("open transaction at end of WAL file")
)
type cFrame struct {
Pgno uint32
Commit uint32
Offset int64
}
type cFrames []*cFrame
// make cFrames sortable by offset.
func (c cFrames) Len() int { return len(c) }
func (c cFrames) Less(i, j int) bool { return c[uint32(i)].Offset < c[uint32(j)].Offset }
func (c cFrames) Swap(i, j int) { c[uint32(i)], c[uint32(j)] = c[uint32(j)], c[uint32(i)] }
// CompactingWALScanner implements WALIterator to iterate over frames in a WAL file.
// It also compacts the WAL file, with Next() returning the last frame for each page. This Scanner
// requires that the final frame in the WAL file is a committing frame. It will return an
// error at creation time if this is not the case.
type CompactingWALScanner struct {
readSeeker io.ReadSeeker
walReader *Reader
header *WALHeader
cIdx int
frames cFrames
}
// NewCompactingWALScanner creates a new CompactingWALScanner with the given io.ReadSeeker.
func NewCompactingWALScanner(r io.ReadSeeker) (*CompactingWALScanner, error) {
walReader := NewReader(r)
err := walReader.ReadHeader()
if err != nil {
return nil, err
}
hdr := &WALHeader{
Magic: walReader.magic,
Version: WALSupportedVersion,
PageSize: walReader.PageSize(),
Seq: walReader.seq,
Salt1: walReader.salt1,
Salt2: walReader.salt2,
Checksum1: walReader.chksum1,
Checksum2: walReader.chksum2,
}
s := &CompactingWALScanner{
readSeeker: r,
walReader: walReader,
header: hdr,
}
if err := s.scan(); err != nil {
return nil, err
}
return s, nil
}
// Header returns the header of the WAL file.
func (c *CompactingWALScanner) Header() (*WALHeader, error) {
return c.header, nil
}
// Next reads the next frame from the WAL file.
func (c *CompactingWALScanner) Next() (*Frame, error) {
if c.cIdx >= len(c.frames) {
return nil, io.EOF
}
frame := &Frame{
Pgno: c.frames[c.cIdx].Pgno,
Commit: c.frames[c.cIdx].Commit,
Data: make([]byte, c.header.PageSize),
}
if _, err := c.readSeeker.Seek(c.frames[c.cIdx].Offset, io.SeekStart); err != nil {
return nil, err
}
if _, err := io.ReadFull(c.readSeeker, frame.Data); err != nil {
return nil, err
}
c.cIdx++
return frame, nil
}
func (c *CompactingWALScanner) scan() error {
waitingForCommit := false
txFrames := make(map[uint32]*cFrame)
frames := make(map[uint32]*cFrame)
buf := make([]byte, c.header.PageSize)
for {
pgno, commit, err := c.walReader.ReadFrame(buf)
if err == io.EOF {
break
} else if err != nil {
return err
}
frame := &cFrame{
Pgno: pgno,
Commit: commit,
Offset: c.walReader.Offset(),
}
// Save latest frame information for each page.
txFrames[pgno] = frame
// If this is not a committing frame, continue to next frame.
if commit == 0 {
waitingForCommit = true
continue
}
waitingForCommit = false
// At the end of each transaction, copy frame information to main map.
for k, v := range txFrames {
frames[k] = v
}
txFrames = make(map[uint32]*cFrame)
}
if waitingForCommit {
return ErrOpenTransaction
}
// Now we have the latest version of each frame. Next we need to sort
// them by offset so we return them in the correct order.
c.frames = make(cFrames, 0, len(frames))
for _, frame := range frames {
c.frames = append(c.frames, frame)
}
sort.Sort(c.frames)
return nil
}

@ -0,0 +1,57 @@
package wal
import (
"io"
)
// FullWALScanner implements WALIterator to iterate over all frames in a WAL file.
type FullWALScanner struct {
reader *Reader
header *WALHeader
}
// NewFullWALScanner creates a new FullWALScanner with the given io.Reader.
func NewFullWALScanner(r io.Reader) (*FullWALScanner, error) {
wr := NewReader(r)
err := wr.ReadHeader()
if err != nil {
return nil, err
}
hdr := &WALHeader{
Magic: wr.magic,
Version: WALSupportedVersion,
PageSize: wr.PageSize(),
Seq: wr.seq,
Salt1: wr.salt1,
Salt2: wr.salt2,
Checksum1: wr.chksum1,
Checksum2: wr.chksum2,
}
return &FullWALScanner{
reader: wr,
header: hdr,
}, nil
}
// Header returns the header of the WAL file.
func (f *FullWALScanner) Header() (*WALHeader, error) {
return f.header, nil
}
// Next reads the next frame from the WAL file.
func (f *FullWALScanner) Next() (*Frame, error) {
data := make([]byte, f.reader.PageSize())
pgno, commit, err := f.reader.ReadFrame(data)
if err != nil {
return nil, err
}
frame := &Frame{
Pgno: pgno,
Commit: commit,
Data: data,
}
return frame, nil
}

@ -0,0 +1,53 @@
package wal
import (
"bytes"
"io"
"os"
"testing"
)
func Test_FullScanner_Scan(t *testing.T) {
b, err := os.ReadFile("testdata/wal-reader/ok/wal")
if err != nil {
t.Fatal(err)
}
s, err := NewFullWALScanner(bytes.NewReader(b))
if err != nil {
t.Fatal(err)
}
for i, expF := range []struct {
pgno uint32
commit uint32
dataLowIdx int
dataHighIdx int
}{
{1, 0, 56, 4152},
{2, 2, 4176, 8272},
{2, 2, 8296, 12392},
} {
f, err := s.Next()
if err != nil {
t.Fatal(err)
}
if f.Pgno != expF.pgno {
t.Fatalf("expected pgno %d, got %d", expF.pgno, f.Pgno)
}
if f.Commit != expF.commit {
t.Fatalf("expected commit %d, got %d", expF.commit, f.Commit)
}
if len(f.Data) != 4096 {
t.Fatalf("expected data length 4096, got %d", len(f.Data))
}
if !bytes.Equal(f.Data, b[expF.dataLowIdx:expF.dataHighIdx]) {
t.Fatalf("page data mismatch on test %d", i)
}
}
_, err = s.Next()
if err != io.EOF {
t.Fatalf("expected EOF, got %v", err)
}
}
Loading…
Cancel
Save