1
0
Fork 0

Builds, but needs unit tests

master
Philip O'Toole 10 months ago
parent a106df55f9
commit bd8297b6d7

@ -1,6 +1,7 @@
package http
import (
"sync"
"time"
"github.com/rqlite/rqlite/store"
@ -21,6 +22,15 @@ type Node struct {
Error string `json:"error,omitempty"`
}
// NewNodeFromServer creates a Node from a Server.
func NewNodeFromServer(s *store.Server) *Node {
return &Node{
ID: s.ID,
Addr: s.Addr,
Voter: s.Suffrage == "Voter",
}
}
// Test tests the node's reachability and leadership status. If an error
// occurs, the Error field will be populated.
func (n *Node) Test(ga GetAddresser, leaderAddr string, timeout time.Duration) {
@ -37,20 +47,38 @@ func (n *Node) Test(ga GetAddresser, leaderAddr string, timeout time.Duration) {
n.Leader = apiAddr == leaderAddr
}
type Nodes []*Node
// NewNodesFromServers creates a slice of Nodes from a slice of Servers.
func NewNodesFromServers(servers []*store.Server) ([]*Node, error) {
func NewNodesFromServers(servers []*store.Server) Nodes {
nodes := make([]*Node, len(servers))
for i, s := range servers {
nodes[i] = NewNodeFromServer(s)
}
return nodes, nil
return nodes
}
// NewNodeFromServer creates a Node from a Server.
func NewNodeFromServer(s *store.Server) *Node {
return &Node{
ID: s.ID,
Addr: s.Addr,
Voter: s.Suffrage == "Voter",
// Voters returns a slice of Nodes that are voters.
func (n Nodes) Voters() Nodes {
v := make(Nodes, 0)
for _, node := range n {
if node.Voter {
v = append(v, node)
}
}
return v
}
// Test tests the reachability and leadership status of all nodes. It does this
// in parallel, and blocks until all nodes have been tested.
func (n Nodes) Test(ga GetAddresser, leaderAddr string, timeout time.Duration) {
var wg sync.WaitGroup
for _, nn := range n {
wg.Add(1)
go func(nnn *Node) {
defer wg.Done()
nnn.Test(ga, leaderAddr, timeout)
}(nn)
}
wg.Wait()
}

@ -1008,7 +1008,7 @@ func (s *Service) handleNodes(w http.ResponseWriter, r *http.Request) {
}
// Get nodes in the cluster, and possibly filter out non-voters.
nodes, err := s.store.Nodes()
sNodes, err := s.store.Nodes()
if err != nil {
statusCode := http.StatusInternalServerError
if err == store.ErrNotOpen {
@ -1017,55 +1017,26 @@ func (s *Service) handleNodes(w http.ResponseWriter, r *http.Request) {
http.Error(w, fmt.Sprintf("store nodes: %s", err.Error()), statusCode)
return
}
filteredNodes := make([]*store.Server, 0)
for _, n := range nodes {
if n.Suffrage != "Voter" && !includeNonVoters {
continue
}
filteredNodes = append(filteredNodes, n)
nodes := NewNodesFromServers(sNodes)
if !includeNonVoters {
nodes = nodes.Voters()
}
// Now test the nodes
lAddr, err := s.store.LeaderAddr()
if err != nil {
http.Error(w, fmt.Sprintf("leader address: %s", err.Error()),
http.StatusInternalServerError)
return
}
nodesResp, err := s.checkNodes(filteredNodes, timeout)
if err != nil {
http.Error(w, fmt.Sprintf("check nodes: %s", err.Error()),
http.StatusInternalServerError)
return
}
resp := make(map[string]struct {
APIAddr string `json:"api_addr,omitempty"`
Addr string `json:"addr,omitempty"`
Reachable bool `json:"reachable"`
Leader bool `json:"leader"`
Time float64 `json:"time,omitempty"`
Error string `json:"error,omitempty"`
})
for _, n := range filteredNodes {
nn := resp[n.ID]
nn.Addr = n.Addr
nn.Leader = nn.Addr == lAddr
nn.APIAddr = nodesResp[n.ID].apiAddr
nn.Reachable = nodesResp[n.ID].reachable
nn.Time = nodesResp[n.ID].time.Seconds()
nn.Error = nodesResp[n.ID].error
resp[n.ID] = nn
}
nodes.Test(s.cluster, lAddr, timeout)
pretty, _ := isPretty(r)
var b []byte
if pretty {
b, err = json.MarshalIndent(resp, "", " ")
b, err = json.MarshalIndent(nodes, "", " ")
} else {
b, err = json.Marshal(resp)
b, err = json.Marshal(nodes)
}
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
@ -1733,48 +1704,6 @@ func (s *Service) runQueue() {
}
}
type checkNodesResponse struct {
apiAddr string
reachable bool
time time.Duration
error string
}
// checkNodes returns a map of node ID to node responsivness, reachable
// being defined as node responds to a simple request over the network.
func (s *Service) checkNodes(nodes []*store.Server, timeout time.Duration) (map[string]*checkNodesResponse, error) {
var wg sync.WaitGroup
var mu sync.Mutex
resp := make(map[string]*checkNodesResponse)
for _, n := range nodes {
resp[n.ID] = &checkNodesResponse{}
}
// Now confirm.
for _, n := range nodes {
wg.Add(1)
go func(id, raftAddr string) {
defer wg.Done()
mu.Lock()
defer mu.Unlock()
start := time.Now()
apiAddr, err := s.cluster.GetNodeAPIAddr(raftAddr, timeout)
if err != nil {
resp[id].error = err.Error()
return
}
resp[id].reachable = true
resp[id].apiAddr = apiAddr
resp[id].time = time.Since(start)
}(n.ID, n.Addr)
}
wg.Wait()
return resp, nil
}
// addBuildVersion adds the build version to the HTTP response.
func (s *Service) addBuildVersion(w http.ResponseWriter) {
// Add version header to every response, if available.

Loading…
Cancel
Save