|
|
@ -707,20 +707,28 @@ func (s *Store) Nodes() ([]*Server, error) {
|
|
|
|
// WaitForRemoval blocks until a node with the given ID is removed from the
|
|
|
|
// WaitForRemoval blocks until a node with the given ID is removed from the
|
|
|
|
// cluster or the timeout expires.
|
|
|
|
// cluster or the timeout expires.
|
|
|
|
func (s *Store) WaitForRemoval(id string, timeout time.Duration) error {
|
|
|
|
func (s *Store) WaitForRemoval(id string, timeout time.Duration) error {
|
|
|
|
|
|
|
|
check := func() bool {
|
|
|
|
|
|
|
|
nodes, err := s.Nodes()
|
|
|
|
|
|
|
|
if err == nil && !Servers(nodes).Contains(id) {
|
|
|
|
|
|
|
|
return true
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
return false
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// try the fast path
|
|
|
|
|
|
|
|
if check() {
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
tck := time.NewTicker(appliedWaitDelay)
|
|
|
|
tck := time.NewTicker(appliedWaitDelay)
|
|
|
|
defer tck.Stop()
|
|
|
|
defer tck.Stop()
|
|
|
|
tmr := time.NewTimer(timeout)
|
|
|
|
tmr := time.NewTimer(timeout)
|
|
|
|
defer tmr.Stop()
|
|
|
|
defer tmr.Stop()
|
|
|
|
|
|
|
|
|
|
|
|
for {
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
select {
|
|
|
|
case <-tck.C:
|
|
|
|
case <-tck.C:
|
|
|
|
nodes, err := s.Nodes()
|
|
|
|
if check() {
|
|
|
|
if err == nil {
|
|
|
|
return nil
|
|
|
|
servers := Servers(nodes)
|
|
|
|
|
|
|
|
if !servers.Contains(id) {
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
case <-tmr.C:
|
|
|
|
case <-tmr.C:
|
|
|
|
return ErrWaitForRemovalTimeout
|
|
|
|
return ErrWaitForRemovalTimeout
|
|
|
@ -730,19 +738,31 @@ func (s *Store) WaitForRemoval(id string, timeout time.Duration) error {
|
|
|
|
|
|
|
|
|
|
|
|
// WaitForLeader blocks until a leader is detected, or the timeout expires.
|
|
|
|
// WaitForLeader blocks until a leader is detected, or the timeout expires.
|
|
|
|
func (s *Store) WaitForLeader(timeout time.Duration) (string, error) {
|
|
|
|
func (s *Store) WaitForLeader(timeout time.Duration) (string, error) {
|
|
|
|
|
|
|
|
var err error
|
|
|
|
|
|
|
|
var leaderAddr string
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
check := func() bool {
|
|
|
|
|
|
|
|
leaderAddr, err = s.LeaderAddr()
|
|
|
|
|
|
|
|
if err == nil && leaderAddr != "" {
|
|
|
|
|
|
|
|
return true
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
return false
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// try the fast path
|
|
|
|
|
|
|
|
if check() {
|
|
|
|
|
|
|
|
return leaderAddr, nil
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
tck := time.NewTicker(leaderWaitDelay)
|
|
|
|
tck := time.NewTicker(leaderWaitDelay)
|
|
|
|
defer tck.Stop()
|
|
|
|
defer tck.Stop()
|
|
|
|
tmr := time.NewTimer(timeout)
|
|
|
|
tmr := time.NewTimer(timeout)
|
|
|
|
defer tmr.Stop()
|
|
|
|
defer tmr.Stop()
|
|
|
|
|
|
|
|
|
|
|
|
var err error
|
|
|
|
|
|
|
|
var l string
|
|
|
|
|
|
|
|
for {
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
select {
|
|
|
|
case <-tck.C:
|
|
|
|
case <-tck.C:
|
|
|
|
l, err = s.LeaderAddr()
|
|
|
|
if check() {
|
|
|
|
if err == nil && l != "" {
|
|
|
|
return leaderAddr, nil
|
|
|
|
return l, nil
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
case <-tmr.C:
|
|
|
|
case <-tmr.C:
|
|
|
|
if err != nil {
|
|
|
|
if err != nil {
|
|
|
|