1
0
Fork 0

Finish multi-node cluster testing

master
Philip O'Toole 9 years ago
parent d5e2bfadbc
commit 7fd7a9a385

@ -186,7 +186,7 @@ func main() {
terminate := make(chan os.Signal, 1) terminate := make(chan os.Signal, 1)
signal.Notify(terminate, os.Interrupt) signal.Notify(terminate, os.Interrupt)
<-terminate <-terminate
if err := store.Close(); err != nil { if err := store.Close(true); err != nil {
log.Printf("failed to close store: %s", err.Error()) log.Printf("failed to close store: %s", err.Error())
} }
log.Println("rqlite server stopped") log.Println("rqlite server stopped")

@ -162,15 +162,17 @@ func (s *Store) Open(enableSingle bool) error {
return nil return nil
} }
// Close closes the store. // Close closes the store. If wait is true, waits for a graceful shutdown.
func (s *Store) Close() error { func (s *Store) Close(wait bool) error {
if err := s.db.Close(); err != nil { if err := s.db.Close(); err != nil {
return err return err
} }
f := s.raft.Shutdown() f := s.raft.Shutdown()
if wait {
if e := f.(raft.Future); e.Error() != nil { if e := f.(raft.Future); e.Error() != nil {
return e.Error() return e.Error()
} }
}
return nil return nil
} }

@ -26,7 +26,7 @@ func Test_OpenStoreCloseSingleNode(t *testing.T) {
if err := s.Open(true); err != nil { if err := s.Open(true); err != nil {
t.Fatalf("failed to open single-node store: %s", err.Error()) t.Fatalf("failed to open single-node store: %s", err.Error())
} }
if err := s.Close(); err != nil { if err := s.Close(true); err != nil {
t.Fatalf("failed to close single-node store: %s", err.Error()) t.Fatalf("failed to close single-node store: %s", err.Error())
} }
} }
@ -38,7 +38,7 @@ func Test_SingleNodeExecuteQuery(t *testing.T) {
if err := s.Open(true); err != nil { if err := s.Open(true); err != nil {
t.Fatalf("failed to open single-node store: %s", err.Error()) t.Fatalf("failed to open single-node store: %s", err.Error())
} }
defer s.Close() defer s.Close(true)
s.WaitForLeader(10 * time.Second) s.WaitForLeader(10 * time.Second)
queries := []string{ queries := []string{
@ -76,7 +76,7 @@ func Test_SingleNodeExecuteQueryTx(t *testing.T) {
if err := s.Open(true); err != nil { if err := s.Open(true); err != nil {
t.Fatalf("failed to open single-node store: %s", err.Error()) t.Fatalf("failed to open single-node store: %s", err.Error())
} }
defer s.Close() defer s.Close(true)
s.WaitForLeader(10 * time.Second) s.WaitForLeader(10 * time.Second)
queries := []string{ queries := []string{
@ -117,7 +117,7 @@ func Test_MultiNodeExecuteQuery(t *testing.T) {
if err := s0.Open(true); err != nil { if err := s0.Open(true); err != nil {
t.Fatalf("failed to open node for multi-node test: %s", err.Error()) t.Fatalf("failed to open node for multi-node test: %s", err.Error())
} }
defer s0.Close() defer s0.Close(true)
s0.WaitForLeader(10 * time.Second) s0.WaitForLeader(10 * time.Second)
s1 := mustNewStore() s1 := mustNewStore()
@ -125,7 +125,7 @@ func Test_MultiNodeExecuteQuery(t *testing.T) {
if err := s1.Open(false); err != nil { if err := s1.Open(false); err != nil {
t.Fatalf("failed to open node for multi-node test: %s", err.Error()) t.Fatalf("failed to open node for multi-node test: %s", err.Error())
} }
defer s1.Close() defer s1.Close(true)
// Join the second node to the first. // Join the second node to the first.
if err := s0.Join(s1.Addr().String()); err != nil { if err := s0.Join(s1.Addr().String()); err != nil {

@ -98,4 +98,51 @@ func Test_MultiNodeCluster(t *testing.T) {
t.Fatalf(`test %d received wrong result "%s" got: %s exp: %s`, i, tt.stmt, r, tt.expected) t.Fatalf(`test %d received wrong result "%s" got: %s exp: %s`, i, tt.stmt, r, tt.expected)
} }
} }
// Kill the leader and wait for the new leader.
leader.Deprovision()
c.RemoveNode(leader)
leader, err = c.WaitForNewLeader(leader)
if err != nil {
t.Fatalf("failed to find new cluster leader after killing leader: %s", err.Error())
}
// Run queries against 2-node cluster.
tests = []struct {
stmt string
expected string
execute bool
}{
{
stmt: `CREATE TABLE foo (id integer not null primary key, name text)`,
expected: `{"results":[{"error":"table foo already exists"}]}`,
execute: true,
},
{
stmt: `INSERT INTO foo(name) VALUES("sinead")`,
expected: `{"results":[{"last_insert_id":2,"rows_affected":1}]}`,
execute: true,
},
{
stmt: `SELECT * FROM foo`,
expected: `{"results":[{"columns":["id","name"],"types":["integer","text"],"values":[[1,"fiona"],[2,"sinead"]]}]}`,
execute: false,
},
}
for i, tt := range tests {
var r string
var err error
if tt.execute {
r, err = leader.Execute(tt.stmt)
} else {
r, err = leader.Query(tt.stmt)
}
if err != nil {
t.Fatalf(`test %d failed "%s": %s`, i, tt.stmt, err.Error())
}
if r != tt.expected {
t.Fatalf(`test %d received wrong result "%s" got: %s exp: %s`, i, tt.stmt, r, tt.expected)
}
}
} }

@ -18,22 +18,20 @@ import (
// Node represents a node under test. // Node represents a node under test.
type Node struct { type Node struct {
APIAddr string
RaftAddr string
Dir string Dir string
Store *store.Store Store *store.Store
Service *httpd.Service Service *httpd.Service
} }
func (n *Node) APIAddr() string { func (n *Node) SameAs(o *Node) bool {
return n.Service.Addr().String() return n.RaftAddr == o.RaftAddr
} }
func (n *Node) RaftAddr() string { // Deprovisions shuts down and removes all resources associated with the node.
return n.Store.Addr().String()
}
// Deprovisions removes all resources associated with the node.
func (n *Node) Deprovision() { func (n *Node) Deprovision() {
n.Store.Close() n.Store.Close(false)
n.Service.Close() n.Service.Close()
os.RemoveAll(n.Dir) os.RemoveAll(n.Dir)
} }
@ -59,7 +57,7 @@ func (n *Node) ExecuteMulti(stmts []string) (string, error) {
// Query runs a single query against the node. // Query runs a single query against the node.
func (n *Node) Query(stmt string) (string, error) { func (n *Node) Query(stmt string) (string, error) {
v, _ := url.Parse("http://" + n.APIAddr() + "/db/query") v, _ := url.Parse("http://" + n.APIAddr + "/db/query")
v.RawQuery = url.Values{"q": []string{stmt}}.Encode() v.RawQuery = url.Values{"q": []string{stmt}}.Encode()
resp, err := http.Get(v.String()) resp, err := http.Get(v.String())
@ -76,13 +74,13 @@ func (n *Node) Query(stmt string) (string, error) {
// Join instructs this node to join the leader. // Join instructs this node to join the leader.
func (n *Node) Join(leader *Node) error { func (n *Node) Join(leader *Node) error {
b, err := json.Marshal(map[string]string{"addr": n.RaftAddr()}) b, err := json.Marshal(map[string]string{"addr": n.RaftAddr})
if err != nil { if err != nil {
return err return err
} }
// Attempt to join leader // Attempt to join leader
resp, err := http.Post("http://"+leader.APIAddr()+"/join", "application-type/json", bytes.NewReader(b)) resp, err := http.Post("http://"+leader.APIAddr+"/join", "application-type/json", bytes.NewReader(b))
if err != nil { if err != nil {
return err return err
} }
@ -94,7 +92,7 @@ func (n *Node) Join(leader *Node) error {
} }
func (n *Node) postExecute(stmt string) (string, error) { func (n *Node) postExecute(stmt string) (string, error) {
resp, err := http.Post("http://"+n.APIAddr()+"/db/execute", "application/json", strings.NewReader(stmt)) resp, err := http.Post("http://"+n.APIAddr+"/db/execute", "application/json", strings.NewReader(stmt))
if err != nil { if err != nil {
return "", err return "", err
} }
@ -118,16 +116,55 @@ func (c Cluster) Leader() (*Node, error) {
return c.FindNodeByRaftAddr(l) return c.FindNodeByRaftAddr(l)
} }
// WaitForNewLeader returns the leader of the cluster as long as it's not "old".
func (c Cluster) WaitForNewLeader(old *Node) (*Node, error) {
timer := time.NewTimer(30 * time.Second)
defer timer.Stop()
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-timer.C:
return nil, fmt.Errorf("timed out waiting for new leader")
case <-ticker.C:
l, err := c.Leader()
if err != nil {
continue
}
if !l.SameAs(old) {
return l, nil
}
}
}
}
// RemoveNode removes the given node from the cluster
func (c Cluster) RemoveNode(node *Node) {
for i, n := range c {
if n.RaftAddr == node.RaftAddr {
c = append(c[:i], c[i+1:]...)
return
}
}
}
// FindNodeByRaftAddr returns the node with the given Raft address. // FindNodeByRaftAddr returns the node with the given Raft address.
func (c Cluster) FindNodeByRaftAddr(addr string) (*Node, error) { func (c Cluster) FindNodeByRaftAddr(addr string) (*Node, error) {
for _, n := range c { for _, n := range c {
if n.RaftAddr() == addr { if n.RaftAddr == addr {
return n, nil return n, nil
} }
} }
return nil, fmt.Errorf("node not found") return nil, fmt.Errorf("node not found")
} }
func (c Cluster) Deprovision() {
for _, n := range c {
n.Deprovision()
}
}
func mustNewNode(enableSingle bool) *Node { func mustNewNode(enableSingle bool) *Node {
node := &Node{ node := &Node{
Dir: mustTempDir(), Dir: mustTempDir(),
@ -139,12 +176,14 @@ func mustNewNode(enableSingle bool) *Node {
node.Deprovision() node.Deprovision()
panic(fmt.Sprintf("failed to open store: %s", err.Error())) panic(fmt.Sprintf("failed to open store: %s", err.Error()))
} }
node.RaftAddr = node.Store.Addr().String()
node.Service = httpd.New("localhost:0", node.Store, nil) node.Service = httpd.New("localhost:0", node.Store, nil)
if err := node.Service.Start(); err != nil { if err := node.Service.Start(); err != nil {
node.Deprovision() node.Deprovision()
panic(fmt.Sprintf("failed to start HTTP server: %s", err.Error())) panic(fmt.Sprintf("failed to start HTTP server: %s", err.Error()))
} }
node.APIAddr = node.Service.Addr().String()
return node return node
} }

Loading…
Cancel
Save