1
0
Fork 0

Merge pull request #1291 from rqlite/otoolep-bootstrap-join-ok

Bootstrap and join if requested
master
Philip O'Toole 1 year ago committed by GitHub
commit b44f6bf769
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -1,3 +1,7 @@
## 7.20.1 (unreleased)
### Implementation changes and bug fixes
- [PR #1291](https://github.com/rqlite/rqlite/pull/1291): Allow bootstrap-join even with preexisting state. Fixes [issue #1290](https://github.com/rqlite/rqlite/issues/1290)
## 7.20.0 (June 1st 2023)
### New features
- [PR #1288](https://github.com/rqlite/rqlite/pull/1288): Upgrade to SQLite 3.42.0.

@ -11,6 +11,7 @@ import (
"net/http"
"os"
"strings"
"sync"
"time"
rurl "github.com/rqlite/rqlite/http/url"
@ -26,6 +27,32 @@ var (
ErrBootTimeout = errors.New("boot timeout")
)
// BootStatus is status of the boot process, after it has completed.
type BootStatus int
const (
BootUnknown BootStatus = iota
BootJoin
BootDone
BootTimeout
)
// String returns a string representation of the BootStatus.
func (b BootStatus) String() string {
switch b {
case BootUnknown:
return "unknown"
case BootJoin:
return "join"
case BootDone:
return "done"
case BootTimeout:
return "timeout"
default:
panic("unknown boot status")
}
}
// AddressProvider is the interface types must implement to provide
// addresses to a Bootstrapper.
type AddressProvider interface {
@ -44,6 +71,9 @@ type Bootstrapper struct {
logger *log.Logger
Interval time.Duration
bootStatusMu sync.RWMutex
bootStatus BootStatus
}
// NewBootstrapper returns an instance of a Bootstrapper.
@ -85,11 +115,13 @@ func (b *Bootstrapper) Boot(id, raftAddr string, done func() bool, timeout time.
for {
select {
case <-timeoutT.C:
b.setBootStatus(BootTimeout)
return ErrBootTimeout
case <-tickerT.C:
if done() {
b.logger.Printf("boot operation marked done")
b.setBootStatus(BootDone)
return nil
}
tickerT.Reset(jitter(b.Interval)) // Move to longer-period polling
@ -108,6 +140,7 @@ func (b *Bootstrapper) Boot(id, raftAddr string, done func() bool, timeout time.
b.joiner.SetBasicAuth(b.username, b.password)
if j, err := b.joiner.Do(targets, id, raftAddr, true); err == nil {
b.logger.Printf("succeeded directly joining cluster via node at %s", j)
b.setBootStatus(BootJoin)
return nil
}
@ -128,6 +161,13 @@ func (b *Bootstrapper) Boot(id, raftAddr string, done func() bool, timeout time.
}
}
// Status returns the reason for the boot process completing.
func (b *Bootstrapper) Status() BootStatus {
b.bootStatusMu.RLock()
defer b.bootStatusMu.RUnlock()
return b.bootStatus
}
func (b *Bootstrapper) notify(targets []string, id, raftAddr string) error {
// Create and configure the client to connect to the other node.
tr := &http.Transport{
@ -190,6 +230,12 @@ func (b *Bootstrapper) notify(targets []string, id, raftAddr string) error {
return nil
}
func (b *Bootstrapper) setBootStatus(status BootStatus) {
b.bootStatusMu.Lock()
defer b.bootStatusMu.Unlock()
b.bootStatus = status
}
type stringAddressProvider struct {
ss []string
}

@ -31,6 +31,9 @@ func Test_NewBootstrapper(t *testing.T) {
if bs == nil {
t.Fatalf("failed to create a simple Bootstrapper")
}
if exp, got := BootUnknown, bs.Status(); exp != got {
t.Fatalf("wrong status, exp %s, got %s", exp, got)
}
}
func Test_BootstrapperBootDoneImmediately(t *testing.T) {
@ -46,6 +49,9 @@ func Test_BootstrapperBootDoneImmediately(t *testing.T) {
if err := bs.Boot("node1", "192.168.1.1:1234", done, 10*time.Second); err != nil {
t.Fatalf("failed to boot: %s", err)
}
if exp, got := BootDone, bs.Status(); exp != got {
t.Fatalf("wrong status, exp %s, got %s", exp, got)
}
}
func Test_BootstrapperBootTimeout(t *testing.T) {
@ -66,6 +72,35 @@ func Test_BootstrapperBootTimeout(t *testing.T) {
if !errors.Is(err, ErrBootTimeout) {
t.Fatalf("wrong error returned")
}
if exp, got := BootTimeout, bs.Status(); exp != got {
t.Fatalf("wrong status, exp %s, got %s", exp, got)
}
}
func Test_BootstrapperBootSingleJoin(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/join" {
t.Fatalf("unexpected path: %s", r.URL.Path)
}
w.WriteHeader(http.StatusOK)
}))
done := func() bool {
return false
}
p := NewAddressProviderString([]string{ts.URL})
bs := NewBootstrapper(p, nil)
bs.Interval = time.Second
err := bs.Boot("node1", "192.168.1.1:1234", done, 60*time.Second)
if err != nil {
t.Fatalf("failed to boot: %s", err)
}
if exp, got := BootJoin, bs.Status(); exp != got {
t.Fatalf("wrong status, exp %s, got %s", exp, got)
}
}
func Test_BootstrapperBootSingleNotify(t *testing.T) {
@ -115,6 +150,10 @@ func Test_BootstrapperBootSingleNotify(t *testing.T) {
if got, exp := body["addr"], "192.168.1.1:1234"; got != exp {
t.Fatalf("wrong address supplied, exp %s, got %s", exp, got)
}
if exp, got := BootDone, bs.Status(); exp != got {
t.Fatalf("wrong status, exp %s, got %s", exp, got)
}
}
func Test_BootstrapperBootSingleNotifyHTTPS(t *testing.T) {
@ -175,6 +214,10 @@ func Test_BootstrapperBootSingleNotifyHTTPS(t *testing.T) {
if got, exp := body["addr"], "192.168.1.1:1234"; got != exp {
t.Fatalf("wrong address supplied, exp %s, got %s", exp, got)
}
if exp, got := BootDone, bs.Status(); exp != got {
t.Fatalf("wrong status, exp %s, got %s", exp, got)
}
}
func Test_BootstrapperBootSingleNotifyAuth(t *testing.T) {
@ -214,6 +257,9 @@ func Test_BootstrapperBootSingleNotifyAuth(t *testing.T) {
if tsNotified != true {
t.Fatalf("notify target not contacted")
}
if exp, got := BootDone, bs.Status(); exp != got {
t.Fatalf("wrong status, exp %s, got %s", exp, got)
}
}
func Test_BootstrapperBootMultiNotify(t *testing.T) {
@ -257,8 +303,10 @@ func Test_BootstrapperBootMultiNotify(t *testing.T) {
if ts1Join != true || ts2Join != true {
t.Fatalf("all join targets not contacted")
}
if ts1Notified != true || ts2Notified != true {
t.Fatalf("all notify targets not contacted")
}
if exp, got := BootDone, bs.Status(); exp != got {
t.Fatalf("wrong status, exp %s, got %s", exp, got)
}
}

@ -515,11 +515,6 @@ func createCluster(cfg *Config, hasPeers bool, joiner *cluster.Joiner, str *stor
}
if joins != nil && cfg.BootstrapExpect > 0 {
if hasPeers {
log.Println("preexisting node configuration detected, ignoring bootstrap request")
return nil
}
// Bootstrap with explicit join addresses requests.
bs := cluster.NewBootstrapper(cluster.NewAddressProviderString(joins), tlsConfig)
if cfg.JoinAs != "" {

@ -40,6 +40,63 @@ class TestBootstrapping(unittest.TestCase):
deprovision_node(n2)
deprovision_node(n3)
class TestBootstrappingRestart(unittest.TestCase):
'''Test simple bootstrapping works via -bootstrap-expect'''
def test(self):
n0 = Node(RQLITED_PATH, '0', bootstrap_expect=3)
n1 = Node(RQLITED_PATH, '1', bootstrap_expect=3)
n2 = Node(RQLITED_PATH, '2', bootstrap_expect=3)
n0.start(join=','.join([n0.APIProtoAddr(), n1.APIProtoAddr(), n2.APIProtoAddr()]))
n1.start(join=','.join([n0.APIProtoAddr(), n1.APIProtoAddr(), n2.APIProtoAddr()]))
n2.start(join=','.join([n0.APIProtoAddr(), n1.APIProtoAddr(), n2.APIProtoAddr()]))
self.assertEqual(n0.wait_for_leader(), n1.wait_for_leader())
self.assertEqual(n0.wait_for_leader(), n2.wait_for_leader())
# Restart all nodes, and ensure they still form a cluster using the same launch params.
n0.stop()
n1.stop()
n2.stop()
n0.start(join=','.join([n0.APIProtoAddr(), n1.APIProtoAddr(), n2.APIProtoAddr()]))
n1.start(join=','.join([n0.APIProtoAddr(), n1.APIProtoAddr(), n2.APIProtoAddr()]))
n2.start(join=','.join([n0.APIProtoAddr(), n1.APIProtoAddr(), n2.APIProtoAddr()]))
self.assertEqual(n0.wait_for_leader(), n1.wait_for_leader())
self.assertEqual(n0.wait_for_leader(), n2.wait_for_leader())
deprovision_node(n0)
deprovision_node(n1)
deprovision_node(n2)
class TestBootstrappingRestartLeaveOnRemove(unittest.TestCase):
'''Test simple bootstrapping works via -bootstrap-expect'''
def test(self):
n0 = Node(RQLITED_PATH, '0', bootstrap_expect=3, raft_cluster_remove_shutdown=True)
n1 = Node(RQLITED_PATH, '1', bootstrap_expect=3, raft_cluster_remove_shutdown=True)
n2 = Node(RQLITED_PATH, '2', bootstrap_expect=3, raft_cluster_remove_shutdown=True)
n0.start(join=','.join([n0.APIProtoAddr(), n1.APIProtoAddr(), n2.APIProtoAddr()]))
n1.start(join=','.join([n0.APIProtoAddr(), n1.APIProtoAddr(), n2.APIProtoAddr()]))
n2.start(join=','.join([n0.APIProtoAddr(), n1.APIProtoAddr(), n2.APIProtoAddr()]))
self.assertEqual(n0.wait_for_leader(), n1.wait_for_leader())
self.assertEqual(n0.wait_for_leader(), n2.wait_for_leader())
# Restart one node, and ensure it still forms a cluster using the same launch params,
# even though it was removed from the cluster on shutdown.
n2.stop(graceful=True)
self.assertEqual(len(n0.nodes()), 2)
n2.start(join=','.join([n0.APIProtoAddr(), n1.APIProtoAddr(), n2.APIProtoAddr()]))
self.assertEqual(n0.wait_for_leader(), n1.wait_for_leader())
self.assertEqual(n0.wait_for_leader(), n2.wait_for_leader())
self.assertEqual(len(n0.nodes()), 3)
deprovision_node(n0)
deprovision_node(n1)
deprovision_node(n2)
class TestAutoClustering(unittest.TestCase):
DiscoModeConsulKV = "consul-kv"
DiscoModeEtcdKV = "etcd-kv"

Loading…
Cancel
Save