1
0
Fork 0

Merge pull request #1635 from rqlite/queries-only-do-local

Always Execute ExecuteRequests without Raft if possible
master
Philip O'Toole 8 months ago committed by GitHub
commit fb23ff0dc2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -1,5 +1,6 @@
## 8.18.2 (unreleased)
### Implementation changes and bug fixes
- [PR #1635](https://github.com/rqlite/rqlite/pull/1635): Always execute ExecuteRequests locally if possible.
- [PR #1636](https://github.com/rqlite/rqlite/pull/1636): Move to explicit choice of SQLite Checkpointing mode.
- [PR #1637](https://github.com/rqlite/rqlite/pull/1637): Remove unneeded SetFullNeeded post WAL checkpoint failure.

@ -165,7 +165,6 @@ func main() {
if err != nil {
log.Fatalf("failed to start HTTP server: %s", err.Error())
}
log.Printf("HTTP server started")
// Now, open store. How long this takes does depend on how much data is being stored by rqlite.
if err := str.Open(); err != nil {

@ -1156,24 +1156,42 @@ func (s *Store) Request(eqr *proto.ExecuteQueryRequest) ([]*proto.ExecuteQueryRe
return nil, ErrNotOpen
}
if !s.RequiresLeader(eqr) {
if eqr.Level == proto.QueryRequest_QUERY_REQUEST_LEVEL_NONE && eqr.Freshness > 0 &&
time.Since(s.raft.LastContact()).Nanoseconds() > eqr.Freshness {
return nil, ErrStaleRead
}
if s.QueriesOnly(eqr) {
if eqr.Request.Transaction {
// Transaction requested during query, but not going through consensus. This means
// we need to block any database serialization during the query.
s.queryTxMu.RLock()
defer s.queryTxMu.RUnlock()
}
return s.db.Request(eqr.Request, eqr.Timings)
convertFn := func(qr []*proto.QueryRows) []*proto.ExecuteQueryResponse {
resp := make([]*proto.ExecuteQueryResponse, len(qr))
for i := range qr {
resp[i] = &proto.ExecuteQueryResponse{
Result: &proto.ExecuteQueryResponse_Q{Q: qr[i]},
}
}
return resp
}
if eqr.Level == proto.QueryRequest_QUERY_REQUEST_LEVEL_NONE {
if eqr.Freshness > 0 && time.Since(s.raft.LastContact()).Nanoseconds() > eqr.Freshness {
return nil, ErrStaleRead
}
qr, err := s.db.Query(eqr.Request, eqr.Timings)
return convertFn(qr), err
} else if eqr.Level == proto.QueryRequest_QUERY_REQUEST_LEVEL_WEAK {
if s.raft.State() != raft.Leader {
return nil, ErrNotLeader
}
qr, err := s.db.Query(eqr.Request, eqr.Timings)
return convertFn(qr), err
}
}
if s.raft.State() != raft.Leader {
return nil, ErrNotLeader
}
if !s.Ready() {
return nil, ErrNotReady
}
@ -1637,24 +1655,20 @@ func (s *Store) Noop(id string) (raft.ApplyFuture, error) {
return s.raft.Apply(bc, s.ApplyTimeout), nil
}
// RequiresLeader returns whether the given ExecuteQueryRequest must be
// processed on the cluster Leader.
func (s *Store) RequiresLeader(eqr *proto.ExecuteQueryRequest) bool {
if eqr.Level != proto.QueryRequest_QUERY_REQUEST_LEVEL_NONE {
return true
}
// QueriesOnly returns whether the given ExecuteQueryRequest contains only
// queries.
func (s *Store) QueriesOnly(eqr *proto.ExecuteQueryRequest) bool {
for _, stmt := range eqr.Request.Statements {
sql := stmt.Sql
if sql == "" {
continue
}
ro, err := s.db.StmtReadOnly(sql)
if !ro || err != nil {
return true
if err != nil || !ro {
return false
}
}
return false
return true
}
// setLogInfo records some key indexes about the log.

@ -2436,7 +2436,7 @@ func Test_IsVoter(t *testing.T) {
}
}
func Test_RequiresLeader(t *testing.T) {
func Test_QueriesOnly(t *testing.T) {
s, ln := mustNewStore(t)
defer ln.Close()
@ -2459,95 +2459,62 @@ func Test_RequiresLeader(t *testing.T) {
}
tests := []struct {
name string
stmts []string
lvl proto.QueryRequest_Level
requires bool
name string
stmts []string
queriesOnly bool
}{
{
name: "Empty SQL",
stmts: []string{""},
requires: false,
},
{
name: "Junk SQL",
stmts: []string{"asdkflj asgkdj"},
requires: true,
},
{
name: "CREATE TABLE statement, already exists",
stmts: []string{"CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)"},
requires: true,
},
{
name: "CREATE TABLE statement, does not exists",
stmts: []string{"CREATE TABLE bar (id INTEGER NOT NULL PRIMARY KEY, name TEXT)"},
requires: true,
},
{
name: "Single INSERT",
stmts: []string{"INSERT INTO foo(id, name) VALUES(1, 'fiona')"},
requires: true,
},
{
name: "Single INSERT, non-existent table",
stmts: []string{"INSERT INTO qux(id, name) VALUES(1, 'fiona')"},
requires: true,
name: "Empty SQL",
stmts: []string{""},
queriesOnly: true,
},
{
name: "Single SELECT with implicit NONE",
stmts: []string{"SELECT * FROM foo"},
requires: false,
name: "Junk SQL",
stmts: []string{"asdkflj asgkdj"},
queriesOnly: false,
},
{
name: "Single SELECT with NONE",
stmts: []string{"SELECT * FROM foo"},
lvl: proto.QueryRequest_QUERY_REQUEST_LEVEL_NONE,
requires: false,
name: "CREATE TABLE statement, already exists",
stmts: []string{"CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)"},
queriesOnly: false,
},
{
name: "Single SELECT from non-existent table with NONE",
stmts: []string{"SELECT * FROM qux"},
lvl: proto.QueryRequest_QUERY_REQUEST_LEVEL_NONE,
requires: true,
name: "Single INSERT",
stmts: []string{"INSERT INTO foo(id, name) VALUES(1, 'fiona')"},
queriesOnly: false,
},
{
name: "Double SELECT with NONE",
stmts: []string{"SELECT * FROM foo", "SELECT * FROM foo WHERE id = 1"},
lvl: proto.QueryRequest_QUERY_REQUEST_LEVEL_NONE,
requires: false,
name: "Single INSERT, non-existent table",
stmts: []string{"INSERT INTO qux(id, name) VALUES(1, 'fiona')"},
queriesOnly: false,
},
{
name: "Single SELECT with STRONG",
stmts: []string{"SELECT * FROM foo"},
lvl: proto.QueryRequest_QUERY_REQUEST_LEVEL_STRONG,
requires: true,
name: "Single SELECT",
stmts: []string{"SELECT * FROM foo"},
queriesOnly: true,
},
{
name: "Single SELECT with WEAK",
stmts: []string{"SELECT * FROM foo"},
lvl: proto.QueryRequest_QUERY_REQUEST_LEVEL_WEAK,
requires: true,
name: "Single SELECT from non-existent table",
stmts: []string{"SELECT * FROM qux"},
queriesOnly: false,
},
{
name: "Mix queries and executes with NONE",
stmts: []string{"SELECT * FROM foo", "INSERT INTO foo(id, name) VALUES(1, 'fiona')"},
lvl: proto.QueryRequest_QUERY_REQUEST_LEVEL_NONE,
requires: true,
name: "Double SELECT",
stmts: []string{"SELECT * FROM foo", "SELECT * FROM foo WHERE id = 1"},
queriesOnly: true,
},
{
name: "Mix queries and executes with WEAK",
stmts: []string{"SELECT * FROM foo", "INSERT INTO foo(id, name) VALUES(1, 'fiona')"},
lvl: proto.QueryRequest_QUERY_REQUEST_LEVEL_WEAK,
requires: true,
name: "Mix queries and executes",
stmts: []string{"SELECT * FROM foo", "INSERT INTO foo(id, name) VALUES(1, 'fiona')"},
queriesOnly: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
requires := s.RequiresLeader(executeQueryRequestFromStrings(tt.stmts, tt.lvl, false, false))
if requires != tt.requires {
t.Fatalf(" test %s failed, unexpected requires: expected %v, got %v", tt.name, tt.requires, requires)
requires := s.QueriesOnly(executeQueryRequestFromStrings(tt.stmts, proto.QueryRequest_QUERY_REQUEST_LEVEL_NONE, false, false))
if requires != tt.queriesOnly {
t.Fatalf(" test %s failed, unexpected requires: expected %v, got %v", tt.name, tt.queriesOnly, requires)
}
})
}

Loading…
Cancel
Save