|
|
|
package wal
|
|
|
|
|
|
|
|
import (
|
|
|
|
"errors"
|
|
|
|
"io"
|
|
|
|
"sort"
|
|
|
|
)
|
|
|
|
|
|
|
|
var (
|
|
|
|
// ErrOpenTransaction is returned when the final frame in the WAL file is not a committing frame.
|
|
|
|
ErrOpenTransaction = errors.New("open transaction at end of WAL file")
|
|
|
|
)
|
|
|
|
|
|
|
|
type cFrame struct {
|
|
|
|
Pgno uint32
|
|
|
|
Commit uint32
|
|
|
|
Offset int64
|
|
|
|
}
|
|
|
|
|
|
|
|
type cFrames []*cFrame
|
|
|
|
|
|
|
|
func (c cFrames) Len() int { return len(c) }
|
|
|
|
func (c cFrames) Less(i, j int) bool { return c[i].Offset < c[j].Offset }
|
|
|
|
func (c cFrames) Swap(i, j int) { c[i], c[j] = c[j], c[i] }
|
|
|
|
|
|
|
|
// CompactingScanner implements WALIterator to iterate over frames in a WAL file.
|
|
|
|
// It also compacts the WAL file, with Next() returning the last valid frame for each
|
|
|
|
// page in the right order such that they can be written to a new WAL file. This Scanner
|
|
|
|
// requires that the final frame in the WAL file is a committing frame. It will return an
|
|
|
|
// error at creation time if this is not the case.
|
|
|
|
type CompactingScanner struct {
|
|
|
|
readSeeker io.ReadSeeker
|
|
|
|
walReader *Reader
|
|
|
|
header *WALHeader
|
|
|
|
fullScan bool
|
|
|
|
|
|
|
|
cIdx int
|
|
|
|
frames cFrames
|
|
|
|
}
|
|
|
|
|
|
|
|
// NewFastCompactingScanner creates a new CompactingScanner with the given io.ReadSeeker.
|
|
|
|
// It performs a fast scan of the WAL file, assuming that the file is valid and does not
|
|
|
|
// need to be checked.
|
|
|
|
func NewFastCompactingScanner(r io.ReadSeeker) (*CompactingScanner, error) {
|
|
|
|
return NewCompactingScanner(r, false)
|
|
|
|
}
|
|
|
|
|
|
|
|
// NewCompactingScanner creates a new CompactingScanner with the given io.ReadSeeker.
|
|
|
|
// If fullScan is true, the scanner will perform a full scan of the WAL file, performing
|
|
|
|
// a checksum on each frame. If fullScan is false, the scanner will only scan the file
|
|
|
|
// sufficiently to find the last valid frame for each page. This is faster when the
|
|
|
|
// caller knows that the entire WAL file is valid, and will not contain pages from a
|
|
|
|
// previous checkpointing operation.
|
|
|
|
func NewCompactingScanner(r io.ReadSeeker, fullScan bool) (*CompactingScanner, error) {
|
|
|
|
walReader := NewReader(r)
|
|
|
|
err := walReader.ReadHeader()
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
hdr := &WALHeader{
|
|
|
|
Magic: walReader.magic,
|
|
|
|
Version: WALSupportedVersion,
|
|
|
|
PageSize: walReader.PageSize(),
|
|
|
|
Seq: walReader.seq,
|
|
|
|
Salt1: walReader.salt1,
|
|
|
|
Salt2: walReader.salt2,
|
|
|
|
Checksum1: walReader.chksum1,
|
|
|
|
Checksum2: walReader.chksum2,
|
|
|
|
}
|
|
|
|
|
|
|
|
s := &CompactingScanner{
|
|
|
|
readSeeker: r,
|
|
|
|
walReader: walReader,
|
|
|
|
header: hdr,
|
|
|
|
fullScan: fullScan,
|
|
|
|
}
|
|
|
|
if err := s.scan(); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
return s, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Header returns the header of the WAL file.
|
|
|
|
func (c *CompactingScanner) Header() (*WALHeader, error) {
|
|
|
|
return c.header, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Next return the next logical frame from the WAL file.
|
|
|
|
func (c *CompactingScanner) Next() (*Frame, error) {
|
|
|
|
if c.cIdx >= len(c.frames) {
|
|
|
|
return nil, io.EOF
|
|
|
|
}
|
|
|
|
|
|
|
|
frame := &Frame{
|
|
|
|
Pgno: c.frames[c.cIdx].Pgno,
|
|
|
|
Commit: c.frames[c.cIdx].Commit,
|
|
|
|
Data: make([]byte, c.header.PageSize),
|
|
|
|
}
|
|
|
|
|
|
|
|
if _, err := c.readSeeker.Seek(c.frames[c.cIdx].Offset+WALFrameHeaderSize, io.SeekStart); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
if _, err := io.ReadFull(c.readSeeker, frame.Data); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
c.cIdx++
|
|
|
|
|
|
|
|
return frame, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *CompactingScanner) scan() error {
|
|
|
|
waitingForCommit := false
|
|
|
|
txFrames := make(map[uint32]*cFrame)
|
|
|
|
frames := make(map[uint32]*cFrame)
|
|
|
|
var buf []byte
|
|
|
|
if c.fullScan {
|
|
|
|
buf = make([]byte, c.header.PageSize)
|
|
|
|
}
|
|
|
|
|
|
|
|
for {
|
|
|
|
pgno, commit, err := c.walReader.ReadFrame(buf)
|
|
|
|
if err == io.EOF {
|
|
|
|
break
|
|
|
|
} else if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
frame := &cFrame{
|
|
|
|
Pgno: pgno,
|
|
|
|
Commit: commit,
|
|
|
|
Offset: c.walReader.Offset(),
|
|
|
|
}
|
|
|
|
|
|
|
|
// Save latest frame information for each page.
|
|
|
|
txFrames[pgno] = frame
|
|
|
|
|
|
|
|
// If this is not a committing frame, continue to next frame.
|
|
|
|
if commit == 0 {
|
|
|
|
waitingForCommit = true
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
waitingForCommit = false
|
|
|
|
|
|
|
|
// At the end of each transaction, copy frame information to main map.
|
|
|
|
for k, v := range txFrames {
|
|
|
|
frames[k] = v
|
|
|
|
}
|
|
|
|
txFrames = make(map[uint32]*cFrame)
|
|
|
|
}
|
|
|
|
if waitingForCommit {
|
|
|
|
return ErrOpenTransaction
|
|
|
|
}
|
|
|
|
|
|
|
|
// Now we have the latest version of each frame. Next we need to sort
|
|
|
|
// them by offset so we return them in the correct order.
|
|
|
|
c.frames = make(cFrames, 0, len(frames))
|
|
|
|
for _, frame := range frames {
|
|
|
|
c.frames = append(c.frames, frame)
|
|
|
|
}
|
|
|
|
sort.Sort(c.frames)
|
|
|
|
return nil
|
|
|
|
}
|