You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
180 lines
4.5 KiB
Go
180 lines
4.5 KiB
Go
package disco
|
|
|
|
import (
|
|
"fmt"
|
|
"log"
|
|
"os"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/rqlite/rqlite/v8/random"
|
|
)
|
|
|
|
const (
|
|
leaderChanLen = 5 // Support any fast back-to-back leadership changes.
|
|
)
|
|
|
|
// Client is the interface discovery clients must implement.
|
|
type Client interface {
|
|
// GetLeader returns the current Leader stored in the KV store. If the Leader
|
|
// is set, the returned ok flag will be true. If the Leader is not set, the
|
|
// returned ok flag will be false.
|
|
GetLeader() (id string, apiAddr string, addr string, ok bool, e error)
|
|
|
|
// InitializeLeader sets the leader to the given details, but only if no leader
|
|
// has already been set. This operation is a check-and-set type operation. If
|
|
// initialization succeeds, ok is set to true, otherwise false.
|
|
InitializeLeader(id, apiAddr, addr string) (bool, error)
|
|
|
|
// SetLeader unconditionally sets the leader to the given details.
|
|
SetLeader(id, apiAddr, addr string) error
|
|
|
|
fmt.Stringer
|
|
}
|
|
|
|
// Store is the interface the consensus system must implement.
|
|
type Store interface {
|
|
// IsLeader returns whether this node is the Leader.
|
|
IsLeader() bool
|
|
|
|
// RegisterLeaderChange registers a channel that will be notified when
|
|
// a leadership change occurs.
|
|
RegisterLeaderChange(c chan<- struct{})
|
|
}
|
|
|
|
// Suffrage is the type of suffrage -- voting or non-voting -- a node has.
|
|
type Suffrage int
|
|
|
|
const (
|
|
SuffrageUnknown Suffrage = iota
|
|
Voter
|
|
NonVoter
|
|
)
|
|
|
|
// VoterSuffrage returns a Suffrage based on the given boolean.
|
|
func VoterSuffrage(b bool) Suffrage {
|
|
if b {
|
|
return Voter
|
|
}
|
|
return NonVoter
|
|
}
|
|
|
|
// IsVoter returns whether the Suffrage indicates a Voter.
|
|
func (s Suffrage) IsVoter() bool {
|
|
return s == Voter
|
|
}
|
|
|
|
// Service represents a Discovery Service instance.
|
|
type Service struct {
|
|
RegisterInterval time.Duration
|
|
ReportInterval time.Duration
|
|
|
|
c Client
|
|
s Store
|
|
suf Suffrage
|
|
|
|
logger *log.Logger
|
|
|
|
mu sync.Mutex
|
|
lastContact time.Time
|
|
}
|
|
|
|
// NewService returns an instantiated Discovery Service.
|
|
func NewService(c Client, s Store, suf Suffrage) *Service {
|
|
return &Service{
|
|
c: c,
|
|
s: s,
|
|
suf: suf,
|
|
RegisterInterval: 3 * time.Second,
|
|
ReportInterval: 10 * time.Second,
|
|
logger: log.New(os.Stderr, "[disco] ", log.LstdFlags),
|
|
}
|
|
}
|
|
|
|
// Register registers this node with the discovery service. It will block
|
|
// until a) if the node is a voter, it registers itself, b) learns of another
|
|
// node it can use to join the cluster, or c) an unrecoverable error occurs.
|
|
func (s *Service) Register(id, apiAddr, addr string) (bool, string, error) {
|
|
for {
|
|
_, _, cRaftAddr, ok, err := s.c.GetLeader()
|
|
if err != nil {
|
|
s.logger.Printf("failed to get leader: %s", err.Error())
|
|
}
|
|
if ok {
|
|
return false, cRaftAddr, nil
|
|
}
|
|
|
|
if s.suf.IsVoter() {
|
|
ok, err = s.c.InitializeLeader(id, apiAddr, addr)
|
|
if err != nil {
|
|
s.logger.Printf("failed to initialize as Leader: %s", err.Error())
|
|
}
|
|
if ok {
|
|
s.updateContact(time.Now())
|
|
return true, addr, nil
|
|
}
|
|
}
|
|
|
|
time.Sleep(random.Jitter(s.RegisterInterval))
|
|
}
|
|
}
|
|
|
|
// StartReporting reports the details of this node to the discovery service,
|
|
// if, and only if, this node is the leader. The service will report
|
|
// anytime a leadership change is detected. It also does it periodically
|
|
// to deal with any intermittent issues that caused Leadership information
|
|
// to go stale.
|
|
func (s *Service) StartReporting(id, apiAddr, addr string) chan struct{} {
|
|
ticker := time.NewTicker(s.ReportInterval)
|
|
obCh := make(chan struct{}, leaderChanLen)
|
|
s.s.RegisterLeaderChange(obCh)
|
|
|
|
update := func(changed bool) {
|
|
if s.s.IsLeader() {
|
|
if err := s.c.SetLeader(id, apiAddr, addr); err != nil {
|
|
s.logger.Printf("failed to update discovery service with Leader details: %s",
|
|
err.Error())
|
|
}
|
|
if changed {
|
|
s.logger.Printf("updated Leader API address to %s due to leadership change",
|
|
apiAddr)
|
|
}
|
|
s.updateContact(time.Now())
|
|
}
|
|
}
|
|
|
|
done := make(chan struct{})
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
update(false)
|
|
case <-obCh:
|
|
update(true)
|
|
case <-done:
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
return done
|
|
}
|
|
|
|
// Stats returns diagnostic information on the disco service.
|
|
func (s *Service) Stats() (map[string]interface{}, error) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
return map[string]interface{}{
|
|
"mode": s.c.String(),
|
|
"register_interval": s.RegisterInterval,
|
|
"report_interval": s.ReportInterval,
|
|
"last_contact": s.lastContact,
|
|
}, nil
|
|
}
|
|
|
|
func (s *Service) updateContact(t time.Time) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
s.lastContact = t
|
|
}
|