1
0
Fork 0
You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

145 lines
3.4 KiB
Go

package disco
import (
"fmt"
"log"
"os"
"sync"
"time"
"github.com/rqlite/rqlite/random"
)
const (
leaderChanLen = 5 // Support any fast back-to-back leadership changes.
)
// Client is the interface discovery clients must implement.
type Client interface {
GetLeader() (id string, apiAddr string, addr string, ok bool, e error)
InitializeLeader(id, apiAddr, addr string) (bool, error)
SetLeader(id, apiAddr, addr string) error
fmt.Stringer
}
// Store is the interface the consensus system must implement.
type Store interface {
IsLeader() bool
RegisterLeaderChange(c chan<- struct{})
}
// Service represents a Discovery Service instance.
type Service struct {
RegisterInterval time.Duration
ReportInterval time.Duration
c Client
s Store
logger *log.Logger
mu sync.Mutex
lastContact time.Time
}
// NewService returns an instantiated Discovery Service.
func NewService(c Client, s Store) *Service {
return &Service{
c: c,
s: s,
RegisterInterval: 3 * time.Second,
ReportInterval: 10 * time.Second,
logger: log.New(os.Stderr, "[disco] ", log.LstdFlags),
}
}
// Register registers this node with the discovery service. It will block
// until a) the node registers itself as leader, b) learns of another node
// it can use to join the cluster, or c) an unrecoverable error occurs.
func (s *Service) Register(id, apiAddr, addr string) (bool, string, error) {
for {
_, cAPIAddr, _, ok, err := s.c.GetLeader()
if err != nil {
s.logger.Printf("failed to get leader: %s", err.Error())
}
if ok {
return false, cAPIAddr, nil
}
ok, err = s.c.InitializeLeader(id, apiAddr, addr)
if err != nil {
s.logger.Printf("failed to initialize as Leader: %s", err.Error())
}
if ok {
s.updateContact(time.Now())
return true, apiAddr, nil
}
time.Sleep(jitter(s.RegisterInterval))
}
}
// StartReporting reports the details of this node to the discovery service,
// if, and only if, this node is the leader. The service will report
// anytime a leadership change is detected. It also does it periodically
// to deal with any intermittent issues that caused Leadership information
2 years ago
// to go stale.
func (s *Service) StartReporting(id, apiAddr, addr string) chan struct{} {
ticker := time.NewTicker(s.ReportInterval)
obCh := make(chan struct{}, leaderChanLen)
s.s.RegisterLeaderChange(obCh)
update := func(changed bool) {
if s.s.IsLeader() {
if err := s.c.SetLeader(id, apiAddr, addr); err != nil {
s.logger.Printf("failed to update discovery service with Leader details: %s",
err.Error())
}
if changed {
s.logger.Printf("updated Leader API address to %s due to leadership change",
apiAddr)
}
s.updateContact(time.Now())
}
}
done := make(chan struct{})
go func() {
for {
select {
case <-ticker.C:
update(false)
case <-obCh:
update(true)
case <-done:
return
}
}
}()
return done
}
// Stats returns diagnostic information on the disco service.
func (s *Service) Stats() (map[string]interface{}, error) {
s.mu.Lock()
defer s.mu.Unlock()
return map[string]interface{}{
"mode": s.c.String(),
"register_interval": s.RegisterInterval,
"report_interval": s.ReportInterval,
"last_contact": s.lastContact,
}, nil
}
func (s *Service) updateContact(t time.Time) {
s.mu.Lock()
defer s.mu.Unlock()
s.lastContact = t
}
func jitter(duration time.Duration) time.Duration {
return duration + time.Duration(random.Float64()*float64(duration))
}