diff --git a/CHANGELOG.md b/CHANGELOG.md index 81f914c6..a2bf9fc5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,10 @@ -## 8.13.6 (unreleased) +## 8.14.0 (unreleased) +### New features +- [PR #1530](https://github.com/rqlite/rqlite/pull/1530): Support automatically snapshotting when WAL reaches a certain size + ### Implementation changes and bug fixes - [PR #1528](https://github.com/rqlite/rqlite/pull/1528): Support setting trailing logs for user-requested snapshot. +- [PR #1529](https://github.com/rqlite/rqlite/pull/1529): Remove obsolete code related to user-triggered snapshots. ## 8.13.5 (December 26th 2023) ### Implementation changes and bug fixes diff --git a/cmd/rqlited/flags.go b/cmd/rqlited/flags.go index 0a0e7061..e5e9ed69 100644 --- a/cmd/rqlited/flags.go +++ b/cmd/rqlited/flags.go @@ -144,6 +144,9 @@ type Config struct { // RaftSnapThreshold is the number of outstanding log entries that trigger snapshot. RaftSnapThreshold uint64 + // RaftSnapThreshold is the size of a SQLite WAL file which will trigger a snapshot. + RaftSnapThresholdWALSize uint64 + // RaftSnapInterval sets the threshold check interval. RaftSnapInterval time.Duration @@ -462,6 +465,7 @@ func ParseFlags(name, desc string, build *BuildInfo) (*Config, error) { flag.DurationVar(&config.RaftElectionTimeout, "raft-election-timeout", time.Second, "Raft election timeout") flag.DurationVar(&config.RaftApplyTimeout, "raft-apply-timeout", 10*time.Second, "Raft apply timeout") flag.Uint64Var(&config.RaftSnapThreshold, "raft-snap", 8192, "Number of outstanding log entries that trigger snapshot and Raft log compaction") + flag.Uint64Var(&config.RaftSnapThresholdWALSize, "raft-snap-wal-size", 16*1024*1024, "Size of a SQLite WAL file in bytes which will trigger a snapshot and Raft log compaction") flag.DurationVar(&config.RaftSnapInterval, "raft-snap-int", 30*time.Second, "Snapshot threshold check interval") flag.DurationVar(&config.RaftLeaderLeaseTimeout, "raft-leader-lease-timeout", 0, "Raft leader lease timeout. Use 0s for Raft default") flag.BoolVar(&config.RaftStepdownOnShutdown, "raft-shutdown-stepdown", true, "If leader, stepdown before shutting down. Enabled by default") diff --git a/cmd/rqlited/main.go b/cmd/rqlited/main.go index b3ca8cf4..066397be 100644 --- a/cmd/rqlited/main.go +++ b/cmd/rqlited/main.go @@ -268,6 +268,7 @@ func createStore(cfg *Config, ln *tcp.Layer) (*store.Store, error) { str.RaftLogLevel = cfg.RaftLogLevel str.ShutdownOnRemove = cfg.RaftShutdownOnRemove str.SnapshotThreshold = cfg.RaftSnapThreshold + str.SnapshotThresholdWALSize = cfg.RaftSnapThresholdWALSize str.SnapshotInterval = cfg.RaftSnapInterval str.LeaderLeaseTimeout = cfg.RaftLeaderLeaseTimeout str.HeartbeatTimeout = cfg.RaftHeartbeatTimeout diff --git a/store/store.go b/store/store.go index f2db1e29..beefc1f3 100644 --- a/store/store.go +++ b/store/store.go @@ -102,6 +102,8 @@ const ( numSnapshotsFailed = "num_snapshots_failed" numUserSnapshots = "num_user_snapshots" numUserSnapshotsFailed = "num_user_snapshots_failed" + numWALSnapshots = "num_wal_snapshots" + numWALSnapshotsFailed = "num_wal_snapshots_failed" numSnapshotsFull = "num_snapshots_full" numSnapshotsIncremental = "num_snapshots_incremental" numProvides = "num_provides" @@ -146,6 +148,8 @@ func ResetStats() { stats.Add(numSnapshotsFailed, 0) stats.Add(numUserSnapshots, 0) stats.Add(numUserSnapshotsFailed, 0) + stats.Add(numWALSnapshots, 0) + stats.Add(numWALSnapshotsFailed, 0) stats.Add(numSnapshotsFull, 0) stats.Add(numSnapshotsIncremental, 0) stats.Add(numBoots, 0) @@ -233,6 +237,10 @@ type Store struct { numClosedReadyChannels int readyChansMu sync.Mutex + // Channels for WAL-size triggered snapshotting + snapshotWClose chan struct{} + snapshotWDone chan struct{} + // Latest log entry index actually reflected by the FSM. Due to Raft code // this value is not updated after a Snapshot-restore. fsmIndex uint64 @@ -269,7 +277,7 @@ type Store struct { ShutdownOnRemove bool SnapshotThreshold uint64 - SnapshotWALSizeThreshold uint64 + SnapshotThresholdWALSize uint64 SnapshotInterval time.Duration LeaderLeaseTimeout time.Duration HeartbeatTimeout time.Duration @@ -508,6 +516,9 @@ func (s *Store) Open() (retErr error) { s.raft.RegisterObserver(s.observer) s.observerClose, s.observerDone = s.observe() + // WAL-size triggered snapshotting. + s.snapshotWClose, s.snapshotWDone = s.runWALSnapshotting() + // Periodically update the applied index for faster startup. s.appliedIdxUpdateDone = s.updateAppliedIndex() @@ -595,6 +606,11 @@ func (s *Store) Close(wait bool) (retErr error) { close(s.observerClose) <-s.observerDone + if s.snapshotWClose != nil { + close(s.snapshotWClose) + <-s.snapshotWDone + } + f := s.raft.Shutdown() if wait { if f.Error() != nil { @@ -1943,27 +1959,38 @@ func (s *Store) Snapshot(n uint64) (retError error) { return nil } -// runSnapshotting runs the user-requested snapshotting, and returns the -// trigger channel, the close channel, and the done channel. Unless Raft -// triggered Snapshotting the log is also compacted after each snapshot. -func (s *Store) runSnapshotting() (triggerT, closeCh, doneCh chan struct{}) { +// runWALSnapshotting runs the periodic check to see if a snapshot should be +// triggered due to WAL size. +func (s *Store) runWALSnapshotting() (closeCh, doneCh chan struct{}) { + if s.SnapshotInterval <= 0 { + return nil, nil + } closeCh = make(chan struct{}) doneCh = make(chan struct{}) - triggerT = make(chan struct{}) + ticker := time.NewTicker(s.SnapshotInterval) go func() { defer close(doneCh) for { select { - case <-triggerT: - if err := s.Snapshot(1); err != nil { - s.logger.Printf("failed to snapshot: %s", err.Error()) + case <-ticker.C: + sz, err := s.db.WALSize() + if err != nil { + s.logger.Printf("failed to get WAL size: %s", err.Error()) + continue + } + if uint64(sz) >= s.SnapshotThresholdWALSize { + if err := s.Snapshot(0); err != nil { + stats.Add(numWALSnapshotsFailed, 1) + s.logger.Printf("failed to snapshot due to WAL threshold: %s", err.Error()) + } + stats.Add(numWALSnapshots, 1) } case <-closeCh: return } } }() - return triggerT, closeCh, doneCh + return closeCh, doneCh } // selfLeaderChange is called when this node detects that its leadership