diff --git a/CHANGELOG.md b/CHANGELOG.md index 19bd51d5..a68f618d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -51,7 +51,7 @@ When officially released 8.0 will support (mostly) seamless upgrades from the 7. - [PR #1430](https://github.com/rqlite/rqlite/pull/1430): Check that any supplied Join addresses are not HTTP servers. - [PR #1437](https://github.com/rqlite/rqlite/pull/1437), [PR #1438](https://github.com/rqlite/rqlite/pull/1438), [PR #1439](https://github.com/rqlite/rqlite/pull/1439): Actually timeout if needed during `nodes/` access. Fixes [issue #1435](https://github.com/rqlite/rqlite/issues/1435). Thanks @dwco-z - [PR #1440](https://github.com/rqlite/rqlite/pull/1440): Add a Compacting WAL rewriter. Thanks @benbjohnson. -- [PR #1441](https://github.com/rqlite/rqlite/pull/1441): Integrate Compacting WAL writer +- [PR #1441](https://github.com/rqlite/rqlite/pull/1441), [PR #1443](https://github.com/rqlite/rqlite/pull/1443): Integrate Compacting WAL writer ## 7.21.4 (July 8th 2023) ### Implementation changes and bug fixes diff --git a/db/db.go b/db/db.go index c8b5e474..1f708764 100644 --- a/db/db.go +++ b/db/db.go @@ -40,6 +40,8 @@ const ( numETx = "execute_transactions" numQTx = "query_transactions" numRTx = "request_transactions" + + CheckpointQuery = "PRAGMA wal_checkpoint(TRUNCATE)" // rqlite WAL compaction requires truncation ) var ( @@ -485,7 +487,7 @@ func (db *DB) CheckpointWithTimeout(dur time.Duration) (err error) { var nMoved int f := func() error { - err := db.rwDB.QueryRow("PRAGMA wal_checkpoint(TRUNCATE)").Scan(&ok, &nPages, &nMoved) + err := db.rwDB.QueryRow(CheckpointQuery).Scan(&ok, &nPages, &nMoved) stats.Add(numCheckpointedPages, int64(nPages)) stats.Add(numCheckpointedMoves, int64(nMoved)) if err != nil { diff --git a/db/wal/compacting_scanner.go b/db/wal/compacting_scanner.go index 749a77b5..48c60b3c 100644 --- a/db/wal/compacting_scanner.go +++ b/db/wal/compacting_scanner.go @@ -32,13 +32,26 @@ type CompactingScanner struct { readSeeker io.ReadSeeker walReader *Reader header *WALHeader + fullScan bool cIdx int frames cFrames } +// NewFastCompactingScanner creates a new CompactingScanner with the given io.ReadSeeker. +// It performs a fast scan of the WAL file, assuming that the file is valid and does not +// need to be checked. +func NewFastCompactingScanner(r io.ReadSeeker) (*CompactingScanner, error) { + return NewCompactingScanner(r, false) +} + // NewCompactingScanner creates a new CompactingScanner with the given io.ReadSeeker. -func NewCompactingScanner(r io.ReadSeeker) (*CompactingScanner, error) { +// If fullScan is true, the scanner will perform a full scan of the WAL file, performing +// a checksum on each frame. If fullScan is false, the scanner will only scan the file +// sufficiently to find the last valid frame for each page. This is faster when the +// caller knows that the entire WAL file is valid, and will not contain pages from a +// previous checkpointing operation. +func NewCompactingScanner(r io.ReadSeeker, fullScan bool) (*CompactingScanner, error) { walReader := NewReader(r) err := walReader.ReadHeader() if err != nil { @@ -60,6 +73,7 @@ func NewCompactingScanner(r io.ReadSeeker) (*CompactingScanner, error) { readSeeker: r, walReader: walReader, header: hdr, + fullScan: fullScan, } if err := s.scan(); err != nil { return nil, err @@ -100,7 +114,10 @@ func (c *CompactingScanner) scan() error { waitingForCommit := false txFrames := make(map[uint32]*cFrame) frames := make(map[uint32]*cFrame) - buf := make([]byte, c.header.PageSize) + var buf []byte + if c.fullScan { + buf = make([]byte, c.header.PageSize) + } for { pgno, commit, err := c.walReader.ReadFrame(buf) diff --git a/db/wal/compacting_scanner_test.go b/db/wal/compacting_scanner_test.go index fc4b2c8d..dc091104 100644 --- a/db/wal/compacting_scanner_test.go +++ b/db/wal/compacting_scanner_test.go @@ -13,7 +13,7 @@ func Test_CompactingScanner_Scan(t *testing.T) { t.Fatal(err) } - s, err := NewCompactingScanner(bytes.NewReader(b)) + s, err := NewCompactingScanner(bytes.NewReader(b), true) if err != nil { t.Fatal(err) } @@ -58,7 +58,7 @@ func Test_CompactingScanner_Scan_Commit0(t *testing.T) { t.Fatal(err) } - s, err := NewCompactingScanner(bytes.NewReader(b)) + s, err := NewCompactingScanner(bytes.NewReader(b), false) if err != nil { t.Fatal(err) } diff --git a/db/wal/full_scanner.go b/db/wal/full_scanner.go index e0cd4d6c..12a38f8a 100644 --- a/db/wal/full_scanner.go +++ b/db/wal/full_scanner.go @@ -11,7 +11,7 @@ type FullScanner struct { } // NewFullScanner creates a new FullScanner with the given io.Reader. -func NewFullScanner(r io.Reader) (*FullScanner, error) { +func NewFullScanner(r io.ReadSeeker) (*FullScanner, error) { wr := NewReader(r) err := wr.ReadHeader() if err != nil { diff --git a/db/wal/reader.go b/db/wal/reader.go index 1e7255cb..1c8ce8a9 100644 --- a/db/wal/reader.go +++ b/db/wal/reader.go @@ -24,7 +24,7 @@ const ( // authors of that software. See https://github.com/superfly/litefs for more // details. type Reader struct { - r io.Reader + r io.ReadSeeker frameN int magic uint32 @@ -37,7 +37,7 @@ type Reader struct { } // NewReader returns a new instance of Reader. -func NewReader(r io.Reader) *Reader { +func NewReader(r io.ReadSeeker) *Reader { return &Reader{r: r} } @@ -96,10 +96,12 @@ func (r *Reader) ReadHeader() error { return nil } -// ReadFrame reads the next frame from the WAL and returns the page number. -// Returns io.EOF at the end of the valid WAL. +// ReadFrame returns the next page number and commit offset from the WAL. If +// data is not nil, then the page data is read into the buffer. If data is nil, +// then the page data is skipped. ReadFrame Returns io.EOF at the end of the valid +// WAL. func (r *Reader) ReadFrame(data []byte) (pgno, commit uint32, err error) { - if len(data) != int(r.pageSize) { + if data != nil && len(data) != int(r.pageSize) { return 0, 0, fmt.Errorf("WALReader.ReadFrame(): buffer size (%d) must match page size (%d)", len(data), r.pageSize) } @@ -118,20 +120,27 @@ func (r *Reader) ReadFrame(data []byte) (pgno, commit uint32, err error) { return 0, 0, io.EOF } - // Read WAL page data. - if _, err := io.ReadFull(r.r, data); err == io.ErrUnexpectedEOF { - return 0, 0, io.EOF - } else if err != nil { - return 0, 0, err - } - - // Verify the checksum is valid. - chksum1 := binary.BigEndian.Uint32(hdr[16:]) - chksum2 := binary.BigEndian.Uint32(hdr[20:]) - r.chksum1, r.chksum2 = WALChecksum(r.bo, r.chksum1, r.chksum2, hdr[:8]) // frame header - r.chksum1, r.chksum2 = WALChecksum(r.bo, r.chksum1, r.chksum2, data) // frame data - if r.chksum1 != chksum1 || r.chksum2 != chksum2 { - return 0, 0, io.EOF + if data != nil { + // Read WAL page data. + if _, err := io.ReadFull(r.r, data); err == io.ErrUnexpectedEOF { + return 0, 0, io.EOF + } else if err != nil { + return 0, 0, err + } + + // Verify the checksum is valid. + chksum1 := binary.BigEndian.Uint32(hdr[16:]) + chksum2 := binary.BigEndian.Uint32(hdr[20:]) + r.chksum1, r.chksum2 = WALChecksum(r.bo, r.chksum1, r.chksum2, hdr[:8]) // frame header + r.chksum1, r.chksum2 = WALChecksum(r.bo, r.chksum1, r.chksum2, data) // frame data + if r.chksum1 != chksum1 || r.chksum2 != chksum2 { + return 0, 0, io.EOF + } + } else { + // Skip WAL page data. + if _, err := r.r.Seek(int64(r.pageSize), io.SeekCurrent); err != nil { + return 0, 0, err + } } pgno = binary.BigEndian.Uint32(hdr[0:]) diff --git a/db/wal/writer_test.go b/db/wal/writer_test.go index 76527a2d..a62828bd 100644 --- a/db/wal/writer_test.go +++ b/db/wal/writer_test.go @@ -123,7 +123,7 @@ func Test_Writer_CompactingScanner(t *testing.T) { t.Fatal(err) } defer destF.Close() - s, err := NewCompactingScanner(srcF) + s, err := NewCompactingScanner(srcF, true) if err != nil { t.Fatal(err) } diff --git a/store/store.go b/store/store.go index 40d291e7..265ccbf5 100644 --- a/store/store.go +++ b/store/store.go @@ -1708,7 +1708,7 @@ func (s *Store) Snapshot() (raft.FSMSnapshot, error) { } defer walFD.Close() // Make sure it closes. - scanner, err := wal.NewCompactingScanner(walFD) + scanner, err := wal.NewFastCompactingScanner(walFD) if err != nil { return nil, err }