1
0
Fork 0

Merge pull request #1644 from rqlite/nuke-snap-memcpy

Remove an unnecessary memcpy during Snapshotting
master
Philip O'Toole 8 months ago committed by GitHub
commit abdce55ed4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -1,3 +1,7 @@
## 8.18.4 (unreleased)
### Implementation changes and bug fixes
- [PR #1644](https://github.com/rqlite/rqlite/pull/1644): Remove an unnecessary memcpy during Snapshotting.
## 8.18.3 (January 29th 2024)
### Implementation changes and bug fixes
- [PR #1638](https://github.com/rqlite/rqlite/pull/1638): More Snapshotting metrics.

@ -1,7 +1,9 @@
package wal
import (
"encoding/binary"
"errors"
"fmt"
"io"
"sort"
)
@ -110,6 +112,56 @@ func (c *CompactingScanner) Next() (*Frame, error) {
return frame, nil
}
// Bytes returns a byte slice containing the entire contents of the compacted WAL file.
// The byte slice is suitable for writing to a new WAL file.
func (c *CompactingScanner) Bytes() ([]byte, error) {
pageSz := int(c.header.PageSize)
buf := make([]byte, WALHeaderSize+(len(c.frames)*WALFrameHeaderSize)+len(c.frames)*pageSz)
c.header.Copy(buf)
var bo binary.ByteOrder
switch magic := c.header.Magic; magic {
case 0x377f0682:
bo = binary.LittleEndian
case 0x377f0683:
bo = binary.BigEndian
default:
return nil, fmt.Errorf("invalid wal header magic: %x", magic)
}
frmHdr := WALHeaderSize
chksum1, chksum2 := c.header.Checksum1, c.header.Checksum2
for _, frame := range c.frames {
frmData := frmHdr + WALFrameHeaderSize
binary.BigEndian.PutUint32(buf[frmHdr:], frame.Pgno)
binary.BigEndian.PutUint32(buf[frmHdr+4:], frame.Commit)
binary.BigEndian.PutUint32(buf[frmHdr+8:], c.header.Salt1)
binary.BigEndian.PutUint32(buf[frmHdr+12:], c.header.Salt2)
// Checksum of frame header: "...the first 8 bytes..."
chksum1, chksum2 = WALChecksum(bo, chksum1, chksum2, buf[frmHdr:frmHdr+8])
// Read the frame data.
if _, err := c.readSeeker.Seek(frame.Offset+WALFrameHeaderSize, io.SeekStart); err != nil {
fmt.Println("error seeking to frame offset:", err)
return nil, err
}
if _, err := io.ReadFull(c.readSeeker, buf[frmData:frmData+pageSz]); err != nil {
fmt.Println("error reading frame data:", err)
return nil, err
}
// Update checksum using frame data: "..the content of all frames up to and including the current frame."
chksum1, chksum2 = WALChecksum(bo, chksum1, chksum2, buf[frmData:frmData+pageSz])
binary.BigEndian.PutUint32(buf[frmHdr+16:], chksum1)
binary.BigEndian.PutUint32(buf[frmHdr+20:], chksum2)
frmHdr += WALFrameHeaderSize + pageSz
}
return buf, nil
}
func (c *CompactingScanner) scan() error {
waitingForCommit := false
txFrames := make(map[uint32]*cFrame)

@ -101,3 +101,35 @@ func Test_CompactingScanner_Scan_Commit0(t *testing.T) {
t.Fatalf("expected EOF, got %v", err)
}
}
func Test_CompactingScanner_Bytes(t *testing.T) {
conn, path := mustCreateWAL(t, 128*1024)
defer conn.Close()
b, err := os.ReadFile(path)
if err != nil {
t.Fatal(err)
}
s, err := NewCompactingScanner(bytes.NewReader(b), false)
if err != nil {
t.Fatal(err)
}
var ramWriter bytes.Buffer
w, err := NewWriter(s)
if err != nil {
t.Fatal(err)
}
_, err = w.WriteTo(&ramWriter)
if err != nil {
t.Fatal(err)
}
buf, err := s.Bytes()
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(buf, ramWriter.Bytes()) {
t.Fatal("bytes mismatch")
}
}

@ -18,6 +18,22 @@ type WALHeader struct {
Checksum2 uint32
}
// Copy copies the WALHeader to the given byte slice. If the byte slice
// is too small, a panic occurs.
func (h *WALHeader) Copy(b []byte) {
if len(b) < WALHeaderSize {
panic("byte slice too small")
}
binary.BigEndian.PutUint32(b[0:], h.Magic)
binary.BigEndian.PutUint32(b[4:], h.Version)
binary.BigEndian.PutUint32(b[8:], h.PageSize)
binary.BigEndian.PutUint32(b[12:], h.Seq)
binary.BigEndian.PutUint32(b[16:], h.Salt1)
binary.BigEndian.PutUint32(b[20:], h.Salt2)
binary.BigEndian.PutUint32(b[24:], h.Checksum1)
binary.BigEndian.PutUint32(b[28:], h.Checksum2)
}
// Frame points to a single WAL frame in a WAL file.
type Frame struct {
Pgno uint32

@ -1949,15 +1949,14 @@ func (s *Store) fsmSnapshot() (fSnap raft.FSMSnapshot, retErr error) {
if err != nil {
return nil, err
}
walWr, err := wal.NewWriter(scanner)
compactedBytes, err := scanner.Bytes()
if err != nil {
return nil, err
}
if _, err := walWr.WriteTo(compactedBuf); err != nil {
return nil, err
}
walFD.Close() // We need it closed for the next step.
walFD.Close() // We need it closed before truncating it.
stats.Get(snapshotCreateWALCompactDuration).(*expvar.Int).Set(time.Since(compactStartTime).Milliseconds())
compactedBuf = bytes.NewBuffer(compactedBytes)
// Now that we're written a (compacted) copy of the WAL to the Snapshot,
// we can truncate the WAL. We use truncate mode so that the next WAL

Loading…
Cancel
Save