1
0
Fork 0

Merge pull request #1443 from rqlite/wal-discard-read

Support skipping verifying frame data
master
Philip O'Toole 10 months ago committed by GitHub
commit 90b37f0756
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -51,7 +51,7 @@ When officially released 8.0 will support (mostly) seamless upgrades from the 7.
- [PR #1430](https://github.com/rqlite/rqlite/pull/1430): Check that any supplied Join addresses are not HTTP servers.
- [PR #1437](https://github.com/rqlite/rqlite/pull/1437), [PR #1438](https://github.com/rqlite/rqlite/pull/1438), [PR #1439](https://github.com/rqlite/rqlite/pull/1439): Actually timeout if needed during `nodes/` access. Fixes [issue #1435](https://github.com/rqlite/rqlite/issues/1435). Thanks @dwco-z
- [PR #1440](https://github.com/rqlite/rqlite/pull/1440): Add a Compacting WAL rewriter. Thanks @benbjohnson.
- [PR #1441](https://github.com/rqlite/rqlite/pull/1441): Integrate Compacting WAL writer
- [PR #1441](https://github.com/rqlite/rqlite/pull/1441), [PR #1443](https://github.com/rqlite/rqlite/pull/1443): Integrate Compacting WAL writer
## 7.21.4 (July 8th 2023)
### Implementation changes and bug fixes

@ -40,6 +40,8 @@ const (
numETx = "execute_transactions"
numQTx = "query_transactions"
numRTx = "request_transactions"
CheckpointQuery = "PRAGMA wal_checkpoint(TRUNCATE)" // rqlite WAL compaction requires truncation
)
var (
@ -485,7 +487,7 @@ func (db *DB) CheckpointWithTimeout(dur time.Duration) (err error) {
var nMoved int
f := func() error {
err := db.rwDB.QueryRow("PRAGMA wal_checkpoint(TRUNCATE)").Scan(&ok, &nPages, &nMoved)
err := db.rwDB.QueryRow(CheckpointQuery).Scan(&ok, &nPages, &nMoved)
stats.Add(numCheckpointedPages, int64(nPages))
stats.Add(numCheckpointedMoves, int64(nMoved))
if err != nil {

@ -32,13 +32,26 @@ type CompactingScanner struct {
readSeeker io.ReadSeeker
walReader *Reader
header *WALHeader
fullScan bool
cIdx int
frames cFrames
}
// NewFastCompactingScanner creates a new CompactingScanner with the given io.ReadSeeker.
// It performs a fast scan of the WAL file, assuming that the file is valid and does not
// need to be checked.
func NewFastCompactingScanner(r io.ReadSeeker) (*CompactingScanner, error) {
return NewCompactingScanner(r, false)
}
// NewCompactingScanner creates a new CompactingScanner with the given io.ReadSeeker.
func NewCompactingScanner(r io.ReadSeeker) (*CompactingScanner, error) {
// If fullScan is true, the scanner will perform a full scan of the WAL file, performing
// a checksum on each frame. If fullScan is false, the scanner will only scan the file
// sufficiently to find the last valid frame for each page. This is faster when the
// caller knows that the entire WAL file is valid, and will not contain pages from a
// previous checkpointing operation.
func NewCompactingScanner(r io.ReadSeeker, fullScan bool) (*CompactingScanner, error) {
walReader := NewReader(r)
err := walReader.ReadHeader()
if err != nil {
@ -60,6 +73,7 @@ func NewCompactingScanner(r io.ReadSeeker) (*CompactingScanner, error) {
readSeeker: r,
walReader: walReader,
header: hdr,
fullScan: fullScan,
}
if err := s.scan(); err != nil {
return nil, err
@ -100,7 +114,10 @@ func (c *CompactingScanner) scan() error {
waitingForCommit := false
txFrames := make(map[uint32]*cFrame)
frames := make(map[uint32]*cFrame)
buf := make([]byte, c.header.PageSize)
var buf []byte
if c.fullScan {
buf = make([]byte, c.header.PageSize)
}
for {
pgno, commit, err := c.walReader.ReadFrame(buf)

@ -13,7 +13,7 @@ func Test_CompactingScanner_Scan(t *testing.T) {
t.Fatal(err)
}
s, err := NewCompactingScanner(bytes.NewReader(b))
s, err := NewCompactingScanner(bytes.NewReader(b), true)
if err != nil {
t.Fatal(err)
}
@ -58,7 +58,7 @@ func Test_CompactingScanner_Scan_Commit0(t *testing.T) {
t.Fatal(err)
}
s, err := NewCompactingScanner(bytes.NewReader(b))
s, err := NewCompactingScanner(bytes.NewReader(b), false)
if err != nil {
t.Fatal(err)
}

@ -11,7 +11,7 @@ type FullScanner struct {
}
// NewFullScanner creates a new FullScanner with the given io.Reader.
func NewFullScanner(r io.Reader) (*FullScanner, error) {
func NewFullScanner(r io.ReadSeeker) (*FullScanner, error) {
wr := NewReader(r)
err := wr.ReadHeader()
if err != nil {

@ -24,7 +24,7 @@ const (
// authors of that software. See https://github.com/superfly/litefs for more
// details.
type Reader struct {
r io.Reader
r io.ReadSeeker
frameN int
magic uint32
@ -37,7 +37,7 @@ type Reader struct {
}
// NewReader returns a new instance of Reader.
func NewReader(r io.Reader) *Reader {
func NewReader(r io.ReadSeeker) *Reader {
return &Reader{r: r}
}
@ -96,10 +96,12 @@ func (r *Reader) ReadHeader() error {
return nil
}
// ReadFrame reads the next frame from the WAL and returns the page number.
// Returns io.EOF at the end of the valid WAL.
// ReadFrame returns the next page number and commit offset from the WAL. If
// data is not nil, then the page data is read into the buffer. If data is nil,
// then the page data is skipped. ReadFrame Returns io.EOF at the end of the valid
// WAL.
func (r *Reader) ReadFrame(data []byte) (pgno, commit uint32, err error) {
if len(data) != int(r.pageSize) {
if data != nil && len(data) != int(r.pageSize) {
return 0, 0, fmt.Errorf("WALReader.ReadFrame(): buffer size (%d) must match page size (%d)", len(data), r.pageSize)
}
@ -118,20 +120,27 @@ func (r *Reader) ReadFrame(data []byte) (pgno, commit uint32, err error) {
return 0, 0, io.EOF
}
// Read WAL page data.
if _, err := io.ReadFull(r.r, data); err == io.ErrUnexpectedEOF {
return 0, 0, io.EOF
} else if err != nil {
return 0, 0, err
}
// Verify the checksum is valid.
chksum1 := binary.BigEndian.Uint32(hdr[16:])
chksum2 := binary.BigEndian.Uint32(hdr[20:])
r.chksum1, r.chksum2 = WALChecksum(r.bo, r.chksum1, r.chksum2, hdr[:8]) // frame header
r.chksum1, r.chksum2 = WALChecksum(r.bo, r.chksum1, r.chksum2, data) // frame data
if r.chksum1 != chksum1 || r.chksum2 != chksum2 {
return 0, 0, io.EOF
if data != nil {
// Read WAL page data.
if _, err := io.ReadFull(r.r, data); err == io.ErrUnexpectedEOF {
return 0, 0, io.EOF
} else if err != nil {
return 0, 0, err
}
// Verify the checksum is valid.
chksum1 := binary.BigEndian.Uint32(hdr[16:])
chksum2 := binary.BigEndian.Uint32(hdr[20:])
r.chksum1, r.chksum2 = WALChecksum(r.bo, r.chksum1, r.chksum2, hdr[:8]) // frame header
r.chksum1, r.chksum2 = WALChecksum(r.bo, r.chksum1, r.chksum2, data) // frame data
if r.chksum1 != chksum1 || r.chksum2 != chksum2 {
return 0, 0, io.EOF
}
} else {
// Skip WAL page data.
if _, err := r.r.Seek(int64(r.pageSize), io.SeekCurrent); err != nil {
return 0, 0, err
}
}
pgno = binary.BigEndian.Uint32(hdr[0:])

@ -123,7 +123,7 @@ func Test_Writer_CompactingScanner(t *testing.T) {
t.Fatal(err)
}
defer destF.Close()
s, err := NewCompactingScanner(srcF)
s, err := NewCompactingScanner(srcF, true)
if err != nil {
t.Fatal(err)
}

@ -1708,7 +1708,7 @@ func (s *Store) Snapshot() (raft.FSMSnapshot, error) {
}
defer walFD.Close() // Make sure it closes.
scanner, err := wal.NewCompactingScanner(walFD)
scanner, err := wal.NewFastCompactingScanner(walFD)
if err != nil {
return nil, err
}

Loading…
Cancel
Save