1
0
Fork 0

Move system testing to Raft Join and Notify

master
Philip O'Toole 10 months ago
parent 2da8a76060
commit 12c20a99c5

@ -513,7 +513,15 @@ func writeCommand(conn net.Conn, c *Command, timeout time.Duration) error {
return nil
}
func readResponse(conn net.Conn, timeout time.Duration) ([]byte, error) {
func readResponse(conn net.Conn, timeout time.Duration) (buf []byte, retErr error) {
defer func() {
// Connecting to an open port, but not a rqlite Raft API, may cause a panic
// when the system tries to read the response. This is a workaround.
if r := recover(); r != nil {
retErr = fmt.Errorf("panic reading response from node: %v", r)
}
}()
// Read length of incoming response.
if err := conn.SetDeadline(time.Now().Add(timeout)); err != nil {
return nil, err

@ -19,6 +19,7 @@ import (
"time"
"github.com/rqlite/rqlite/cluster"
"github.com/rqlite/rqlite/command"
"github.com/rqlite/rqlite/command/encoding"
httpd "github.com/rqlite/rqlite/http"
"github.com/rqlite/rqlite/store"
@ -58,6 +59,7 @@ type Node struct {
Store *store.Store
Service *httpd.Service
Cluster *cluster.Service
Client *cluster.Client
}
// SameAs returns true if this node is the same as node o.
@ -197,41 +199,25 @@ func (n *Node) Noop(id string) error {
// Join instructs this node to join the leader.
func (n *Node) Join(leader *Node) error {
resp, err := DoJoinRequest(leader.APIAddr, n.Store.ID(), n.RaftAddr, true)
if err != nil {
return err
}
if resp.StatusCode != 200 {
return fmt.Errorf("failed to join as voter, leader returned: %s", resp.Status)
}
defer resp.Body.Close()
return nil
joiner := cluster.NewJoiner(n.Client, 3, 1*time.Second)
_, err := joiner.Do([]string{leader.RaftAddr}, n.Store.ID(), n.RaftAddr, true)
return err
}
// JoinAsNonVoter instructs this node to join the leader, but as a non-voting node.
func (n *Node) JoinAsNonVoter(leader *Node) error {
resp, err := DoJoinRequest(leader.APIAddr, n.Store.ID(), n.RaftAddr, false)
if err != nil {
return err
}
if resp.StatusCode != 200 {
return fmt.Errorf("failed to join as non-voter, leader returned: %s", resp.Status)
}
defer resp.Body.Close()
return nil
joiner := cluster.NewJoiner(n.Client, 3, 1*time.Second)
_, err := joiner.Do([]string{leader.RaftAddr}, n.Store.ID(), n.RaftAddr, false)
return err
}
// Notify notifies this node of the existence of another node
func (n *Node) Notify(id, raftAddr string) error {
resp, err := DoNotify(n.APIAddr, id, raftAddr)
if err != nil {
return err
}
if resp.StatusCode != 200 {
return fmt.Errorf("failed to notify node: %s", resp.Status)
nr := &command.NotifyRequest{
Id: n.Store.ID(),
Address: n.RaftAddr,
}
defer resp.Body.Close()
return nil
return n.Client.Notify(nr, raftAddr, 5*time.Second)
}
// NodesStatus is the Go type /nodes endpoint response is marshaled into.
@ -616,36 +602,6 @@ func Remove(n *Node, addr string) error {
return nil
}
// DoJoinRequest sends a join request to nodeAddr, for raftID, reachable at raftAddr.
func DoJoinRequest(nodeAddr, raftID, raftAddr string, voter bool) (*http.Response, error) {
b, err := json.Marshal(map[string]interface{}{"id": raftID, "addr": raftAddr, "voter": voter})
if err != nil {
return nil, err
}
resp, err := http.Post("http://"+nodeAddr+"/join", "application/json", bytes.NewReader(b))
if err != nil {
return nil, err
}
return resp, nil
}
// DoNotify notifies the node at nodeAddr about node with ID, and Raft address of raftAddr.
func DoNotify(nodeAddr, id, raftAddr string) (*http.Response, error) {
b, err := json.Marshal(map[string]interface{}{"id": id, "addr": raftAddr})
if err != nil {
return nil, err
}
resp, err := http.Post("http://"+nodeAddr+"/notify", "application/json", bytes.NewReader(b))
if err != nil {
return nil, err
}
return resp, nil
}
func mustNewNode(enableSingle bool) *Node {
return mustNewNodeEncrypted(enableSingle, false, false)
}
@ -715,6 +671,7 @@ func mustNodeEncrypted(dir string, enableSingle, httpEncrypt bool, mux *tcp.Mux,
clstrDialer := tcp.NewDialer(cluster.MuxClusterHeader, nil)
clstrClient := cluster.NewClient(clstrDialer, 30*time.Second)
node.Client = clstrClient
node.Service = httpd.New("localhost:0", node.Store, clstrClient, nil)
if httpEncrypt {
node.Service.CertFile = node.HTTPCertPath

Loading…
Cancel
Save