1
0
Fork 0

Support snaphotting on WAL size

master
Philip O'Toole 9 months ago
parent 685721e082
commit 7c97fc4347

@ -1,6 +1,7 @@
## 8.13.6 (unreleased)
### Implementation changes and bug fixes
- [PR #1528](https://github.com/rqlite/rqlite/pull/1528): Support setting trailing logs for user-requested snapshot.
- [PR #1529](https://github.com/rqlite/rqlite/pull/1529): Remove obsolete code related to user-triggered snapshots.
## 8.13.5 (December 26th 2023)
### Implementation changes and bug fixes

@ -144,6 +144,9 @@ type Config struct {
// RaftSnapThreshold is the number of outstanding log entries that trigger snapshot.
RaftSnapThreshold uint64
// RaftSnapThreshold is the size of a SQLite WAL file which will trigger a snapshot.
RaftSnapThresholdWALSize uint64
// RaftSnapInterval sets the threshold check interval.
RaftSnapInterval time.Duration
@ -462,6 +465,7 @@ func ParseFlags(name, desc string, build *BuildInfo) (*Config, error) {
flag.DurationVar(&config.RaftElectionTimeout, "raft-election-timeout", time.Second, "Raft election timeout")
flag.DurationVar(&config.RaftApplyTimeout, "raft-apply-timeout", 10*time.Second, "Raft apply timeout")
flag.Uint64Var(&config.RaftSnapThreshold, "raft-snap", 8192, "Number of outstanding log entries that trigger snapshot and Raft log compaction")
flag.Uint64Var(&config.RaftSnapThresholdWALSize, "raft-snap-wal-size", 16*1024*1024, "Size of a SQLite WAL file in bytes which will trigger a snapshot and Raft log compaction")
flag.DurationVar(&config.RaftSnapInterval, "raft-snap-int", 30*time.Second, "Snapshot threshold check interval")
flag.DurationVar(&config.RaftLeaderLeaseTimeout, "raft-leader-lease-timeout", 0, "Raft leader lease timeout. Use 0s for Raft default")
flag.BoolVar(&config.RaftStepdownOnShutdown, "raft-shutdown-stepdown", true, "If leader, stepdown before shutting down. Enabled by default")

@ -268,6 +268,7 @@ func createStore(cfg *Config, ln *tcp.Layer) (*store.Store, error) {
str.RaftLogLevel = cfg.RaftLogLevel
str.ShutdownOnRemove = cfg.RaftShutdownOnRemove
str.SnapshotThreshold = cfg.RaftSnapThreshold
str.SnapshotThresholdWALSize = cfg.RaftSnapThresholdWALSize
str.SnapshotInterval = cfg.RaftSnapInterval
str.LeaderLeaseTimeout = cfg.RaftLeaderLeaseTimeout
str.HeartbeatTimeout = cfg.RaftHeartbeatTimeout

@ -102,6 +102,8 @@ const (
numSnapshotsFailed = "num_snapshots_failed"
numUserSnapshots = "num_user_snapshots"
numUserSnapshotsFailed = "num_user_snapshots_failed"
numWALSnapshots = "num_wal_snapshots"
numWALSnapshotsFailed = "num_wal_snapshots_failed"
numSnapshotsFull = "num_snapshots_full"
numSnapshotsIncremental = "num_snapshots_incremental"
numProvides = "num_provides"
@ -146,6 +148,8 @@ func ResetStats() {
stats.Add(numSnapshotsFailed, 0)
stats.Add(numUserSnapshots, 0)
stats.Add(numUserSnapshotsFailed, 0)
stats.Add(numWALSnapshots, 0)
stats.Add(numWALSnapshotsFailed, 0)
stats.Add(numSnapshotsFull, 0)
stats.Add(numSnapshotsIncremental, 0)
stats.Add(numBoots, 0)
@ -233,6 +237,10 @@ type Store struct {
numClosedReadyChannels int
readyChansMu sync.Mutex
// Channels for WAL-size triggered snapshotting
snapshotWClose chan struct{}
snapshotWDone chan struct{}
// Latest log entry index actually reflected by the FSM. Due to Raft code
// this value is not updated after a Snapshot-restore.
fsmIndex uint64
@ -269,7 +277,7 @@ type Store struct {
ShutdownOnRemove bool
SnapshotThreshold uint64
SnapshotWALSizeThreshold uint64
SnapshotThresholdWALSize uint64
SnapshotInterval time.Duration
LeaderLeaseTimeout time.Duration
HeartbeatTimeout time.Duration
@ -508,6 +516,9 @@ func (s *Store) Open() (retErr error) {
s.raft.RegisterObserver(s.observer)
s.observerClose, s.observerDone = s.observe()
// WAL-size triggered snapshotting.
s.snapshotWClose, s.snapshotWDone = s.runWALSnapshotting()
// Periodically update the applied index for faster startup.
s.appliedIdxUpdateDone = s.updateAppliedIndex()
@ -595,6 +606,9 @@ func (s *Store) Close(wait bool) (retErr error) {
close(s.observerClose)
<-s.observerDone
close(s.snapshotWClose)
<-s.snapshotWDone
f := s.raft.Shutdown()
if wait {
if f.Error() != nil {
@ -1943,27 +1957,35 @@ func (s *Store) Snapshot(n uint64) (retError error) {
return nil
}
// runSnapshotting runs the user-requested snapshotting, and returns the
// trigger channel, the close channel, and the done channel. Unless Raft
// triggered Snapshotting the log is also compacted after each snapshot.
func (s *Store) runSnapshotting() (triggerT, closeCh, doneCh chan struct{}) {
// runWALSnapshotting runs the periodic check to see if a snapshot should be
// triggered due to WAL size.
func (s *Store) runWALSnapshotting() (closeCh, doneCh chan struct{}) {
closeCh = make(chan struct{})
doneCh = make(chan struct{})
triggerT = make(chan struct{})
ticker := time.NewTicker(s.SnapshotInterval)
go func() {
defer close(doneCh)
for {
select {
case <-triggerT:
if err := s.Snapshot(1); err != nil {
s.logger.Printf("failed to snapshot: %s", err.Error())
case <-ticker.C:
sz, err := s.db.WALSize()
if err != nil {
s.logger.Printf("failed to get WAL size: %s", err.Error())
continue
}
if uint64(sz) >= s.SnapshotThresholdWALSize {
if err := s.Snapshot(0); err != nil {
stats.Add(numWALSnapshotsFailed, 1)
s.logger.Printf("failed to snapshot due to WAL threshold: %s", err.Error())
}
stats.Add(numWALSnapshots, 1)
}
case <-closeCh:
return
}
}
}()
return triggerT, closeCh, doneCh
return closeCh, doneCh
}
// selfLeaderChange is called when this node detects that its leadership

Loading…
Cancel
Save