@ -103,6 +103,7 @@ const (
trailingScale = 1.25
observerChanLen = 50
baseVacuumTimeKey = "rqlite_base_vacuum"
lastVacuumTimeKey = "rqlite_last_vacuum"
)
@ -545,8 +546,8 @@ func (s *Store) Open() (retErr error) {
// Periodically update the applied index for faster startup.
s . appliedIdxUpdateDone = s . updateAppliedIndex ( )
if err := s . init Last VacuumTime( ) ; err != nil {
return fmt . Errorf ( "failed to initialize last vacuum time : %s", err . Error ( ) )
if err := s . init VacuumTime( ) ; err != nil {
return fmt . Errorf ( "failed to initialize auto-vacuum times : %s", err . Error ( ) )
}
return nil
}
@ -746,12 +747,7 @@ func (s *Store) State() ClusterState {
// LastVacuumTime returns the time of the last automatic VACUUM.
func ( s * Store ) LastVacuumTime ( ) ( time . Time , error ) {
vt , err := s . boltStore . Get ( [ ] byte ( lastVacuumTimeKey ) )
if err != nil {
return time . Time { } , fmt . Errorf ( "failed to get last vacuum time: %s" , err )
}
n := int64 ( binary . LittleEndian . Uint64 ( vt ) )
return time . Unix ( 0 , n ) , nil
return s . getKeyTime ( lastVacuumTimeKey )
}
// Path returns the path to the store's storage directory.
@ -1019,8 +1015,20 @@ func (s *Store) Stats() (map[string]interface{}, error) {
"sqlite3" : dbStatus ,
"db_conf" : s . dbConf ,
}
if lVac , err := s . LastVacuumTime ( ) ; err == nil {
status [ "last_vacuum" ] = lVac . String ( )
if s . AutoVacInterval > 0 {
bt , err := s . getKeyTime ( baseVacuumTimeKey )
if err != nil {
return nil , err
}
avm := map [ string ] interface { } { }
if lvt , err := s . LastVacuumTime ( ) ; err == nil {
avm [ "last_vacuum" ] = lvt
bt = lvt
}
avm [ "next_vacuum_after" ] = bt . Add ( s . AutoVacInterval )
status [ "auto_vaccum" ] = avm
}
// Snapshot stats may be in flux if a snapshot is in progress. Only
@ -1430,9 +1438,6 @@ func (s *Store) Vacuum() error {
if err := s . snapshotStore . SetFullNeeded ( ) ; err != nil {
return err
}
if err := s . setLastVacuumTime ( time . Now ( ) ) ; err != nil {
return err
}
return nil
}
@ -1675,22 +1680,47 @@ func (s *Store) remove(id string) error {
return f . Error ( )
}
func ( s * Store ) initLastVacuumTime ( ) error {
// initVacuumTime initializes the last vacuum times in the Config store.
// If auto-vacuum is disabled, then all auto-vacuum related state is removed.
// If enabled, but no last vacuum time is set, then the auto-bac baseline
// time i.e. now is set. If a last vacuum time is set, then it is left as is.
func ( s * Store ) initVacuumTime ( ) error {
if s . AutoVacInterval == 0 {
if err := s . clearKeyTime ( baseVacuumTimeKey ) ; err != nil {
return fmt . Errorf ( "failed to clear base vacuum time: %s" , err )
}
if err := s . clearKeyTime ( lastVacuumTimeKey ) ; err != nil {
return fmt . Errorf ( "failed to clear last vacuum time: %s" , err )
}
return nil
}
if _ , err := s . LastVacuumTime ( ) ; err != nil {
return s . setLastVacuumTime ( time . Now ( ) )
return s . set KeyTime( baseVacuumTimeKey , time . Now ( ) )
}
return nil
}
func ( s * Store ) setLastVacuumTime ( t time . Time ) error {
func ( s * Store ) set KeyTime( key string , t time . Time ) error {
buf := bytes . NewBuffer ( make ( [ ] byte , 0 , 8 ) )
if err := binary . Write ( buf , binary . LittleEndian , t . UnixNano ( ) ) ; err != nil {
return fmt . Errorf ( "failed to encode last vacuum time: %s" , err )
return err
}
if err := s . boltStore . Set ( [ ] byte ( lastVacuumTimeKey ) , buf . Bytes ( ) ) ; err != nil {
return fmt . Errorf ( "failed to set last vacuum time: %s" , err )
return s . boltStore . Set ( [ ] byte ( key ) , buf . Bytes ( ) )
}
func ( s * Store ) getKeyTime ( key string ) ( time . Time , error ) {
kt , err := s . boltStore . Get ( [ ] byte ( key ) )
if err != nil {
return time . Time { } , fmt . Errorf ( "failed to get key %s: %s" , key , err )
} else if kt == nil {
return time . Time { } , fmt . Errorf ( "key %s is nil" , key )
}
return nil
n := int64 ( binary . LittleEndian . Uint64 ( kt ) )
return time . Unix ( 0 , n ) , nil
}
func ( s * Store ) clearKeyTime ( key string ) error {
return s . boltStore . Set ( [ ] byte ( key ) , nil )
}
// raftConfig returns a new Raft config for the store.
@ -1835,7 +1865,8 @@ func (s *Store) fsmSnapshot() (fSnap raft.FSMSnapshot, retErr error) {
}
} ( )
// Automatic VACUUM needed?
// Automatic VACUUM needed? This is deliberately done in the context of a Snapshot
// as it guarantees that the database is not being written to.
if avn , err := s . autoVacNeeded ( time . Now ( ) ) ; err != nil {
return nil , err
} else if avn {
@ -1848,7 +1879,7 @@ func (s *Store) fsmSnapshot() (fSnap raft.FSMSnapshot, retErr error) {
stats . Get ( autoVacuumDuration ) . ( * expvar . Int ) . Set ( time . Since ( vacStart ) . Milliseconds ( ) )
stats . Add ( numAutoVacuums , 1 )
s . numAutoVacuums ++
if err := s . set LastVacuumTime( time . Now ( ) ) ; err != nil {
if err := s . set KeyTime( lastVacuumTimeKey , time . Now ( ) ) ; err != nil {
return nil , err
}
}
@ -2219,11 +2250,16 @@ func (s *Store) autoVacNeeded(t time.Time) (bool, error) {
if s . AutoVacInterval == 0 {
return false , nil
}
lvt , err := s . LastVacuumTime ( )
vt , err := s . LastVacuumTime ( )
if err == nil {
return t . Sub ( vt ) > s . AutoVacInterval , nil
}
// OK, check if we have a base time from which we can start.
bt , err := s . getKeyTime ( baseVacuumTimeKey )
if err != nil {
return false , err
}
return t . Sub ( lvt ) > s . AutoVacInterval , nil
return t . Sub ( b t) > s . AutoVacInterval , nil
}
// createOnDisk opens an on-disk database file at the configured path. Any