1
0
Fork 0

Merge pull request #1441 from rqlite/integrate-compacting-wal

Integrate Compacting WAL writer
master
Philip O'Toole 10 months ago committed by GitHub
commit 9de61cb444
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -51,6 +51,7 @@ When officially released 8.0 will support (mostly) seamless upgrades from the 7.
- [PR #1430](https://github.com/rqlite/rqlite/pull/1430): Check that any supplied Join addresses are not HTTP servers.
- [PR #1437](https://github.com/rqlite/rqlite/pull/1437), [PR #1438](https://github.com/rqlite/rqlite/pull/1438), [PR #1439](https://github.com/rqlite/rqlite/pull/1439): Actually timeout if needed during `nodes/` access. Fixes [issue #1435](https://github.com/rqlite/rqlite/issues/1435). Thanks @dwco-z
- [PR #1440](https://github.com/rqlite/rqlite/pull/1440): Add a Compacting WAL rewriter. Thanks @benbjohnson.
- [PR #1441](https://github.com/rqlite/rqlite/pull/1441): Integrate Compacting WAL writer
## 7.21.4 (July 8th 2023)
### Implementation changes and bug fixes

@ -23,6 +23,7 @@ import (
"github.com/rqlite/rqlite/command"
"github.com/rqlite/rqlite/command/chunking"
sql "github.com/rqlite/rqlite/db"
wal "github.com/rqlite/rqlite/db/wal"
rlog "github.com/rqlite/rqlite/log"
"github.com/rqlite/rqlite/snapshot"
)
@ -84,31 +85,32 @@ const (
)
const (
numSnapshots = "num_snapshots"
numSnapshotsFull = "num_snapshots_full"
numSnapshotsIncremental = "num_snapshots_incremental"
numProvides = "num_provides"
numBackups = "num_backups"
numLoads = "num_loads"
numRestores = "num_restores"
numAutoRestores = "num_auto_restores"
numAutoRestoresSkipped = "num_auto_restores_skipped"
numAutoRestoresFailed = "num_auto_restores_failed"
numRecoveries = "num_recoveries"
numUncompressedCommands = "num_uncompressed_commands"
numCompressedCommands = "num_compressed_commands"
numJoins = "num_joins"
numIgnoredJoins = "num_ignored_joins"
numRemovedBeforeJoins = "num_removed_before_joins"
numDBStatsErrors = "num_db_stats_errors"
snapshotCreateDuration = "snapshot_create_duration"
snapshotPersistDuration = "snapshot_persist_duration"
snapshotWALSize = "snapshot_wal_size"
leaderChangesObserved = "leader_changes_observed"
leaderChangesDropped = "leader_changes_dropped"
failedHeartbeatObserved = "failed_heartbeat_observed"
nodesReapedOK = "nodes_reaped_ok"
nodesReapedFailed = "nodes_reaped_failed"
numSnapshots = "num_snapshots"
numSnapshotsFull = "num_snapshots_full"
numSnapshotsIncremental = "num_snapshots_incremental"
numProvides = "num_provides"
numBackups = "num_backups"
numLoads = "num_loads"
numRestores = "num_restores"
numAutoRestores = "num_auto_restores"
numAutoRestoresSkipped = "num_auto_restores_skipped"
numAutoRestoresFailed = "num_auto_restores_failed"
numRecoveries = "num_recoveries"
numUncompressedCommands = "num_uncompressed_commands"
numCompressedCommands = "num_compressed_commands"
numJoins = "num_joins"
numIgnoredJoins = "num_ignored_joins"
numRemovedBeforeJoins = "num_removed_before_joins"
numDBStatsErrors = "num_db_stats_errors"
snapshotCreateDuration = "snapshot_create_duration"
snapshotPersistDuration = "snapshot_persist_duration"
snapshotPrecompactWALSize = "snapshot_precompact_wal_size"
snapshotWALSize = "snapshot_wal_size"
leaderChangesObserved = "leader_changes_observed"
leaderChangesDropped = "leader_changes_dropped"
failedHeartbeatObserved = "failed_heartbeat_observed"
nodesReapedOK = "nodes_reaped_ok"
nodesReapedFailed = "nodes_reaped_failed"
)
// stats captures stats for the Store.
@ -140,6 +142,7 @@ func ResetStats() {
stats.Add(numDBStatsErrors, 0)
stats.Add(snapshotCreateDuration, 0)
stats.Add(snapshotPersistDuration, 0)
stats.Add(snapshotPrecompactWALSize, 0)
stats.Add(snapshotWALSize, 0)
stats.Add(leaderChangesObserved, 0)
stats.Add(leaderChangesDropped, 0)
@ -1696,20 +1699,42 @@ func (s *Store) Snapshot() (raft.FSMSnapshot, error) {
fsmSnapshot = snapshot.NewSnapshot(dbFD)
stats.Add(numSnapshotsFull, 1)
} else {
var b []byte
compactedBuf := bytes.NewBuffer(nil)
var err error
if pathExists(s.db.WALPath()) {
b, err = os.ReadFile(s.db.WALPath())
walFD, err := os.Open(s.db.WALPath())
if err != nil {
return nil, err
}
stats.Get(snapshotWALSize).(*expvar.Int).Set(int64(len(b)))
s.logger.Printf("%s snapshot is %d bytes on node ID %s", fPLog, len(b), s.raftID)
defer walFD.Close() // Make sure it closes.
scanner, err := wal.NewCompactingScanner(walFD)
if err != nil {
return nil, err
}
walWr, err := wal.NewWriter(scanner)
if err != nil {
return nil, err
}
if _, err := walWr.WriteTo(compactedBuf); err != nil {
return nil, err
}
walFD.Close() // We need it closed for the next step.
walSz, err := fileSize(s.db.WALPath())
if err != nil {
return nil, err
}
stats.Get(snapshotWALSize).(*expvar.Int).Set(int64(compactedBuf.Len()))
stats.Get(snapshotPrecompactWALSize).(*expvar.Int).Set(walSz)
s.logger.Printf("%s snapshot is %d bytes (WAL=%d bytes) on node ID %s", fPLog, compactedBuf.Len(),
walSz, s.raftID)
if err := s.db.Checkpoint(); err != nil {
return nil, err
}
}
fsmSnapshot = snapshot.NewSnapshot(io.NopCloser(bytes.NewBuffer(b)))
fsmSnapshot = snapshot.NewSnapshot(io.NopCloser(compactedBuf))
if err != nil {
return nil, err
}
@ -2231,6 +2256,14 @@ func dirExists(path string) bool {
return err == nil && stat.IsDir()
}
func fileSize(path string) (int64, error) {
stat, err := os.Stat(path)
if err != nil {
return 0, err
}
return stat.Size(), nil
}
// dirSize returns the total size of all files in the given directory
func dirSize(path string) (int64, error) {
var size int64

Loading…
Cancel
Save