1
0
Fork 0

Start supporting /nodes endpoint

master
Philip O'Toole 3 years ago
parent 7280ff26b8
commit 805074f8f5

@ -17,6 +17,7 @@ import (
"net/http/pprof"
"os"
"runtime"
"strconv"
"strings"
"sync"
"time"
@ -61,6 +62,9 @@ type Store interface {
// Stats returns stats on the Store.
Stats() (map[string]interface{}, error)
// Nodes returns the slice of store.Servers in the cluster
Nodes() ([]*store.Server, error)
// Backup wites backup of the node state to dst
Backup(leader bool, f store.BackupFormat, dst io.Writer) error
}
@ -273,6 +277,8 @@ func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
s.handleRemove(w, r)
case strings.HasPrefix(r.URL.Path, "/status"):
s.handleStatus(w, r)
case strings.HasPrefix(r.URL.Path, "/nodes"):
s.handleNodes(w, r)
case r.URL.Path == "/debug/vars" && s.Expvar:
s.handleExpvar(w, r)
case strings.HasPrefix(r.URL.Path, "/debug/pprof") && s.Pprof:
@ -595,6 +601,70 @@ func (s *Service) handleStatus(w http.ResponseWriter, r *http.Request) {
}
}
// handleNodes returns status on the other nodes in the system. This
// attempts to contact all the nodes in the cluster, so may take some
// time to return.
func (s *Service) handleNodes(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json; charset=utf-8")
if !s.CheckRequestPerm(r, PermStatus) {
w.WriteHeader(http.StatusUnauthorized)
return
}
if r.Method != "GET" {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
_, err := timeout(r, time.Duration(1))
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
nodes, err := s.store.Nodes()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
// Attempt contact with each node.
//var wg sync.WaitGroup
online := make(map[string]bool)
for _, n := range nodes {
_, err := s.cluster.GetNodeAPIAddr(n.Addr)
if err != nil {
online[n.ID] = false
}
}
resp := make([]map[string]string, len(nodes))
for i, n := range nodes {
resp[i] = make(map[string]string)
resp[i]["id"] = n.ID
resp[i]["addr"] = n.Addr
resp[i]["reachable"] = strconv.FormatBool(online[n.ID])
}
pretty, _ := isPretty(r)
var b []byte
if pretty {
b, err = json.MarshalIndent(resp, "", " ")
} else {
b, err = json.Marshal(resp)
}
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
} else {
_, err = w.Write([]byte(b))
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
}
// handleExecute handles queries that modify the database.
func (s *Service) handleExecute(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json; charset=utf-8")
@ -979,6 +1049,21 @@ func timings(req *http.Request) (bool, error) {
return queryParam(req, "timings")
}
// timeout returns the timeout included in the query, or the given default
func timeout(req *http.Request, d time.Duration) (time.Duration, error) {
q := req.URL.Query()
tStr := q.Get("timeout")
if tStr == "" {
return d, nil
}
t, err := time.ParseDuration(tStr)
if err != nil {
return d, nil
}
return t, nil
}
// level returns the requested consistency level for a query
func level(req *http.Request) (command.QueryRequest_Level, error) {
q := req.URL.Query()

@ -6,7 +6,9 @@ import (
"io"
"io/ioutil"
"net/http"
"net/url"
"testing"
"time"
"github.com/rqlite/rqlite/command"
sql "github.com/rqlite/rqlite/db"
@ -633,6 +635,49 @@ func Test_TLSServce(t *testing.T) {
}
}
func Test_timeoutQueryParam(t *testing.T) {
var req http.Request
defStr := "10s"
def := mustParseDuration(defStr)
tests := []struct {
u string
dur string
}{
{
u: "http://localhost:4001/nodes?timeout=5s",
dur: "5s",
},
{
u: "http://localhost:4001/nodes?timeout=2m",
dur: "2m",
},
{
u: "http://localhost:4001/nodes?x=777&timeout=5s",
dur: "5s",
},
{
u: "http://localhost:4001/nodes",
dur: defStr,
},
{
u: "http://localhost:4001/nodes?timeout=zdfjkh",
dur: defStr,
},
}
for _, tt := range tests {
req.URL = mustURLParse(tt.u)
timeout, err := timeout(&req, def)
if err != nil {
t.Fatalf("failed to get timeout: %s", err)
}
if timeout != mustParseDuration(tt.dur) {
t.Fatalf("got wrong timeout, expected %s, got %s", mustParseDuration(tt.dur), timeout)
}
}
}
type MockStore struct {
executeFn func(queries []string, tx bool) ([]*sql.Result, error)
queryFn func(queries []string, tx, leader, verify bool) ([]*sql.Rows, error)
@ -674,6 +719,10 @@ func (m *MockStore) Stats() (map[string]interface{}, error) {
return nil, nil
}
func (m *MockStore) Nodes() ([]*store.Server, error) {
return nil, nil
}
func (m *MockStore) Backup(leader bool, f store.BackupFormat, w io.Writer) error {
if m.backupFn == nil {
return nil
@ -733,3 +782,19 @@ func mustTempDir() string {
}
return path
}
func mustURLParse(s string) *url.URL {
u, err := url.Parse(s)
if err != nil {
panic("failed to URL parse string")
}
return u
}
func mustParseDuration(d string) time.Duration {
if dur, err := time.ParseDuration(d); err != nil {
panic("failed to parse duration")
} else {
return dur
}
}

Loading…
Cancel
Save