1
0
Fork 0

Add bootstrap-expect (#974)

Add cluster-boostrap
master
Philip O'Toole 3 years ago committed by GitHub
parent 6df797b145
commit e4fed0cee3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -1,3 +1,9 @@
## 7.1.0 (unreleased)
This release introduces a new automatic clustering approach, known as _Bootstrapping_, which allows rqlite clusters to form without assistance from an external system such as Consul. This can be very useful for certain deployment scenarios. See the [documentation](https://github.com/rqlite/rqlite/blob/master/DOC/AUTO_CLUSTERING.md) for full details on using the new Bootstrapping mode. Special thanks to [Nathan Ferch](https://github.com/nferch) for his advice regarding the design and development of this feature.
### New features
- [PR #974](https://github.com/rqlite/rqlite/pull/974): Add support for automatically bootstrapping clusters from just rqlite nodes.
## 7.0.1 (January 26th 2022)
### Implementation changes and bug fixes
- [PR #957](https://github.com/rqlite/rqlite/pull/971): Correct rqlite command line options in log message.

@ -1,19 +1,55 @@
# Automatic clustering via node-discovery
This document describes how to use [Consul](https://www.consul.io/) and [etcd](https://etcd.io/) to automatically form rqlite clusters.
# Automatic clustering
This document describes various ways to dynamically form rqlite clusters, which is particularly useful for automating your deployment of rqlite.
> :warning: **This functionality was introduced in version 7.x. It does not exist in earlier releases.**
## Contents
* [Quickstart](#quickstart)
* [Automatic Boostrapping](#automatic-bootstrapping)
* [Consul](#consul)
* [etcd](#etcd)
* [More details](more-details)
* [Controlling configuration](#controlling-configuration)
* [Controlling Consul and etcd configuration](#controlling-consul-and-etcd-configuration)
* [Running multiple different clusters](#running-multiple-different-clusters)
* [Design](design)
## Quickstart
### Automatic Boostrapping
While [manually creating a cluster](https://github.com/rqlite/rqlite/blob/master/DOC/CLUSTER_MGMT.md) is simple, it does suffer one drawback -- you must start one node first and with different options, so it can become the Leader. _Automatic Bootstrapping_, in constrast, allows you to start all the nodes at once, and in a very similar manner. You just need to know the network addresses of the nodes ahead of time.
For simplicity, let's assume you want to run a 3-node rqlite cluster. To bootstrap the cluster, use the `-bootstrap-expect` option like so:
Node 1:
```bash
rqlited -http-addr=$IP1:$HTTP_PORT -raft-addr=$IP1:$RAFT_PORT \
-bootstrap-expect 3 -join http://$IP1:HTTP_PORT,http://$IP2:HTTP_PORT,http://$IP2:HTTP_PORT data
```
Node 2:
```bash
rqlited -http-addr=$IP2:$HTTP_PORT -raft-addr=$IP2:$RAFT_PORT \
-bootstrap-expect 3 -join http://$IP1:HTTP_PORT,http://$IP2:HTTP_PORT,http://$IP2:HTTP_PORT data
```
Node 3:
```bash
rqlited -http-addr=$IP3:$HTTP_PORT -raft-addr=$IP3:$RAFT_PORT \
-bootstrap-expect 3 -join http://$IP1:HTTP_PORT,http://$IP2:HTTP_PORT,http://$IP2:HTTP_PORT data
```
`-bootstrap-expect` should be set to the number of nodes that must be available before the bootstrapping process will commence, in this case 3. You also set `-join` to the HTTP URL of all 3 nodes in the cluster. **It's also required that each launch command has the same values for `-bootstrap-expect` and `-join`.**
After the cluster has formed, you can launch more nodes with the same options. A node will always attempt to first perform a normal cluster-join using the given join addresses, before trying the bootstrap approach.
#### Docker
With Docker you can launch every node identically:
```bash
docker run rqlite/rqlite -bootstrap-expect 3 -join http://$IP1:HTTP_PORT,http://$IP2:HTTP_PORT,http://$IP2:HTTP_PORT
```
where `$IP[1-3]` are the expected network addresses of the containers.
### Consul
Another approach uses [Consul](https://www.consul.io/) to coordinate clustering. The advantage of this approach is that you do need to know the network addresses of the nodes ahead of time.
Let's assume your Consul cluster is running at `http://example.com:8500`. Let's also assume that you are going to run 3 rqlite nodes, each node on a different machine. Launch your rqlite nodes as follows:
Node 1:
@ -35,13 +71,15 @@ rqlited -http-addr=$IP3:$HTTP_PORT -raft-addr=$IP3:$RAFT_PORT \
These three nodes will automatically find each other, and cluster. You can start the nodes in any order and at anytime. Furthermore, the cluster Leader will continually update Consul with its address. This means other nodes can be launched later and automatically join the cluster, even if the Leader changes.
#### Docker
It's even easier with Docker, as you can launch every node identically:
It's even easier with Docker, as you can launch every node almost identically:
```bash
docker run rqlite/rqlite -disco-mode=consul-kv -disco-config '{"address": "example.com:8500"}'
```
### etcd
Autoclustering with etcd is very similar. Let's assume etcd is available at `example.com:2379`.
A third approach uses [etcd](https://www.etcd.io/) to coordinate clustering. Autoclustering with etcd is very similar to Consul. Like when you use Consul, the advantage of this approach is that you do need to know the network addresses of the nodes ahead of time.
Let's assume etcd is available at `example.com:2379`.
Node 1:
```bash
@ -66,7 +104,7 @@ docker run rqlite/rqlite -disco-mode=etcd-kv -disco-config '{"endpoints": ["exam
```
## More Details
### Controlling configuration
### Controlling Consul and etcd configuration
For both Consul and etcd, `-disco-confg` can either be an actual JSON string, or a path to a file containing a JSON-formatted configuration. The former option may be more convenient if the configuration you need to supply is very short, as in the example above.
The example above demonstrates a simple configuration, and most real deployments will require more configuration information for Consul and etcd. For example, your Consul system might be reachable over HTTPS. To more fully configure rqlite for Discovery, consult the relevant configuration specification below. You must create a JSON-formatted file which matches that described in the source code.
@ -74,8 +112,10 @@ The example above demonstrates a simple configuration, and most real deployments
- [Full Consul configuration description](https://github.com/rqlite/rqlite-disco-clients/blob/main/consul/config.go)
- [Full etcd configuration description](https://github.com/rqlite/rqlite-disco-clients/blob/main/etcd/config.go)
### Running multiple different clusters
#### Running multiple different clusters
If you wish a single Consul or etcd system to support multiple rqlite clusters, then set the `-disco-key` command line argument to a different value for each cluster.
### Design
When using either Consul or etcd for automatic clustering, rqlite uses the key-value store of each system. In each case the Leader atomically sets its HTTP URL, allowing other nodes to discover it. To prevent multiple nodes updating the Leader key at once, nodes uses a check-and-set operation, only updating the Leader key if it's value has not changed since it was last read by the node.
## Design
When using Automatic Bootstrapping, each node notifies all other nodes of its existence. The first node to have a record of enough nodes (set by `-boostrap-expect`) forms the cluster. Only one node can ever form a cluster, any node that attempts to do so later will fail, and instead become Followers in the new cluster.
When using either Consul or etcd for automatic clustering, rqlite uses the key-value store of each system. In each case the Leader atomically sets its HTTP URL, allowing other nodes to discover it. To prevent multiple nodes updating the Leader key at once, nodes uses a check-and-set operation, only updating the Leader key if it's value has not changed since it was last read by the node. See [this blog post](https://www.philipotoole.com/rqlite-7-0-designing-node-discovery-and-automatic-clustering/) for more details on the design.

@ -1,75 +0,0 @@
# rqlite Cluster Discovery Service
_For full details on how the Discovery Service is implemented using AWS Lambda and DynamoDB check out [this blog post](http://www.philipotoole.com/building-a-cluster-discovery-service-with-aws-lambda-and-dynamodb/)._
> :warning: **rqlite 7.0 and later does not support this legacy Discovery service .** If you wish to use the legacy Discovery service, you must run rqlite 6.x or earlier. **The legacy Discovery service is also deprecated and may be removed in the future**. However the [source code and design](https://github.com/rqlite/rqlite-disco) for the Discovery service has been published, which means you can run your own Discovery service if you wish.
To form a rqlite cluster, the joining node must be supplied with the network address of some other node in the cluster. This requirement -- that one must know the network address of other nodes to join a cluster -- can be inconvenient in various environments. For example, if you do not know which network addresses will be assigned ahead of time, creating a cluster for the first time requires the following steps:
* First start one node and specify its network address.
* Let it become the leader.
* Start the next node, passing the network address of the first node to the second.
* Repeat this previous step, until you have a cluster of the desired size.
To make all this easier, rqlite also supports _discovery_ mode. In this mode each node registers its network address with an external service, and learns the _join_ addresses of other nodes from the same service.
As a convenience, a free Discovery Service for rqlite is hosted at `discovery.rqlite.com`. Note that this service is provided on an _as-is_ basis, with no guarantees it will always be available (though it has been built in a highly-reliable manner). If you wish to run your own copy of the service, you can deploy the Discovery Service source code yourself.
## Creating a Discovery Service ID
To form a new cluster via discovery, you must first generate a unique Discovery ID for your cluster. This ID is then passed to each node on start-up, allowing the rqlite nodes to automatically connect to each other. To generate an ID using the rqlite discovery service, hosted at `discovery.rqlite.com`, execute the following command:
```
curl -XPOST -L -w "\n" 'http://discovery.rqlite.com'
```
The output of this command will be something like so:
```json
{
"created_at": "2017-02-20 01:25:45.589277",
"disco_id": "809d9ba6-f70b-11e6-9a5a-92819c00729a",
"nodes": []
}
```
In the example above `809d9ba6-f70b-11e6-9a5a-92819c00729a` was returned by the service.
This ID is then provided to each node on start-up.
```shell
rqlited -disco-id 809d9ba6-f70b-11e6-9a5a-92819c00729a
```
When any node registers using the ID, it is returned the current list of nodes that have registered using that ID. If the nodes is the first node to access the service using the ID, it will receive a list that contains just itself -- and will subsequently elect itself leader. Subsequent nodes will then receive a list with more than 1 entry. These nodes will use one of the join addresses in the list to join the cluster.
### Controlling the registered join address
By default, each node registers the address passed in via the `-http-addr` option. However if you instead set `-http-adv-addr` when starting a node, the node will instead register that address. This can be useful when telling a node to listen on all interfaces, but that is should be contacted at a specific address. For example:
```shell
rqlited -disco-id 809d9ba6-f70b-11e6-9a5a-92819c00729a -http-addr 0.0.0.0:4001 -http-adv-addr host1:4001
```
In this example, other nodes will contact this node at `host1:4001`.
## Caveats
If a node is already part of a cluster, no attempt is made to contact the Discovery service, even if a Discovery ID is passed to a node at startup.
## Example
Create a Discovery Service ID:
```shell
$ curl -XPOST -L -w "\n" 'http://discovery.rqlite.com/'
{
"created_at":
"2017-02-20 01:25:45.589277",
"disco_id": "b3da7185-725f-461c-b7a4-13f185bd5007",
"nodes": []
}
```
To automatically form a 3-node cluster simply pass the ID to 3 nodes, all of which can be started simultaneously via the following commands:
```shell
$ rqlited -disco-id b3da7185-725f-461c-b7a4-13f185bd5007 ~/node.1
$ rqlited -http-addr localhost:4003 -raft-addr localhost:4004 -disco-id b3da7185-725f-461c-b7a4-13f185bd5007 ~/node.2
$ rqlited -http-addr localhost:4005 -raft-addr localhost:4006 -disco-id b3da7185-725f-461c-b7a4-13f185bd5007 ~/node.3
```
_This demonstration shows all 3 nodes running on the same host. In reality you probably wouldn't do this, and then you wouldn't need to select different -http-addr and -raft-addr ports for each rqlite node._
## Removing registered addresses
If you need to remove an address from the list of registered addresses, perhaps because a node has permanently left a cluster, you can do this via the following command (be sure to pass all the options shown to `curl`):
```shell
$ curl -XDELETE -L --post301 http://discovery.rqlite.com/<disco ID> -H "Content-Type: application/json" -d '{"addr": "<node address>"}'
```
For example:
```shell
$ curl -XDELETE -L --post301 http://discovery.rqlite.com/be0dd310-fe41-11e6-bb97-92e4c2da9b50 -H "Content-Type: application/json" -d '{"addr": "http://192.168.0.1:4001"}'
```

@ -21,7 +21,7 @@ rqlite uses [Raft](https://raft.github.io/) to achieve consensus across all the
- Fully replicated production-grade SQL database.
- [Production-grade](https://github.com/hashicorp/raft) distributed consensus system.
- An easy-to-use [HTTP(S) API](https://github.com/rqlite/rqlite/blob/master/DOC/DATA_API.md). A [command-line interface is also available](https://github.com/rqlite/rqlite/tree/master/cmd/rqlite), as are various [client libraries](https://github.com/rqlite).
- [Node-discovery and automatic clustering via Consul and etcd](https://github.com/rqlite/rqlite/blob/master/DOC/AUTO_CLUSTERING.md), allowing clusters to be dynamically created.
- [Node-discovery and automatic clustering, including integration with Consul and etcd](https://github.com/rqlite/rqlite/blob/master/DOC/AUTO_CLUSTERING.md), allowing clusters to be dynamically created.
- [Extensive security and encryption support](https://github.com/rqlite/rqlite/blob/master/DOC/SECURITY.md), including node-to-node encryption.
- Choice of [read consistency levels](https://github.com/rqlite/rqlite/blob/master/DOC/CONSISTENCY.md).
- Optional [read-only (non-voting) nodes](https://github.com/rqlite/rqlite/blob/master/DOC/READ_ONLY_NODES.md), which can add read scalability to the system.

@ -0,0 +1,191 @@
package cluster
import (
"bytes"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"log"
"math/rand"
"net/http"
"os"
"strings"
"time"
httpd "github.com/rqlite/rqlite/http"
)
func init() {
rand.Seed(time.Now().UnixNano())
}
var (
// ErrBootTimeout is returned when a boot operation does not
// complete within the timeout.
ErrBootTimeout = errors.New("boot timeout")
)
// AddressProvider is the interface types must implement to provide
// addresses to a Bootstrapper.
type AddressProvider interface {
Lookup() ([]string, error)
}
// Bootstrapper performs a bootstrap of this node.
type Bootstrapper struct {
provider AddressProvider
expect int
tlsConfig *tls.Config
logger *log.Logger
Interval time.Duration
}
// NewBootstrapper returns an instance of a Bootstrapper.
func NewBootstrapper(p AddressProvider, expect int, tlsConfig *tls.Config) *Bootstrapper {
bs := &Bootstrapper{
provider: p,
expect: expect,
tlsConfig: &tls.Config{InsecureSkipVerify: true},
logger: log.New(os.Stderr, "[cluster-bootstrap] ", log.LstdFlags),
Interval: jitter(5 * time.Second),
}
if tlsConfig != nil {
bs.tlsConfig = tlsConfig
}
return bs
}
// Boot performs the bootstrapping process for this node. This means it will
// ensure this node becomes part of a cluster. It does this by either joining
// an existing cluster by explicitly joining it through one of these nodes,
// or by notifying those nodes that it exists, allowing a cluster-wide bootstap
// take place.
//
// Returns nil if the boot operation was successful, or if done() ever returns
// true. done() is periodically polled by the boot process. Returns an error
// the boot process encounters an unrecoverable error, or booting does not
// occur within the given timeout.
//
// id and raftAddr are those of the node calling Boot. All operations
// performed by this function are done as a voting node.
func (b *Bootstrapper) Boot(id, raftAddr string, done func() bool, timeout time.Duration) error {
timeoutT := time.NewTimer(timeout)
defer timeoutT.Stop()
tickerT := time.NewTimer(jitter(time.Millisecond))
defer tickerT.Stop()
notifySuccess := false
for {
select {
case <-timeoutT.C:
return ErrBootTimeout
case <-tickerT.C:
if done() {
b.logger.Printf("boot operation marked done")
return nil
}
tickerT.Reset(jitter(b.Interval)) // Move to longer-period polling
targets, err := b.provider.Lookup()
if err != nil {
b.logger.Printf("provider loopup failed %s", err.Error())
}
if len(targets) < b.expect {
continue
}
// Try an explicit join.
if j, err := Join("", targets, id, raftAddr, true, 1, 0, b.tlsConfig); err == nil {
b.logger.Printf("succeeded directly joining cluster via node at %s",
httpd.RemoveBasicAuth(j))
return nil
}
// Join didn't work, so perhaps perform a notify if we haven't done
// one yet.
if !notifySuccess {
if err := b.notify(targets, id, raftAddr); err != nil {
b.logger.Printf("failed to notify %s, retrying", targets)
} else {
b.logger.Printf("succeeded notifying %s", targets)
notifySuccess = true
}
}
}
}
}
func (b *Bootstrapper) notify(targets []string, id, raftAddr string) error {
// Create and configure the client to connect to the other node.
tr := &http.Transport{
TLSClientConfig: b.tlsConfig,
}
client := &http.Client{Transport: tr}
buf, err := json.Marshal(map[string]interface{}{
"id": id,
"addr": raftAddr,
})
if err != nil {
return err
}
for _, t := range targets {
// Check for protocol scheme, and insert default if necessary.
fullTarget := httpd.NormalizeAddr(fmt.Sprintf("%s/notify", t))
TargetLoop:
for {
resp, err := client.Post(fullTarget, "application/json", bytes.NewReader(buf))
if err != nil {
return err
// time.Sleep(bs.joinInterval) // need to count loops....? Or this just does one loop?
// continue
}
resp.Body.Close()
switch resp.StatusCode {
case http.StatusOK:
break TargetLoop
case http.StatusBadRequest:
// One possible cause is that the target server is listening for HTTPS, but
// an HTTP attempt was made. Switch the protocol to HTTPS, and try again.
// This can happen when using various disco approaches, since it doesn't
// record information about which protocol a registered node is actually using.
if strings.HasPrefix(fullTarget, "https://") {
// It's already HTTPS, give up.
return fmt.Errorf("failed to notify node at %s: %s",
httpd.RemoveBasicAuth(fullTarget), resp.Status)
}
fullTarget = httpd.EnsureHTTPS(fullTarget)
default:
return fmt.Errorf("failed to notify node at %s: %s",
httpd.RemoveBasicAuth(fullTarget), resp.Status)
}
}
}
return nil
}
type stringAddressProvider struct {
ss []string
}
func (s *stringAddressProvider) Lookup() ([]string, error) {
return s.ss, nil
}
// NewAddressProviderString wraps an AddressProvider around a string slice.
func NewAddressProviderString(ss []string) AddressProvider {
return &stringAddressProvider{ss}
}
// jitter adds a little bit of randomness to a given duration. This is
// useful to prevent nodes across the cluster performing certain operations
// all at the same time.
func jitter(duration time.Duration) time.Duration {
return duration + time.Duration(rand.Float64()*float64(duration))
}

@ -0,0 +1,169 @@
package cluster
import (
"encoding/json"
"errors"
"io"
"net/http"
"net/http/httptest"
"reflect"
"testing"
"time"
)
func Test_AddressProviderString(t *testing.T) {
a := []string{"a", "b", "c"}
p := NewAddressProviderString(a)
b, err := p.Lookup()
if err != nil {
t.Fatalf("failed to lookup addresses: %s", err.Error())
}
if !reflect.DeepEqual(a, b) {
t.Fatalf("failed to get correct addresses")
}
}
func Test_NewBootstrapper(t *testing.T) {
bs := NewBootstrapper(nil, 1, nil)
if bs == nil {
t.Fatalf("failed to create a simple Bootstrapper")
}
}
func Test_BootstrapperBootDoneImmediately(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
t.Fatalf("client made HTTP request")
}))
done := func() bool {
return true
}
p := NewAddressProviderString([]string{ts.URL})
bs := NewBootstrapper(p, 1, nil)
if err := bs.Boot("node1", "192.168.1.1:1234", done, 10*time.Second); err != nil {
t.Fatalf("failed to boot: %s", err)
}
}
func Test_BootstrapperBootTimeout(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusServiceUnavailable)
}))
done := func() bool {
return false
}
p := NewAddressProviderString([]string{ts.URL})
bs := NewBootstrapper(p, 1, nil)
bs.Interval = time.Second
err := bs.Boot("node1", "192.168.1.1:1234", done, 5*time.Second)
if err == nil {
t.Fatalf("no error returned from timed-out boot")
}
if !errors.Is(err, ErrBootTimeout) {
t.Fatalf("wrong error returned")
}
}
func Test_BootstrapperBootSingleNotify(t *testing.T) {
tsNotified := false
var body map[string]string
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/join" {
w.WriteHeader(http.StatusServiceUnavailable)
return
}
tsNotified = true
b, err := io.ReadAll(r.Body)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
if err := json.Unmarshal(b, &body); err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
}))
n := -1
done := func() bool {
n++
if n == 5 {
return true
}
return false
}
p := NewAddressProviderString([]string{ts.URL})
bs := NewBootstrapper(p, 1, nil)
bs.Interval = time.Second
err := bs.Boot("node1", "192.168.1.1:1234", done, 60*time.Second)
if err != nil {
t.Fatalf("failed to boot: %s", err)
}
if tsNotified != true {
t.Fatalf("notify target not contacted")
}
if got, exp := body["id"], "node1"; got != exp {
t.Fatalf("wrong node ID supplied, exp %s, got %s", exp, got)
}
if got, exp := body["addr"], "192.168.1.1:1234"; got != exp {
t.Fatalf("wrong address supplied, exp %s, got %s", exp, got)
}
}
func Test_BootstrapperBootMultiNotify(t *testing.T) {
ts1Join := false
ts1Notified := false
ts1 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/join" {
ts1Join = true
w.WriteHeader(http.StatusServiceUnavailable)
return
}
ts1Notified = true
}))
ts2Join := false
ts2Notified := false
ts2 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/join" {
ts2Join = true
w.WriteHeader(http.StatusServiceUnavailable)
return
}
ts2Notified = true
}))
n := -1
done := func() bool {
n++
if n == 5 {
return true
}
return false
}
p := NewAddressProviderString([]string{ts1.URL, ts2.URL})
bs := NewBootstrapper(p, 2, nil)
bs.Interval = time.Second
err := bs.Boot("node1", "192.168.1.1:1234", done, 60*time.Second)
if err != nil {
t.Fatalf("failed to boot: %s", err)
}
if ts1Join != true || ts2Join != true {
t.Fatalf("all join targets not contacted")
}
if ts1Notified != true || ts2Notified != true {
t.Fatalf("all notify targets not contacted")
}
}

@ -10,7 +10,6 @@ import (
"log"
"net"
"net/http"
"net/url"
"os"
"strings"
"time"
@ -19,44 +18,12 @@ import (
)
var (
// ErrUserInfoExists is returned when a join address already contains
// a username and a password.
ErrUserInfoExists = errors.New("userinfo exists")
// ErrJoinFailed is returned when a node fails to join a cluster
ErrJoinFailed = errors.New("failed to join cluster")
)
// AddUserInfo adds username and password to the join address. If username is empty
// joinAddr is returned unchanged. If joinAddr already contains a username, ErrUserInfoExists
// is returned.
func AddUserInfo(joinAddr, username, password string) (string, error) {
if username == "" {
return joinAddr, nil
}
u, err := url.Parse(joinAddr)
if err != nil {
return "", err
}
if u.User != nil && u.User.Username() != "" {
return "", ErrUserInfoExists
}
u.User = url.UserPassword(username, password)
return u.String(), nil
}
// RemoveUserInfo returns the joinAddr with any username and password removed.
func RemoveUserInfo(joinAddr string) string {
u, err := url.Parse(joinAddr)
if err != nil {
return joinAddr
}
u.User = nil
return u.String()
}
// ErrNotifyFailed is returned when a node fails to notify another node
ErrNotifyFailed = errors.New("failed to notify node")
)
// Join attempts to join the cluster at one of the addresses given in joinAddr.
// It walks through joinAddr in order, and sets the node ID and Raft address of

@ -13,52 +13,6 @@ import (
const numAttempts int = 3
const attemptInterval = 5 * time.Second
func Test_AddUserInfo(t *testing.T) {
var u string
var err error
u, err = AddUserInfo("http://example.com", "user1", "pass1")
if err != nil {
t.Fatalf("failed to add user info: %s", err.Error())
}
if exp, got := "http://user1:pass1@example.com", u; exp != got {
t.Fatalf("wrong URL created, exp %s, got %s", exp, got)
}
u, err = AddUserInfo("http://example.com", "user1", "")
if err != nil {
t.Fatalf("failed to add user info: %s", err.Error())
}
if exp, got := "http://user1:@example.com", u; exp != got {
t.Fatalf("wrong URL created, exp %s, got %s", exp, got)
}
u, err = AddUserInfo("http://example.com", "", "pass1")
if err != nil {
t.Fatalf("failed to add user info: %s", err.Error())
}
if exp, got := "http://example.com", u; exp != got {
t.Fatalf("wrong URL created, exp %s, got %s", exp, got)
}
u, err = AddUserInfo("http://user1:pass1@example.com", "user2", "pass2")
if err == nil {
t.Fatalf("failed to get expected error when UserInfo exists")
}
}
func Test_RemoveUserInfo(t *testing.T) {
if exp, got := "http://example.com", RemoveUserInfo("http://user1:pass1@example.com"); exp != got {
t.Fatalf("expected %s, got %s", exp, got)
}
if exp, got := "http://example.com", RemoveUserInfo("http://example.com"); exp != got {
t.Fatalf("expected %s, got %s", exp, got)
}
if exp, got := "nonsense", RemoveUserInfo("nonsense"); exp != got {
t.Fatalf("expected %s, got %s", exp, got)
}
}
func Test_SingleJoinOK(t *testing.T) {
var body map[string]interface{}
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {

@ -86,6 +86,12 @@ type Config struct {
// JoinInterval is the time between retrying failed join operations.
JoinInterval time.Duration
// BootstrapExpect is the minimum number of nodes required for a bootstrap.
BootstrapExpect int
// BootstrapExpectTimeout is the maximum time a bootstrap operation can take.
BootstrapExpectTimeout time.Duration
// NoHTTPVerify disables checking other nodes' HTTP X509 certs for validity.
NoHTTPVerify bool
@ -143,9 +149,6 @@ type Config struct {
// RaftApplyTimeout sets the Log-apply timeout.
RaftApplyTimeout time.Duration
// RaftOpenTimeout sets the Raft store open timeout.
RaftOpenTimeout time.Duration
// RaftShutdownOnRemove sets whether Raft should be shutdown if the node is removed
RaftShutdownOnRemove bool
@ -215,6 +218,8 @@ func ParseFlags(name, desc string, build *BuildInfo) (*Config, error) {
flag.StringVar(&config.JoinAs, "join-as", "", "Username in authentication file to join as. If not set, joins anonymously")
flag.IntVar(&config.JoinAttempts, "join-attempts", 5, "Number of join attempts to make")
flag.DurationVar(&config.JoinInterval, "join-interval", 5*time.Second, "Period between join attempts")
flag.IntVar(&config.BootstrapExpect, "bootstrap-expect", 0, "Minimum number of nodes required for a bootstrap")
flag.DurationVar(&config.BootstrapExpectTimeout, "bootstrap-expect-timeout", 60*time.Second, "Maximum time for bootstrap process")
flag.StringVar(&config.DiscoMode, "disco-mode", "", "Choose cluster discovery service. If not set, not used")
flag.StringVar(&config.DiscoKey, "disco-key", "rqlite", "Key prefix for cluster discovery service")
flag.StringVar(&config.DiscoConfig, "disco-config", "", "Set path to cluster discovery config file")
@ -229,7 +234,6 @@ func ParseFlags(name, desc string, build *BuildInfo) (*Config, error) {
flag.DurationVar(&config.RaftHeartbeatTimeout, "raft-timeout", time.Second, "Raft heartbeat timeout")
flag.DurationVar(&config.RaftElectionTimeout, "raft-election-timeout", time.Second, "Raft election timeout")
flag.DurationVar(&config.RaftApplyTimeout, "raft-apply-timeout", 10*time.Second, "Raft apply timeout")
flag.DurationVar(&config.RaftOpenTimeout, "raft-open-timeout", 120*time.Second, "Time for initial Raft logs to be applied. Use 0s duration to skip wait")
flag.Uint64Var(&config.RaftSnapThreshold, "raft-snap", 8192, "Number of outstanding log entries that trigger snapshot")
flag.DurationVar(&config.RaftSnapInterval, "raft-snap-int", 30*time.Second, "Snapshot threshold check interval")
flag.DurationVar(&config.RaftLeaderLeaseTimeout, "raft-leader-lease-timeout", 0, "Raft leader lease timeout. Use 0s for Raft default")
@ -253,19 +257,19 @@ func ParseFlags(name, desc string, build *BuildInfo) (*Config, error) {
errorExit(0, msg)
}
if config.OnDiskPath != "" && !config.OnDisk {
errorExit(1, "fatal: on-disk-path is set, but on-disk is not\n")
}
// Ensure the data path is set.
if flag.NArg() < 1 {
errorExit(1, "fatal: no data directory set\n")
errorExit(1, "no data directory set")
}
config.DataPath = flag.Arg(0)
// Ensure no args come after the data directory.
if flag.NArg() > 1 {
errorExit(1, "fatal: arguments after data directory are not accepted\n")
errorExit(1, "arguments after data directory are not accepted")
}
if config.OnDiskPath != "" && !config.OnDisk {
errorExit(1, "on-disk-path is set, but on-disk is not")
}
// Enforce policies regarding addresses
@ -279,7 +283,7 @@ func ParseFlags(name, desc string, build *BuildInfo) (*Config, error) {
// Perfom some address validity checks.
if strings.HasPrefix(strings.ToLower(config.HTTPAddr), "http") ||
strings.HasPrefix(strings.ToLower(config.HTTPAdv), "http") {
errorExit(1, "fatal: HTTP options should not include protocol (http:// or https://)\n")
errorExit(1, "HTTP options should not include protocol (http:// or https://)")
}
if _, _, err := net.SplitHostPort(config.HTTPAddr); err != nil {
errorExit(1, "HTTP bind address not valid")
@ -288,20 +292,27 @@ func ParseFlags(name, desc string, build *BuildInfo) (*Config, error) {
errorExit(1, "HTTP advertised address not valid")
}
if _, _, err := net.SplitHostPort(config.RaftAddr); err != nil {
errorExit(1, "Raft bind address not valid")
errorExit(1, "raft bind address not valid")
}
if _, _, err := net.SplitHostPort(config.RaftAdv); err != nil {
errorExit(1, "Raft advertised address not valid")
errorExit(1, "raft advertised address not valid")
}
// Enforce bootstrapping policies
if config.BootstrapExpect > 0 && config.RaftNonVoter {
errorExit(1, "bootstrapping only applicable to voting nodes")
}
// Valid disco mode?
switch config.DiscoMode {
case "":
case DiscoModeConsulKV:
case DiscoModeEtcdKV:
break
case DiscoModeEtcdKV, DiscoModeConsulKV:
if config.BootstrapExpect > 0 {
errorExit(1, fmt.Sprintf("bootstrapping not applicable when using %s",
config.DiscoMode))
}
default:
errorExit(1, fmt.Sprintf("fatal: invalid disco mode, choose %s or %s\n",
errorExit(1, fmt.Sprintf("invalid disco mode, choose %s or %s",
DiscoModeConsulKV, DiscoModeEtcdKV))
}
@ -314,6 +325,9 @@ func ParseFlags(name, desc string, build *BuildInfo) (*Config, error) {
}
func errorExit(code int, msg string) {
fmt.Fprintf(os.Stderr, msg)
if code != 0 {
fmt.Fprintf(os.Stderr, "fatal: ")
}
fmt.Fprintf(os.Stderr, fmt.Sprintf("%s\n", msg))
os.Exit(code)
}

@ -167,13 +167,15 @@ func main() {
log.Println("rqlite server stopped")
}
// determineJoinAddresses returns the join addresses supplied at the command-line.
// removing any occurence of this nodes HTTP address.
func determineJoinAddresses(cfg *Config) ([]string, error) {
var addrs []string
if cfg.JoinAddr != "" {
addrs = strings.Split(cfg.JoinAddr, ",")
}
// It won't work to attempt a self-join, so remove any such address.
// It won't work to attempt an explicit self-join, so remove any such address.
var validAddrs []string
for i := range addrs {
if addrs[i] == cfg.HTTPAdv || addrs[i] == cfg.HTTPAddr {
@ -213,6 +215,7 @@ func createStore(cfg *Config, ln *tcp.Layer) (*store.Store, error) {
str.HeartbeatTimeout = cfg.RaftHeartbeatTimeout
str.ElectionTimeout = cfg.RaftElectionTimeout
str.ApplyTimeout = cfg.RaftApplyTimeout
str.BootstrapExpect = cfg.BootstrapExpect
isNew := store.IsNewNode(dataPath)
if isNew {
@ -364,20 +367,41 @@ func createCluster(cfg *Config, joins []string, tlsConfig *tls.Config, hasPeers
}
if len(joins) > 0 {
// Explicit join addresses supplied, so use them.
log.Println("explicit join addresses are:", joins)
if cfg.BootstrapExpect == 0 {
// Explicit join operation requested, so do it.
log.Println("explicit join addresses are:", joins)
if err := addJoinCreds(joins, cfg.JoinAs, credStr); err != nil {
return fmt.Errorf("failed too add auth creds: %s", err.Error())
if err := addJoinCreds(joins, cfg.JoinAs, credStr); err != nil {
return fmt.Errorf("failed to add BasicAuth creds: %s", err.Error())
}
j, err := cluster.Join(cfg.JoinSrcIP, joins, str.ID(), cfg.RaftAdv, !cfg.RaftNonVoter,
cfg.JoinAttempts, cfg.JoinInterval, tlsConfig)
if err != nil {
return fmt.Errorf("failed to join cluster: %s", err.Error())
}
log.Println("successfully joined cluster at", httpd.RemoveBasicAuth(j))
return nil
}
j, err := cluster.Join(cfg.JoinSrcIP, joins, str.ID(), cfg.RaftAdv, !cfg.RaftNonVoter,
cfg.JoinAttempts, cfg.JoinInterval, tlsConfig)
if err != nil {
return fmt.Errorf("failed to join cluster at %s: %s", joins, err.Error())
if hasPeers {
log.Println("preexisting node configuration detected, ignoring bootstrap request")
return nil
}
log.Println("successfully joined cluster at", j)
return nil
// Must self-notify when bootstrapping
targets := append(joins, cfg.HTTPAdv)
log.Println("bootstrap addresses are:", targets)
if err := addJoinCreds(targets, cfg.JoinAs, credStr); err != nil {
return fmt.Errorf("failed to add BasicAuth creds: %s", err.Error())
}
bs := cluster.NewBootstrapper(cluster.NewAddressProviderString(targets),
cfg.BootstrapExpect, tlsConfig)
done := func() bool {
leader, _ := str.LeaderAddr()
return leader != ""
}
return bs.Boot(str.ID(), cfg.RaftAdv, done, cfg.BootstrapExpectTimeout)
}
if cfg.DiscoMode == "" {
@ -449,7 +473,7 @@ func addJoinCreds(joins []string, joinAs string, credStr *auth.CredentialsStore)
var err error
for i := range joins {
joins[i], err = cluster.AddUserInfo(joins[i], joinAs, pw)
joins[i], err = httpd.AddBasicAuth(joins[i], joinAs, pw)
if err != nil {
return fmt.Errorf("failed to use credential store join_as: %s", err.Error())
}

@ -15,6 +15,7 @@ import (
"net"
"net/http"
"net/http/pprof"
"net/url"
"os"
"runtime"
"strings"
@ -30,6 +31,10 @@ import (
var (
// ErrLeaderNotFound is returned when a node cannot locate a leader
ErrLeaderNotFound = errors.New("leader not found")
// ErrUserInfoExists is returned when a join address already contains
// a username and a password.
ErrUserInfoExists = errors.New("userinfo exists")
)
// Database is the interface any queryable system must implement
@ -54,6 +59,9 @@ type Store interface {
// Join joins the node with the given ID, reachable at addr, to this node.
Join(id, addr string, voter bool) error
// Notify notifies this node that a node is available at addr.
Notify(id, addr string) error
// Remove removes the node, specified by id, from the cluster.
Remove(id string) error
@ -140,6 +148,7 @@ const (
numBackups = "backups"
numLoad = "loads"
numJoins = "joins"
numNotifies = "notifies"
numAuthOK = "authOK"
numAuthFail = "authFail"
@ -184,6 +193,7 @@ func init() {
stats.Add(numBackups, 0)
stats.Add(numLoad, 0)
stats.Add(numJoins, 0)
stats.Add(numNotifies, 0)
stats.Add(numAuthOK, 0)
stats.Add(numAuthFail, 0)
}
@ -316,6 +326,9 @@ func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
case strings.HasPrefix(r.URL.Path, "/join"):
stats.Add(numJoins, 1)
s.handleJoin(w, r)
case strings.HasPrefix(r.URL.Path, "/notify"):
stats.Add(numNotifies, 1)
s.handleNotify(w, r)
case strings.HasPrefix(r.URL.Path, "/remove"):
s.handleRemove(w, r)
case strings.HasPrefix(r.URL.Path, "/status"):
@ -405,6 +418,46 @@ func (s *Service) handleJoin(w http.ResponseWriter, r *http.Request) {
}
}
// handleNotify handles node-notify requests from other nodes.
func (s *Service) handleNotify(w http.ResponseWriter, r *http.Request) {
if !s.CheckRequestPerm(r, PermJoin) {
w.WriteHeader(http.StatusUnauthorized)
return
}
if r.Method != "POST" {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
b, err := ioutil.ReadAll(r.Body)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
md := map[string]interface{}{}
if err := json.Unmarshal(b, &md); err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
remoteID, ok := md["id"]
if !ok {
w.WriteHeader(http.StatusBadRequest)
return
}
remoteAddr, ok := md["addr"]
if !ok {
w.WriteHeader(http.StatusBadRequest)
return
}
if err := s.store.Notify(remoteID.(string), remoteAddr.(string)); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
}
// handleRemove handles cluster-remove requests.
func (s *Service) handleRemove(w http.ResponseWriter, r *http.Request) {
if !s.CheckRequestPerm(r, PermRemove) {
@ -1378,6 +1431,38 @@ func CheckHTTPS(addr string) bool {
return strings.HasPrefix(addr, "https://")
}
// AddBasicAuth adds username and password to the join address. If username is empty
// joinAddr is returned unchanged. If joinAddr already contains a username, ErrUserInfoExists
// is returned.
func AddBasicAuth(joinAddr, username, password string) (string, error) {
if username == "" {
return joinAddr, nil
}
u, err := url.Parse(joinAddr)
if err != nil {
return "", err
}
if u.User != nil && u.User.Username() != "" {
return "", ErrUserInfoExists
}
u.User = url.UserPassword(username, password)
return u.String(), nil
}
// RemoveBasicAuth returns a copy of the given URL, with any basic auth information
// removed.
func RemoveBasicAuth(u string) string {
uu, err := url.Parse(u)
if err != nil {
return u
}
uu.User = nil
return uu.String()
}
// queryRequestFromStrings converts a slice of strings into a command.QueryRequest
func executeRequestFromStrings(s []string, timings, tx bool) *command.ExecuteRequest {
stmts := make([]*command.Statement, len(s))

@ -154,6 +154,74 @@ func Test_EnsureHTTPS(t *testing.T) {
}
}
func Test_AddBasicAuth(t *testing.T) {
var u string
var err error
u, err = AddBasicAuth("http://example.com", "user1", "pass1")
if err != nil {
t.Fatalf("failed to add user info: %s", err.Error())
}
if exp, got := "http://user1:pass1@example.com", u; exp != got {
t.Fatalf("wrong URL created, exp %s, got %s", exp, got)
}
u, err = AddBasicAuth("http://example.com", "user1", "")
if err != nil {
t.Fatalf("failed to add user info: %s", err.Error())
}
if exp, got := "http://user1:@example.com", u; exp != got {
t.Fatalf("wrong URL created, exp %s, got %s", exp, got)
}
u, err = AddBasicAuth("http://example.com", "", "pass1")
if err != nil {
t.Fatalf("failed to add user info: %s", err.Error())
}
if exp, got := "http://example.com", u; exp != got {
t.Fatalf("wrong URL created, exp %s, got %s", exp, got)
}
u, err = AddBasicAuth("http://user1:pass1@example.com", "user2", "pass2")
if err == nil {
t.Fatalf("failed to get expected error when UserInfo exists")
}
}
func Test_RemoveBasicAuth(t *testing.T) {
tests := []struct {
orig string
removed string
}{
{
orig: "localhost",
removed: "localhost",
},
{
orig: "http://localhost:4001",
removed: "http://localhost:4001",
},
{
orig: "https://foo:bar@localhost",
removed: "https://localhost",
},
{
orig: "https://foo:bar@localhost:4001",
removed: "https://localhost:4001",
},
{
orig: "http://foo:bar@localhost:4001/path",
removed: "http://localhost:4001/path",
},
}
for _, tt := range tests {
if e := RemoveBasicAuth(tt.orig); e != tt.removed {
t.Fatalf("%s BasicAuth not removed correctly, exp %s, got %s", tt.orig, tt.removed, e)
}
}
}
func Test_NewService(t *testing.T) {
m := &MockStore{}
c := &mockClusterService{}
@ -355,6 +423,14 @@ func Test_405Routes(t *testing.T) {
t.Fatalf("failed to get expected 405, got %d", resp.StatusCode)
}
resp, err = client.Get(host + "/notify")
if err != nil {
t.Fatalf("failed to make request")
}
if resp.StatusCode != 405 {
t.Fatalf("failed to get expected 405, got %d", resp.StatusCode)
}
resp, err = client.Post(host+"/db/backup", "", nil)
if err != nil {
t.Fatalf("failed to make request")
@ -415,6 +491,7 @@ func Test_401Routes_NoBasicAuth(t *testing.T) {
"/db/backup",
"/db/load",
"/join",
"/notify",
"/remove",
"/status",
"/nodes",
@ -456,6 +533,7 @@ func Test_401Routes_BasicAuthBadPassword(t *testing.T) {
"/db/backup",
"/db/load",
"/join",
"/notify",
"/status",
"/nodes",
"/readyz",
@ -502,6 +580,7 @@ func Test_401Routes_BasicAuthBadPerm(t *testing.T) {
"/db/backup",
"/db/load",
"/join",
"/notify",
"/status",
"/nodes",
"/readyz",
@ -993,6 +1072,10 @@ func (m *MockStore) Join(id, addr string, voter bool) error {
return nil
}
func (m *MockStore) Notify(id, addr string) error {
return nil
}
func (m *MockStore) Remove(id string) error {
return nil
}

@ -176,6 +176,11 @@ type Store struct {
logger *log.Logger
notifyMu sync.Mutex
BootstrapExpect int
bootstrapped bool
notifyingNodes map[string]*Server
// StartupOnDisk disables in-memory initialization of on-disk databases.
// Restarting a node with an on-disk database can be slow so, by default,
// rqlite creates on-disk databases in memory first, and then moves the
@ -242,6 +247,7 @@ func New(ln Listener, c *Config) *Store {
leaderObservers: make([]chan<- struct{}, 0),
reqMarshaller: command.NewRequestMarshaler(),
logger: logger,
notifyingNodes: make(map[string]*Server),
ApplyTimeout: applyTimeout,
}
}
@ -833,6 +839,50 @@ func (s *Store) Backup(leader bool, fmt BackupFormat, dst io.Writer) error {
return nil
}
// Notify notifies this Store that a node is ready for bootstrapping at the
// given address. Once the number of known nodes reaches the expected level
// bootstrapping will be attempted using this Store. "Expected level" includes
// this node, so this node must self-notify to ensure the cluster bootstraps
// with the *advertised Raft address* which the Store doesn't know about.
func (s *Store) Notify(id, addr string) error {
s.notifyMu.Lock()
defer s.notifyMu.Unlock()
if s.BootstrapExpect == 0 || s.bootstrapped || s.raft.Leader() != "" {
// There is no reason this node will bootstrap.
return nil
}
if _, ok := s.notifyingNodes[id]; ok {
return nil
}
s.notifyingNodes[id] = &Server{id, addr, "voter"}
if len(s.notifyingNodes) < s.BootstrapExpect {
return nil
}
raftServers := make([]raft.Server, 0)
for _, n := range s.notifyingNodes {
raftServers = append(raftServers, raft.Server{
ID: raft.ServerID(n.ID),
Address: raft.ServerAddress(n.Addr),
})
}
s.logger.Printf("reached expected bootstrap count of %d, starting cluster bootstrap",
s.BootstrapExpect)
bf := s.raft.BootstrapCluster(raft.Configuration{
Servers: raftServers,
}).(raft.Future)
if bf.Error() != nil {
s.logger.Printf("cluster bootstrap failed: %s", bf.Error())
} else {
s.logger.Printf("cluster bootstrap successful")
}
s.bootstrapped = true
return nil
}
// Join joins a node, identified by id and located at addr, to this store.
// The node must be ready to respond to Raft communications at that address.
func (s *Store) Join(id, addr string, voter bool) error {

@ -1073,6 +1073,92 @@ func Test_MultiNodeJoinRemove(t *testing.T) {
}
}
func Test_MultiNodeStoreNotifyBootstrap(t *testing.T) {
s0, ln0 := mustNewStore(true)
defer os.RemoveAll(s0.Path())
defer ln0.Close()
if err := s0.Open(); err != nil {
t.Fatalf("failed to open single-node store: %s", err.Error())
}
defer s0.Close(true)
s1, ln1 := mustNewStore(true)
defer os.RemoveAll(s1.Path())
defer ln1.Close()
if err := s1.Open(); err != nil {
t.Fatalf("failed to open single-node store: %s", err.Error())
}
defer s1.Close(true)
s2, ln2 := mustNewStore(true)
defer os.RemoveAll(s2.Path())
defer ln2.Close()
if err := s2.Open(); err != nil {
t.Fatalf("failed to open single-node store: %s", err.Error())
}
defer s2.Close(true)
s0.BootstrapExpect = 3
if err := s0.Notify(s0.ID(), ln0.Addr().String()); err != nil {
t.Fatalf("failed to notify store: %s", err.Error())
}
if err := s0.Notify(s0.ID(), ln0.Addr().String()); err != nil {
t.Fatalf("failed to notify store -- not idempotent: %s", err.Error())
}
if err := s0.Notify(s1.ID(), ln1.Addr().String()); err != nil {
t.Fatalf("failed to notify store: %s", err.Error())
}
if err := s0.Notify(s2.ID(), ln2.Addr().String()); err != nil {
t.Fatalf("failed to notify store: %s", err.Error())
}
// Check that the cluster bootstrapped properly.
leader0, err := s0.WaitForLeader(10 * time.Second)
if err != nil {
t.Fatalf("failed to get leader: %s", err.Error())
}
nodes, err := s0.Nodes()
if err != nil {
t.Fatalf("failed to get nodes: %s", err.Error())
}
if len(nodes) != 3 {
t.Fatalf("size of bootstrapped cluster is not correct")
}
leader1, err := s1.WaitForLeader(10 * time.Second)
if err != nil {
t.Fatalf("failed to get leader: %s", err.Error())
}
nodes, err = s1.Nodes()
if err != nil {
t.Fatalf("failed to get nodes: %s", err.Error())
}
if len(nodes) != 3 {
t.Fatalf("size of bootstrapped cluster is not correct")
}
leader2, err := s2.WaitForLeader(10 * time.Second)
if err != nil {
t.Fatalf("failed to get leader: %s", err.Error())
}
nodes, err = s2.Nodes()
if err != nil {
t.Fatalf("failed to get nodes: %s", err.Error())
}
if len(nodes) != 3 {
t.Fatalf("size of bootstrapped cluster is not correct")
}
if leader0 == leader1 && leader0 == leader2 {
return
}
t.Fatalf("leader not the same on each node")
// Calling Notify() on a node that is part of a cluster should
// be a no-op.
if err := s0.Notify(s1.ID(), ln1.Addr().String()); err != nil {
t.Fatalf("failed to notify store that is part of cluster: %s", err.Error())
}
}
func Test_MultiNodeJoinNonVoterRemove(t *testing.T) {
s0, ln0 := mustNewStore(true)
defer os.RemoveAll(s0.Path())

@ -3,9 +3,11 @@ package system
import (
"fmt"
"net"
"sync"
"testing"
"time"
"github.com/rqlite/rqlite/cluster"
"github.com/rqlite/rqlite/tcp"
)
@ -160,6 +162,352 @@ func Test_MultiNodeCluster(t *testing.T) {
}
}
// Test_MultiNodeClusterBootstrap tests formation of a 3-node cluster via bootstraping,
// and its operation.
func Test_MultiNodeClusterBootstrap(t *testing.T) {
node1 := mustNewNode(false)
node1.Store.BootstrapExpect = 3
defer node1.Deprovision()
node2 := mustNewNode(false)
node2.Store.BootstrapExpect = 3
defer node2.Deprovision()
node3 := mustNewNode(false)
node3.Store.BootstrapExpect = 3
defer node3.Deprovision()
provider := cluster.NewAddressProviderString(
[]string{node1.APIAddr, node2.APIAddr, node3.APIAddr})
node1Bs := cluster.NewBootstrapper(provider, 3, nil)
node2Bs := cluster.NewBootstrapper(provider, 3, nil)
node3Bs := cluster.NewBootstrapper(provider, 3, nil)
// Have all nodes start a bootstrap basically in parallel,
// ensure only 1 leader actually gets elected.
var wg sync.WaitGroup
wg.Add(3)
go func() {
done := func() bool {
addr, _ := node1.Store.LeaderAddr()
return addr != ""
}
node1Bs.Boot(node1.ID, node1.RaftAddr, done, 10*time.Second)
wg.Done()
}()
go func() {
done := func() bool {
addr, _ := node2.Store.LeaderAddr()
return addr != ""
}
node2Bs.Boot(node2.ID, node2.RaftAddr, done, 10*time.Second)
wg.Done()
}()
go func() {
done := func() bool {
addr, _ := node3.Store.LeaderAddr()
return addr != ""
}
node3Bs.Boot(node3.ID, node3.RaftAddr, done, 10*time.Second)
wg.Done()
}()
wg.Wait()
// Wait for leader election
_, err := node1.WaitForLeader()
if err != nil {
t.Fatalf("failed waiting for a leader: %s", err.Error())
}
c := Cluster{node1, node2, node3}
leader, err := c.Leader()
if err != nil {
t.Fatalf("failed to find cluster leader: %s", err.Error())
}
// Run queries against cluster.
tests := []struct {
stmt string
expected string
execute bool
}{
{
stmt: `CREATE TABLE foo (id integer not null primary key, name text)`,
expected: `{"results":[{}]}`,
execute: true,
},
{
stmt: `INSERT INTO foo(name) VALUES("fiona")`,
expected: `{"results":[{"last_insert_id":1,"rows_affected":1}]}`,
execute: true,
},
{
stmt: `SELECT * FROM foo`,
expected: `{"results":[{"columns":["id","name"],"types":["integer","text"],"values":[[1,"fiona"]]}]}`,
execute: false,
},
}
for i, tt := range tests {
var r string
var err error
if tt.execute {
r, err = leader.Execute(tt.stmt)
} else {
r, err = leader.Query(tt.stmt)
}
if err != nil {
t.Fatalf(`test %d failed "%s": %s`, i, tt.stmt, err.Error())
}
if r != tt.expected {
t.Fatalf(`test %d received wrong result "%s" got: %s exp: %s`, i, tt.stmt, r, tt.expected)
}
}
// Kill the leader and wait for the new leader.
leader.Deprovision()
c.RemoveNode(leader)
leader, err = c.WaitForNewLeader(leader)
if err != nil {
t.Fatalf("failed to find new cluster leader after killing leader: %s", err.Error())
}
// Run queries against the now 2-node cluster.
tests = []struct {
stmt string
expected string
execute bool
}{
{
stmt: `CREATE TABLE foo (id integer not null primary key, name text)`,
expected: `{"results":[{"error":"table foo already exists"}]}`,
execute: true,
},
{
stmt: `INSERT INTO foo(name) VALUES("sinead")`,
expected: `{"results":[{"last_insert_id":2,"rows_affected":1}]}`,
execute: true,
},
{
stmt: `SELECT * FROM foo`,
expected: `{"results":[{"columns":["id","name"],"types":["integer","text"],"values":[[1,"fiona"],[2,"sinead"]]}]}`,
execute: false,
},
}
for i, tt := range tests {
var r string
var err error
if tt.execute {
r, err = leader.Execute(tt.stmt)
} else {
r, err = leader.Query(tt.stmt)
}
if err != nil {
t.Fatalf(`test %d failed "%s": %s`, i, tt.stmt, err.Error())
}
if r != tt.expected {
t.Fatalf(`test %d received wrong result "%s" got: %s exp: %s`, i, tt.stmt, r, tt.expected)
}
}
}
// Test_MultiNodeClusterBootstrapLaterJoin tests formation of a 3-node cluster and
// then checking a 4th node can join later with the bootstap parameters.
func Test_MultiNodeClusterBootstrapLaterJoin(t *testing.T) {
node1 := mustNewNode(false)
node1.Store.BootstrapExpect = 3
defer node1.Deprovision()
node2 := mustNewNode(false)
node2.Store.BootstrapExpect = 3
defer node2.Deprovision()
node3 := mustNewNode(false)
node3.Store.BootstrapExpect = 3
defer node3.Deprovision()
provider := cluster.NewAddressProviderString(
[]string{node1.APIAddr, node2.APIAddr, node3.APIAddr})
node1Bs := cluster.NewBootstrapper(provider, 3, nil)
node1Bs.Interval = time.Second
node2Bs := cluster.NewBootstrapper(provider, 3, nil)
node2Bs.Interval = time.Second
node3Bs := cluster.NewBootstrapper(provider, 3, nil)
node3Bs.Interval = time.Second
// Have all nodes start a bootstrap basically in parallel,
// ensure only 1 leader actually gets elected.
var wg sync.WaitGroup
wg.Add(3)
go func() {
done := func() bool {
addr, _ := node1.Store.LeaderAddr()
return addr != ""
}
node1Bs.Boot(node1.ID, node1.RaftAddr, done, 10*time.Second)
wg.Done()
}()
go func() {
done := func() bool {
addr, _ := node2.Store.LeaderAddr()
return addr != ""
}
node2Bs.Boot(node2.ID, node2.RaftAddr, done, 10*time.Second)
wg.Done()
}()
go func() {
done := func() bool {
addr, _ := node3.Store.LeaderAddr()
return addr != ""
}
node3Bs.Boot(node3.ID, node3.RaftAddr, done, 10*time.Second)
wg.Done()
}()
wg.Wait()
// Check leaders
node1Leader, err := node1.WaitForLeader()
if err != nil {
t.Fatalf("failed waiting for a leader: %s", err.Error())
}
node2Leader, err := node2.WaitForLeader()
if err != nil {
t.Fatalf("failed waiting for a leader: %s", err.Error())
}
node3Leader, err := node3.WaitForLeader()
if err != nil {
t.Fatalf("failed waiting for a leader: %s", err.Error())
}
if got, exp := node2Leader, node1Leader; got != exp {
t.Fatalf("leader mismatch between node 1 and node 2, got %s, exp %s", got, exp)
}
if got, exp := node3Leader, node1Leader; got != exp {
t.Fatalf("leader mismatch between node 1 and node 3, got %s, exp %s", got, exp)
}
// Ensure a 4th node can join cluster with exactly same launch
// params. Under the cover it should just do a join.
node4 := mustNewNode(false)
node4.Store.BootstrapExpect = 3
defer node3.Deprovision()
node4Bs := cluster.NewBootstrapper(provider, 3, nil)
node4Bs.Interval = time.Second
done := func() bool {
addr, _ := node4.Store.LeaderAddr()
return addr != ""
}
if err := node4Bs.Boot(node4.ID, node4.RaftAddr, done, 10*time.Second); err != nil {
t.Fatalf("node 4 failed to boot")
}
node4Leader, err := node4.WaitForLeader()
if err != nil {
t.Fatalf("failed waiting for a leader: %s", err.Error())
}
if got, exp := node4Leader, node1Leader; got != exp {
t.Fatalf("leader mismatch between node 4 and node 1, got %s, exp %s", got, exp)
}
}
// Test_MultiNodeClusterBootstrapLaterJoinHTTPS tests formation of a 3-node cluster which
// uses HTTP and TLS,then checking a 4th node can join later with the bootstap parameters.
func Test_MultiNodeClusterBootstrapLaterJoinHTTPS(t *testing.T) {
node1 := mustNewNodeEncrypted(false, true, true)
node1.Store.BootstrapExpect = 3
defer node1.Deprovision()
node2 := mustNewNodeEncrypted(false, true, true)
node2.Store.BootstrapExpect = 3
defer node2.Deprovision()
node3 := mustNewNodeEncrypted(false, true, true)
node3.Store.BootstrapExpect = 3
defer node3.Deprovision()
provider := cluster.NewAddressProviderString(
[]string{node1.APIAddr, node2.APIAddr, node3.APIAddr})
node1Bs := cluster.NewBootstrapper(provider, 3, nil)
node1Bs.Interval = time.Second
node2Bs := cluster.NewBootstrapper(provider, 3, nil)
node2Bs.Interval = time.Second
node3Bs := cluster.NewBootstrapper(provider, 3, nil)
node3Bs.Interval = time.Second
// Have all nodes start a bootstrap basically in parallel,
// ensure only 1 leader actually gets elected.
var wg sync.WaitGroup
wg.Add(3)
go func() {
done := func() bool {
addr, _ := node1.Store.LeaderAddr()
return addr != ""
}
node1Bs.Boot(node1.ID, node1.RaftAddr, done, 10*time.Second)
wg.Done()
}()
go func() {
done := func() bool {
addr, _ := node2.Store.LeaderAddr()
return addr != ""
}
node2Bs.Boot(node2.ID, node2.RaftAddr, done, 10*time.Second)
wg.Done()
}()
go func() {
done := func() bool {
addr, _ := node3.Store.LeaderAddr()
return addr != ""
}
node3Bs.Boot(node3.ID, node3.RaftAddr, done, 10*time.Second)
wg.Done()
}()
wg.Wait()
// Check leaders
node1Leader, err := node1.WaitForLeader()
if err != nil {
t.Fatalf("failed waiting for a leader: %s", err.Error())
}
node2Leader, err := node2.WaitForLeader()
if err != nil {
t.Fatalf("failed waiting for a leader: %s", err.Error())
}
node3Leader, err := node3.WaitForLeader()
if err != nil {
t.Fatalf("failed waiting for a leader: %s", err.Error())
}
if got, exp := node2Leader, node1Leader; got != exp {
t.Fatalf("leader mismatch between node 1 and node 2, got %s, exp %s", got, exp)
}
if got, exp := node3Leader, node1Leader; got != exp {
t.Fatalf("leader mismatch between node 1 and node 3, got %s, exp %s", got, exp)
}
// Ensure a 4th node can join cluster with exactly same launch
// params. Under the cover it should just do a join.
node4 := mustNewNodeEncrypted(false, true, true)
node4.Store.BootstrapExpect = 3
defer node3.Deprovision()
node4Bs := cluster.NewBootstrapper(provider, 3, nil)
node4Bs.Interval = time.Second
done := func() bool {
addr, _ := node4.Store.LeaderAddr()
return addr != ""
}
if err := node4Bs.Boot(node4.ID, node4.RaftAddr, done, 10*time.Second); err != nil {
t.Fatalf("node 4 failed to boot")
}
node4Leader, err := node4.WaitForLeader()
if err != nil {
t.Fatalf("failed waiting for a leader: %s", err.Error())
}
if got, exp := node4Leader, node1Leader; got != exp {
t.Fatalf("leader mismatch between node 4 and node 1, got %s, exp %s", got, exp)
}
}
// Test_MultiNodeClusterRaftAdv tests 3-node cluster with advertised Raft addresses usage.
func Test_MultiNodeClusterRaftAdv(t *testing.T) {
ln1 := mustTCPListener("0.0.0.0:0")

@ -43,6 +43,7 @@ def write_random_file(data):
class Node(object):
def __init__(self, path, node_id,
api_addr=None, api_adv=None,
boostrap_expect=0,
raft_addr=None, raft_adv=None,
raft_voter=True,
raft_snap_threshold=8192, raft_snap_int="1s",
@ -66,6 +67,7 @@ class Node(object):
self.node_id = node_id
self.api_addr = api_addr
self.api_adv = api_adv
self.boostrap_expect = boostrap_expect
self.raft_addr = raft_addr
self.raft_adv = raft_adv
self.raft_voter = raft_voter
@ -117,6 +119,7 @@ class Node(object):
command = [self.path,
'-node-id', self.node_id,
'-http-addr', self.api_addr,
'-bootstrap-expect', str(self.boostrap_expect),
'-raft-addr', self.raft_addr,
'-raft-snap', str(self.raft_snap_threshold),
'-raft-snap-int', self.raft_snap_int,
@ -130,7 +133,9 @@ class Node(object):
if self.auth is not None:
command += ['-auth', self.auth]
if join is not None:
command += ['-join', 'http://' + join]
if join.startswith('http://') is False:
join = 'http://' + join
command += ['-join', join]
if join_as is not None:
command += ['-join-as', join_as]
if join_attempts is not None:
@ -708,6 +713,25 @@ class TestEndToEndAdvAddr(TestEndToEnd):
self.cluster = Cluster([n0, n1, n2])
class TestBootstrapping(unittest.TestCase):
'''Test simple bootstrapping works via -bootstrap-expect'''
def test(self):
n0 = Node(RQLITED_PATH, '0', boostrap_expect=2)
n1 = Node(RQLITED_PATH, '1', boostrap_expect=2)
n0.start(join=','.join([n0.APIProtoAddr(), n1.APIProtoAddr()]))
n1.start(join=','.join([n0.APIProtoAddr(), n1.APIProtoAddr()]))
self.assertEqual(n0.wait_for_leader(), n1.wait_for_leader())
# Ensure a third node can join later, with same launch params.
n2 = Node(RQLITED_PATH, '1', boostrap_expect=2)
n2.start(join=','.join([n0.APIProtoAddr(), n1.APIProtoAddr()]))
self.assertEqual(n2.wait_for_leader(), n1.wait_for_leader())
deprovision_node(n0)
deprovision_node(n1)
deprovision_node(n2)
class TestAutoClustering(unittest.TestCase):
DiscoModeConsulKV = "consul-kv"
DiscoModeEtcdKV = "etcd-kv"
@ -1186,8 +1210,10 @@ class TestEndToEndSnapRestoreCluster(unittest.TestCase):
self.n2.start()
self.n2.wait_for_leader()
# Force the Apply loop to run on the node, so fsm_index is updated.
# Force the Apply loop to run on the node twice, so fsm_index is updated on n2 to a point
# where the SQLite database on n2 is updated.
self.n0.execute('INSERT INTO foo(name) VALUES("fiona")')
self.n2.query('SELECT count(*) FROM foo', level='strong')
self.n2.wait_for_fsm_index(self.n0.fsm_index())
j = self.n2.query('SELECT count(*) FROM foo', level='none')

@ -2,6 +2,8 @@ package system
import (
"bytes"
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"io"
@ -19,7 +21,7 @@ import (
httpd "github.com/rqlite/rqlite/http"
"github.com/rqlite/rqlite/store"
"github.com/rqlite/rqlite/tcp"
"github.com/rqlite/rqlite/testdata/x509"
rX509 "github.com/rqlite/rqlite/testdata/x509"
)
const (
@ -37,6 +39,7 @@ type Node struct {
NodeKeyPath string
HTTPCertPath string
HTTPKeyPath string
TLSConfig *tls.Config
PeersPath string
Store *store.Store
Service *httpd.Service
@ -186,6 +189,19 @@ func (n *Node) JoinAsNonVoter(leader *Node) error {
return nil
}
// Notify notifies this node of the existence of another node
func (n *Node) Notify(id, raftAddr string) error {
resp, err := DoNotify(n.APIAddr, id, raftAddr)
if err != nil {
return err
}
if resp.StatusCode != 200 {
return fmt.Errorf("failed to notify node: %s", resp.Status)
}
defer resp.Body.Close()
return nil
}
// NodesStatus is the Go type /nodes endpoint response is marshaled into.
type NodesStatus map[string]struct {
APIAddr string `json:"api_addr,omitempty"`
@ -459,6 +475,21 @@ func DoJoinRequest(nodeAddr, raftID, raftAddr string, voter bool) (*http.Respons
return resp, nil
}
// DoNotify notifies the node at nodeAddr about node with ID, and Raft address of raftAddr.
func DoNotify(nodeAddr, id, raftAddr string) (*http.Response, error) {
b, err := json.Marshal(map[string]interface{}{"id": id, "addr": raftAddr})
if err != nil {
return nil, err
}
resp, err := http.Post("http://"+nodeAddr+"/notify", "application/json", bytes.NewReader(b))
if err != nil {
return nil, err
}
return resp, nil
}
func mustNewNode(enableSingle bool) *Node {
return mustNewNodeEncrypted(enableSingle, false, false)
}
@ -467,7 +498,7 @@ func mustNewNodeEncrypted(enableSingle, httpEncrypt, nodeEncrypt bool) *Node {
dir := mustTempDir()
var mux *tcp.Mux
if nodeEncrypt {
mux = mustNewOpenTLSMux(x509.CertFile(dir), x509.KeyFile(dir), "")
mux = mustNewOpenTLSMux(rX509.CertFile(dir), rX509.KeyFile(dir), "")
} else {
mux, _ = mustNewOpenMux("")
}
@ -481,8 +512,8 @@ func mustNodeEncrypted(dir string, enableSingle, httpEncrypt bool, mux *tcp.Mux,
}
func mustNodeEncryptedOnDisk(dir string, enableSingle, httpEncrypt bool, mux *tcp.Mux, nodeID string, onDisk bool) *Node {
nodeCertPath := x509.CertFile(dir)
nodeKeyPath := x509.KeyFile(dir)
nodeCertPath := rX509.CertFile(dir)
nodeKeyPath := rX509.KeyFile(dir)
httpCertPath := nodeCertPath
httpKeyPath := nodeKeyPath
@ -492,6 +523,7 @@ func mustNodeEncryptedOnDisk(dir string, enableSingle, httpEncrypt bool, mux *tc
NodeKeyPath: nodeKeyPath,
HTTPCertPath: httpCertPath,
HTTPKeyPath: httpKeyPath,
TLSConfig: mustCreateTLSConfig(nodeCertPath, nodeKeyPath, ""),
PeersPath: filepath.Join(dir, "raft/peers.json"),
}
@ -661,6 +693,34 @@ func mustWriteFile(path, contents string) {
}
}
// mustCreateTLSConfig returns a TLS config from the given cert, key and optionally
// Certificate Authority cert. Config doesn't verify certs.
func mustCreateTLSConfig(certFile, keyFile, caCertFile string) *tls.Config {
var err error
config := &tls.Config{
InsecureSkipVerify: true,
}
config.Certificates = make([]tls.Certificate, 1)
config.Certificates[0], err = tls.LoadX509KeyPair(certFile, keyFile)
if err != nil {
panic(err.Error())
}
if caCertFile != "" {
asn1Data, err := ioutil.ReadFile(caCertFile)
if err != nil {
panic(err.Error())
}
config.RootCAs = x509.NewCertPool()
ok := config.RootCAs.AppendCertsFromPEM(asn1Data)
if !ok {
panic(err.Error())
}
}
return config
}
/* MIT License
*
* Copyright (c) 2017 Roland Singer [roland.singer@desertbit.com]

Loading…
Cancel
Save