diff --git a/CHANGELOG.md b/CHANGELOG.md index e4bcd0c1..7af4cf76 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +## 8.18.4 (unreleased) +### Implementation changes and bug fixes +- [PR #1644](https://github.com/rqlite/rqlite/pull/1644): Remove an unnecessary memcpy during Snapshotting. + ## 8.18.3 (January 29th 2024) ### Implementation changes and bug fixes - [PR #1638](https://github.com/rqlite/rqlite/pull/1638): More Snapshotting metrics. diff --git a/db/wal/compacting_scanner.go b/db/wal/compacting_scanner.go index e0b89513..c5bde1ab 100644 --- a/db/wal/compacting_scanner.go +++ b/db/wal/compacting_scanner.go @@ -1,7 +1,9 @@ package wal import ( + "encoding/binary" "errors" + "fmt" "io" "sort" ) @@ -110,6 +112,56 @@ func (c *CompactingScanner) Next() (*Frame, error) { return frame, nil } +// Bytes returns a byte slice containing the entire contents of the compacted WAL file. +// The byte slice is suitable for writing to a new WAL file. +func (c *CompactingScanner) Bytes() ([]byte, error) { + pageSz := int(c.header.PageSize) + buf := make([]byte, WALHeaderSize+(len(c.frames)*WALFrameHeaderSize)+len(c.frames)*pageSz) + c.header.Copy(buf) + + var bo binary.ByteOrder + switch magic := c.header.Magic; magic { + case 0x377f0682: + bo = binary.LittleEndian + case 0x377f0683: + bo = binary.BigEndian + default: + return nil, fmt.Errorf("invalid wal header magic: %x", magic) + } + + frmHdr := WALHeaderSize + chksum1, chksum2 := c.header.Checksum1, c.header.Checksum2 + for _, frame := range c.frames { + frmData := frmHdr + WALFrameHeaderSize + + binary.BigEndian.PutUint32(buf[frmHdr:], frame.Pgno) + binary.BigEndian.PutUint32(buf[frmHdr+4:], frame.Commit) + binary.BigEndian.PutUint32(buf[frmHdr+8:], c.header.Salt1) + binary.BigEndian.PutUint32(buf[frmHdr+12:], c.header.Salt2) + + // Checksum of frame header: "...the first 8 bytes..." + chksum1, chksum2 = WALChecksum(bo, chksum1, chksum2, buf[frmHdr:frmHdr+8]) + + // Read the frame data. + if _, err := c.readSeeker.Seek(frame.Offset+WALFrameHeaderSize, io.SeekStart); err != nil { + fmt.Println("error seeking to frame offset:", err) + return nil, err + } + if _, err := io.ReadFull(c.readSeeker, buf[frmData:frmData+pageSz]); err != nil { + fmt.Println("error reading frame data:", err) + return nil, err + } + + // Update checksum using frame data: "..the content of all frames up to and including the current frame." + chksum1, chksum2 = WALChecksum(bo, chksum1, chksum2, buf[frmData:frmData+pageSz]) + binary.BigEndian.PutUint32(buf[frmHdr+16:], chksum1) + binary.BigEndian.PutUint32(buf[frmHdr+20:], chksum2) + + frmHdr += WALFrameHeaderSize + pageSz + } + return buf, nil +} + func (c *CompactingScanner) scan() error { waitingForCommit := false txFrames := make(map[uint32]*cFrame) diff --git a/db/wal/compacting_scanner_test.go b/db/wal/compacting_scanner_test.go index 77b65188..c4a9d057 100644 --- a/db/wal/compacting_scanner_test.go +++ b/db/wal/compacting_scanner_test.go @@ -101,3 +101,35 @@ func Test_CompactingScanner_Scan_Commit0(t *testing.T) { t.Fatalf("expected EOF, got %v", err) } } + +func Test_CompactingScanner_Bytes(t *testing.T) { + conn, path := mustCreateWAL(t, 128*1024) + defer conn.Close() + b, err := os.ReadFile(path) + if err != nil { + t.Fatal(err) + } + + s, err := NewCompactingScanner(bytes.NewReader(b), false) + if err != nil { + t.Fatal(err) + } + + var ramWriter bytes.Buffer + w, err := NewWriter(s) + if err != nil { + t.Fatal(err) + } + _, err = w.WriteTo(&ramWriter) + if err != nil { + t.Fatal(err) + } + + buf, err := s.Bytes() + if err != nil { + t.Fatal(err) + } + if !bytes.Equal(buf, ramWriter.Bytes()) { + t.Fatal("bytes mismatch") + } +} diff --git a/db/wal/writer.go b/db/wal/writer.go index e66177f7..a29fc4f4 100644 --- a/db/wal/writer.go +++ b/db/wal/writer.go @@ -18,6 +18,22 @@ type WALHeader struct { Checksum2 uint32 } +// Copy copies the WALHeader to the given byte slice. If the byte slice +// is too small, a panic occurs. +func (h *WALHeader) Copy(b []byte) { + if len(b) < WALHeaderSize { + panic("byte slice too small") + } + binary.BigEndian.PutUint32(b[0:], h.Magic) + binary.BigEndian.PutUint32(b[4:], h.Version) + binary.BigEndian.PutUint32(b[8:], h.PageSize) + binary.BigEndian.PutUint32(b[12:], h.Seq) + binary.BigEndian.PutUint32(b[16:], h.Salt1) + binary.BigEndian.PutUint32(b[20:], h.Salt2) + binary.BigEndian.PutUint32(b[24:], h.Checksum1) + binary.BigEndian.PutUint32(b[28:], h.Checksum2) +} + // Frame points to a single WAL frame in a WAL file. type Frame struct { Pgno uint32 diff --git a/store/store.go b/store/store.go index 96ec3a3b..9aa1a777 100644 --- a/store/store.go +++ b/store/store.go @@ -1949,15 +1949,14 @@ func (s *Store) fsmSnapshot() (fSnap raft.FSMSnapshot, retErr error) { if err != nil { return nil, err } - walWr, err := wal.NewWriter(scanner) + compactedBytes, err := scanner.Bytes() if err != nil { return nil, err } - if _, err := walWr.WriteTo(compactedBuf); err != nil { - return nil, err - } - walFD.Close() // We need it closed for the next step. + walFD.Close() // We need it closed before truncating it. + stats.Get(snapshotCreateWALCompactDuration).(*expvar.Int).Set(time.Since(compactStartTime).Milliseconds()) + compactedBuf = bytes.NewBuffer(compactedBytes) // Now that we're written a (compacted) copy of the WAL to the Snapshot, // we can truncate the WAL. We use truncate mode so that the next WAL