1
0
Fork 0
You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

246 lines
6.4 KiB
Go

package cluster
import (
"errors"
"fmt"
"log"
"os"
"sync"
"time"
"github.com/rqlite/rqlite/v8/cluster/proto"
command "github.com/rqlite/rqlite/v8/command/proto"
"github.com/rqlite/rqlite/v8/random"
)
var (
// ErrBootTimeout is returned when a boot operation does not
// complete within the timeout.
ErrBootTimeout = errors.New("boot timeout")
)
// BootStatus is the reason the boot process completed.
type BootStatus int
const (
// BootUnknown is the initial state of the boot process.
BootUnknown BootStatus = iota
// BootJoin means boot completed due to a successful join.
BootJoin
// BootDone means boot completed due to Done being "true".
BootDone
// BootTimeout means the boot process timed out.
BootTimeout
)
// Suffrage is the type of suffrage -- voting or non-voting -- a node has.
type Suffrage int
const (
SuffrageUnknown Suffrage = iota
Voter
NonVoter
)
// VoterSuffrage returns a Suffrage based on the given boolean.
func VoterSuffrage(b bool) Suffrage {
if b {
return Voter
}
return NonVoter
}
// String returns a string representation of the Suffrage.
func (s Suffrage) String() string {
switch s {
case Voter:
return "voter"
case NonVoter:
return "non-voter"
default:
panic("unknown suffrage")
}
}
// IsVoter returns whether the Suffrage is a Voter.
func (s Suffrage) IsVoter() bool {
return s == Voter
}
// IsNonVoter returns whether the Suffrage is a NonVoter.
func (s Suffrage) IsNonVoter() bool {
return s == NonVoter
}
const (
requestTimeout = 5 * time.Second
numJoinAttempts = 1
bootInterval = 2 * time.Second
)
// String returns a string representation of the BootStatus.
func (b BootStatus) String() string {
switch b {
case BootUnknown:
return "unknown"
case BootJoin:
return "join"
case BootDone:
return "done"
case BootTimeout:
return "timeout"
default:
panic("unknown boot status")
}
}
// AddressProvider is the interface types must implement to provide
// addresses to a Bootstrapper.
type AddressProvider interface {
Lookup() ([]string, error)
}
// Bootstrapper performs a bootstrap of this node.
type Bootstrapper struct {
provider AddressProvider
client *Client
creds *proto.Credentials
logger *log.Logger
Interval time.Duration
bootStatusMu sync.RWMutex
bootStatus BootStatus
}
// NewBootstrapper returns an instance of a Bootstrapper.
func NewBootstrapper(p AddressProvider, client *Client) *Bootstrapper {
bs := &Bootstrapper{
provider: p,
client: client,
logger: log.New(os.Stderr, "[cluster-bootstrap] ", log.LstdFlags),
Interval: bootInterval,
}
return bs
}
// SetCredentials sets the credentials for the Bootstrapper.
func (b *Bootstrapper) SetCredentials(creds *proto.Credentials) {
b.creds = creds
}
// Boot performs the bootstrapping process for this node. This means it will
// ensure this node becomes part of a cluster. It does this by either:
// - joining an existing cluster by explicitly joining it through a node returned
// by the AddressProvider, or
// - if it's a Voting node, notifying all nodes returned by the AddressProvider
// that it exists, potentially allowing a cluster-wide bootstrap take place
// which will include this node.
//
// Returns nil if the boot operation was successful, or if done() ever returns
// true. done() is periodically polled by the boot process. Returns an error
// the boot process encounters an unrecoverable error, or booting does not
// occur within the given timeout.
//
// id and raftAddr are those of the node calling Boot. suf is whether this node
// is a Voter or NonVoter.
func (b *Bootstrapper) Boot(id, raftAddr string, suf Suffrage, done func() bool, timeout time.Duration) error {
timeoutT := time.NewTimer(timeout)
defer timeoutT.Stop()
tickerT := time.NewTimer(random.Jitter(time.Millisecond))
defer tickerT.Stop()
joiner := NewJoiner(b.client, numJoinAttempts, requestTimeout)
joiner.SetCredentials(b.creds)
for {
select {
case <-timeoutT.C:
b.setBootStatus(BootTimeout)
return ErrBootTimeout
case <-tickerT.C:
if done() {
b.logger.Printf("boot operation marked done")
b.setBootStatus(BootDone)
return nil
}
tickerT.Reset(random.Jitter(b.Interval)) // Move to longer-period polling
targets, err := b.provider.Lookup()
if err != nil {
b.logger.Printf("provider lookup failed %s", err.Error())
}
if len(targets) == 0 {
continue
}
// Try an explicit join first. Joining an existing cluster is always given priority
// over trying to form a new cluster.
if j, err := joiner.Do(targets, id, raftAddr, suf); err == nil {
b.logger.Printf("succeeded directly joining cluster via node at %s as %s", j, suf)
b.setBootStatus(BootJoin)
return nil
}
if suf.IsVoter() {
// This is where we have to be careful. This node failed to join with any node
// in the targets list. This could be because none of the nodes are contactable,
// or none of the nodes are in a functioning cluster with a leader. That means that
// this node could be part of a set nodes that are bootstrapping to form a cluster
// de novo. For that to happen it needs to now let the other nodes know it is here.
// If this is a new cluster, some node will then reach the bootstrap-expect value
// first, form the cluster, beating all other nodes to it.
if err := b.notify(targets, id, raftAddr); err != nil {
b.logger.Printf("failed to notify all targets: %s (%s, will retry)", targets,
err.Error())
} else {
b.logger.Printf("succeeded notifying all targets: %s", targets)
}
}
}
}
}
// Status returns the reason for the boot process completing.
func (b *Bootstrapper) Status() BootStatus {
b.bootStatusMu.RLock()
defer b.bootStatusMu.RUnlock()
return b.bootStatus
}
func (b *Bootstrapper) notify(targets []string, id, raftAddr string) error {
nr := &command.NotifyRequest{
Address: raftAddr,
Id: id,
}
for _, t := range targets {
if err := b.client.Notify(nr, t, b.creds, requestTimeout); err != nil {
return fmt.Errorf("failed to notify node at %s: %s", t, err)
}
}
return nil
}
func (b *Bootstrapper) setBootStatus(status BootStatus) {
b.bootStatusMu.Lock()
defer b.bootStatusMu.Unlock()
b.bootStatus = status
}
type stringAddressProvider struct {
ss []string
}
func (s *stringAddressProvider) Lookup() ([]string, error) {
return s.ss, nil
}
// NewAddressProviderString wraps an AddressProvider around a string slice.
func NewAddressProviderString(ss []string) AddressProvider {
return &stringAddressProvider{ss}
}