1
0
Fork 0

Support leader freshness with None consistency

This change allows the read request to specify the maximum time the node
receiving the request may have last heard from the cluster leader. It
only applies to a read consistency selection of None.
master
Philip O'Toole 5 years ago
parent 002c41fcaf
commit abb7772105

@ -1,15 +1,24 @@
# Read Consistency
Even though serving queries does not require consensus (because the database is not changed), [queries should generally be served by the leader](https://github.com/rqlite/rqlite/issues/5). Why is this? Because, without this check, queries on a node could return results that are significantly out-of-date. This could happen for one, or both, of the following two reasons:
Even though serving queries does not require consensus (because the database is not changed), [queries should generally be served by the leader](https://github.com/rqlite/rqlite/issues/5). Why is this? Because, without this check, queries on a node could return results that are out-of-date. This could happen for one, or both, of the following two reasons:
* The node, while still part of the cluster, has fallen behind the leader in terms of updates to its underlying database.
* The node is no longer part of the cluster, and has stopped receiving Raft log updates.
This is why rqlite offers selectable read consistency levels of _none_, _weak_, and _strong_. Each is explained below.
With _none_, the node simply queries its local SQLite database, and does not even check if it is the leader. This offers the fastest query response, but suffers from the potential issues listed above. _Weak_ instructs the node to check that it is the leader, before querying the local SQLite file. Checking leader state only involves checking local state, so is still very fast. There is, however, a very small window of time (milliseconds by default) during which the node may return stale data. This is because after the leader check, but before the local SQLite database is read, another node could be elected leader and make changes to the cluster. As result the node may not be quite up-to-date with the rest of cluster.
## None
With _none_, the node simply queries its local SQLite database, and does not even check if it is the leader. This offers the fastest query response, but suffers from the potential issues listed above.
To avoid even this last possibility, rqlite also offers _strong_. In this mode, rqlite sends the query through the Raft consensus system, ensuring that the node remains the leader at all times during query processing. However, this will involve the leader contacting at least a quorum of nodes, and will therefore increase query response times.
You can tell the node not return results (effectively) older than a certain time, however. If the read request sets the query parameter `freshess` to a [Go duration string](https://golang.org/pkg/time/#Duration), the node will check that less time has passed since it was last in contact with the leader, than that specified via freshness. `freshness` is ignored for all consistency levels except _none`, and is also ignored if set to zero.
If you decide to deploy [read-only nodes](https://github.com/rqlite/rqlite/blob/master/DOC/READ_ONLY_NODES.md) however, _none_ combined with `freshness` can be quite effective.
## Weak
_Weak_ instructs the node to check that it is the leader, before querying the local SQLite file. Checking leader state only involves checking local state, so is still very fast. There is, however, a very small window of time (milliseconds by default) during which the node may return stale data. This is because after the leader check, but before the local SQLite database is read, another node could be elected leader and make changes to the cluster. As result the node may not be quite up-to-date with the rest of cluster.
## Strong
To avoid even the issues associated with _weak_ consistency, rqlite also offers _strong_. In this mode, rqlite sends the query through the Raft consensus system, ensuring that the node remains the leader at all times during query processing. However, this will involve the leader contacting at least a quorum of nodes, and will therefore increase query response times.
_Weak_ is probably sufficient for most applications, and is the default read consistency level. To explicitly select consistency, set the query param `level` to the desired level.
@ -18,6 +27,7 @@ Examples of enabling each read consistency level for a simple query is shown bel
```bash
curl -G 'localhost:4001/db/query?level=none' --data-urlencode 'q=SELECT * FROM foo'
curl -G 'localhost:4001/db/query?level=none&freshness=1s' --data-urlencode 'q=SELECT * FROM foo'
curl -G 'localhost:4001/db/query?level=weak' --data-urlencode 'q=SELECT * FROM foo'
curl -G 'localhost:4001/db/query' --data-urlencode 'q=SELECT * FROM foo' # Same as weak
curl -G 'localhost:4001/db/query?level=strong' --data-urlencode 'q=SELECT * FROM foo'

@ -642,19 +642,25 @@ func (s *Service) handleQuery(w http.ResponseWriter, r *http.Request) {
isTx, err := isTx(r)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
timings, err := timings(r)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
lvl, err := level(r)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
frsh, err := freshness(r)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
@ -665,7 +671,7 @@ func (s *Service) handleQuery(w http.ResponseWriter, r *http.Request) {
return
}
results, err := s.store.Query(&store.QueryRequest{queries, timings, isTx, lvl})
results, err := s.store.Query(&store.QueryRequest{queries, timings, isTx, lvl, frsh})
if err != nil {
if err == store.ErrNotLeader {
leader := s.leaderAPIAddr()
@ -911,6 +917,21 @@ func level(req *http.Request) (store.ConsistencyLevel, error) {
}
}
// freshness returns any freshness requested with a query.
func freshness(req *http.Request) (time.Duration, error) {
q := req.URL.Query()
f := strings.TrimSpace(q.Get("freshness"))
if f == "" {
return 0, nil
}
d, err := time.ParseDuration(f)
if err != nil {
return 0, err
}
return d, nil
}
// backupFormat returns the request backup format, setting the response header
// accordingly.
func backupFormat(w http.ResponseWriter, r *http.Request) (store.BackupFormat, error) {

@ -29,6 +29,10 @@ var (
// operation.
ErrNotLeader = errors.New("not leader")
// ErrStaleRead is returned if the executing the query would violate the
// requested freshness.
ErrStaleRead = errors.New("stale read")
// ErrOpenTimeout is returned when the Store does not apply its initial
// logs within the specified time.
ErrOpenTimeout = errors.New("timeout waiting for initial logs application")
@ -80,10 +84,11 @@ func init() {
// QueryRequest represents a query that returns rows, and does not modify
// the database.
type QueryRequest struct {
Queries []string
Timings bool
Tx bool
Lvl ConsistencyLevel
Queries []string
Timings bool
Tx bool
Lvl ConsistencyLevel
Freshness time.Duration
}
// ExecuteRequest represents a query that returns now rows, but does modify
@ -565,8 +570,11 @@ func (s *Store) Query(qr *QueryRequest) ([]*sql.Rows, error) {
return nil, ErrNotLeader
}
r, err := s.db.Query(qr.Queries, qr.Tx, qr.Timings)
return r, err
// Read straight from database.
if qr.Freshness > 0 && time.Since(s.raft.LastContact()) > qr.Freshness {
return nil, ErrStaleRead
}
return s.db.Query(qr.Queries, qr.Tx, qr.Timings)
}
// Join joins a node, identified by id and located at addr, to this store.

@ -66,7 +66,7 @@ func Test_SingleNodeInMemExecuteQuery(t *testing.T) {
if err != nil {
t.Fatalf("failed to execute on single node: %s", err.Error())
}
r, err := s.Query(&QueryRequest{[]string{`SELECT * FROM foo`}, false, false, None})
r, err := s.Query(&QueryRequest{[]string{`SELECT * FROM foo`}, false, false, None, 0})
if err != nil {
t.Fatalf("failed to query single node: %s", err.Error())
}
@ -119,15 +119,15 @@ func Test_SingleNodeFileExecuteQuery(t *testing.T) {
if err != nil {
t.Fatalf("failed to execute on single node: %s", err.Error())
}
r, err := s.Query(&QueryRequest{[]string{`SELECT * FROM foo`}, false, false, None})
r, err := s.Query(&QueryRequest{[]string{`SELECT * FROM foo`}, false, false, None, 0})
if err != nil {
t.Fatalf("failed to query single node: %s", err.Error())
}
r, err = s.Query(&QueryRequest{[]string{`SELECT * FROM foo`}, false, false, None})
r, err = s.Query(&QueryRequest{[]string{`SELECT * FROM foo`}, false, false, None, 0})
if err != nil {
t.Fatalf("failed to query single node: %s", err.Error())
}
r, err = s.Query(&QueryRequest{[]string{`SELECT * FROM foo`}, false, false, None})
r, err = s.Query(&QueryRequest{[]string{`SELECT * FROM foo`}, false, false, None, 0})
if err != nil {
t.Fatalf("failed to query single node: %s", err.Error())
}
@ -157,15 +157,15 @@ func Test_SingleNodeExecuteQueryTx(t *testing.T) {
if err != nil {
t.Fatalf("failed to execute on single node: %s", err.Error())
}
r, err := s.Query(&QueryRequest{[]string{`SELECT * FROM foo`}, false, true, None})
r, err := s.Query(&QueryRequest{[]string{`SELECT * FROM foo`}, false, true, None, 0})
if err != nil {
t.Fatalf("failed to query single node: %s", err.Error())
}
r, err = s.Query(&QueryRequest{[]string{`SELECT * FROM foo`}, false, true, Weak})
r, err = s.Query(&QueryRequest{[]string{`SELECT * FROM foo`}, false, true, Weak, 0})
if err != nil {
t.Fatalf("failed to query single node: %s", err.Error())
}
r, err = s.Query(&QueryRequest{[]string{`SELECT * FROM foo`}, false, true, Strong})
r, err = s.Query(&QueryRequest{[]string{`SELECT * FROM foo`}, false, true, Strong, 0})
if err != nil {
t.Fatalf("failed to query single node: %s", err.Error())
}
@ -292,7 +292,7 @@ COMMIT;
}
// Check that data were loaded correctly.
r, err := s.Query(&QueryRequest{[]string{`SELECT * FROM foo`}, false, true, Strong})
r, err := s.Query(&QueryRequest{[]string{`SELECT * FROM foo`}, false, true, Strong, 0})
if err != nil {
t.Fatalf("failed to query single node: %s", err.Error())
}
@ -455,7 +455,7 @@ func Test_SingleNodeLoadChinook(t *testing.T) {
// Check that data were loaded correctly.
r, err := s.Query(&QueryRequest{[]string{`SELECT count(*) FROM track`}, false, true, Strong})
r, err := s.Query(&QueryRequest{[]string{`SELECT count(*) FROM track`}, false, true, Strong, 0})
if err != nil {
t.Fatalf("failed to query single node: %s", err.Error())
}
@ -466,7 +466,7 @@ func Test_SingleNodeLoadChinook(t *testing.T) {
t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
}
r, err = s.Query(&QueryRequest{[]string{`SELECT count(*) FROM album`}, false, true, Strong})
r, err = s.Query(&QueryRequest{[]string{`SELECT count(*) FROM album`}, false, true, Strong, 0})
if err != nil {
t.Fatalf("failed to query single node: %s", err.Error())
}
@ -477,7 +477,7 @@ func Test_SingleNodeLoadChinook(t *testing.T) {
t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
}
r, err = s.Query(&QueryRequest{[]string{`SELECT count(*) FROM artist`}, false, true, Strong})
r, err = s.Query(&QueryRequest{[]string{`SELECT count(*) FROM artist`}, false, true, Strong, 0})
if err != nil {
t.Fatalf("failed to query single node: %s", err.Error())
}
@ -667,7 +667,7 @@ func Test_MultiNodeExecuteQuery(t *testing.T) {
if err != nil {
t.Fatalf("failed to execute on single node: %s", err.Error())
}
r, err := s0.Query(&QueryRequest{[]string{`SELECT * FROM foo`}, false, false, None})
r, err := s0.Query(&QueryRequest{[]string{`SELECT * FROM foo`}, false, false, None, 0})
if err != nil {
t.Fatalf("failed to query leader node: %s", err.Error())
}
@ -683,15 +683,15 @@ func Test_MultiNodeExecuteQuery(t *testing.T) {
if err := s1.WaitForAppliedIndex(3, 5*time.Second); err != nil {
t.Fatalf("error waiting for follower to apply index: %s:", err.Error())
}
r, err = s1.Query(&QueryRequest{[]string{`SELECT * FROM foo`}, false, false, Weak})
r, err = s1.Query(&QueryRequest{[]string{`SELECT * FROM foo`}, false, false, Weak, 0})
if err == nil {
t.Fatalf("successfully queried non-leader node")
}
r, err = s1.Query(&QueryRequest{[]string{`SELECT * FROM foo`}, false, false, Strong})
r, err = s1.Query(&QueryRequest{[]string{`SELECT * FROM foo`}, false, false, Strong, 0})
if err == nil {
t.Fatalf("successfully queried non-leader node")
}
r, err = s1.Query(&QueryRequest{[]string{`SELECT * FROM foo`}, false, false, None})
r, err = s1.Query(&QueryRequest{[]string{`SELECT * FROM foo`}, false, false, None, 0})
if err != nil {
t.Fatalf("failed to query follower node: %s", err.Error())
}
@ -707,15 +707,15 @@ func Test_MultiNodeExecuteQuery(t *testing.T) {
if err := s2.WaitForAppliedIndex(3, 5*time.Second); err != nil {
t.Fatalf("error waiting for follower to apply index: %s:", err.Error())
}
r, err = s2.Query(&QueryRequest{[]string{`SELECT * FROM foo`}, false, false, Weak})
r, err = s2.Query(&QueryRequest{[]string{`SELECT * FROM foo`}, false, false, Weak, 0})
if err == nil {
t.Fatalf("successfully queried non-voting node")
t.Fatalf("successfully queried non-voting node with Weak")
}
r, err = s2.Query(&QueryRequest{[]string{`SELECT * FROM foo`}, false, false, Strong})
r, err = s2.Query(&QueryRequest{[]string{`SELECT * FROM foo`}, false, false, Strong, 0})
if err == nil {
t.Fatalf("successfully queried non-voting node")
t.Fatalf("successfully queried non-voting node with Strong")
}
r, err = s2.Query(&QueryRequest{[]string{`SELECT * FROM foo`}, false, false, None})
r, err = s2.Query(&QueryRequest{[]string{`SELECT * FROM foo`}, false, false, None, 0})
if err != nil {
t.Fatalf("failed to query non-voting node: %s", err.Error())
}
@ -727,6 +727,103 @@ func Test_MultiNodeExecuteQuery(t *testing.T) {
}
}
func Test_MultiNodeExecuteQueryFreshness(t *testing.T) {
s0 := mustNewStore(true)
defer os.RemoveAll(s0.Path())
if err := s0.Open(true); err != nil {
t.Fatalf("failed to open node for multi-node test: %s", err.Error())
}
defer s0.Close(true)
s0.WaitForLeader(10 * time.Second)
s1 := mustNewStore(true)
defer os.RemoveAll(s1.Path())
if err := s1.Open(false); err != nil {
t.Fatalf("failed to open node for multi-node test: %s", err.Error())
}
defer s1.Close(true)
// Join the second node to the first.
if err := s0.Join(s1.ID(), s1.Addr(), true, nil); err != nil {
t.Fatalf("failed to join to node at %s: %s", s0.Addr(), err.Error())
}
queries := []string{
`CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)`,
`INSERT INTO foo(id, name) VALUES(1, "fiona")`,
}
_, err := s0.Execute(&ExecuteRequest{queries, false, false})
if err != nil {
t.Fatalf("failed to execute on single node: %s", err.Error())
}
r, err := s0.Query(&QueryRequest{[]string{`SELECT * FROM foo`}, false, false, None, 0})
if err != nil {
t.Fatalf("failed to query leader node: %s", err.Error())
}
if exp, got := `["id","name"]`, asJSON(r[0].Columns); exp != got {
t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
}
if exp, got := `[[1,"fiona"]]`, asJSON(r[0].Values); exp != got {
t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
}
// Wait until the 3 log entries have been applied to the follower,
// and then query.
if err := s1.WaitForAppliedIndex(3, 5*time.Second); err != nil {
t.Fatalf("error waiting for follower to apply index: %s:", err.Error())
}
// Kill leader.
s0.Close(true)
// "None" consistency queries should still work.
r, err = s1.Query(&QueryRequest{[]string{`SELECT * FROM foo`}, false, false, None, 0})
if err != nil {
t.Fatalf("failed to query follower node: %s", err.Error())
}
if exp, got := `["id","name"]`, asJSON(r[0].Columns); exp != got {
t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
}
if exp, got := `[[1,"fiona"]]`, asJSON(r[0].Values); exp != got {
t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
}
// "None" consistency queries with 1 nanosecond freshness should fail, because at least
// one nanosecond *should* have passed since leader died (surely!).
r, err = s1.Query(&QueryRequest{[]string{`SELECT * FROM foo`}, false, false, None, mustParseDuration("1ns")})
if err == nil {
t.Fatalf("freshness violating query didn't return an error")
}
if err != ErrStaleRead {
t.Fatalf("freshness violating query didn't returned wrong error: %s", err.Error())
}
// Freshness of 0 is ignored.
r, err = s1.Query(&QueryRequest{[]string{`SELECT * FROM foo`}, false, false, None, 0})
if err != nil {
t.Fatalf("failed to query follower node: %s", err.Error())
}
if exp, got := `["id","name"]`, asJSON(r[0].Columns); exp != got {
t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
}
if exp, got := `[[1,"fiona"]]`, asJSON(r[0].Values); exp != got {
t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
}
// "None" consistency queries with 1 hour freshness should pass, because it should
// not be that long since the leader died.
r, err = s1.Query(&QueryRequest{[]string{`SELECT * FROM foo`}, false, false, None, mustParseDuration("1h")})
if err != nil {
t.Fatalf("failed to query follower node: %s", err.Error())
}
if exp, got := `["id","name"]`, asJSON(r[0].Columns); exp != got {
t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
}
if exp, got := `[[1,"fiona"]]`, asJSON(r[0].Values); exp != got {
t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got)
}
}
func Test_StoreLogTruncationMultinode(t *testing.T) {
s0 := mustNewStore(true)
defer os.RemoveAll(s0.Path())
@ -783,7 +880,7 @@ func Test_StoreLogTruncationMultinode(t *testing.T) {
if err := s1.WaitForAppliedIndex(8, 5*time.Second); err != nil {
t.Fatalf("error waiting for follower to apply index: %s:", err.Error())
}
r, err := s1.Query(&QueryRequest{[]string{`SELECT count(*) FROM foo`}, false, true, None})
r, err := s1.Query(&QueryRequest{[]string{`SELECT count(*) FROM foo`}, false, true, None, 0})
if err != nil {
t.Fatalf("failed to query single node: %s", err.Error())
}
@ -813,7 +910,7 @@ func Test_SingleNodeSnapshotOnDisk(t *testing.T) {
if err != nil {
t.Fatalf("failed to execute on single node: %s", err.Error())
}
_, err = s.Query(&QueryRequest{[]string{`SELECT * FROM foo`}, false, false, None})
_, err = s.Query(&QueryRequest{[]string{`SELECT * FROM foo`}, false, false, None, 0})
if err != nil {
t.Fatalf("failed to query single node: %s", err.Error())
}
@ -845,7 +942,7 @@ func Test_SingleNodeSnapshotOnDisk(t *testing.T) {
}
// Ensure database is back in the correct state.
r, err := s.Query(&QueryRequest{[]string{`SELECT * FROM foo`}, false, false, None})
r, err := s.Query(&QueryRequest{[]string{`SELECT * FROM foo`}, false, false, None, 0})
if err != nil {
t.Fatalf("failed to query single node: %s", err.Error())
}
@ -875,7 +972,7 @@ func Test_SingleNodeSnapshotInMem(t *testing.T) {
if err != nil {
t.Fatalf("failed to execute on single node: %s", err.Error())
}
_, err = s.Query(&QueryRequest{[]string{`SELECT * FROM foo`}, false, false, None})
_, err = s.Query(&QueryRequest{[]string{`SELECT * FROM foo`}, false, false, None, 0})
if err != nil {
t.Fatalf("failed to query single node: %s", err.Error())
}
@ -907,7 +1004,7 @@ func Test_SingleNodeSnapshotInMem(t *testing.T) {
}
// Ensure database is back in the correct state.
r, err := s.Query(&QueryRequest{[]string{`SELECT * FROM foo`}, false, false, None})
r, err := s.Query(&QueryRequest{[]string{`SELECT * FROM foo`}, false, false, None, 0})
if err != nil {
t.Fatalf("failed to query single node: %s", err.Error())
}
@ -1075,6 +1172,14 @@ func mustTempDir() string {
return path
}
func mustParseDuration(t string) time.Duration {
d, err := time.ParseDuration(t)
if err != nil {
panic("failed to parse duration")
}
return d
}
func asJSON(v interface{}) string {
b, err := json.Marshal(v)
if err != nil {

@ -145,3 +145,141 @@ func Test_MultiNodeCluster(t *testing.T) {
}
}
}
// Test_MultiNodeClusterWithNonVoter tests formation of a 4-node cluster, one of which is
// a non-voter
func Test_MultiNodeClusterWithNonVoter(t *testing.T) {
node1 := mustNewLeaderNode()
defer node1.Deprovision()
node2 := mustNewNode(false)
defer node2.Deprovision()
if err := node2.Join(node1); err != nil {
t.Fatalf("node failed to join leader: %s", err.Error())
}
_, err := node2.WaitForLeader()
if err != nil {
t.Fatalf("failed waiting for leader: %s", err.Error())
}
// Get the new leader, in case it changed.
c := Cluster{node1, node2}
leader, err := c.Leader()
if err != nil {
t.Fatalf("failed to find cluster leader: %s", err.Error())
}
node3 := mustNewNode(false)
defer node3.Deprovision()
if err := node3.Join(leader); err != nil {
t.Fatalf("node failed to join leader: %s", err.Error())
}
_, err = node3.WaitForLeader()
if err != nil {
t.Fatalf("failed waiting for leader: %s", err.Error())
}
// Get the new leader, in case it changed.
c = Cluster{node1, node2, node3}
leader, err = c.Leader()
if err != nil {
t.Fatalf("failed to find cluster leader: %s", err.Error())
}
nonVoter := mustNewNode(false)
defer nonVoter.Deprovision()
if err := nonVoter.JoinAsNonVoter(leader); err != nil {
t.Fatalf("non-voting node failed to join leader: %s", err.Error())
}
_, err = nonVoter.WaitForLeader()
if err != nil {
t.Fatalf("failed waiting for leader: %s", err.Error())
}
c = Cluster{node1, node2, node3, nonVoter}
// Run queries against cluster.
tests := []struct {
stmt string
expected string
execute bool
}{
{
stmt: `CREATE TABLE foo (id integer not null primary key, name text)`,
expected: `{"results":[{}]}`,
execute: true,
},
{
stmt: `INSERT INTO foo(name) VALUES("fiona")`,
expected: `{"results":[{"last_insert_id":1,"rows_affected":1}]}`,
execute: true,
},
{
stmt: `SELECT * FROM foo`,
expected: `{"results":[{"columns":["id","name"],"types":["integer","text"],"values":[[1,"fiona"]]}]}`,
execute: false,
},
}
for i, tt := range tests {
var r string
var err error
if tt.execute {
r, err = leader.Execute(tt.stmt)
} else {
r, err = leader.Query(tt.stmt)
}
if err != nil {
t.Fatalf(`test %d failed "%s": %s`, i, tt.stmt, err.Error())
}
if r != tt.expected {
t.Fatalf(`test %d received wrong result "%s" got: %s exp: %s`, i, tt.stmt, r, tt.expected)
}
}
// Kill the leader and wait for the new leader.
leader.Deprovision()
c.RemoveNode(leader)
leader, err = c.WaitForNewLeader(leader)
if err != nil {
t.Fatalf("failed to find new cluster leader after killing leader: %s", err.Error())
}
// Run queries against the now 3-node cluster.
tests = []struct {
stmt string
expected string
execute bool
}{
{
stmt: `CREATE TABLE foo (id integer not null primary key, name text)`,
expected: `{"results":[{"error":"table foo already exists"}]}`,
execute: true,
},
{
stmt: `INSERT INTO foo(name) VALUES("sinead")`,
expected: `{"results":[{"last_insert_id":2,"rows_affected":1}]}`,
execute: true,
},
{
stmt: `SELECT * FROM foo`,
expected: `{"results":[{"columns":["id","name"],"types":["integer","text"],"values":[[1,"fiona"],[2,"sinead"]]}]}`,
execute: false,
},
}
for i, tt := range tests {
var r string
var err error
if tt.execute {
r, err = leader.Execute(tt.stmt)
} else {
r, err = leader.Query(tt.stmt)
}
if err != nil {
t.Fatalf(`test %d failed "%s": %s`, i, tt.stmt, err.Error())
}
if r != tt.expected {
t.Fatalf(`test %d received wrong result "%s" got: %s exp: %s`, i, tt.stmt, r, tt.expected)
}
}
}

@ -85,12 +85,25 @@ func (n *Node) QueryMulti(stmts []string) (string, error) {
// Join instructs this node to join the leader.
func (n *Node) Join(leader *Node) error {
resp, err := DoJoinRequest(leader.APIAddr, n.Store.ID(), n.RaftAddr)
resp, err := DoJoinRequest(leader.APIAddr, n.Store.ID(), n.RaftAddr, true)
if err != nil {
return err
}
if resp.StatusCode != 200 {
return fmt.Errorf("failed to join, leader returned: %s", resp.Status)
return fmt.Errorf("failed to join as voter, leader returned: %s", resp.Status)
}
defer resp.Body.Close()
return nil
}
// JoinAsNonVoter instructs this node to join the leader, but as a non-voting node.
func (n *Node) JoinAsNonVoter(leader *Node) error {
resp, err := DoJoinRequest(leader.APIAddr, n.Store.ID(), n.RaftAddr, false)
if err != nil {
return err
}
if resp.StatusCode != 200 {
return fmt.Errorf("failed to join as non-voter, leader returned: %s", resp.Status)
}
defer resp.Body.Close()
return nil
@ -283,8 +296,8 @@ func Remove(n *Node, addr string) error {
}
// DoJoinRequest sends a join request to nodeAddr, for raftID, reachable at raftAddr.
func DoJoinRequest(nodeAddr, raftID, raftAddr string) (*http.Response, error) {
b, err := json.Marshal(map[string]string{"id": raftID, "addr": raftAddr})
func DoJoinRequest(nodeAddr, raftID, raftAddr string, voter bool) (*http.Response, error) {
b, err := json.Marshal(map[string]interface{}{"id": raftID, "addr": raftAddr, "voter": voter})
if err != nil {
return nil, err
}

Loading…
Cancel
Save