|
|
|
@ -141,6 +141,9 @@ const (
|
|
|
|
|
numRemovedBeforeJoins = "num_removed_before_joins"
|
|
|
|
|
numDBStatsErrors = "num_db_stats_errors"
|
|
|
|
|
snapshotCreateDuration = "snapshot_create_duration"
|
|
|
|
|
snapshotCreateChkRestartDuration = "snapshot_create_chk_restart_duration"
|
|
|
|
|
snapshotCreateChkTruncateDuration = "snapshot_create_chk_truncate_duration"
|
|
|
|
|
snapshotCreateWALCompactDuration = "snapshot_create_wal_compact_duration"
|
|
|
|
|
snapshotPersistDuration = "snapshot_persist_duration"
|
|
|
|
|
snapshotPrecompactWALSize = "snapshot_precompact_wal_size"
|
|
|
|
|
snapshotWALSize = "snapshot_wal_size"
|
|
|
|
@ -195,6 +198,9 @@ func ResetStats() {
|
|
|
|
|
stats.Add(numRemovedBeforeJoins, 0)
|
|
|
|
|
stats.Add(numDBStatsErrors, 0)
|
|
|
|
|
stats.Add(snapshotCreateDuration, 0)
|
|
|
|
|
stats.Add(snapshotCreateChkRestartDuration, 0)
|
|
|
|
|
stats.Add(snapshotCreateChkTruncateDuration, 0)
|
|
|
|
|
stats.Add(snapshotCreateWALCompactDuration, 0)
|
|
|
|
|
stats.Add(snapshotPersistDuration, 0)
|
|
|
|
|
stats.Add(snapshotPrecompactWALSize, 0)
|
|
|
|
|
stats.Add(snapshotWALSize, 0)
|
|
|
|
@ -1919,10 +1925,12 @@ func (s *Store) fsmSnapshot() (fSnap raft.FSMSnapshot, retErr error) {
|
|
|
|
|
|
|
|
|
|
var fsmSnapshot raft.FSMSnapshot
|
|
|
|
|
if fullNeeded {
|
|
|
|
|
chkStartTime := time.Now()
|
|
|
|
|
if err := s.db.Checkpoint(sql.CheckpointTruncate); err != nil {
|
|
|
|
|
stats.Add(numFullCheckpointFailed, 1)
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
stats.Get(snapshotCreateChkTruncateDuration).(*expvar.Int).Set(time.Since(chkStartTime).Milliseconds())
|
|
|
|
|
dbFD, err := os.Open(s.db.Path())
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
@ -1938,11 +1946,14 @@ func (s *Store) fsmSnapshot() (fSnap raft.FSMSnapshot, retErr error) {
|
|
|
|
|
// it fails if some query is in progress. If it fails, return an error
|
|
|
|
|
// and Raft will retry later. But if it succeeds it means that all readers
|
|
|
|
|
// are reading from the main database file.
|
|
|
|
|
chkRStartTime := time.Now()
|
|
|
|
|
if err := s.db.Checkpoint(sql.CheckpointRestart); err != nil {
|
|
|
|
|
stats.Add(numWALCheckpointRestartFailed, 1)
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
stats.Get(snapshotCreateChkRestartDuration).(*expvar.Int).Set(time.Since(chkRStartTime).Milliseconds())
|
|
|
|
|
|
|
|
|
|
compactStartTime := time.Now()
|
|
|
|
|
// Read a compacted version of the WAL into memory, and write it
|
|
|
|
|
// to the Snapshot store.
|
|
|
|
|
walFD, err := os.Open(s.walPath)
|
|
|
|
@ -1962,6 +1973,7 @@ func (s *Store) fsmSnapshot() (fSnap raft.FSMSnapshot, retErr error) {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
walFD.Close() // We need it closed for the next step.
|
|
|
|
|
stats.Get(snapshotCreateWALCompactDuration).(*expvar.Int).Set(time.Since(compactStartTime).Milliseconds())
|
|
|
|
|
|
|
|
|
|
// Clean-up by truncating the WAL. This should be fast because all the pages
|
|
|
|
|
// have been checkpointed into the main database file, and writes are
|
|
|
|
@ -1971,10 +1983,12 @@ func (s *Store) fsmSnapshot() (fSnap raft.FSMSnapshot, retErr error) {
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
chkTStartTime := time.Now()
|
|
|
|
|
if err := s.db.Checkpoint(sql.CheckpointTruncate); err != nil {
|
|
|
|
|
stats.Add(numWALCheckpointTruncateFailed, 1)
|
|
|
|
|
return nil, fmt.Errorf("failed to truncate WAL: %s", err.Error())
|
|
|
|
|
}
|
|
|
|
|
stats.Get(snapshotCreateChkTruncateDuration).(*expvar.Int).Set(time.Since(chkTStartTime).Milliseconds())
|
|
|
|
|
stats.Get(snapshotWALSize).(*expvar.Int).Set(int64(compactedBuf.Len()))
|
|
|
|
|
stats.Get(snapshotPrecompactWALSize).(*expvar.Int).Set(walSz)
|
|
|
|
|
}
|
|
|
|
|