|
|
|
@ -98,6 +98,7 @@ const (
|
|
|
|
|
sqliteFile = "db.sqlite"
|
|
|
|
|
leaderWaitDelay = 100 * time.Millisecond
|
|
|
|
|
appliedWaitDelay = 100 * time.Millisecond
|
|
|
|
|
commitEquivalenceDelay = 50 * time.Millisecond
|
|
|
|
|
appliedIndexUpdateInterval = 5 * time.Second
|
|
|
|
|
connectionPoolCount = 5
|
|
|
|
|
connectionTimeout = 10 * time.Second
|
|
|
|
@ -712,6 +713,31 @@ func (s *Store) WaitForAppliedIndex(idx uint64, timeout time.Duration) error {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// WaitCommitIndex blocks until the local Raft commit index is equal to
|
|
|
|
|
// or greater than the Leader Commit Index at the time this function
|
|
|
|
|
// is called, or the timeout expires.
|
|
|
|
|
func (s *Store) WaitCommitIndex(timeout time.Duration) error {
|
|
|
|
|
tck := time.NewTicker(commitEquivalenceDelay)
|
|
|
|
|
defer tck.Stop()
|
|
|
|
|
tmr := time.NewTimer(timeout)
|
|
|
|
|
defer tmr.Stop()
|
|
|
|
|
|
|
|
|
|
lci, err := s.LeaderCommitIndex()
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
for {
|
|
|
|
|
select {
|
|
|
|
|
case <-tck.C:
|
|
|
|
|
if lci <= s.raft.CommitIndex() {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
case <-tmr.C:
|
|
|
|
|
return fmt.Errorf("timeout expired")
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// DBAppliedIndex returns the index of the last Raft log that changed the
|
|
|
|
|
// underlying database. If the index is unknown then 0 is returned.
|
|
|
|
|
func (s *Store) DBAppliedIndex() uint64 {
|
|
|
|
|