1
0
Fork 0

Bootstapper explicitly supports Voting nodes

master
Philip O'Toole 9 months ago
parent 77be8883b8
commit 27710bab7e

@ -35,6 +35,13 @@ const (
BootTimeout
)
type Suffrage int
const (
Voter Suffrage = iota
NonVoter
)
const (
requestTimeout = 5 * time.Second
numJoinAttempts = 1
@ -94,19 +101,20 @@ func (b *Bootstrapper) SetCredentials(creds *Credentials) {
}
// Boot performs the bootstrapping process for this node. This means it will
// ensure this node becomes part of a cluster. It does this by either joining
// an existing cluster by explicitly joining it through one of these nodes,
// or by notifying those nodes that it exists, allowing a cluster-wide bootstap
// take place.
// ensure this node becomes part of a cluster. It does this by either:
// - joining an existing cluster by explicitly joining it through a node returned
// by the AddressProvider, or
// - if it's a Voting node, notifying all nodes returned by the AddressProvider
// that it exists, allowing a cluster-wide bootstrap take place.
//
// Returns nil if the boot operation was successful, or if done() ever returns
// true. done() is periodically polled by the boot process. Returns an error
// the boot process encounters an unrecoverable error, or booting does not
// occur within the given timeout.
//
// id and raftAddr are those of the node calling Boot. All operations
// performed by this function are done as a voting node.
func (b *Bootstrapper) Boot(id, raftAddr string, done func() bool, timeout time.Duration) error {
// id and raftAddr are those of the node calling Boot. suf is whether this node
// is a Voter or NonVoter.
func (b *Bootstrapper) Boot(id, raftAddr string, suf Suffrage, done func() bool, timeout time.Duration) error {
timeoutT := time.NewTimer(timeout)
defer timeoutT.Stop()
tickerT := time.NewTimer(random.Jitter(time.Millisecond))
@ -132,7 +140,6 @@ func (b *Bootstrapper) Boot(id, raftAddr string, done func() bool, timeout time.
if err != nil {
b.logger.Printf("provider lookup failed %s", err.Error())
}
if len(targets) == 0 {
continue
}
@ -145,18 +152,20 @@ func (b *Bootstrapper) Boot(id, raftAddr string, done func() bool, timeout time.
return nil
}
// This is where we have to be careful. This node failed to join with any node
// in the targets list. This could be because none of the nodes are contactable,
// or none of the nodes are in a functioning cluster with a leader. That means that
// this node could be part of a set nodes that are bootstrapping to form a cluster
// de novo. For that to happen it needs to now let the other nodes know it is here.
// If this is a new cluster, some node will then reach the bootstrap-expect value
// first, form the cluster, beating all other nodes to it.
if err := b.notify(targets, id, raftAddr); err != nil {
b.logger.Printf("failed to notify all targets: %s (%s, will retry)", targets,
err.Error())
} else {
b.logger.Printf("succeeded notifying all targets: %s", targets)
if suf == Voter {
// This is where we have to be careful. This node failed to join with any node
// in the targets list. This could be because none of the nodes are contactable,
// or none of the nodes are in a functioning cluster with a leader. That means that
// this node could be part of a set nodes that are bootstrapping to form a cluster
// de novo. For that to happen it needs to now let the other nodes know it is here.
// If this is a new cluster, some node will then reach the bootstrap-expect value
// first, form the cluster, beating all other nodes to it.
if err := b.notify(targets, id, raftAddr); err != nil {
b.logger.Printf("failed to notify all targets: %s (%s, will retry)", targets,
err.Error())
} else {
b.logger.Printf("succeeded notifying all targets: %s", targets)
}
}
}
}

@ -48,7 +48,7 @@ func Test_BootstrapperBootDoneImmediately(t *testing.T) {
}
p := NewAddressProviderString([]string{srv.Addr()})
bs := NewBootstrapper(p, nil)
if err := bs.Boot("node1", "192.168.1.1:1234", done, 10*time.Second); err != nil {
if err := bs.Boot("node1", "192.168.1.1:1234", Voter, done, 10*time.Second); err != nil {
t.Fatalf("failed to boot: %s", err)
}
if exp, got := BootDone, bs.Status(); exp != got {
@ -69,7 +69,7 @@ func Test_BootstrapperBootTimeout(t *testing.T) {
p := NewAddressProviderString([]string{srv.Addr()})
bs := NewBootstrapper(p, NewClient(&simpleDialer{}, 0))
bs.Interval = time.Second
err := bs.Boot("node1", "192.168.1.1:1234", done, 5*time.Second)
err := bs.Boot("node1", "192.168.1.1:1234", Voter, done, 5*time.Second)
if err == nil {
t.Fatalf("no error returned from timed-out boot")
}
@ -120,7 +120,7 @@ func Test_BootstrapperBootSingleJoin(t *testing.T) {
bs := NewBootstrapper(p, NewClient(&simpleDialer{}, 0))
bs.Interval = time.Second
err := bs.Boot("node1", "192.168.1.1:1234", done, 5*time.Second)
err := bs.Boot("node1", "192.168.1.1:1234", Voter, done, 5*time.Second)
if err != nil {
t.Fatalf("failed to boot: %s", err)
}
@ -166,7 +166,7 @@ func Test_BootstrapperBootSingleNotify(t *testing.T) {
bs := NewBootstrapper(p, NewClient(&simpleDialer{}, 0))
bs.Interval = time.Second
err := bs.Boot("node1", "192.168.1.1:1234", done, 60*time.Second)
err := bs.Boot("node1", "192.168.1.1:1234", Voter, done, 60*time.Second)
if err != nil {
t.Fatalf("failed to boot: %s", err)
}
@ -255,7 +255,7 @@ func Test_BootstrapperBootMultiJoinNotify(t *testing.T) {
bs := NewBootstrapper(p, NewClient(&simpleDialer{}, 0))
bs.Interval = time.Second
err := bs.Boot("node1", "192.168.1.1:1234", done, 60*time.Second)
err := bs.Boot("node1", "192.168.1.1:1234", Voter, done, 60*time.Second)
if err != nil {
t.Fatalf("failed to boot: %s", err)
}

@ -459,7 +459,7 @@ func createCluster(cfg *Config, hasPeers bool, client *cluster.Client, str *stor
// Bootstrap with explicit join addresses requests.
bs := cluster.NewBootstrapper(cluster.NewAddressProviderString(joins), client)
bs.SetCredentials(cluster.CredentialsFor(credStr, cfg.JoinAs))
return bs.Boot(str.ID(), cfg.RaftAdv, isClustered, cfg.BootstrapExpectTimeout)
return bs.Boot(str.ID(), cfg.RaftAdv, cluster.Voter, isClustered, cfg.BootstrapExpectTimeout)
}
if cfg.DiscoMode == "" {
@ -503,7 +503,7 @@ func createCluster(cfg *Config, hasPeers bool, client *cluster.Client, str *stor
bs := cluster.NewBootstrapper(provider, client)
bs.SetCredentials(cluster.CredentialsFor(credStr, cfg.JoinAs))
httpServ.RegisterStatus("disco", provider)
return bs.Boot(str.ID(), cfg.RaftAdv, isClustered, cfg.BootstrapExpectTimeout)
return bs.Boot(str.ID(), cfg.RaftAdv, cluster.Voter, isClustered, cfg.BootstrapExpectTimeout)
case DiscoModeEtcdKV, DiscoModeConsulKV:
discoService, err := createDiscoService(cfg, str)

@ -256,7 +256,7 @@ func Test_MultiNodeClusterBootstrap(t *testing.T) {
addr, _ := node1.Store.LeaderAddr()
return addr != ""
}
node1Bs.Boot(node1.ID, node1.RaftAddr, done, 10*time.Second)
node1Bs.Boot(node1.ID, node1.RaftAddr, cluster.Voter, done, 10*time.Second)
wg.Done()
}()
go func() {
@ -264,7 +264,7 @@ func Test_MultiNodeClusterBootstrap(t *testing.T) {
addr, _ := node2.Store.LeaderAddr()
return addr != ""
}
node2Bs.Boot(node2.ID, node2.RaftAddr, done, 10*time.Second)
node2Bs.Boot(node2.ID, node2.RaftAddr, cluster.Voter, done, 10*time.Second)
wg.Done()
}()
go func() {
@ -272,7 +272,7 @@ func Test_MultiNodeClusterBootstrap(t *testing.T) {
addr, _ := node3.Store.LeaderAddr()
return addr != ""
}
node3Bs.Boot(node3.ID, node3.RaftAddr, done, 10*time.Second)
node3Bs.Boot(node3.ID, node3.RaftAddr, cluster.Voter, done, 10*time.Second)
wg.Done()
}()
wg.Wait()
@ -424,7 +424,7 @@ func Test_MultiNodeClusterBootstrapLaterJoin(t *testing.T) {
addr, _ := node1.Store.LeaderAddr()
return addr != ""
}
node1Bs.Boot(node1.ID, node1.RaftAddr, done, 10*time.Second)
node1Bs.Boot(node1.ID, node1.RaftAddr, cluster.Voter, done, 10*time.Second)
wg.Done()
}()
go func() {
@ -432,7 +432,7 @@ func Test_MultiNodeClusterBootstrapLaterJoin(t *testing.T) {
addr, _ := node2.Store.LeaderAddr()
return addr != ""
}
node2Bs.Boot(node2.ID, node2.RaftAddr, done, 10*time.Second)
node2Bs.Boot(node2.ID, node2.RaftAddr, cluster.Voter, done, 10*time.Second)
wg.Done()
}()
go func() {
@ -440,7 +440,7 @@ func Test_MultiNodeClusterBootstrapLaterJoin(t *testing.T) {
addr, _ := node3.Store.LeaderAddr()
return addr != ""
}
node3Bs.Boot(node3.ID, node3.RaftAddr, done, 10*time.Second)
node3Bs.Boot(node3.ID, node3.RaftAddr, cluster.Voter, done, 10*time.Second)
wg.Done()
}()
wg.Wait()
@ -477,7 +477,7 @@ func Test_MultiNodeClusterBootstrapLaterJoin(t *testing.T) {
addr, _ := node4.Store.LeaderAddr()
return addr != ""
}
if err := node4Bs.Boot(node4.ID, node4.RaftAddr, done, 10*time.Second); err != nil {
if err := node4Bs.Boot(node4.ID, node4.RaftAddr, cluster.Voter, done, 10*time.Second); err != nil {
t.Fatalf("node 4 failed to boot")
}
node4Leader, err := node4.WaitForLeader()
@ -525,7 +525,7 @@ func Test_MultiNodeClusterBootstrapLaterJoinTLS(t *testing.T) {
addr, _ := node1.Store.LeaderAddr()
return addr != ""
}
node1Bs.Boot(node1.ID, node1.RaftAddr, done, 10*time.Second)
node1Bs.Boot(node1.ID, node1.RaftAddr, cluster.Voter, done, 10*time.Second)
wg.Done()
}()
go func() {
@ -533,7 +533,7 @@ func Test_MultiNodeClusterBootstrapLaterJoinTLS(t *testing.T) {
addr, _ := node2.Store.LeaderAddr()
return addr != ""
}
node2Bs.Boot(node2.ID, node2.RaftAddr, done, 10*time.Second)
node2Bs.Boot(node2.ID, node2.RaftAddr, cluster.Voter, done, 10*time.Second)
wg.Done()
}()
go func() {
@ -541,7 +541,7 @@ func Test_MultiNodeClusterBootstrapLaterJoinTLS(t *testing.T) {
addr, _ := node3.Store.LeaderAddr()
return addr != ""
}
node3Bs.Boot(node3.ID, node3.RaftAddr, done, 10*time.Second)
node3Bs.Boot(node3.ID, node3.RaftAddr, cluster.Voter, done, 10*time.Second)
wg.Done()
}()
wg.Wait()
@ -579,7 +579,7 @@ func Test_MultiNodeClusterBootstrapLaterJoinTLS(t *testing.T) {
addr, _ := node4.Store.LeaderAddr()
return addr != ""
}
if err := node4Bs.Boot(node4.ID, node4.RaftAddr, done, 10*time.Second); err != nil {
if err := node4Bs.Boot(node4.ID, node4.RaftAddr, cluster.Voter, done, 10*time.Second); err != nil {
t.Fatalf("node 4 failed to boot")
}
node4Leader, err := node4.WaitForLeader()

Loading…
Cancel
Save