@ -102,6 +102,8 @@ const (
numSnapshotsFailed = "num_snapshots_failed"
numUserSnapshots = "num_user_snapshots"
numUserSnapshotsFailed = "num_user_snapshots_failed"
numWALSnapshots = "num_wal_snapshots"
numWALSnapshotsFailed = "num_wal_snapshots_failed"
numSnapshotsFull = "num_snapshots_full"
numSnapshotsIncremental = "num_snapshots_incremental"
numProvides = "num_provides"
@ -146,6 +148,8 @@ func ResetStats() {
stats . Add ( numSnapshotsFailed , 0 )
stats . Add ( numUserSnapshots , 0 )
stats . Add ( numUserSnapshotsFailed , 0 )
stats . Add ( numWALSnapshots , 0 )
stats . Add ( numWALSnapshotsFailed , 0 )
stats . Add ( numSnapshotsFull , 0 )
stats . Add ( numSnapshotsIncremental , 0 )
stats . Add ( numBoots , 0 )
@ -233,6 +237,10 @@ type Store struct {
numClosedReadyChannels int
readyChansMu sync . Mutex
// Channels for WAL-size triggered snapshotting
snapshotWClose chan struct { }
snapshotWDone chan struct { }
// Latest log entry index actually reflected by the FSM. Due to Raft code
// this value is not updated after a Snapshot-restore.
fsmIndex uint64
@ -269,7 +277,7 @@ type Store struct {
ShutdownOnRemove bool
SnapshotThreshold uint64
Snapshot WALSize Threshold uint64
Snapshot ThresholdWALSize uint64
SnapshotInterval time . Duration
LeaderLeaseTimeout time . Duration
HeartbeatTimeout time . Duration
@ -508,6 +516,9 @@ func (s *Store) Open() (retErr error) {
s . raft . RegisterObserver ( s . observer )
s . observerClose , s . observerDone = s . observe ( )
// WAL-size triggered snapshotting.
s . snapshotWClose , s . snapshotWDone = s . runWALSnapshotting ( )
// Periodically update the applied index for faster startup.
s . appliedIdxUpdateDone = s . updateAppliedIndex ( )
@ -595,6 +606,11 @@ func (s *Store) Close(wait bool) (retErr error) {
close ( s . observerClose )
<- s . observerDone
if s . snapshotWClose != nil {
close ( s . snapshotWClose )
<- s . snapshotWDone
}
f := s . raft . Shutdown ( )
if wait {
if f . Error ( ) != nil {
@ -1943,27 +1959,38 @@ func (s *Store) Snapshot(n uint64) (retError error) {
return nil
}
// runSnapshotting runs the user-requested snapshotting, and returns the
// trigger channel, the close channel, and the done channel. Unless Raft
// triggered Snapshotting the log is also compacted after each snapshot.
func ( s * Store ) runSnapshotting ( ) ( triggerT , closeCh , doneCh chan struct { } ) {
// runWALSnapshotting runs the periodic check to see if a snapshot should be
// triggered due to WAL size.
func ( s * Store ) runWALSnapshotting ( ) ( closeCh , doneCh chan struct { } ) {
if s . SnapshotInterval <= 0 {
return nil , nil
}
closeCh = make ( chan struct { } )
doneCh = make ( chan struct { } )
t riggerT = make ( chan struct { } )
t icker := time . NewTicker ( s . SnapshotInterval )
go func ( ) {
defer close ( doneCh )
for {
select {
case <- triggerT :
if err := s . Snapshot ( 1 ) ; err != nil {
s . logger . Printf ( "failed to snapshot: %s" , err . Error ( ) )
case <- ticker . C :
sz , err := s . db . WALSize ( )
if err != nil {
s . logger . Printf ( "failed to get WAL size: %s" , err . Error ( ) )
continue
}
if uint64 ( sz ) >= s . SnapshotThresholdWALSize {
if err := s . Snapshot ( 0 ) ; err != nil {
stats . Add ( numWALSnapshotsFailed , 1 )
s . logger . Printf ( "failed to snapshot due to WAL threshold: %s" , err . Error ( ) )
}
stats . Add ( numWALSnapshots , 1 )
}
case <- closeCh :
return
}
}
} ( )
return triggerT , closeCh , doneCh
return closeCh, doneCh
}
// selfLeaderChange is called when this node detects that its leadership