diff --git a/CHANGELOG.md b/CHANGELOG.md index 84772319..30f54fba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ When officially released 8.0 will support (mostly) seamless upgrades from the 7. - [PR #1418](https://github.com/rqlite/rqlite/pull/1418): Add basic CORS support. Fixes [issue #687](https://github.com/rqlite/rqlite/issues/687). Thanks @kkoreilly - [PR #1422](https://github.com/rqlite/rqlite/pull/1422): Add mTLS support to rqlite CLI. Fixes [issue #1421](https://github.com/rqlite/rqlite/issues/1421) - [PR #1427](https://github.com/rqlite/rqlite/pull/1427): Upgrade to SQLite 3.44.0. +- [PR #1433](https://github.com/rqlite/rqlite/pull/1433): Support an optional better form for the `nodes/` output. Fixes [issue #1415](https://github.com/rqlite/rqlite/issues/1415) ### Implementation changes and bug fixes - [PR #1368](https://github.com/rqlite/rqlite/pull/1374): Switch to always-on expvar and pprof. diff --git a/http/nodes.go b/http/nodes.go new file mode 100644 index 00000000..f41e2e0b --- /dev/null +++ b/http/nodes.go @@ -0,0 +1,210 @@ +package http + +import ( + "bytes" + "encoding/json" + "io" + "sort" + "sync" + "time" + + "github.com/rqlite/rqlite/store" +) + +// Node represents a single node in the cluster and can include +// information about the node's reachability and leadership status. +// If there was an error communicating with the node, the Error +// field will be populated. +type Node struct { + ID string `json:"id,omitempty"` + APIAddr string `json:"api_addr,omitempty"` + Addr string `json:"addr,omitempty"` + Voter bool `json:"voter"` + Reachable bool `json:"reachable"` + Leader bool `json:"leader"` + Time float64 `json:"time,omitempty"` + Error string `json:"error,omitempty"` +} + +// NewNodeFromServer creates a Node from a Server. +func NewNodeFromServer(s *store.Server) *Node { + return &Node{ + ID: s.ID, + Addr: s.Addr, + Voter: s.Suffrage == "Voter", + } +} + +// Test tests the node's reachability and leadership status. If an error +// occurs, the Error field will be populated. +func (n *Node) Test(ga GetAddresser, leaderAddr string, timeout time.Duration) { + start := time.Now() + apiAddr, err := ga.GetNodeAPIAddr(n.Addr, timeout) + if err != nil { + n.Error = err.Error() + n.Reachable = false + return + } + n.Time = time.Since(start).Seconds() + n.APIAddr = apiAddr + n.Reachable = true + n.Leader = n.Addr == leaderAddr +} + +type Nodes []*Node + +func (n Nodes) Len() int { return len(n) } +func (n Nodes) Swap(i, j int) { n[i], n[j] = n[j], n[i] } +func (n Nodes) Less(i, j int) bool { return n[i].ID < n[j].ID } + +// NewNodesFromServers creates a slice of Nodes from a slice of Servers. +func NewNodesFromServers(servers []*store.Server) Nodes { + nodes := make([]*Node, len(servers)) + for i, s := range servers { + nodes[i] = NewNodeFromServer(s) + } + sort.Sort(Nodes(nodes)) + return nodes +} + +// Voters returns a slice of Nodes that are voters. +func (n Nodes) Voters() Nodes { + v := make(Nodes, 0) + for _, node := range n { + if node.Voter { + v = append(v, node) + } + } + sort.Sort(v) + return v +} + +// HasAddr returns whether any node in the Nodes slice has the given Raft address. +func (n Nodes) HasAddr(addr string) bool { + for _, node := range n { + if node.Addr == addr { + return true + } + } + return false +} + +// GetNode returns the Node with the given ID, or nil if no such node exists. +func (n Nodes) GetNode(id string) *Node { + for _, node := range n { + if node.ID == id { + return node + } + } + return nil +} + +// Test tests the reachability and leadership status of all nodes. It does this +// in parallel, and blocks until all nodes have been tested. +func (n Nodes) Test(ga GetAddresser, leaderAddr string, timeout time.Duration) { + var wg sync.WaitGroup + for _, nn := range n { + wg.Add(1) + go func(nnn *Node) { + defer wg.Done() + nnn.Test(ga, leaderAddr, timeout) + }(nn) + } + wg.Wait() +} + +// NodesRespEncoder encodes Nodes into JSON with an option for legacy format. +type NodesRespEncoder struct { + writer io.Writer + legacy bool + prefix string + indent string +} + +// NewNodesRespEncoder creates a new NodesRespEncoder instance with the specified +// io.Writer and legacy flag. +func NewNodesRespEncoder(w io.Writer, legacy bool) *NodesRespEncoder { + return &NodesRespEncoder{ + writer: w, + legacy: legacy, + } +} + +// SetIndent sets the indentation format for the JSON output. +func (e *NodesRespEncoder) SetIndent(prefix, indent string) { + e.prefix = prefix + e.indent = indent +} + +// Encode takes a slice of Nodes and encodes it into JSON, +// writing the output to the Encoder's writer. +func (e *NodesRespEncoder) Encode(nodes Nodes) error { + var data []byte + var err error + + if e.legacy { + data, err = e.encodeLegacy(nodes) + } else { + data, err = e.encode(nodes) + } + + if err != nil { + return err + } + + if e.indent != "" { + var buf bytes.Buffer + err = json.Indent(&buf, data, e.prefix, e.indent) + if err != nil { + return err + } + data = buf.Bytes() + } + + _, err = e.writer.Write(data) + return err +} + +// encode encodes the nodes in the standard format. +func (e *NodesRespEncoder) encode(nodes Nodes) ([]byte, error) { + nodeOutput := &struct { + Nodes Nodes `json:"nodes"` + }{ + Nodes: nodes, + } + return json.Marshal(nodeOutput) +} + +// encodeLegacy encodes the nodes in the legacy format. +func (e *NodesRespEncoder) encodeLegacy(nodes Nodes) ([]byte, error) { + legacyOutput := make(map[string]*Node) + for _, node := range nodes { + legacyOutput[node.ID] = node + } + return json.Marshal(legacyOutput) +} + +// NodesRespDecoder decodes JSON data into a slice of Nodes. +type NodesRespDecoder struct { + reader io.Reader +} + +// NewNodesRespDecoder creates a new Decoder instance with the specified io.Reader. +func NewNodesRespDecoder(r io.Reader) *NodesRespDecoder { + return &NodesRespDecoder{reader: r} +} + +// Decode reads JSON from its reader and decodes it into the provided Nodes slice. +func (d *NodesRespDecoder) Decode(nodes *Nodes) error { + // Temporary structure to facilitate decoding. + var data struct { + Nodes Nodes `json:"nodes"` + } + + if err := json.NewDecoder(d.reader).Decode(&data); err != nil { + return err + } + + *nodes = data.Nodes + return nil +} diff --git a/http/nodes_test.go b/http/nodes_test.go new file mode 100644 index 00000000..a57f0abd --- /dev/null +++ b/http/nodes_test.go @@ -0,0 +1,269 @@ +package http + +import ( + "bytes" + "encoding/json" + "fmt" + "strings" + "testing" + "time" + + "github.com/rqlite/rqlite/store" +) + +func Test_NewNodeFromServer(t *testing.T) { + server := &store.Server{ID: "1", Addr: "192.168.1.1", Suffrage: "Voter"} + node := NewNodeFromServer(server) + + if node.ID != server.ID || node.Addr != server.Addr || !node.Voter { + t.Fatalf("NewNodeFromServer did not correctly initialize Node from Server") + } +} + +func Test_NewNodesFromServers(t *testing.T) { + servers := []*store.Server{ + {ID: "1", Addr: "192.168.1.1", Suffrage: "Voter"}, + {ID: "2", Addr: "192.168.1.2", Suffrage: "Nonvoter"}, + } + nodes := NewNodesFromServers(servers) + + if len(nodes) != len(servers) { + t.Fatalf("NewNodesFromServers did not create the correct number of nodes") + } + for i, node := range nodes { + if node.ID != servers[i].ID || node.Addr != servers[i].Addr { + t.Fatalf("NewNodesFromServers did not correctly initialize Node %d from Server", i) + } + } +} + +func Test_NodesVoters(t *testing.T) { + nodes := Nodes{ + {ID: "1", Voter: true}, + {ID: "2", Voter: false}, + } + voters := nodes.Voters() + + if len(voters) != 1 || !voters[0].Voter { + t.Fatalf("Voters method did not correctly filter voter nodes") + } +} + +func Test_NodeTestLeader(t *testing.T) { + node := &Node{ID: "1", Addr: "leader-raft-addr", APIAddr: "leader-api-addr"} + mockGA := newMockGetAddresser("leader-api-addr", nil) + + node.Test(mockGA, "leader-raft-addr", 10*time.Second) + if !node.Reachable || !node.Leader { + t.Fatalf("Test method did not correctly update node status %s", asJSON(node)) + } +} + +func Test_NodeTestNotLeader(t *testing.T) { + node := &Node{ID: "1", Addr: "follower-raft-addr", APIAddr: "follower-api-addr"} + mockGA := newMockGetAddresser("follower-api-addr", nil) + + node.Test(mockGA, "leader-raft-addr", 10*time.Second) + if !node.Reachable || node.Leader { + t.Fatalf("Test method did not correctly update node status %s", asJSON(node)) + } +} + +func Test_NodeTestDouble(t *testing.T) { + node1 := &Node{ID: "1", Addr: "leader-raft-addr", APIAddr: "leader-api-addr"} + node2 := &Node{ID: "2", Addr: "follower-raft-addr", APIAddr: "follower-api-addr"} + mockGA := &mockGetAddresser{} + mockGA.getAddrFn = func(addr string, timeout time.Duration) (string, error) { + if addr == "leader-raft-addr" { + return "leader-api-addr", nil + } + return "", fmt.Errorf("not reachable") + } + + nodes := Nodes{node1, node2} + nodes.Test(mockGA, "leader-raft-addr", 10*time.Second) + if !node1.Reachable || !node1.Leader || node2.Reachable || node2.Leader || node2.Error != "not reachable" { + t.Fatalf("Test method did not correctly update node status %s", asJSON(nodes)) + } + + if !nodes.HasAddr("leader-raft-addr") { + t.Fatalf("HasAddr method did not correctly find node") + } + if nodes.HasAddr("not-found") { + t.Fatalf("HasAddr method incorrectly found node") + } +} + +func Test_NodesRespEncodeStandard(t *testing.T) { + nodes := mockNodes() + buffer := new(bytes.Buffer) + encoder := NewNodesRespEncoder(buffer, false) + + err := encoder.Encode(nodes) + if err != nil { + t.Errorf("Encode failed: %v", err) + } + + m := make(map[string]interface{}) + if err := json.Unmarshal(buffer.Bytes(), &m); err != nil { + t.Errorf("Encode failed: %v", err) + } + if len(m) != 1 { + t.Errorf("unexpected number of keys") + } + if _, ok := m["nodes"]; !ok { + t.Errorf("nodes key missing") + } + nodesArray, ok := m["nodes"].([]interface{}) + if !ok { + t.Errorf("nodes key is not an array") + } + if len(nodesArray) != 1 { + t.Errorf("unexpected number of nodes") + } + node, ok := nodesArray[0].(map[string]interface{}) + if !ok { + t.Errorf("node is not a map") + } + checkNode(t, node) +} + +func Test_NodeRespEncodeLegacy(t *testing.T) { + nodes := mockNodes() + buffer := new(bytes.Buffer) + encoder := NewNodesRespEncoder(buffer, true) + + err := encoder.Encode(nodes) + if err != nil { + t.Errorf("Encode failed: %v", err) + } + + m := make(map[string]interface{}) + if err := json.Unmarshal(buffer.Bytes(), &m); err != nil { + t.Errorf("Encode failed: %v", err) + } + if len(m) != 1 { + t.Errorf("unexpected number of keys") + } + if _, ok := m["1"]; !ok { + t.Errorf("node key missing") + } + node, ok := m["1"].(map[string]interface{}) + if !ok { + t.Errorf("nodes key is not an map") + } + checkNode(t, node) +} + +func Test_NodesRespDecoder_Decode_ValidJSON(t *testing.T) { + jsonInput := `{"nodes":[{"id":"1","addr":"192.168.1.1","voter":true},{"id":"2","addr":"192.168.1.2","voter":false}]}` + reader := strings.NewReader(jsonInput) + decoder := NewNodesRespDecoder(reader) + + var nodes Nodes + err := decoder.Decode(&nodes) + if err != nil { + t.Errorf("Decode failed with valid JSON: %v", err) + } + + if len(nodes) != 2 || nodes[0].ID != "1" || nodes[1].ID != "2" { + t.Errorf("Decode did not properly decode the JSON into Nodes") + } +} + +func Test_NodesRespDecoder_Decode_InvalidJSON(t *testing.T) { + invalidJsonInput := `{"nodes": "invalid"}` + reader := strings.NewReader(invalidJsonInput) + decoder := NewNodesRespDecoder(reader) + + var nodes Nodes + err := decoder.Decode(&nodes) + if err == nil { + t.Error("Decode should fail with invalid JSON") + } +} + +func Test_NodesRespDecoder_Decode_EmptyJSON(t *testing.T) { + emptyJsonInput := `{}` + reader := strings.NewReader(emptyJsonInput) + decoder := NewNodesRespDecoder(reader) + + var nodes Nodes + err := decoder.Decode(&nodes) + if err != nil { + t.Errorf("Decode failed with empty JSON: %v", err) + } + + if len(nodes) != 0 { + t.Errorf("Decode should result in an empty Nodes slice for empty JSON") + } +} + +// mockGetAddresser is a mock implementation of the GetAddresser interface. +type mockGetAddresser struct { + apiAddr string + err error + getAddrFn func(addr string, timeout time.Duration) (string, error) +} + +// newMockGetAddresser creates a new instance of mockGetAddresser. +// You can customize the return values for GetNodeAPIAddr by setting apiAddr and err. +func newMockGetAddresser(apiAddr string, err error) *mockGetAddresser { + return &mockGetAddresser{apiAddr: apiAddr, err: err} +} + +// GetNodeAPIAddr is the mock implementation of the GetNodeAPIAddr method. +func (m *mockGetAddresser) GetNodeAPIAddr(addr string, timeout time.Duration) (string, error) { + if m.getAddrFn != nil { + return m.getAddrFn(addr, timeout) + } + return m.apiAddr, m.err +} + +func mockNodes() Nodes { + return Nodes{ + &Node{ID: "1", APIAddr: "http://localhost:4001", Addr: "localhost:4002", Reachable: true, Leader: true}, + } +} + +func checkNode(t *testing.T, node map[string]interface{}) { + t.Helper() + if _, ok := node["id"]; !ok { + t.Errorf("node is missing id") + } + if node["id"] != "1" { + t.Errorf("unexpected node id") + } + if _, ok := node["api_addr"]; !ok { + t.Errorf("node is missing api_addr") + } + if node["api_addr"] != "http://localhost:4001" { + t.Errorf("unexpected node api_addr") + } + if _, ok := node["addr"]; !ok { + t.Errorf("node is missing addr") + } + if node["addr"] != "localhost:4002" { + t.Errorf("unexpected node addr") + } + if _, ok := node["reachable"]; !ok { + t.Errorf("node is missing reachable") + } + if node["reachable"] != true { + t.Errorf("unexpected node reachable") + } + if _, ok := node["leader"]; !ok { + t.Errorf("node is missing leader") + } + if node["leader"] != true { + t.Errorf("unexpected node leader") + } +} + +func asJSON(v interface{}) string { + b, err := json.Marshal(v) + if err != nil { + panic(fmt.Sprintf("failed to JSON marshal value: %s", err.Error())) + } + return string(b) +} diff --git a/http/service.go b/http/service.go index a64e39bf..5c7a2080 100644 --- a/http/service.go +++ b/http/service.go @@ -91,10 +91,15 @@ type Store interface { Backup(br *command.BackupRequest, dst io.Writer) error } +// GetAddresser is the interface that wraps the GetNodeAPIAddr method. +// GetNodeAPIAddr returns the HTTP API URL for the node at the given Raft address. +type GetAddresser interface { + GetNodeAPIAddr(addr string, timeout time.Duration) (string, error) +} + // Cluster is the interface node API services must provide type Cluster interface { - // GetNodeAPIAddr returns the HTTP API URL for the node at the given Raft address. - GetNodeAPIAddr(nodeAddr string, timeout time.Duration) (string, error) + GetAddresser // Execute performs an Execute Request on a remote node. Execute(er *command.ExecuteRequest, nodeAddr string, creds *cluster.Credentials, timeout time.Duration) ([]*command.ExecuteResult, error) @@ -1003,7 +1008,7 @@ func (s *Service) handleNodes(w http.ResponseWriter, r *http.Request) { } // Get nodes in the cluster, and possibly filter out non-voters. - nodes, err := s.store.Nodes() + sNodes, err := s.store.Nodes() if err != nil { statusCode := http.StatusInternalServerError if err == store.ErrNotOpen { @@ -1012,64 +1017,34 @@ func (s *Service) handleNodes(w http.ResponseWriter, r *http.Request) { http.Error(w, fmt.Sprintf("store nodes: %s", err.Error()), statusCode) return } - - filteredNodes := make([]*store.Server, 0) - for _, n := range nodes { - if n.Suffrage != "Voter" && !includeNonVoters { - continue - } - filteredNodes = append(filteredNodes, n) + nodes := NewNodesFromServers(sNodes) + if !includeNonVoters { + nodes = nodes.Voters() } + // Now test the nodes lAddr, err := s.store.LeaderAddr() if err != nil { http.Error(w, fmt.Sprintf("leader address: %s", err.Error()), http.StatusInternalServerError) return } + nodes.Test(s.cluster, lAddr, timeout) - nodesResp, err := s.checkNodes(filteredNodes, timeout) + ver, err := verParam(r) if err != nil { - http.Error(w, fmt.Sprintf("check nodes: %s", err.Error()), - http.StatusInternalServerError) + http.Error(w, err.Error(), http.StatusInternalServerError) return } - - resp := make(map[string]struct { - APIAddr string `json:"api_addr,omitempty"` - Addr string `json:"addr,omitempty"` - Reachable bool `json:"reachable"` - Leader bool `json:"leader"` - Time float64 `json:"time,omitempty"` - Error string `json:"error,omitempty"` - }) - - for _, n := range filteredNodes { - nn := resp[n.ID] - nn.Addr = n.Addr - nn.Leader = nn.Addr == lAddr - nn.APIAddr = nodesResp[n.ID].apiAddr - nn.Reachable = nodesResp[n.ID].reachable - nn.Time = nodesResp[n.ID].time.Seconds() - nn.Error = nodesResp[n.ID].error - resp[n.ID] = nn - } - + enc := NewNodesRespEncoder(w, ver != "2") pretty, _ := isPretty(r) - var b []byte if pretty { - b, err = json.MarshalIndent(resp, "", " ") - } else { - b, err = json.Marshal(resp) + enc.SetIndent("", " ") } + err = enc.Encode(nodes) if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - _, err = w.Write(b) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return + http.Error(w, fmt.Sprintf("JSON marshal: %s", err.Error()), + http.StatusInternalServerError) } } @@ -1728,48 +1703,6 @@ func (s *Service) runQueue() { } } -type checkNodesResponse struct { - apiAddr string - reachable bool - time time.Duration - error string -} - -// checkNodes returns a map of node ID to node responsivness, reachable -// being defined as node responds to a simple request over the network. -func (s *Service) checkNodes(nodes []*store.Server, timeout time.Duration) (map[string]*checkNodesResponse, error) { - var wg sync.WaitGroup - var mu sync.Mutex - resp := make(map[string]*checkNodesResponse) - - for _, n := range nodes { - resp[n.ID] = &checkNodesResponse{} - } - - // Now confirm. - for _, n := range nodes { - wg.Add(1) - go func(id, raftAddr string) { - defer wg.Done() - mu.Lock() - defer mu.Unlock() - - start := time.Now() - apiAddr, err := s.cluster.GetNodeAPIAddr(raftAddr, timeout) - if err != nil { - resp[id].error = err.Error() - return - } - resp[id].reachable = true - resp[id].apiAddr = apiAddr - resp[id].time = time.Since(start) - }(n.ID, n.Addr) - } - wg.Wait() - - return resp, nil -} - // addBuildVersion adds the build version to the HTTP response. func (s *Service) addBuildVersion(w http.ResponseWriter) { // Add version header to every response, if available. @@ -1884,6 +1817,12 @@ func fmtParam(req *http.Request) (string, error) { return strings.TrimSpace(q.Get("fmt")), nil } +// verParam returns the requested version, if present. +func verParam(req *http.Request) (string, error) { + q := req.URL.Query() + return strings.TrimSpace(q.Get("ver")), nil +} + // isPretty returns whether the HTTP response body should be pretty-printed. func isPretty(req *http.Request) (bool, error) { return queryParam(req, "pretty") diff --git a/system_test/cluster_test.go b/system_test/cluster_test.go index 6819e3c2..75feda49 100644 --- a/system_test/cluster_test.go +++ b/system_test/cluster_test.go @@ -701,8 +701,8 @@ func Test_MultiNodeClusterNodes(t *testing.T) { if len(nodes) != len(c) { t.Fatalf("nodes/ output returned wrong number of nodes, got %d, exp %d", len(nodes), len(c)) } - ns, ok := nodes[leader.ID] - if !ok { + ns := nodes.GetNode(leader.ID) + if ns == nil { t.Fatalf("failed to find leader with ID %s in node status", leader.ID) } if !ns.Leader { @@ -728,7 +728,10 @@ func Test_MultiNodeClusterNodes(t *testing.T) { t.Fatalf("got incorrect number of followers: %d", len(followers)) } f := followers[0] - ns = nodes[f.ID] + ns = nodes.GetNode(f.ID) + if ns == nil { + t.Fatalf("failed to find follower with ID %s in node status", f.ID) + } if ns.Addr != f.RaftAddr { t.Fatalf("node has wrong Raft address for follower") } diff --git a/system_test/helpers.go b/system_test/helpers.go index 9e929cf7..6c8d956a 100644 --- a/system_test/helpers.go +++ b/system_test/helpers.go @@ -228,27 +228,9 @@ func (n *Node) Notify(id, raftAddr string) error { return n.Client.Notify(nr, raftAddr, nil, 5*time.Second) } -// NodesStatus is the Go type /nodes endpoint response is marshaled into. -type NodesStatus map[string]struct { - APIAddr string `json:"api_addr,omitempty"` - Addr string `json:"addr,omitempty"` - Reachable bool `json:"reachable,omitempty"` - Leader bool `json:"leader,omitempty"` -} - -// HasAddr returns whether any node in the NodeStatus has the given Raft address. -func (n NodesStatus) HasAddr(addr string) bool { - for i := range n { - if n[i].Addr == addr { - return true - } - } - return false -} - -// Nodes returns the sNodes endpoint output for node. -func (n *Node) Nodes(includeNonVoters bool) (NodesStatus, error) { - v, _ := url.Parse("http://" + n.APIAddr + "/nodes") +// Nodes returns the Nodes endpoint output for node. +func (n *Node) Nodes(includeNonVoters bool) (httpd.Nodes, error) { + v, _ := url.Parse("http://" + n.APIAddr + "/nodes?ver=2") if includeNonVoters { q := v.Query() q.Set("nonvoters", "true") @@ -263,16 +245,13 @@ func (n *Node) Nodes(includeNonVoters bool) (NodesStatus, error) { return nil, fmt.Errorf("nodes endpoint returned: %s", resp.Status) } defer resp.Body.Close() - body, err := io.ReadAll(resp.Body) - if err != nil { - return nil, err - } - var nstatus NodesStatus - if err = json.Unmarshal(body, &nstatus); err != nil { + dec := httpd.NewNodesRespDecoder(resp.Body) + var nodes httpd.Nodes + if err := dec.Decode(&nodes); err != nil { return nil, err } - return nstatus, nil + return nodes, nil } // Status returns the status and diagnostic output for node. diff --git a/system_test/single_node_test.go b/system_test/single_node_test.go index b93eb72b..a0398e99 100644 --- a/system_test/single_node_test.go +++ b/system_test/single_node_test.go @@ -974,10 +974,7 @@ func Test_SingleNodeNodes(t *testing.T) { if len(nodes) != 1 { t.Fatalf("wrong number of nodes in response") } - n, ok := nodes[node.ID] - if !ok { - t.Fatalf("node not found by ID in response") - } + n := nodes[0] if n.Addr != node.RaftAddr { t.Fatalf("node has wrong Raft address") }