|
|
|
@ -71,6 +71,8 @@ const (
|
|
|
|
|
numRemovedBeforeJoins = "num_removed_before_joins"
|
|
|
|
|
snapshotCreateDuration = "snapshot_create_duration"
|
|
|
|
|
snapshotPersistDuration = "snapshot_persist_duration"
|
|
|
|
|
snapshotDBSerializedSize = "snapshot_db_serialized_size"
|
|
|
|
|
snapshotDBOnDiskSize = "snapshot_db_ondisk_size"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
// BackupFormat represents the format of database backup.
|
|
|
|
@ -99,6 +101,8 @@ func init() {
|
|
|
|
|
stats.Add(numRemovedBeforeJoins, 0)
|
|
|
|
|
stats.Add(snapshotCreateDuration, 0)
|
|
|
|
|
stats.Add(snapshotPersistDuration, 0)
|
|
|
|
|
stats.Add(snapshotDBSerializedSize, 0)
|
|
|
|
|
stats.Add(snapshotDBOnDiskSize, 0)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ClusterState defines the possible Raft states the current node can be in
|
|
|
|
@ -1044,6 +1048,7 @@ func (s *Store) Snapshot() (raft.FSMSnapshot, error) {
|
|
|
|
|
dur := time.Since(fsm.startT)
|
|
|
|
|
stats.Add(numSnaphots, 1)
|
|
|
|
|
stats.Get(snapshotCreateDuration).(*expvar.Int).Set(dur.Milliseconds())
|
|
|
|
|
stats.Get(snapshotDBSerializedSize).(*expvar.Int).Set(int64(len(fsm.database)))
|
|
|
|
|
s.logger.Printf("node snapshot created in %s", dur)
|
|
|
|
|
return fsm, nil
|
|
|
|
|
}
|
|
|
|
@ -1213,6 +1218,7 @@ func (f *fsmSnapshot) Persist(sink raft.SnapshotSink) error {
|
|
|
|
|
if _, err := sink.Write(cdb); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
stats.Get(snapshotDBOnDiskSize).(*expvar.Int).Set(int64(len(cdb)))
|
|
|
|
|
} else {
|
|
|
|
|
f.logger.Println("no database data available for snapshot")
|
|
|
|
|
err = writeUint64(b, uint64(0))
|
|
|
|
@ -1222,6 +1228,7 @@ func (f *fsmSnapshot) Persist(sink raft.SnapshotSink) error {
|
|
|
|
|
if _, err := sink.Write(b.Bytes()); err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
stats.Get(snapshotDBOnDiskSize).(*expvar.Int).Set(0)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Close the sink.
|
|
|
|
|