diff --git a/CHANGELOG.md b/CHANGELOG.md index 1b5d0451..1da5612f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ ## 8.19.1 (unreleased) ### Implementation changes and bug fixes - [PR #1670](https://github.com/rqlite/rqlite/pull/1670): Improve error message when query on remote node fails. +- [PR #1671](https://github.com/rqlite/rqlite/pull/1670): Minor optimizations to Unified Request processing. ## 8.19.0 (February 3rd 2024) This release allows you to set a maximum amount of a time a query will run. If the query does not complete within the set time, an error will be returned. diff --git a/store/store.go b/store/store.go index 8229553a..fb2dfbd5 100644 --- a/store/store.go +++ b/store/store.go @@ -1158,15 +1158,18 @@ func (s *Store) Request(eqr *proto.ExecuteQueryRequest) ([]*proto.ExecuteQueryRe if !s.open { return nil, ErrNotOpen } + nRW, _ := s.RORWCount(eqr) + isLeader := s.raft.State() == raft.Leader - if s.QueriesOnly(eqr) { + if nRW == 0 && eqr.Level != proto.QueryRequest_QUERY_REQUEST_LEVEL_STRONG { + // It's a little faster just to do a Query of the DB if we know there is no need + // for consensus. if eqr.Request.Transaction { // Transaction requested during query, but not going through consensus. This means // we need to block any database serialization during the query. s.queryTxMu.RLock() defer s.queryTxMu.RUnlock() } - convertFn := func(qr []*proto.QueryRows) []*proto.ExecuteQueryResponse { resp := make([]*proto.ExecuteQueryResponse, len(qr)) for i := range qr { @@ -1176,41 +1179,39 @@ func (s *Store) Request(eqr *proto.ExecuteQueryRequest) ([]*proto.ExecuteQueryRe } return resp } - if eqr.Level == proto.QueryRequest_QUERY_REQUEST_LEVEL_NONE { - if s.raft.State() != raft.Leader && eqr.Freshness > 0 && + if !isLeader && eqr.Freshness > 0 && time.Since(s.raft.LastContact()).Nanoseconds() > eqr.Freshness { return nil, ErrStaleRead } - qr, err := s.db.Query(eqr.Request, eqr.Timings) - return convertFn(qr), err } else if eqr.Level == proto.QueryRequest_QUERY_REQUEST_LEVEL_WEAK { - if s.raft.State() != raft.Leader { + if !isLeader { return nil, ErrNotLeader } - qr, err := s.db.Query(eqr.Request, eqr.Timings) - return convertFn(qr), err } + qr, err := s.db.Query(eqr.Request, eqr.Timings) + return convertFn(qr), err } - if s.raft.State() != raft.Leader { + // At least one write in the request, or STRONG consistency requested, so + // we need to go through consensus. Check that we can do that. + if !isLeader { return nil, ErrNotLeader } if !s.Ready() { return nil, ErrNotReady } + // Send the request through consensus. b, compressed, err := s.tryCompress(eqr) if err != nil { return nil, err } - c := &proto.Command{ Type: proto.Command_COMMAND_TYPE_EXECUTE_QUERY, SubCommand: b, Compressed: compressed, } - b, err = command.Marshal(c) if err != nil { return nil, err @@ -1659,20 +1660,22 @@ func (s *Store) Noop(id string) (raft.ApplyFuture, error) { return s.raft.Apply(bc, s.ApplyTimeout), nil } -// QueriesOnly returns whether the given ExecuteQueryRequest contains only -// queries. -func (s *Store) QueriesOnly(eqr *proto.ExecuteQueryRequest) bool { +// RORWCount returns the number of read-only and read-write statements in the +// given ExecuteQueryRequest. +func (s *Store) RORWCount(eqr *proto.ExecuteQueryRequest) (nRW, nRO int) { for _, stmt := range eqr.Request.Statements { sql := stmt.Sql if sql == "" { continue } ro, err := s.db.StmtReadOnly(sql) - if err != nil || !ro { - return false + if err == nil && ro { + nRO++ + } else { + nRW++ } } - return true + return } // setLogInfo records some key indexes about the log. diff --git a/store/store_multi_test.go b/store/store_multi_test.go index 998798fc..0264d8af 100644 --- a/store/store_multi_test.go +++ b/store/store_multi_test.go @@ -12,6 +12,100 @@ import ( "github.com/rqlite/rqlite/v8/command/proto" ) +// Test_MultiNodeSimple tests that a the core operation of a multi-node +// cluster works as expected. That is, with a two node cluster, writes +// actually replicate, and reads are consistent. +func Test_MultiNodeSimple(t *testing.T) { + s0, ln := mustNewStore(t) + defer ln.Close() + + if err := s0.Open(); err != nil { + t.Fatalf("failed to open single-node store: %s", err.Error()) + } + defer s0.Close(true) + if err := s0.Bootstrap(NewServer(s0.ID(), s0.Addr(), true)); err != nil { + t.Fatalf("failed to bootstrap single-node store: %s", err.Error()) + } + if _, err := s0.WaitForLeader(10 * time.Second); err != nil { + t.Fatalf("Error waiting for leader: %s", err) + } + + s1, ln1 := mustNewStore(t) + defer ln1.Close() + + if err := s1.Open(); err != nil { + t.Fatalf("failed to open single-node store: %s", err.Error()) + } + defer s1.Close(true) + + if err := s0.Join(joinRequest(s1.ID(), s1.Addr(), true)); err != nil { + t.Fatalf("failed to join single-node store: %s", err.Error()) + } + if _, err := s1.WaitForLeader(10 * time.Second); err != nil { + t.Fatalf("Error waiting for leader: %s", err) + } + + // Write some data. + er := executeRequestFromStrings([]string{ + `CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)`, + `INSERT INTO foo(id, name) VALUES(1, "fiona")`, + }, false, false) + _, err := s0.Execute(er) + if err != nil { + t.Fatalf("failed to execute on single node: %s", err.Error()) + } + if _, err := s0.WaitForAppliedFSM(5 * time.Second); err != nil { + t.Fatalf("failed to wait for FSM to apply on leader") + } + testPoll(t, func() bool { + return s0.DBAppliedIndex() == s1.DBAppliedIndex() + }, 250*time.Millisecond, 3*time.Second) + + // Now, do a NONE consistency query on each node, to actually confirm the data + // has been replicated. + testFn1 := func(t *testing.T, s *Store) { + t.Helper() + qr := queryRequestFromString("SELECT * FROM foo", false, false) + qr.Level = proto.QueryRequest_QUERY_REQUEST_LEVEL_NONE + r, err := s.Query(qr) + if err != nil { + t.Fatalf("failed to query single node: %s", err.Error()) + } + if exp, got := `[{"columns":["id","name"],"types":["integer","text"],"values":[[1,"fiona"]]}]`, asJSON(r); exp != got { + t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got) + } + } + testFn1(t, s0) + testFn1(t, s1) + + // Write another row using Request + rr := executeQueryRequestFromString("INSERT INTO foo(id, name) VALUES(2, 'fiona')", proto.QueryRequest_QUERY_REQUEST_LEVEL_STRONG, false, false) + _, err = s0.Request(rr) + if err != nil { + t.Fatalf("failed to execute on single node: %s", err.Error()) + } + testPoll(t, func() bool { + return s0.DBAppliedIndex() == s1.DBAppliedIndex() + }, 250*time.Millisecond, 3*time.Second) + + // Now, do a NONE consistency query on each node, to actually confirm the data + // has been replicated. + testFn2 := func(t *testing.T, s *Store) { + t.Helper() + qr := queryRequestFromString("SELECT COUNT(*) FROM foo", false, false) + qr.Level = proto.QueryRequest_QUERY_REQUEST_LEVEL_NONE + r, err := s.Query(qr) + if err != nil { + t.Fatalf("failed to query single node: %s", err.Error()) + } + if exp, got := `[{"columns":["COUNT(*)"],"types":["integer"],"values":[[2]]}]`, asJSON(r); exp != got { + t.Fatalf("unexpected results for query\nexp: %s\ngot: %s", exp, got) + } + } + testFn2(t, s0) + testFn2(t, s1) +} + // Test_MultiNodeSnapshot_ErrorMessage tests that a snapshot fails with a specific // error message when the snapshot is attempted too soon after joining a cluster. // Hashicorp Raft doesn't expose a typed error, so we have to check the error diff --git a/store/store_test.go b/store/store_test.go index a8667a12..eabe3a6b 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -789,7 +789,14 @@ func Test_SingleNodeRequest(t *testing.T) { }, { stmts: []string{ - `SELECT * FROM foo`, + `INSERT INTO foo(id, name) VALUES(1234, "dana")`, + `INSERT INTO foo(id, name) VALUES(5678, "bob")`, + }, + expected: `[{"last_insert_id":1234,"rows_affected":1},{"last_insert_id":5678,"rows_affected":1}]`, + }, + { + stmts: []string{ + `SELECT * FROM foo WHERE name='fiona'`, `INSERT INTO foo(id, name) VALUES(66, "declan")`, }, expected: `[{"columns":["id","name"],"types":["integer","text"],"values":[[1,"fiona"]]},{"last_insert_id":66,"rows_affected":1}]`, @@ -799,7 +806,7 @@ func Test_SingleNodeRequest(t *testing.T) { `INSERT INTO foo(id, name) VALUES(77, "fiona")`, `SELECT COUNT(*) FROM foo`, }, - expected: `[{"last_insert_id":77,"rows_affected":1},{"columns":["COUNT(*)"],"types":["integer"],"values":[[3]]}]`, + expected: `[{"last_insert_id":77,"rows_affected":1},{"columns":["COUNT(*)"],"types":["integer"],"values":[[5]]}]`, }, { stmts: []string{ @@ -2448,7 +2455,7 @@ func Test_IsVoter(t *testing.T) { } } -func Test_QueriesOnly(t *testing.T) { +func Test_RWROCount(t *testing.T) { s, ln := mustNewStore(t) defer ln.Close() @@ -2471,62 +2478,66 @@ func Test_QueriesOnly(t *testing.T) { } tests := []struct { - name string - stmts []string - queriesOnly bool + name string + stmts []string + expRW int + expRO int }{ { - name: "Empty SQL", - stmts: []string{""}, - queriesOnly: true, + name: "Empty SQL", + stmts: []string{""}, }, { - name: "Junk SQL", - stmts: []string{"asdkflj asgkdj"}, - queriesOnly: false, + name: "Junk SQL", + stmts: []string{"asdkflj asgkdj"}, + expRW: 1, }, { - name: "CREATE TABLE statement, already exists", - stmts: []string{"CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)"}, - queriesOnly: false, + name: "CREATE TABLE statement, already exists", + stmts: []string{"CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)"}, + expRW: 1, }, { - name: "Single INSERT", - stmts: []string{"INSERT INTO foo(id, name) VALUES(1, 'fiona')"}, - queriesOnly: false, + name: "Single INSERT", + stmts: []string{"INSERT INTO foo(id, name) VALUES(1, 'fiona')"}, + expRW: 1, }, { - name: "Single INSERT, non-existent table", - stmts: []string{"INSERT INTO qux(id, name) VALUES(1, 'fiona')"}, - queriesOnly: false, + name: "Single INSERT, non-existent table", + stmts: []string{"INSERT INTO qux(id, name) VALUES(1, 'fiona')"}, + expRW: 1, }, { - name: "Single SELECT", - stmts: []string{"SELECT * FROM foo"}, - queriesOnly: true, + name: "Single SELECT", + stmts: []string{"SELECT * FROM foo"}, + expRO: 1, }, { - name: "Single SELECT from non-existent table", - stmts: []string{"SELECT * FROM qux"}, - queriesOnly: false, + name: "Single SELECT from non-existent table", + stmts: []string{"SELECT * FROM qux"}, + expRW: 1, // Yeah, this is unfortunate, but it's how SQLite works. }, { - name: "Double SELECT", - stmts: []string{"SELECT * FROM foo", "SELECT * FROM foo WHERE id = 1"}, - queriesOnly: true, + name: "Double SELECT", + stmts: []string{"SELECT * FROM foo", "SELECT * FROM foo WHERE id = 1"}, + expRO: 2, }, { - name: "Mix queries and executes", - stmts: []string{"SELECT * FROM foo", "INSERT INTO foo(id, name) VALUES(1, 'fiona')"}, - queriesOnly: false, + name: "Mix queries and executes", + stmts: []string{"SELECT * FROM foo", "INSERT INTO foo(id, name) VALUES(1, 'fiona')"}, + expRW: 1, + expRO: 1, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - requires := s.QueriesOnly(executeQueryRequestFromStrings(tt.stmts, proto.QueryRequest_QUERY_REQUEST_LEVEL_NONE, false, false)) - if requires != tt.queriesOnly { - t.Fatalf(" test %s failed, unexpected requires: expected %v, got %v", tt.name, tt.queriesOnly, requires) + rwN, roN := s.RORWCount(executeQueryRequestFromStrings(tt.stmts, proto.QueryRequest_QUERY_REQUEST_LEVEL_NONE, false, false)) + if rwN != tt.expRW { + t.Fatalf("wrong number of RW statements, exp %d, got %d", tt.expRW, rwN) + } + if roN != tt.expRO { + t.Fatalf("wrong number of RO statements, exp %d, got %d", tt.expRO, roN) } }) }