1
0
Fork 0

Support skipping verifying frame data

This halves the Compacting Scan time, and doesn't seem necessary since
rqlite is the only system writing the WAL files.
master
Philip O'Toole 10 months ago
parent 1b0dfe6942
commit 5a8a2744ff

@ -32,13 +32,18 @@ type CompactingScanner struct {
readSeeker io.ReadSeeker
walReader *Reader
header *WALHeader
fullScan bool
cIdx int
frames cFrames
}
// NewCompactingScanner creates a new CompactingScanner with the given io.ReadSeeker.
func NewCompactingScanner(r io.ReadSeeker) (*CompactingScanner, error) {
// If fullScan is true, the scanner will perform a full scan of the WAL file, performing
// a checksum on each frame. If fullScan is false, the scanner will only scan the file
// sufficiently to find the last valid frame for each page. This is faster when the
// caller knows that the WAL file is valid and does not need to be checked.
func NewCompactingScanner(r io.ReadSeeker, fullScan bool) (*CompactingScanner, error) {
walReader := NewReader(r)
err := walReader.ReadHeader()
if err != nil {
@ -60,6 +65,7 @@ func NewCompactingScanner(r io.ReadSeeker) (*CompactingScanner, error) {
readSeeker: r,
walReader: walReader,
header: hdr,
fullScan: fullScan,
}
if err := s.scan(); err != nil {
return nil, err
@ -100,7 +106,10 @@ func (c *CompactingScanner) scan() error {
waitingForCommit := false
txFrames := make(map[uint32]*cFrame)
frames := make(map[uint32]*cFrame)
buf := make([]byte, c.header.PageSize)
var buf []byte
if c.fullScan {
buf = make([]byte, c.header.PageSize)
}
for {
pgno, commit, err := c.walReader.ReadFrame(buf)

@ -13,7 +13,7 @@ func Test_CompactingScanner_Scan(t *testing.T) {
t.Fatal(err)
}
s, err := NewCompactingScanner(bytes.NewReader(b))
s, err := NewCompactingScanner(bytes.NewReader(b), true)
if err != nil {
t.Fatal(err)
}
@ -58,7 +58,7 @@ func Test_CompactingScanner_Scan_Commit0(t *testing.T) {
t.Fatal(err)
}
s, err := NewCompactingScanner(bytes.NewReader(b))
s, err := NewCompactingScanner(bytes.NewReader(b), false)
if err != nil {
t.Fatal(err)
}

@ -11,7 +11,7 @@ type FullScanner struct {
}
// NewFullScanner creates a new FullScanner with the given io.Reader.
func NewFullScanner(r io.Reader) (*FullScanner, error) {
func NewFullScanner(r io.ReadSeeker) (*FullScanner, error) {
wr := NewReader(r)
err := wr.ReadHeader()
if err != nil {

@ -24,7 +24,7 @@ const (
// authors of that software. See https://github.com/superfly/litefs for more
// details.
type Reader struct {
r io.Reader
r io.ReadSeeker
frameN int
magic uint32
@ -37,7 +37,7 @@ type Reader struct {
}
// NewReader returns a new instance of Reader.
func NewReader(r io.Reader) *Reader {
func NewReader(r io.ReadSeeker) *Reader {
return &Reader{r: r}
}
@ -96,10 +96,12 @@ func (r *Reader) ReadHeader() error {
return nil
}
// ReadFrame reads the next frame from the WAL and returns the page number.
// Returns io.EOF at the end of the valid WAL.
// ReadFrame returns the next page number and commit offset from the WAL. If
// data is not nil, then the page data is read into the buffer. If data is nil,
// then the page data is skipped. ReadFrame Returns io.EOF at the end of the valid
// WAL.
func (r *Reader) ReadFrame(data []byte) (pgno, commit uint32, err error) {
if len(data) != int(r.pageSize) {
if data != nil && len(data) != int(r.pageSize) {
return 0, 0, fmt.Errorf("WALReader.ReadFrame(): buffer size (%d) must match page size (%d)", len(data), r.pageSize)
}
@ -118,20 +120,27 @@ func (r *Reader) ReadFrame(data []byte) (pgno, commit uint32, err error) {
return 0, 0, io.EOF
}
// Read WAL page data.
if _, err := io.ReadFull(r.r, data); err == io.ErrUnexpectedEOF {
return 0, 0, io.EOF
} else if err != nil {
return 0, 0, err
}
// Verify the checksum is valid.
chksum1 := binary.BigEndian.Uint32(hdr[16:])
chksum2 := binary.BigEndian.Uint32(hdr[20:])
r.chksum1, r.chksum2 = WALChecksum(r.bo, r.chksum1, r.chksum2, hdr[:8]) // frame header
r.chksum1, r.chksum2 = WALChecksum(r.bo, r.chksum1, r.chksum2, data) // frame data
if r.chksum1 != chksum1 || r.chksum2 != chksum2 {
return 0, 0, io.EOF
if data != nil {
// Read WAL page data.
if _, err := io.ReadFull(r.r, data); err == io.ErrUnexpectedEOF {
return 0, 0, io.EOF
} else if err != nil {
return 0, 0, err
}
// Verify the checksum is valid.
chksum1 := binary.BigEndian.Uint32(hdr[16:])
chksum2 := binary.BigEndian.Uint32(hdr[20:])
r.chksum1, r.chksum2 = WALChecksum(r.bo, r.chksum1, r.chksum2, hdr[:8]) // frame header
r.chksum1, r.chksum2 = WALChecksum(r.bo, r.chksum1, r.chksum2, data) // frame data
if r.chksum1 != chksum1 || r.chksum2 != chksum2 {
return 0, 0, io.EOF
}
} else {
// Skip WAL page data.
if _, err := r.r.Seek(int64(r.pageSize), io.SeekCurrent); err != nil {
return 0, 0, err
}
}
pgno = binary.BigEndian.Uint32(hdr[0:])

@ -123,7 +123,7 @@ func Test_Writer_CompactingScanner(t *testing.T) {
t.Fatal(err)
}
defer destF.Close()
s, err := NewCompactingScanner(srcF)
s, err := NewCompactingScanner(srcF, true)
if err != nil {
t.Fatal(err)
}

@ -1708,7 +1708,7 @@ func (s *Store) Snapshot() (raft.FSMSnapshot, error) {
}
defer walFD.Close() // Make sure it closes.
scanner, err := wal.NewCompactingScanner(walFD)
scanner, err := wal.NewCompactingScanner(walFD, false)
if err != nil {
return nil, err
}

Loading…
Cancel
Save