1
0
Fork 0
You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

180 lines
4.5 KiB
Go

package disco
import (
"fmt"
"log"
"os"
"sync"
"time"
"github.com/rqlite/rqlite/v8/random"
)
const (
leaderChanLen = 5 // Support any fast back-to-back leadership changes.
)
// Client is the interface discovery clients must implement.
type Client interface {
// GetLeader returns the current Leader stored in the KV store. If the Leader
// is set, the returned ok flag will be true. If the Leader is not set, the
// returned ok flag will be false.
GetLeader() (id string, apiAddr string, addr string, ok bool, e error)
// InitializeLeader sets the leader to the given details, but only if no leader
// has already been set. This operation is a check-and-set type operation. If
// initialization succeeds, ok is set to true, otherwise false.
InitializeLeader(id, apiAddr, addr string) (bool, error)
// SetLeader unconditionally sets the leader to the given details.
SetLeader(id, apiAddr, addr string) error
fmt.Stringer
}
// Store is the interface the consensus system must implement.
type Store interface {
// IsLeader returns whether this node is the Leader.
IsLeader() bool
// RegisterLeaderChange registers a channel that will be notified when
// a leadership change occurs.
RegisterLeaderChange(c chan<- struct{})
}
// Suffrage is the type of suffrage -- voting or non-voting -- a node has.
type Suffrage int
const (
SuffrageUnknown Suffrage = iota
Voter
NonVoter
)
// VoterSuffrage returns a Suffrage based on the given boolean.
func VoterSuffrage(b bool) Suffrage {
if b {
return Voter
}
return NonVoter
}
// IsVoter returns whether the Suffrage indicates a Voter.
func (s Suffrage) IsVoter() bool {
return s == Voter
}
// Service represents a Discovery Service instance.
type Service struct {
RegisterInterval time.Duration
ReportInterval time.Duration
c Client
s Store
suf Suffrage
logger *log.Logger
mu sync.Mutex
lastContact time.Time
}
// NewService returns an instantiated Discovery Service.
func NewService(c Client, s Store, suf Suffrage) *Service {
return &Service{
c: c,
s: s,
suf: suf,
RegisterInterval: 3 * time.Second,
ReportInterval: 10 * time.Second,
logger: log.New(os.Stderr, "[disco] ", log.LstdFlags),
}
}
// Register registers this node with the discovery service. It will block
// until a) if the node is a voter, it registers itself, b) learns of another
// node it can use to join the cluster, or c) an unrecoverable error occurs.
func (s *Service) Register(id, apiAddr, addr string) (bool, string, error) {
for {
_, _, cRaftAddr, ok, err := s.c.GetLeader()
if err != nil {
s.logger.Printf("failed to get leader: %s", err.Error())
}
if ok {
return false, cRaftAddr, nil
}
if s.suf.IsVoter() {
ok, err = s.c.InitializeLeader(id, apiAddr, addr)
if err != nil {
s.logger.Printf("failed to initialize as Leader: %s", err.Error())
}
if ok {
s.updateContact(time.Now())
return true, addr, nil
}
}
time.Sleep(random.Jitter(s.RegisterInterval))
}
}
// StartReporting reports the details of this node to the discovery service,
// if, and only if, this node is the leader. The service will report
// anytime a leadership change is detected. It also does it periodically
// to deal with any intermittent issues that caused Leadership information
// to go stale.
func (s *Service) StartReporting(id, apiAddr, addr string) chan struct{} {
ticker := time.NewTicker(s.ReportInterval)
obCh := make(chan struct{}, leaderChanLen)
s.s.RegisterLeaderChange(obCh)
update := func(changed bool) {
if s.s.IsLeader() {
if err := s.c.SetLeader(id, apiAddr, addr); err != nil {
s.logger.Printf("failed to update discovery service with Leader details: %s",
err.Error())
}
if changed {
s.logger.Printf("updated Leader API address to %s due to leadership change",
apiAddr)
}
s.updateContact(time.Now())
}
}
done := make(chan struct{})
go func() {
for {
select {
case <-ticker.C:
update(false)
case <-obCh:
update(true)
case <-done:
return
}
}
}()
return done
}
// Stats returns diagnostic information on the disco service.
func (s *Service) Stats() (map[string]interface{}, error) {
s.mu.Lock()
defer s.mu.Unlock()
return map[string]interface{}{
"mode": s.c.String(),
"register_interval": s.RegisterInterval,
"report_interval": s.ReportInterval,
"last_contact": s.lastContact,
}, nil
}
func (s *Service) updateContact(t time.Time) {
s.mu.Lock()
defer s.mu.Unlock()
s.lastContact = t
}