1
0
Fork 0

Bootstrapper migrated to Raft requests

master
Philip O'Toole 10 months ago
parent a96a5fa571
commit 2b8a9713c8

@ -1,19 +1,14 @@
package cluster
import (
"bytes"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"log"
"net/http"
"os"
"strings"
"sync"
"time"
rurl "github.com/rqlite/rqlite/http/url"
"github.com/rqlite/rqlite/command"
"github.com/rqlite/rqlite/random"
)
@ -40,6 +35,12 @@ const (
BootTimeout
)
const (
requestTimeout = 5 * time.Second
numJoinAttempts = 1
bootInterval = 2 * time.Second
)
// String returns a string representation of the BootStatus.
func (b BootStatus) String() string {
switch b {
@ -64,10 +65,9 @@ type AddressProvider interface {
// Bootstrapper performs a bootstrap of this node.
type Bootstrapper struct {
provider AddressProvider
tlsConfig *tls.Config
provider AddressProvider
joiner *Joiner
client *Client
logger *log.Logger
Interval time.Duration
@ -77,13 +77,12 @@ type Bootstrapper struct {
}
// NewBootstrapper returns an instance of a Bootstrapper.
func NewBootstrapper(p AddressProvider, tlsConfig *tls.Config) *Bootstrapper {
func NewBootstrapper(p AddressProvider, client *Client) *Bootstrapper {
bs := &Bootstrapper{
provider: p,
tlsConfig: tlsConfig,
joiner: NewJoiner("", 1, 0, tlsConfig),
logger: log.New(os.Stderr, "[cluster-bootstrap] ", log.LstdFlags),
Interval: 2 * time.Second,
provider: p,
client: client,
logger: log.New(os.Stderr, "[cluster-bootstrap] ", log.LstdFlags),
Interval: bootInterval,
}
return bs
}
@ -107,6 +106,7 @@ func (b *Bootstrapper) Boot(id, raftAddr string, done func() bool, timeout time.
tickerT := time.NewTimer(random.Jitter(time.Millisecond))
defer tickerT.Stop()
joiner := NewJoiner(b.client, numJoinAttempts, requestTimeout)
for {
select {
case <-timeoutT.C:
@ -132,7 +132,7 @@ func (b *Bootstrapper) Boot(id, raftAddr string, done func() bool, timeout time.
// Try an explicit join first. Joining an existing cluster is always given priority
// over trying to form a new cluster.
if j, err := b.joiner.Do(targets, id, raftAddr, true); err == nil {
if j, err := joiner.Do(targets, id, raftAddr, true); err == nil {
b.logger.Printf("succeeded directly joining cluster via node at %s", j)
b.setBootStatus(BootJoin)
return nil
@ -143,8 +143,8 @@ func (b *Bootstrapper) Boot(id, raftAddr string, done func() bool, timeout time.
// or none of the nodes are in a functioning cluster with a leader. That means that
// this node could be part of a set nodes that are bootstrapping to form a cluster
// de novo. For that to happen it needs to now let the other nodes know it is here.
// If this is a new cluster, some node will then reach the bootstrap-expect value,
// form the cluster, beating all other nodes to it.
// If this is a new cluster, some node will then reach the bootstrap-expect value
// first, form the cluster, beating all other nodes to it.
if err := b.notify(targets, id, raftAddr); err != nil {
b.logger.Printf("failed to notify all targets: %s (%s, will retry)", targets,
err.Error())
@ -163,59 +163,13 @@ func (b *Bootstrapper) Status() BootStatus {
}
func (b *Bootstrapper) notify(targets []string, id, raftAddr string) error {
// Create and configure the client to connect to the other node.
tr := &http.Transport{
TLSClientConfig: b.tlsConfig,
ForceAttemptHTTP2: true,
}
client := &http.Client{Transport: tr}
buf, err := json.Marshal(map[string]interface{}{
"id": id,
"addr": raftAddr,
})
if err != nil {
return err
nr := &command.NotifyRequest{
Address: raftAddr,
Id: id,
}
for _, t := range targets {
// Check for protocol scheme, and insert default if necessary.
fullTarget := rurl.NormalizeAddr(fmt.Sprintf("%s/notify", t))
TargetLoop:
for {
req, err := http.NewRequest("POST", fullTarget, bytes.NewReader(buf))
if err != nil {
return err
}
req.Header.Add("Content-Type", "application/json")
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("failed to post notification to node at %s: %s",
rurl.RemoveBasicAuth(fullTarget), err)
}
resp.Body.Close()
switch resp.StatusCode {
case http.StatusOK:
b.logger.Printf("succeeded notifying target: %s", rurl.RemoveBasicAuth(fullTarget))
break TargetLoop
case http.StatusBadRequest:
// One possible cause is that the target server is listening for HTTPS, but
// an HTTP attempt was made. Switch the protocol to HTTPS, and try again.
// This can happen when using various disco approaches, since it doesn't
// record information about which protocol a registered node is actually using.
if strings.HasPrefix(fullTarget, "https://") {
// It's already HTTPS, give up.
return fmt.Errorf("failed to notify node at %s: %s", rurl.RemoveBasicAuth(fullTarget),
resp.Status)
}
fullTarget = rurl.EnsureHTTPS(fullTarget)
default:
return fmt.Errorf("failed to notify node at %s: %s",
rurl.RemoveBasicAuth(fullTarget), resp.Status)
}
if err := b.client.Notify(nr, t, requestTimeout); err != nil {
return fmt.Errorf("failed to notify node at %s: %s", t, err)
}
}
return nil

@ -1,17 +1,15 @@
package cluster
import (
"crypto/tls"
"encoding/json"
"errors"
"io"
"net/http"
"net/http/httptest"
"net"
"reflect"
"testing"
"time"
"github.com/rqlite/rqlite/rtls"
"github.com/rqlite/rqlite/cluster/servicetest"
"github.com/rqlite/rqlite/command"
"google.golang.org/protobuf/proto"
)
func Test_AddressProviderString(t *testing.T) {
@ -37,14 +35,17 @@ func Test_NewBootstrapper(t *testing.T) {
}
func Test_BootstrapperBootDoneImmediately(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
t.Fatalf("client made HTTP request")
}))
srv := servicetest.NewService()
srv.Handler = func(conn net.Conn) {
t.Fatalf("client made request")
}
srv.Start()
defer srv.Close()
done := func() bool {
return true
}
p := NewAddressProviderString([]string{ts.URL})
p := NewAddressProviderString([]string{srv.Addr()})
bs := NewBootstrapper(p, nil)
if err := bs.Boot("node1", "192.168.1.1:1234", done, 10*time.Second); err != nil {
t.Fatalf("failed to boot: %s", err)
@ -55,15 +56,17 @@ func Test_BootstrapperBootDoneImmediately(t *testing.T) {
}
func Test_BootstrapperBootTimeout(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusServiceUnavailable)
}))
srv := servicetest.NewService()
srv.Handler = func(conn net.Conn) {
}
srv.Start()
defer srv.Close()
done := func() bool {
return false
}
p := NewAddressProviderString([]string{ts.URL})
bs := NewBootstrapper(p, nil)
p := NewAddressProviderString([]string{srv.Addr()})
bs := NewBootstrapper(p, NewClient(&simpleDialer{}, 0))
bs.Interval = time.Second
err := bs.Boot("node1", "192.168.1.1:1234", done, 5*time.Second)
if err == nil {
@ -78,23 +81,45 @@ func Test_BootstrapperBootTimeout(t *testing.T) {
}
func Test_BootstrapperBootSingleJoin(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/join" {
t.Fatalf("unexpected path: %s", r.URL.Path)
srv := servicetest.NewService()
srv.Handler = func(conn net.Conn) {
var p []byte
var err error
c := readCommand(conn)
if c == nil {
// Connection error handling
return
}
if c.Type != Command_COMMAND_TYPE_JOIN {
t.Fatalf("unexpected command type: %d", c.Type)
}
jnr := c.GetJoinRequest()
if jnr == nil {
t.Fatal("expected join node request, got nil")
}
if jnr.Address != "192.168.1.1:1234" {
t.Fatalf("unexpected node address, got %s", jnr.Address)
}
w.WriteHeader(http.StatusOK)
}))
p, err = proto.Marshal(&CommandJoinResponse{})
if err != nil {
conn.Close()
return
}
writeBytesWithLength(conn, p)
}
srv.Start()
defer srv.Close()
done := func() bool {
return false
}
p := NewAddressProviderString([]string{ts.URL})
bs := NewBootstrapper(p, nil)
p := NewAddressProviderString([]string{srv.Addr()})
bs := NewBootstrapper(p, NewClient(&simpleDialer{}, 0))
bs.Interval = time.Second
err := bs.Boot("node1", "192.168.1.1:1234", done, 60*time.Second)
err := bs.Boot("node1", "192.168.1.1:1234", done, 5*time.Second)
if err != nil {
t.Fatalf("failed to boot: %s", err)
}
@ -104,26 +129,31 @@ func Test_BootstrapperBootSingleJoin(t *testing.T) {
}
func Test_BootstrapperBootSingleNotify(t *testing.T) {
tsNotified := false
var body map[string]string
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/join" {
w.WriteHeader(http.StatusServiceUnavailable)
var gotNR *command.NotifyRequest
srv := servicetest.NewService()
srv.Handler = func(conn net.Conn) {
var p []byte
var err error
c := readCommand(conn)
if c == nil {
// Connection error handling
return
}
tsNotified = true
b, err := io.ReadAll(r.Body)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
if c.Type != Command_COMMAND_TYPE_NOTIFY {
return
}
gotNR = c.GetNotifyRequest()
if err := json.Unmarshal(b, &body); err != nil {
w.WriteHeader(http.StatusBadRequest)
p, err = proto.Marshal(&CommandNotifyResponse{})
if err != nil {
conn.Close()
return
}
}))
writeBytesWithLength(conn, p)
}
srv.Start()
defer srv.Close()
n := -1
done := func() bool {
@ -131,8 +161,8 @@ func Test_BootstrapperBootSingleNotify(t *testing.T) {
return n == 5
}
p := NewAddressProviderString([]string{ts.URL})
bs := NewBootstrapper(p, nil)
p := NewAddressProviderString([]string{srv.Addr()})
bs := NewBootstrapper(p, NewClient(&simpleDialer{}, 0))
bs.Interval = time.Second
err := bs.Boot("node1", "192.168.1.1:1234", done, 60*time.Second)
@ -140,108 +170,79 @@ func Test_BootstrapperBootSingleNotify(t *testing.T) {
t.Fatalf("failed to boot: %s", err)
}
if tsNotified != true {
t.Fatalf("notify target not contacted")
}
if got, exp := body["id"], "node1"; got != exp {
if got, exp := gotNR.Id, "node1"; got != exp {
t.Fatalf("wrong node ID supplied, exp %s, got %s", exp, got)
}
if got, exp := body["addr"], "192.168.1.1:1234"; got != exp {
if got, exp := gotNR.Address, "192.168.1.1:1234"; got != exp {
t.Fatalf("wrong address supplied, exp %s, got %s", exp, got)
}
if exp, got := BootDone, bs.Status(); exp != got {
t.Fatalf("wrong status, exp %s, got %s", exp, got)
}
}
func Test_BootstrapperBootSingleNotifyHTTPS(t *testing.T) {
tsNotified := false
var body map[string]string
ts := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/join" {
w.WriteHeader(http.StatusServiceUnavailable)
func Test_BootstrapperBootMultiJoinNotify(t *testing.T) {
srv1Join := false
srv1Notified := false
srv1 := servicetest.NewService()
srv1.Handler = func(conn net.Conn) {
var p []byte
var err error
c := readCommand(conn)
if c == nil {
// Connection error handling
return
}
if r.URL.Path != "/notify" {
t.Fatalf("unexpected path: %s", r.URL.Path)
if c.Type == Command_COMMAND_TYPE_JOIN {
srv1Join = true
}
tsNotified = true
b, err := io.ReadAll(r.Body)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
if c.Type != Command_COMMAND_TYPE_NOTIFY {
return
}
srv1Notified = true
if err := json.Unmarshal(b, &body); err != nil {
w.WriteHeader(http.StatusBadRequest)
p, err = proto.Marshal(&CommandNotifyResponse{})
if err != nil {
conn.Close()
return
}
writeBytesWithLength(conn, p)
}
srv1.Start()
defer srv1.Close()
srv2Join := false
srv2Notified := false
srv2 := servicetest.NewService()
srv2.Handler = func(conn net.Conn) {
var p []byte
var err error
c := readCommand(conn)
if c == nil {
// Connection error handling
return
}
}))
defer ts.Close()
ts.TLS = &tls.Config{NextProtos: []string{"h2", "http/1.1"}}
ts.StartTLS()
n := -1
done := func() bool {
n++
return n == 5
}
tlsConfig, err := rtls.CreateClientConfig("", "", "", true)
if err != nil {
t.Fatalf("failed to create TLS config: %s", err)
}
p := NewAddressProviderString([]string{ts.URL})
bs := NewBootstrapper(p, tlsConfig)
bs.Interval = time.Second
err = bs.Boot("node1", "192.168.1.1:1234", done, 60*time.Second)
if err != nil {
t.Fatalf("failed to boot: %s", err)
}
if tsNotified != true {
t.Fatalf("notify target not contacted")
}
if got, exp := body["id"], "node1"; got != exp {
t.Fatalf("wrong node ID supplied, exp %s, got %s", exp, got)
}
if got, exp := body["addr"], "192.168.1.1:1234"; got != exp {
t.Fatalf("wrong address supplied, exp %s, got %s", exp, got)
}
if exp, got := BootDone, bs.Status(); exp != got {
t.Fatalf("wrong status, exp %s, got %s", exp, got)
}
}
if c.Type == Command_COMMAND_TYPE_JOIN {
srv2Join = true
}
func Test_BootstrapperBootMultiNotify(t *testing.T) {
ts1Join := false
ts1Notified := false
ts1 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/join" {
ts1Join = true
w.WriteHeader(http.StatusServiceUnavailable)
if c.Type != Command_COMMAND_TYPE_NOTIFY {
return
}
ts1Notified = true
}))
ts2Join := false
ts2Notified := false
ts2 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/join" {
ts2Join = true
w.WriteHeader(http.StatusServiceUnavailable)
srv2Notified = true
p, err = proto.Marshal(&CommandNotifyResponse{})
if err != nil {
conn.Close()
return
}
ts2Notified = true
}))
writeBytesWithLength(conn, p)
}
srv2.Start()
defer srv2.Close()
n := -1
done := func() bool {
@ -249,8 +250,8 @@ func Test_BootstrapperBootMultiNotify(t *testing.T) {
return n == 5
}
p := NewAddressProviderString([]string{ts1.URL, ts2.URL})
bs := NewBootstrapper(p, nil)
p := NewAddressProviderString([]string{srv1.Addr(), srv2.Addr()})
bs := NewBootstrapper(p, NewClient(&simpleDialer{}, 0))
bs.Interval = time.Second
err := bs.Boot("node1", "192.168.1.1:1234", done, 60*time.Second)
@ -258,10 +259,10 @@ func Test_BootstrapperBootMultiNotify(t *testing.T) {
t.Fatalf("failed to boot: %s", err)
}
if ts1Join != true || ts2Join != true {
if srv1Join != true || srv2Join != true {
t.Fatalf("all join targets not contacted")
}
if ts1Notified != true || ts2Notified != true {
if srv1Notified != true || srv2Notified != true {
t.Fatalf("all notify targets not contacted")
}
if exp, got := BootDone, bs.Status(); exp != got {

Loading…
Cancel
Save