package store import ( "errors" "expvar" "path/filepath" "sort" "strings" "testing" "time" "github.com/rqlite/rqlite/v8/command/proto" ) // Test_MultiNodeSimple tests that a the core operation of a multi-node // cluster works as expected. That is, with a two node cluster, writes // actually replicate, and reads are consistent. func Test_MultiNodeSimple(t *testing.T) { s0, ln := mustNewStore(t) defer ln.Close() if err := s0.Open(); err != nil { t.Fatalf("failed to open single-node store: %s", err.Error()) } defer s0.Close(true) if err := s0.Bootstrap(NewServer(s0.ID(), s0.Addr(), true)); err != nil { t.Fatalf("failed to bootstrap single-node store: %s", err.Error()) } if _, err := s0.WaitForLeader(10 * time.Second); err != nil { t.Fatalf("Error waiting for leader: %s", err) } s1, ln1 := mustNewStore(t) defer ln1.Close() if err := s1.Open(); err != nil { t.Fatalf("failed to open single-node store: %s", err.Error()) } defer s1.Close(true) if err := s0.Join(joinRequest(s1.ID(), s1.Addr(), true)); err != nil { t.Fatalf("failed to join single-node store: %s", err.Error()) } if _, err := s1.WaitForLeader(10 * time.Second); err != nil { t.Fatalf("Error waiting for leader: %s", err) } // Write some data. er := executeRequestFromStrings([]string{ `CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)`, `INSERT INTO foo(id, name) VALUES(1, "fiona")`, }, false, false) _, err := s0.Execute(er) if err != nil { t.Fatalf("failed to execute on single node: %s", err.Error()) } if _, err := s0.WaitForAppliedFSM(5 * time.Second); err != nil { t.Fatalf("failed to wait for FSM to apply on leader") } testPoll(t, func() bool { return s0.DBAppliedIndex() == s1.DBAppliedIndex() }, 250*time.Millisecond, 3*time.Second) ci, err := s0.CommitIndex() if err != nil { t.Fatalf("failed to retrieve commit index: %s", err.Error()) } if exp, got := uint64(4), ci; exp != got { t.Fatalf("wrong commit index, got: %d, exp: %d", got, exp) } lci, err := s0.LeaderCommitIndex() if err != nil { t.Fatalf("failed to retrieve commit index: %s", err.Error()) } if exp, got := uint64(4), lci; exp != got { t.Fatalf("wrong leader commit index, got: %d, exp: %d", got, exp) } if err := s0.WaitForCommitIndex(4, time.Second); err != nil { t.Fatalf("failed to wait for commit index: %s", err.Error()) } if err := s0.WaitForCommitIndex(5, 500*time.Millisecond); err == nil { t.Fatalf("unexpectedly waited successfully for commit index") } // Now, do a NONE consistency query on each node, to actually confirm the data // has been replicated. testFn1 := func(t *testing.T, s *Store) { t.Helper() qr := queryRequestFromString("SELECT * FROM foo", false, false) qr.Level = proto.QueryRequest_QUERY_REQUEST_LEVEL_NONE r, err := s.Query(qr) if err != nil { t.Fatalf("failed to query single node: %s", err.Error()) } if exp, got := `[{"columns":["id","name"],"types":["integer","text"],"values":[[1,"fiona"]]}]`, asJSON(r); exp != got { t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got) } } testFn1(t, s0) testFn1(t, s1) ci, err = s1.CommitIndex() if err != nil { t.Fatalf("failed to retrieve commit index: %s", err.Error()) } if exp, got := uint64(4), ci; exp != got { t.Fatalf("wrong commit index, got: %d, exp: %d", got, exp) } lci, err = s1.LeaderCommitIndex() if err != nil { t.Fatalf("failed to retrieve commit index: %s", err.Error()) } if exp, got := uint64(4), lci; exp != got { t.Fatalf("wrong leader commit index, got: %d, exp: %d", got, exp) } if err := s1.WaitForCommitIndex(4, time.Second); err != nil { t.Fatalf("failed to wait for commit index: %s", err.Error()) } if err := s1.WaitForCommitIndex(5, 500*time.Millisecond); err == nil { t.Fatalf("unexpectedly waited successfully for commit index") } // Write another row using Request rr := executeQueryRequestFromString("INSERT INTO foo(id, name) VALUES(2, 'fiona')", proto.QueryRequest_QUERY_REQUEST_LEVEL_STRONG, false, false) _, err = s0.Request(rr) if err != nil { t.Fatalf("failed to execute on single node: %s", err.Error()) } testPoll(t, func() bool { return s0.DBAppliedIndex() == s1.DBAppliedIndex() }, 250*time.Millisecond, 3*time.Second) // Now, do a NONE consistency query on each node, to actually confirm the data // has been replicated. testFn2 := func(t *testing.T, s *Store) { t.Helper() qr := queryRequestFromString("SELECT COUNT(*) FROM foo", false, false) qr.Level = proto.QueryRequest_QUERY_REQUEST_LEVEL_NONE r, err := s.Query(qr) if err != nil { t.Fatalf("failed to query single node: %s", err.Error()) } if exp, got := `[{"columns":["COUNT(*)"],"types":["integer"],"values":[[2]]}]`, asJSON(r); exp != got { t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got) } } testFn2(t, s0) testFn2(t, s1) } // Test_MultiNodeNode_CommitIndexes tests that the commit indexes are // correctly updated as nodes join and leave the cluster, and as // commands are committed through the Raft log. func Test_MultiNodeNode_CommitIndexes(t *testing.T) { s0, ln0 := mustNewStore(t) defer s0.Close(true) defer ln0.Close() if err := s0.Open(); err != nil { t.Fatalf("failed to open single-node store: %s", err.Error()) } if err := s0.Bootstrap(NewServer(s0.ID(), s0.Addr(), true)); err != nil { t.Fatalf("failed to bootstrap single-node store: %s", err.Error()) } _, err := s0.WaitForLeader(10 * time.Second) if err != nil { t.Fatalf("Error waiting for leader: %s", err) } s1, ln1 := mustNewStore(t) defer ln1.Close() if err := s1.Open(); err != nil { t.Fatalf("failed to open single-node store: %s", err.Error()) } defer s1.Close(true) if err := s0.Join(joinRequest(s1.ID(), s1.Addr(), true)); err != nil { t.Fatalf("failed to join single-node store: %s", err.Error()) } if _, err := s1.WaitForLeader(10 * time.Second); err != nil { t.Fatalf("Error waiting for leader: %s", err) } testPoll(t, func() bool { // The config change command coming through the log due to s1 joining is not instant. return s1.raft.CommitIndex() == 3 }, 50*time.Millisecond, 2*time.Second) if exp, got := uint64(0), s1.raftTn.CommandCommitIndex(); exp != got { t.Fatalf("wrong command commit index, got: %d, exp %d", got, exp) } // Send an FSM command through the log, ensure the indexes are correctly updated. // on the follower. s0.Noop("don't care") testPoll(t, func() bool { return s1.numNoops.Load() == 1 }, 50*time.Millisecond, 2*time.Second) if exp, got := uint64(4), s1.raft.CommitIndex(); exp != got { t.Fatalf("wrong commit index, got: %d, exp %d", got, exp) } if exp, got := uint64(4), s1.raftTn.CommandCommitIndex(); exp != got { t.Fatalf("wrong command commit index, got: %d, exp %d", got, exp) } // Join another node to the cluster, which will result in Raft cluster // config commands through the log, but no FSM commands. s2, ln2 := mustNewStore(t) defer ln2.Close() if err := s2.Open(); err != nil { t.Fatalf("failed to open single-node store: %s", err.Error()) } defer s2.Close(true) if err := s0.Join(joinRequest(s2.ID(), s2.Addr(), true)); err != nil { t.Fatalf("failed to join single-node store: %s", err.Error()) } if _, err := s2.WaitForLeader(10 * time.Second); err != nil { t.Fatalf("Error waiting for leader: %s", err) } testPoll(t, func() bool { // The config change command coming through the log due to s2 joining is not instant. return s2.raft.CommitIndex() == 5 }, 50*time.Millisecond, 2*time.Second) if exp, got := uint64(4), s2.raftTn.CommandCommitIndex(); exp != got { t.Fatalf("wrong command commit index, got: %d, exp %d", got, exp) } // First node to join should also reflect the new cluster config // command. testPoll(t, func() bool { // The config change command coming through the log due to s2 joining is not instant. return s1.raft.CommitIndex() == 5 }, 50*time.Millisecond, 2*time.Second) if exp, got := uint64(4), s1.raftTn.CommandCommitIndex(); exp != got { t.Fatalf("wrong command commit index, got: %d, exp %d", got, exp) } } // Test_MultiNodeSnapshot_ErrorMessage tests that a snapshot fails with a specific // error message when the snapshot is attempted too soon after joining a cluster. // Hashicorp Raft doesn't expose a typed error, so we have to check the error // message in the production code. This test makes sure the text doesn't change. func Test_MultiNodeSnapshot_ErrorMessage(t *testing.T) { s0, ln := mustNewStore(t) defer ln.Close() if err := s0.Open(); err != nil { t.Fatalf("failed to open single-node store: %s", err.Error()) } defer s0.Close(true) if err := s0.Bootstrap(NewServer(s0.ID(), s0.Addr(), true)); err != nil { t.Fatalf("failed to bootstrap single-node store: %s", err.Error()) } if _, err := s0.WaitForLeader(10 * time.Second); err != nil { t.Fatalf("Error waiting for leader: %s", err) } s0.Noop("don't care") // If if fails, we'll find out later. s1, ln1 := mustNewStore(t) defer ln1.Close() if err := s1.Open(); err != nil { t.Fatalf("failed to open single-node store: %s", err.Error()) } defer s1.Close(true) if err := s0.Join(joinRequest(s1.ID(), s1.Addr(), true)); err != nil { t.Fatalf("failed to join single-node store: %s", err.Error()) } if _, err := s1.WaitForLeader(10 * time.Second); err != nil { t.Fatalf("Error waiting for leader: %s", err) } err := s0.Snapshot(0) if err == nil { t.Fatalf("expected error when snapshotting multi-node store immediately after joining") } if !strings.Contains(err.Error(), "wait until the configuration entry at") { t.Fatalf("expected error to contain 'wait until the configuration entry at', got %s", err.Error()) } } func Test_MultiNodeDBAppliedIndex(t *testing.T) { s0, ln0 := mustNewStore(t) defer ln0.Close() if err := s0.Open(); err != nil { t.Fatalf("failed to open single-node store: %s", err.Error()) } defer s0.Close(true) if err := s0.Bootstrap(NewServer(s0.ID(), s0.Addr(), true)); err != nil { t.Fatalf("failed to bootstrap single-node store: %s", err.Error()) } if _, err := s0.WaitForLeader(10 * time.Second); err != nil { t.Fatalf("Error waiting for leader: %s", err) } // Join a second node to the first. s1, ln1 := mustNewStore(t) defer ln1.Close() if err := s1.Open(); err != nil { t.Fatalf("failed to open single-node store: %s", err.Error()) } defer s1.Close(true) if err := s0.Join(joinRequest(s1.ID(), s1.Addr(), true)); err != nil { t.Fatalf("failed to join to node at %s: %s", s0.Addr(), err.Error()) } _, err := s1.WaitForLeader(10 * time.Second) if err != nil { t.Fatalf("failed to wait for leader on follower: %s", err.Error()) } // Check that the DBAppliedIndex is the same on both nodes. if s0.DBAppliedIndex() != s1.DBAppliedIndex() { t.Fatalf("applied index mismatch") } // Write some data, and check that the DBAppliedIndex remains the same on both nodes. er := executeRequestFromStrings([]string{ `CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)`, `INSERT INTO foo(id, name) VALUES(1, "fiona")`, `INSERT INTO foo(id, name) VALUES(2, "fiona")`, `INSERT INTO foo(id, name) VALUES(3, "fiona")`, }, false, false) _, err = s0.Execute(er) if err != nil { t.Fatalf("failed to execute on single node: %s", err.Error()) } if _, err := s0.WaitForAppliedFSM(5 * time.Second); err != nil { t.Fatalf("failed to wait for FSM to apply on leader") } testPoll(t, func() bool { return s0.DBAppliedIndex() == s1.DBAppliedIndex() }, 250*time.Millisecond, 3*time.Second) // Create a third node, make sure it joins the cluster, and check that the DBAppliedIndex // is correct. s2, ln2 := mustNewStore(t) defer ln2.Close() if err := s2.Open(); err != nil { t.Fatalf("failed to open single-node store: %s", err.Error()) } defer s2.Close(true) if err := s0.Join(joinRequest(s2.ID(), s2.Addr(), true)); err != nil { t.Fatalf("failed to join to node at %s: %s", s0.Addr(), err.Error()) } _, err = s2.WaitForLeader(10 * time.Second) if err != nil { t.Fatalf("failed to wait for leader on follower: %s", err.Error()) } testPoll(t, func() bool { return s0.DBAppliedIndex() == s2.DBAppliedIndex() }, 250*time.Millisecond, 3*time.Second) // Noop, then snapshot, truncating all logs. Then have another node join the cluster. if af, err := s0.Noop("don't care"); err != nil || af.Error() != nil { t.Fatalf("failed to noop on single node: %s", err.Error()) } if err := s0.Snapshot(1); err != nil { t.Fatalf("failed to snapshot single-node store: %s", err.Error()) } s3, ln3 := mustNewStore(t) defer ln3.Close() if err := s3.Open(); err != nil { t.Fatalf("failed to open single-node store: %s", err.Error()) } defer s3.Close(true) if err := s0.Join(joinRequest(s3.ID(), s3.Addr(), true)); err != nil { t.Fatalf("failed to join to node at %s: %s", s0.Addr(), err.Error()) } _, err = s3.WaitForLeader(10 * time.Second) if err != nil { t.Fatalf("failed to wait for leader on follower: %s", err.Error()) } testPoll(t, func() bool { return s0.DBAppliedIndex() <= s3.DBAppliedIndex() }, 250*time.Millisecond, 5*time.Second) // Write one last row, and everything should be in sync. er = executeRequestFromStrings([]string{ `INSERT INTO foo(id, name) VALUES(4, "fiona")`, }, false, false) _, err = s0.Execute(er) if err != nil { t.Fatalf("failed to execute on single node: %s", err.Error()) } if _, err := s0.WaitForAppliedFSM(5 * time.Second); err != nil { t.Fatalf("failed to wait for FSM to apply on leader") } testPoll(t, func() bool { i := s0.DBAppliedIndex() return i == s1.DBAppliedIndex() && i == s2.DBAppliedIndex() && i == s3.DBAppliedIndex() }, 100*time.Millisecond, 5*time.Second) } func Test_MultiNodeJoinRemove(t *testing.T) { s0, ln0 := mustNewStore(t) defer ln0.Close() if err := s0.Open(); err != nil { t.Fatalf("failed to open single-node store: %s", err.Error()) } defer s0.Close(true) if err := s0.Bootstrap(NewServer(s0.ID(), s0.Addr(), true)); err != nil { t.Fatalf("failed to bootstrap single-node store: %s", err.Error()) } if _, err := s0.WaitForLeader(10 * time.Second); err != nil { t.Fatalf("Error waiting for leader: %s", err) } s1, ln1 := mustNewStore(t) defer ln1.Close() if err := s1.Open(); err != nil { t.Fatalf("failed to open single-node store: %s", err.Error()) } defer s1.Close(true) if err := s1.Bootstrap(NewServer(s1.ID(), s1.Addr(), true)); err != nil { t.Fatalf("failed to bootstrap single-node store: %s", err.Error()) } // Get sorted list of cluster nodes. storeNodes := []string{s0.ID(), s1.ID()} sort.StringSlice(storeNodes).Sort() // Join the second node to the first. if err := s0.Join(joinRequest(s1.ID(), s1.Addr(), true)); err != nil { t.Fatalf("failed to join to node at %s: %s", s0.Addr(), err.Error()) } _, err := s1.WaitForLeader(10 * time.Second) if err != nil { t.Fatalf("failed to get leader address on follower: %s", err.Error()) } nodes, err := s0.Nodes() if err != nil { t.Fatalf("failed to get nodes: %s", err.Error()) } if len(nodes) != len(storeNodes) { t.Fatalf("size of cluster is not correct") } if storeNodes[0] != nodes[0].ID || storeNodes[1] != nodes[1].ID { t.Fatalf("cluster does not have correct nodes") } // Should timeout waiting for removal of other node err = s0.WaitForRemoval(s1.ID(), time.Second) // if err is nil then fail the test if err == nil { t.Fatalf("no error waiting for removal of non-existent node") } if !errors.Is(err, ErrWaitForRemovalTimeout) { t.Fatalf("waiting for removal resulted in wrong error: %s", err.Error()) } // Remove a node. if err := s0.Remove(removeNodeRequest(s1.ID())); err != nil { t.Fatalf("failed to remove %s from cluster: %s", s1.ID(), err.Error()) } nodes, err = s0.Nodes() if err != nil { t.Fatalf("failed to get nodes post remove: %s", err.Error()) } if len(nodes) != 1 { t.Fatalf("size of cluster is not correct post remove") } if s0.ID() != nodes[0].ID { t.Fatalf("cluster does not have correct nodes post remove") } // Should be no error now waiting for removal of other node err = s0.WaitForRemoval(s1.ID(), time.Second) // if err is nil then fail the test if err != nil { t.Fatalf("error waiting for removal of removed node") } } func Test_MultiNodeStepdown(t *testing.T) { s0, ln0 := mustNewStore(t) defer ln0.Close() if err := s0.Open(); err != nil { t.Fatalf("failed to open single-node store: %s", err.Error()) } defer s0.Close(true) if err := s0.Bootstrap(NewServer(s0.ID(), s0.Addr(), true)); err != nil { t.Fatalf("failed to bootstrap single-node store: %s", err.Error()) } if _, err := s0.WaitForLeader(10 * time.Second); err != nil { t.Fatalf("Error waiting for leader: %s", err) } s1, ln1 := mustNewStore(t) defer ln1.Close() if err := s1.Open(); err != nil { t.Fatalf("failed to open single-node store: %s", err.Error()) } defer s1.Close(true) s2, ln2 := mustNewStore(t) defer ln2.Close() if err := s2.Open(); err != nil { t.Fatalf("failed to open single-node store: %s", err.Error()) } defer s2.Close(true) // Form the 3-node cluster if err := s0.Join(joinRequest(s1.ID(), s1.Addr(), true)); err != nil { t.Fatalf("failed to join to node at %s: %s", s0.Addr(), err.Error()) } if _, err := s1.WaitForLeader(10 * time.Second); err != nil { t.Fatalf("Error waiting for leader: %s", err) } if err := s0.Join(joinRequest(s2.ID(), s2.Addr(), true)); err != nil { t.Fatalf("failed to join to node at %s: %s", s0.Addr(), err.Error()) } if _, err := s2.WaitForLeader(10 * time.Second); err != nil { t.Fatalf("Error waiting for leader: %s", err) } // Tell leader to step down. After this finishes there should be a new Leader. if err := s0.Stepdown(true); err != nil { t.Fatalf("leader failed to step down: %s", err.Error()) } check := func() bool { leader, err := s1.WaitForLeader(10 * time.Second) if err != nil || leader == s0.Addr() { return false } return true } testPoll(t, check, 250*time.Millisecond, 10*time.Second) } func Test_MultiNodeStoreNotifyBootstrap(t *testing.T) { s0, ln0 := mustNewStore(t) defer ln0.Close() if err := s0.Open(); err != nil { t.Fatalf("failed to open single-node store: %s", err.Error()) } defer s0.Close(true) s1, ln1 := mustNewStore(t) defer ln1.Close() if err := s1.Open(); err != nil { t.Fatalf("failed to open single-node store: %s", err.Error()) } defer s1.Close(true) s2, ln2 := mustNewStore(t) defer ln2.Close() if err := s2.Open(); err != nil { t.Fatalf("failed to open single-node store: %s", err.Error()) } defer s2.Close(true) s0.BootstrapExpect = 3 if err := s0.Notify(notifyRequest(s0.ID(), ln0.Addr().String())); err != nil { t.Fatalf("failed to notify store: %s", err.Error()) } if err := s0.Notify(notifyRequest(s0.ID(), ln0.Addr().String())); err != nil { t.Fatalf("failed to notify store -- not idempotent: %s", err.Error()) } if err := s0.Notify(notifyRequest(s1.ID(), ln1.Addr().String())); err != nil { t.Fatalf("failed to notify store: %s", err.Error()) } if err := s0.Notify(notifyRequest(s2.ID(), ln2.Addr().String())); err != nil { t.Fatalf("failed to notify store: %s", err.Error()) } // Check that the cluster bootstrapped properly. reportedLeaders := make([]string, 3) for i, n := range []*Store{s0, s1, s2} { check := func() bool { nodes, err := n.Nodes() return err == nil && len(nodes) == 3 } testPoll(t, check, 250*time.Millisecond, 10*time.Second) var err error reportedLeaders[i], err = n.WaitForLeader(10 * time.Second) if err != nil { t.Fatalf("failed to get leader on node %d (id=%s): %s", i, n.raftID, err.Error()) } } if reportedLeaders[0] != reportedLeaders[1] || reportedLeaders[0] != reportedLeaders[2] { t.Fatalf("leader not the same on each node") } // Calling Notify() on a node that is part of a cluster should // be a no-op. if err := s0.Notify(notifyRequest(s1.ID(), ln1.Addr().String())); err != nil { t.Fatalf("failed to notify store that is part of cluster: %s", err.Error()) } } // Test_MultiNodeStoreAutoRestoreBootstrap tests that a cluster will // bootstrap correctly when each node is supplied with an auto-restore // file. Only one node should do able to restore from the file. func Test_MultiNodeStoreAutoRestoreBootstrap(t *testing.T) { ResetStats() s0, ln0 := mustNewStore(t) defer ln0.Close() s1, ln1 := mustNewStore(t) defer ln1.Close() s2, ln2 := mustNewStore(t) defer ln2.Close() path0 := mustCopyFileToTempFile(filepath.Join("testdata", "load.sqlite")) path1 := mustCopyFileToTempFile(filepath.Join("testdata", "load.sqlite")) path2 := mustCopyFileToTempFile(filepath.Join("testdata", "load.sqlite")) s0.SetRestorePath(path0) s1.SetRestorePath(path1) s2.SetRestorePath(path2) for i, s := range []*Store{s0, s1, s2} { if err := s.Open(); err != nil { t.Fatalf("failed to open store %d: %s", i, err.Error()) } defer s.Close(true) } // Trigger a bootstrap. s0.BootstrapExpect = 3 for _, s := range []*Store{s0, s1, s2} { if err := s0.Notify(notifyRequest(s.ID(), s.ly.Addr().String())); err != nil { t.Fatalf("failed to notify store: %s", err.Error()) } } // Wait for the cluster to bootstrap. _, err := s0.WaitForLeader(10 * time.Second) if err != nil { t.Fatalf("failed to get leader: %s", err.Error()) } // Ultimately there is a hard-to-control timing issue here. Knowing // exactly when the leader has applied the restore is difficult, so // just wait a bit. time.Sleep(2 * time.Second) if !s0.Ready() { t.Fatalf("node is not ready") } qr := queryRequestFromString("SELECT * FROM foo WHERE id=2", false, true) r, err := s0.Query(qr) if err != nil { t.Fatalf("failed to query single node: %s", err.Error()) } if exp, got := `["id","name"]`, asJSON(r[0].Columns); exp != got { t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got) } if exp, got := `[[2,"fiona"]]`, asJSON(r[0].Values); exp != got { t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got) } if pathExists(path0) || pathExists(path1) || pathExists(path2) { t.Fatalf("an auto-restore file was not removed") } numAuto := stats.Get(numAutoRestores).(*expvar.Int).Value() numAutoSkipped := stats.Get(numAutoRestoresSkipped).(*expvar.Int).Value() if exp, got := int64(1), numAuto; exp != got { t.Fatalf("unexpected number of auto-restores\nexp: %d\ngot: %d", exp, got) } if exp, got := int64(2), numAutoSkipped; exp != got { t.Fatalf("unexpected number of auto-restores skipped\nexp: %d\ngot: %d", exp, got) } } func Test_MultiNodeJoinNonVoterRemove(t *testing.T) { s0, ln0 := mustNewStore(t) defer ln0.Close() if err := s0.Open(); err != nil { t.Fatalf("failed to open single-node store: %s", err.Error()) } defer s0.Close(true) if err := s0.Bootstrap(NewServer(s0.ID(), s0.Addr(), true)); err != nil { t.Fatalf("failed to bootstrap single-node store: %s", err.Error()) } if _, err := s0.WaitForLeader(10 * time.Second); err != nil { t.Fatalf("Error waiting for leader: %s", err) } s1, ln1 := mustNewStore(t) defer ln1.Close() if err := s1.Open(); err != nil { t.Fatalf("failed to open single-node store: %s", err.Error()) } defer s1.Close(true) // Get sorted list of cluster nodes. storeNodes := []string{s0.ID(), s1.ID()} sort.StringSlice(storeNodes).Sort() // Join the second node to the first. if err := s0.Join(joinRequest(s1.ID(), s1.Addr(), false)); err != nil { t.Fatalf("failed to join to node at %s: %s", s0.Addr(), err.Error()) } if _, err := s1.WaitForLeader(10 * time.Second); err != nil { t.Fatalf("Error waiting for leader: %s", err) } // Check leader state on follower. got, err := s1.LeaderAddr() if err != nil { t.Fatalf("failed to get leader address: %s", err.Error()) } if exp := s0.Addr(); got != exp { t.Fatalf("wrong leader address returned, got: %s, exp %s", got, exp) } id, err := waitForLeaderID(s1, 10*time.Second) if err != nil { t.Fatalf("failed to retrieve leader ID: %s", err.Error()) } if got, exp := id, s0.raftID; got != exp { t.Fatalf("wrong leader ID returned, got: %s, exp %s", got, exp) } nodes, err := s0.Nodes() if err != nil { t.Fatalf("failed to get nodes: %s", err.Error()) } if len(nodes) != len(storeNodes) { t.Fatalf("size of cluster is not correct") } if storeNodes[0] != nodes[0].ID || storeNodes[1] != nodes[1].ID { t.Fatalf("cluster does not have correct nodes") } // Remove the non-voter. if err := s0.Remove(removeNodeRequest(s1.ID())); err != nil { t.Fatalf("failed to remove %s from cluster: %s", s1.ID(), err.Error()) } nodes, err = s0.Nodes() if err != nil { t.Fatalf("failed to get nodes post remove: %s", err.Error()) } if len(nodes) != 1 { t.Fatalf("size of cluster is not correct post remove") } if s0.ID() != nodes[0].ID { t.Fatalf("cluster does not have correct nodes post remove") } } func Test_MultiNodeExecuteQuery(t *testing.T) { s0, ln0 := mustNewStore(t) defer ln0.Close() if err := s0.Open(); err != nil { t.Fatalf("failed to open single-node store: %s", err.Error()) } defer s0.Close(true) if err := s0.Bootstrap(NewServer(s0.ID(), s0.Addr(), true)); err != nil { t.Fatalf("failed to bootstrap single-node store: %s", err.Error()) } if _, err := s0.WaitForLeader(10 * time.Second); err != nil { t.Fatalf("Error waiting for leader: %s", err) } s1, ln1 := mustNewStore(t) defer ln1.Close() if err := s1.Open(); err != nil { t.Fatalf("failed to open node for multi-node test: %s", err.Error()) } defer s1.Close(true) s2, ln2 := mustNewStore(t) defer ln2.Close() if err := s2.Open(); err != nil { t.Fatalf("failed to open node for multi-node test: %s", err.Error()) } defer s2.Close(true) // Join the second node to the first as a voting node. if err := s0.Join(joinRequest(s1.ID(), s1.Addr(), true)); err != nil { t.Fatalf("failed to join to node at %s: %s", s0.Addr(), err.Error()) } // Join the third node to the first as a non-voting node. if err := s0.Join(joinRequest(s2.ID(), s2.Addr(), false)); err != nil { t.Fatalf("failed to join to node at %s: %s", s0.Addr(), err.Error()) } er := executeRequestFromStrings([]string{ `CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)`, `INSERT INTO foo(id, name) VALUES(1, "fiona")`, }, false, false) _, err := s0.Execute(er) if err != nil { t.Fatalf("failed to execute on single node: %s", err.Error()) } s0FsmIdx, err := s0.WaitForAppliedFSM(5 * time.Second) if err != nil { t.Fatalf("failed to wait for fsmIndex: %s", err.Error()) } qr := queryRequestFromString("SELECT * FROM foo", false, false) qr.Level = proto.QueryRequest_QUERY_REQUEST_LEVEL_NONE r, err := s0.Query(qr) if err != nil { t.Fatalf("failed to query leader node: %s", err.Error()) } if exp, got := `["id","name"]`, asJSON(r[0].Columns); exp != got { t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got) } if exp, got := `[[1,"fiona"]]`, asJSON(r[0].Values); exp != got { t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got) } // Wait until the log entries have been applied to the voting follower, // and then query. if _, err := s1.WaitForFSMIndex(s0FsmIdx, 5*time.Second); err != nil { t.Fatalf("error waiting for follower to apply index: %s:", err.Error()) } qr.Level = proto.QueryRequest_QUERY_REQUEST_LEVEL_WEAK _, err = s1.Query(qr) if err == nil { t.Fatalf("successfully queried non-leader node") } qr.Level = proto.QueryRequest_QUERY_REQUEST_LEVEL_STRONG _, err = s1.Query(qr) if err == nil { t.Fatalf("successfully queried non-leader node") } qr.Level = proto.QueryRequest_QUERY_REQUEST_LEVEL_NONE r, err = s1.Query(qr) if err != nil { t.Fatalf("failed to query follower node: %s", err.Error()) } if exp, got := `["id","name"]`, asJSON(r[0].Columns); exp != got { t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got) } if exp, got := `[[1,"fiona"]]`, asJSON(r[0].Values); exp != got { t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got) } // Wait until the 3 log entries have been applied to the non-voting follower, // and then query. if err := s2.WaitForAppliedIndex(3, 5*time.Second); err != nil { t.Fatalf("error waiting for follower to apply index: %s:", err.Error()) } qr.Level = proto.QueryRequest_QUERY_REQUEST_LEVEL_WEAK _, err = s1.Query(qr) if err == nil { t.Fatalf("successfully queried non-voting node with Weak") } qr.Level = proto.QueryRequest_QUERY_REQUEST_LEVEL_STRONG _, err = s1.Query(qr) if err == nil { t.Fatalf("successfully queried non-voting node with Strong") } qr.Level = proto.QueryRequest_QUERY_REQUEST_LEVEL_NONE r, err = s1.Query(qr) if err != nil { t.Fatalf("failed to query non-voting node: %s", err.Error()) } if exp, got := `["id","name"]`, asJSON(r[0].Columns); exp != got { t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got) } if exp, got := `[[1,"fiona"]]`, asJSON(r[0].Values); exp != got { t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got) } } func Test_MultiNodeExecuteQueryFreshness(t *testing.T) { s0, ln0 := mustNewStore(t) defer ln0.Close() if err := s0.Open(); err != nil { t.Fatalf("failed to open single-node store: %s", err.Error()) } defer s0.Close(true) if err := s0.Bootstrap(NewServer(s0.ID(), s0.Addr(), true)); err != nil { t.Fatalf("failed to bootstrap single-node store: %s", err.Error()) } if _, err := s0.WaitForLeader(10 * time.Second); err != nil { t.Fatalf("Error waiting for leader: %s", err) } s1, ln1 := mustNewStore(t) defer ln1.Close() if err := s1.Open(); err != nil { t.Fatalf("failed to open node for multi-node test: %s", err.Error()) } defer s1.Close(true) // Join the second node to the first. if err := s0.Join(joinRequest(s1.ID(), s1.Addr(), true)); err != nil { t.Fatalf("failed to join to node at %s: %s", s0.Addr(), err.Error()) } er := executeRequestFromStrings([]string{ `CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)`, `INSERT INTO foo(id, name) VALUES(1, "fiona")`, }, false, false) _, err := s0.Execute(er) if err != nil { t.Fatalf("failed to execute on single node: %s", err.Error()) } qr := queryRequestFromString("SELECT * FROM foo", false, false) qr.Level = proto.QueryRequest_QUERY_REQUEST_LEVEL_NONE r, err := s0.Query(qr) if err != nil { t.Fatalf("failed to query leader node: %s", err.Error()) } if exp, got := `["id","name"]`, asJSON(r[0].Columns); exp != got { t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got) } if exp, got := `[[1,"fiona"]]`, asJSON(r[0].Values); exp != got { t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got) } // Wait until the 3 log entries have been applied to the follower, // and then query. if err := s1.WaitForAppliedIndex(3, 5*time.Second); err != nil { t.Fatalf("error waiting for follower to apply index: %s:", err.Error()) } // "Weak" consistency queries with 1 nanosecond freshness should pass, because freshness // is ignored in this case. qr.Level = proto.QueryRequest_QUERY_REQUEST_LEVEL_WEAK qr.Freshness = mustParseDuration("1ns").Nanoseconds() _, err = s0.Query(qr) if err != nil { t.Fatalf("Failed to ignore freshness if level is Weak: %s", err.Error()) } qr.Level = proto.QueryRequest_QUERY_REQUEST_LEVEL_STRONG // "Strong" consistency queries with 1 nanosecond freshness should pass, because freshness // is ignored in this case. _, err = s0.Query(qr) if err != nil { t.Fatalf("Failed to ignore freshness if level is Strong: %s", err.Error()) } // Kill leader. s0.Close(true) // "None" consistency queries should still work. qr.Level = proto.QueryRequest_QUERY_REQUEST_LEVEL_NONE qr.Freshness = 0 r, err = s1.Query(qr) if err != nil { t.Fatalf("failed to query follower node: %s", err.Error()) } if exp, got := `["id","name"]`, asJSON(r[0].Columns); exp != got { t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got) } if exp, got := `[[1,"fiona"]]`, asJSON(r[0].Values); exp != got { t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got) } // Wait for the freshness interval to pass. time.Sleep(mustParseDuration("1s")) // "None" consistency queries with 1 nanosecond freshness should fail, because at least // one nanosecond *should* have passed since leader died (surely!). qr.Level = proto.QueryRequest_QUERY_REQUEST_LEVEL_NONE qr.Freshness = mustParseDuration("1ns").Nanoseconds() _, err = s1.Query(qr) if err == nil { t.Fatalf("freshness violating query didn't return an error") } if err != ErrStaleRead { t.Fatalf("freshness violating query returned wrong error: %s", err.Error()) } // Freshness of 0 is ignored. qr.Freshness = 0 r, err = s1.Query(qr) if err != nil { t.Fatalf("failed to query follower node: %s", err.Error()) } if exp, got := `["id","name"]`, asJSON(r[0].Columns); exp != got { t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got) } if exp, got := `[[1,"fiona"]]`, asJSON(r[0].Values); exp != got { t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got) } // "None" consistency queries with 1 hour freshness should pass, because it should // not be that long since the leader died. qr.Freshness = mustParseDuration("1h").Nanoseconds() r, err = s1.Query(qr) if err != nil { t.Fatalf("failed to query follower node: %s", err.Error()) } if exp, got := `["id","name"]`, asJSON(r[0].Columns); exp != got { t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got) } if exp, got := `[[1,"fiona"]]`, asJSON(r[0].Values); exp != got { t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got) } // Check Stale-read detection works with Requests too. eqr := executeQueryRequestFromString("SELECT * FROM foo", proto.QueryRequest_QUERY_REQUEST_LEVEL_NONE, false, false) eqr.Freshness = mustParseDuration("1ns").Nanoseconds() _, err = s1.Request(eqr) if err == nil { t.Fatalf("freshness violating request didn't return an error") } if err != ErrStaleRead { t.Fatalf("freshness violating request returned wrong error: %s", err.Error()) } eqr.Freshness = 0 eqresp, err := s1.Request(eqr) if err != nil { t.Fatalf("inactive freshness violating request returned an error") } if exp, got := `[{"columns":["id","name"],"types":["integer","text"],"values":[[1,"fiona"]]}]`, asJSON(eqresp); exp != got { t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got) } } func Test_MultiNodeIsLeaderHasLeader(t *testing.T) { s0, ln0 := mustNewStore(t) defer ln0.Close() if err := s0.Open(); err != nil { t.Fatalf("failed to open single-node store: %s", err.Error()) } defer s0.Close(true) if err := s0.Bootstrap(NewServer(s0.ID(), s0.Addr(), true)); err != nil { t.Fatalf("failed to bootstrap single-node store: %s", err.Error()) } if _, err := s0.WaitForLeader(10 * time.Second); err != nil { t.Fatalf("Error waiting for leader: %s", err) } s1, ln1 := mustNewStore(t) defer ln1.Close() if err := s1.Open(); err != nil { t.Fatalf("failed to open single-node store: %s", err.Error()) } defer s1.Close(true) if err := s1.Bootstrap(NewServer(s1.ID(), s1.Addr(), true)); err != nil { t.Fatalf("failed to bootstrap single-node store: %s", err.Error()) } // Join the second node to the first. if err := s0.Join(joinRequest(s1.ID(), s1.Addr(), true)); err != nil { t.Fatalf("failed to join to node at %s: %s", s0.Addr(), err.Error()) } leader, err := s1.WaitForLeader(10 * time.Second) if err != nil { t.Fatalf("failed to get leader address on follower: %s", err.Error()) } if !s0.HasLeader() { t.Fatalf("s0 does not have a leader") } if !s0.IsLeader() { t.Fatalf("s0 is not leader, leader is %s", leader) } if !s1.HasLeader() { t.Fatalf("s1 does not have a leader") } if s1.IsLeader() { t.Fatalf("s1 is leader, leader is %s", leader) } } func Test_MultiNodeStoreLogTruncation(t *testing.T) { s0, ln0 := mustNewStore(t) defer ln0.Close() s0.SnapshotThreshold = 4 s0.SnapshotInterval = 100 * time.Millisecond if err := s0.Open(); err != nil { t.Fatalf("failed to open single-node store: %s", err.Error()) } defer s0.Close(true) if err := s0.Bootstrap(NewServer(s0.ID(), s0.Addr(), true)); err != nil { t.Fatalf("failed to bootstrap single-node store: %s", err.Error()) } if _, err := s0.WaitForLeader(10 * time.Second); err != nil { t.Fatalf("Error waiting for leader: %s", err) } nSnaps := stats.Get(numSnapshots).String() // Write more than s.SnapshotThreshold statements. queries := []string{ `CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)`, `INSERT INTO foo(id, name) VALUES(1, "fiona")`, `INSERT INTO foo(id, name) VALUES(2, "fiona")`, `INSERT INTO foo(id, name) VALUES(3, "fiona")`, `INSERT INTO foo(id, name) VALUES(4, "fiona")`, `INSERT INTO foo(id, name) VALUES(5, "fiona")`, } for i := range queries { _, err := s0.Execute(executeRequestFromString(queries[i], false, false)) if err != nil { t.Fatalf("failed to execute on single node: %s", err.Error()) } } // Wait for the snapshot to happen and log to be truncated. f := func() bool { return stats.Get(numSnapshots).String() != nSnaps } testPoll(t, f, 100*time.Millisecond, 2*time.Second) // Do one more execute, to ensure there is at least one log not snapshot. // Without this, there is no guarantee fsmIndex will be set on s1. _, err := s0.Execute(executeRequestFromString(`INSERT INTO foo(id, name) VALUES(6, "fiona")`, false, false)) if err != nil { t.Fatalf("failed to execute on single node: %s", err.Error()) } // Fire up new node and ensure it picks up all changes. This will // involve getting a snapshot and truncated log. s1, ln1 := mustNewStore(t) defer ln1.Close() if err := s1.Open(); err != nil { t.Fatalf("failed to open single-node store: %s", err.Error()) } defer s1.Close(true) // Join the second node to the first. if err := s0.Join(joinRequest(s1.ID(), s1.Addr(), true)); err != nil { t.Fatalf("failed to join to node at %s: %s", s0.Addr(), err.Error()) } if _, err := s1.WaitForLeader(10 * time.Second); err != nil { t.Fatalf("Error waiting for leader: %s", err) } // Wait until the log entries have been applied to the follower, // and then query. if err := s1.WaitForAppliedIndex(8, 5*time.Second); err != nil { t.Fatalf("error waiting for follower to apply index: %s:", err.Error()) } qr := queryRequestFromString("SELECT count(*) FROM foo", false, true) qr.Level = proto.QueryRequest_QUERY_REQUEST_LEVEL_NONE r, err := s1.Query(qr) if err != nil { t.Fatalf("failed to query single node: %s", err.Error()) } if exp, got := `["count(*)"]`, asJSON(r[0].Columns); exp != got { t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got) } if exp, got := `[[6]]`, asJSON(r[0].Values); exp != got { t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got) } }