1
0
Fork 0

Start removing HTTP join and notify code

master
Philip O'Toole 11 months ago
parent 5de1c17d8c
commit 16052d8020

@ -18,10 +18,6 @@ const (
// PermAll means all actions permitted.
PermAll = "all"
// PermJoin means user is permitted to join cluster.
PermJoin = "join"
// PermJoinReadOnly means user is permitted to join the cluster only as a read-only node
PermJoinReadOnly = "join-read-only"
// PermRemove means user is permitted to remove a node.
PermRemove = "remove"
// PermExecute means user can access execute endpoint.

@ -69,9 +69,6 @@ type Bootstrapper struct {
joiner *Joiner
username string
password string
logger *log.Logger
Interval time.Duration
@ -91,11 +88,6 @@ func NewBootstrapper(p AddressProvider, tlsConfig *tls.Config) *Bootstrapper {
return bs
}
// SetBasicAuth sets Basic Auth credentials for any bootstrap attempt.
func (b *Bootstrapper) SetBasicAuth(username, password string) {
b.username, b.password = username, password
}
// Boot performs the bootstrapping process for this node. This means it will
// ensure this node becomes part of a cluster. It does this by either joining
// an existing cluster by explicitly joining it through one of these nodes,
@ -140,7 +132,6 @@ func (b *Bootstrapper) Boot(id, raftAddr string, done func() bool, timeout time.
// Try an explicit join first. Joining an existing cluster is always given priority
// over trying to form a new cluster.
b.joiner.SetBasicAuth(b.username, b.password)
if j, err := b.joiner.Do(targets, id, raftAddr, true); err == nil {
b.logger.Printf("succeeded directly joining cluster via node at %s", j)
b.setBootStatus(BootJoin)
@ -197,9 +188,6 @@ func (b *Bootstrapper) notify(targets []string, id, raftAddr string) error {
if err != nil {
return err
}
if b.username != "" && b.password != "" {
req.SetBasicAuth(b.username, b.password)
}
req.Header.Add("Content-Type", "application/json")
resp, err := client.Do(req)

@ -220,48 +220,6 @@ func Test_BootstrapperBootSingleNotifyHTTPS(t *testing.T) {
}
}
func Test_BootstrapperBootSingleNotifyAuth(t *testing.T) {
tsNotified := false
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
username, password, ok := r.BasicAuth()
if !ok {
t.Fatalf("request did not have Basic Auth credentials")
}
if username != "username1" || password != "password1" {
t.Fatalf("bad Basic Auth credentials received (%s, %s", username, password)
}
if r.URL.Path == "/join" {
w.WriteHeader(http.StatusServiceUnavailable)
return
}
tsNotified = true
}))
n := -1
done := func() bool {
n++
return n == 5
}
p := NewAddressProviderString([]string{ts.URL})
bs := NewBootstrapper(p, nil)
bs.SetBasicAuth("username1", "password1")
bs.Interval = time.Second
err := bs.Boot("node1", "192.168.1.1:1234", done, 60*time.Second)
if err != nil {
t.Fatalf("failed to boot: %s", err)
}
if tsNotified != true {
t.Fatalf("notify target not contacted")
}
if exp, got := BootDone, bs.Status(); exp != got {
t.Fatalf("wrong status, exp %s, got %s", exp, got)
}
}
func Test_BootstrapperBootMultiNotify(t *testing.T) {
ts1Join := false
ts1Notified := false

@ -7,7 +7,6 @@ import (
"fmt"
"io"
"net"
"net/url"
"os"
"path/filepath"
"reflect"
@ -101,13 +100,8 @@ type Config struct {
// JoinSrcIP sets the source IP address during Join request. May not be set.
JoinSrcIP string
// JoinAddr is the list addresses to use for a join attempt. Each address
// will include the proto (HTTP or HTTPS) and will never include the node's
// own HTTP server address. May not be set.
JoinAddr string
// JoinAs sets the user join attempts should be performed as. May not be set.
JoinAs string
// JoinAddrs is the list of Raft addresses to use for a join attempt.
JoinAddrs string
// JoinAttempts is the number of times a node should attempt to join using a
// given address.
@ -286,15 +280,15 @@ func (c *Config) Validate() error {
}
// Join parameters OK?
if c.JoinAddr != "" {
addrs := strings.Split(c.JoinAddr, ",")
if c.JoinAddrs != "" {
addrs := strings.Split(c.JoinAddrs, ",")
for i := range addrs {
u, err := url.Parse(addrs[i])
if err != nil {
if _, _, err := net.SplitHostPort(addrs[i]); err != nil {
return fmt.Errorf("%s is an invalid join adddress", addrs[i])
}
if c.BootstrapExpect == 0 {
if u.Host == c.HTTPAdv || addrs[i] == c.HTTPAddr {
if addrs[i] == c.RaftAdv || addrs[i] == c.RaftAddr {
return errors.New("node cannot join with itself unless bootstrapping")
}
if c.AutoRestoreFile != "" {
@ -329,10 +323,10 @@ func (c *Config) Validate() error {
// JoinAddresses returns the join addresses set at the command line. Returns nil
// if no join addresses were set.
func (c *Config) JoinAddresses() []string {
if c.JoinAddr == "" {
if c.JoinAddrs == "" {
return nil
}
return strings.Split(c.JoinAddr, ",")
return strings.Split(c.JoinAddrs, ",")
}
// HTTPURL returns the fully-formed, advertised HTTP API address for this config, including
@ -427,8 +421,7 @@ func ParseFlags(name, desc string, build *BuildInfo) (*Config, error) {
flag.StringVar(&config.RaftAddr, RaftAddrFlag, "localhost:4002", "Raft communication bind address")
flag.StringVar(&config.RaftAdv, RaftAdvAddrFlag, "", "Advertised Raft communication address. If not set, same as Raft bind address")
flag.StringVar(&config.JoinSrcIP, "join-source-ip", "", "Set source IP address during HTTP Join request")
flag.StringVar(&config.JoinAddr, "join", "", "Comma-delimited list of nodes, through which a cluster can be joined (proto://host:port)")
flag.StringVar(&config.JoinAs, "join-as", "", "Username in authentication file to join as. If not set, joins anonymously")
flag.StringVar(&config.JoinAddrs, "join", "", "Comma-delimited list of nodes, through which a cluster can be joined (proto://host:port)")
flag.IntVar(&config.JoinAttempts, "join-attempts", 5, "Number of join attempts to make")
flag.DurationVar(&config.JoinInterval, "join-interval", 3*time.Second, "Period between join attempts")
flag.IntVar(&config.BootstrapExpect, "bootstrap-expect", 0, "Minimum number of nodes required for a bootstrap")

@ -431,15 +431,7 @@ func createJoiner(cfg *Config, credStr *auth.CredentialsStore) (*cluster.Joiner,
if err != nil {
return nil, err
}
joiner := cluster.NewJoiner(cfg.JoinSrcIP, cfg.JoinAttempts, cfg.JoinInterval, tlsConfig)
if cfg.JoinAs != "" {
pw, ok := credStr.Password(cfg.JoinAs)
if !ok {
return nil, fmt.Errorf("user %s does not exist in credential store", cfg.JoinAs)
}
joiner.SetBasicAuth(cfg.JoinAs, pw)
}
return joiner, nil
return cluster.NewJoiner(cfg.JoinSrcIP, cfg.JoinAttempts, cfg.JoinInterval, tlsConfig), nil
}
func clusterService(cfg *Config, tn cluster.Transport, db cluster.Database, mgr cluster.Manager, credStr *auth.CredentialsStore) (*cluster.Service, error) {
@ -509,13 +501,6 @@ func createCluster(cfg *Config, hasPeers bool, joiner *cluster.Joiner, str *stor
if joins != nil && cfg.BootstrapExpect > 0 {
// Bootstrap with explicit join addresses requests.
bs := cluster.NewBootstrapper(cluster.NewAddressProviderString(joins), tlsConfig)
if cfg.JoinAs != "" {
pw, ok := credStr.Password(cfg.JoinAs)
if !ok {
return fmt.Errorf("user %s does not exist in credential store", cfg.JoinAs)
}
bs.SetBasicAuth(cfg.JoinAs, pw)
}
return bs.Boot(str.ID(), cfg.RaftAdv, isClustered, cfg.BootstrapExpectTimeout)
}
@ -558,13 +543,6 @@ func createCluster(cfg *Config, hasPeers bool, joiner *cluster.Joiner, str *stor
}
bs := cluster.NewBootstrapper(provider, tlsConfig)
if cfg.JoinAs != "" {
pw, ok := credStr.Password(cfg.JoinAs)
if !ok {
return fmt.Errorf("user %s does not exist in credential store", cfg.JoinAs)
}
bs.SetBasicAuth(cfg.JoinAs, pw)
}
httpServ.RegisterStatus("disco", provider)
return bs.Boot(str.ID(), cfg.RaftAdv, isClustered, cfg.BootstrapExpectTimeout)

@ -1,5 +1,4 @@
// Package http provides the HTTP server for accessing the distributed database.
// It also provides the endpoint for other nodes to join an existing cluster.
package http
import (
@ -73,12 +72,6 @@ type Database interface {
type Store interface {
Database
// Join joins the node with the given ID, reachable at addr, to this node.
Join(jr *command.JoinRequest) error
// Notify notifies this node that a node is available at addr.
Notify(nr *command.NotifyRequest) error
// Remove removes the node from the cluster.
Remove(rn *command.RemoveNodeRequest) error
@ -226,8 +219,6 @@ const (
numStatus = "num_status"
numBackups = "backups"
numLoad = "loads"
numJoins = "joins"
numNotifies = "notifies"
numAuthOK = "authOK"
numAuthFail = "authFail"
@ -281,8 +272,6 @@ func ResetStats() {
stats.Add(numStatus, 0)
stats.Add(numBackups, 0)
stats.Add(numLoad, 0)
stats.Add(numJoins, 0)
stats.Add(numNotifies, 0)
stats.Add(numAuthOK, 0)
stats.Add(numAuthFail, 0)
}
@ -444,12 +433,6 @@ func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
case strings.HasPrefix(r.URL.Path, "/db/load"):
stats.Add(numLoad, 1)
s.handleLoad(w, r)
case strings.HasPrefix(r.URL.Path, "/join"):
stats.Add(numJoins, 1)
s.handleJoin(w, r)
case strings.HasPrefix(r.URL.Path, "/notify"):
stats.Add(numNotifies, 1)
s.handleNotify(w, r)
case strings.HasPrefix(r.URL.Path, "/remove"):
s.handleRemove(w, r)
case strings.HasPrefix(r.URL.Path, "/status"):
@ -482,147 +465,6 @@ func (s *Service) RegisterStatus(key string, stat StatusReporter) error {
return nil
}
// handleJoin handles cluster-join requests from other nodes.
func (s *Service) handleJoin(w http.ResponseWriter, r *http.Request) {
if !s.CheckRequestPerm(r, auth.PermJoin) && !s.CheckRequestPerm(r, auth.PermJoinReadOnly) {
w.WriteHeader(http.StatusUnauthorized)
return
}
if r.Method != "POST" {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
b, err := io.ReadAll(r.Body)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
md := map[string]interface{}{}
if err := json.Unmarshal(b, &md); err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
rID, ok := md["id"]
if !ok {
w.WriteHeader(http.StatusBadRequest)
return
}
rAddr, ok := md["addr"]
if !ok {
w.WriteHeader(http.StatusBadRequest)
return
}
voter, ok := md["voter"]
if !ok {
voter = true
}
if voter.(bool) && !s.CheckRequestPerm(r, auth.PermJoin) {
http.Error(w, "joining as voter not allowed", http.StatusUnauthorized)
return
}
remoteID, remoteAddr := rID.(string), rAddr.(string)
s.logger.Printf("received join request from node with ID %s at %s",
remoteID, remoteAddr)
// Confirm that this node can resolve the remote address. This can happen due
// to incomplete DNS records across the underlying infrastructure. If it can't
// then don't consider this join attempt successful -- so the joining node
// will presumably try again.
if addr, err := resolvableAddress(remoteAddr); err != nil {
s.logger.Printf("failed to resolve %s (%s) while handling join request", addr, err)
http.Error(w, fmt.Sprintf("can't resolve %s (%s)", addr, err.Error()),
http.StatusServiceUnavailable)
return
}
jr := &command.JoinRequest{
Id: remoteID,
Address: remoteAddr,
Voter: voter.(bool),
}
if err := s.store.Join(jr); err != nil {
if err == store.ErrNotLeader {
leaderAPIAddr := s.LeaderAPIAddr()
if leaderAPIAddr == "" {
stats.Add(numLeaderNotFound, 1)
http.Error(w, ErrLeaderNotFound.Error(), http.StatusServiceUnavailable)
return
}
redirect := s.FormRedirect(r, leaderAPIAddr)
http.Redirect(w, r, redirect, http.StatusMovedPermanently)
return
}
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
// handleNotify handles node-notify requests from other nodes.
func (s *Service) handleNotify(w http.ResponseWriter, r *http.Request) {
if !s.CheckRequestPerm(r, auth.PermJoin) {
w.WriteHeader(http.StatusUnauthorized)
return
}
if r.Method != "POST" {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
b, err := io.ReadAll(r.Body)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
md := map[string]interface{}{}
if err := json.Unmarshal(b, &md); err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
rID, ok := md["id"]
if !ok {
w.WriteHeader(http.StatusBadRequest)
return
}
rAddr, ok := md["addr"]
if !ok {
w.WriteHeader(http.StatusBadRequest)
return
}
remoteID, remoteAddr := rID.(string), rAddr.(string)
s.logger.Printf("received notify request from node with ID %s at %s",
remoteID, remoteAddr)
// Confirm that this node can resolve the remote address. This can happen due
// to incomplete DNS records across the underlying infrastructure. If it can't
// then don't consider this notify attempt successful -- so the notifying node
// will presumably try again.
if addr, err := resolvableAddress(remoteAddr); err != nil {
s.logger.Printf("failed to resolve %s (%s) while handling notify request", addr, err)
http.Error(w, fmt.Sprintf("can't resolve %s (%s)", addr, err.Error()),
http.StatusServiceUnavailable)
return
}
if err := s.store.Notify(&command.NotifyRequest{
Id: remoteID,
Address: remoteAddr,
}); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
}
// handleRemove handles cluster-remove requests.
func (s *Service) handleRemove(w http.ResponseWriter, r *http.Request) {
if !s.CheckRequestPerm(r, auth.PermRemove) {

Loading…
Cancel
Save