1
0
Fork 0

Execute ExecuteRequests locally if possible

master
Philip O'Toole 8 months ago
parent 355c4baa06
commit 5261b7058e

@ -1154,24 +1154,30 @@ func (s *Store) Request(eqr *proto.ExecuteQueryRequest) ([]*proto.ExecuteQueryRe
return nil, ErrNotOpen
}
if !s.RequiresLeader(eqr) {
if eqr.Level == proto.QueryRequest_QUERY_REQUEST_LEVEL_NONE && eqr.Freshness > 0 &&
time.Since(s.raft.LastContact()).Nanoseconds() > eqr.Freshness {
return nil, ErrStaleRead
}
if s.QueriesOnly(eqr) {
if eqr.Request.Transaction {
// Transaction requested during query, but not going through consensus. This means
// we need to block any database serialization during the query.
s.queryTxMu.RLock()
defer s.queryTxMu.RUnlock()
}
return s.db.Request(eqr.Request, eqr.Timings)
if eqr.Level == proto.QueryRequest_QUERY_REQUEST_LEVEL_NONE {
if eqr.Freshness > 0 && time.Since(s.raft.LastContact()).Nanoseconds() > eqr.Freshness {
return nil, ErrStaleRead
}
return s.db.Request(eqr.Request, eqr.Timings)
} else if eqr.Level == proto.QueryRequest_QUERY_REQUEST_LEVEL_WEAK {
if s.raft.State() != raft.Leader {
return nil, ErrNotLeader
}
return s.db.Request(eqr.Request, eqr.Timings)
}
}
if s.raft.State() != raft.Leader {
return nil, ErrNotLeader
}
if !s.Ready() {
return nil, ErrNotReady
}
@ -1635,24 +1641,21 @@ func (s *Store) Noop(id string) (raft.ApplyFuture, error) {
return s.raft.Apply(bc, s.ApplyTimeout), nil
}
// RequiresLeader returns whether the given ExecuteQueryRequest must be
// processed on the cluster Leader.
func (s *Store) RequiresLeader(eqr *proto.ExecuteQueryRequest) bool {
if eqr.Level != proto.QueryRequest_QUERY_REQUEST_LEVEL_NONE {
return true
}
// QueriesOnly returns whether the given ExecuteQueryRequest contains only
// queries.
func (s *Store) QueriesOnly(eqr *proto.ExecuteQueryRequest) bool {
for _, stmt := range eqr.Request.Statements {
sql := stmt.Sql
if sql == "" {
continue
}
ro, err := s.db.StmtReadOnly(sql)
if !ro || err != nil {
return true
if ro && err == nil {
continue
}
return false
}
return false
return true
}
// setLogInfo records some key indexes about the log.

@ -2506,7 +2506,7 @@ func Test_IsVoter(t *testing.T) {
}
}
func Test_RequiresLeader(t *testing.T) {
func Test_QueriesOnly(t *testing.T) {
s, ln := mustNewStore(t)
defer ln.Close()
@ -2529,95 +2529,62 @@ func Test_RequiresLeader(t *testing.T) {
}
tests := []struct {
name string
stmts []string
lvl proto.QueryRequest_Level
requires bool
name string
stmts []string
queriesOnly bool
}{
{
name: "Empty SQL",
stmts: []string{""},
requires: false,
},
{
name: "Junk SQL",
stmts: []string{"asdkflj asgkdj"},
requires: true,
},
{
name: "CREATE TABLE statement, already exists",
stmts: []string{"CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)"},
requires: true,
},
{
name: "CREATE TABLE statement, does not exists",
stmts: []string{"CREATE TABLE bar (id INTEGER NOT NULL PRIMARY KEY, name TEXT)"},
requires: true,
},
{
name: "Single INSERT",
stmts: []string{"INSERT INTO foo(id, name) VALUES(1, 'fiona')"},
requires: true,
},
{
name: "Single INSERT, non-existent table",
stmts: []string{"INSERT INTO qux(id, name) VALUES(1, 'fiona')"},
requires: true,
name: "Empty SQL",
stmts: []string{""},
queriesOnly: true,
},
{
name: "Single SELECT with implicit NONE",
stmts: []string{"SELECT * FROM foo"},
requires: false,
name: "Junk SQL",
stmts: []string{"asdkflj asgkdj"},
queriesOnly: false,
},
{
name: "Single SELECT with NONE",
stmts: []string{"SELECT * FROM foo"},
lvl: proto.QueryRequest_QUERY_REQUEST_LEVEL_NONE,
requires: false,
name: "CREATE TABLE statement, already exists",
stmts: []string{"CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)"},
queriesOnly: false,
},
{
name: "Single SELECT from non-existent table with NONE",
stmts: []string{"SELECT * FROM qux"},
lvl: proto.QueryRequest_QUERY_REQUEST_LEVEL_NONE,
requires: true,
name: "Single INSERT",
stmts: []string{"INSERT INTO foo(id, name) VALUES(1, 'fiona')"},
queriesOnly: false,
},
{
name: "Double SELECT with NONE",
stmts: []string{"SELECT * FROM foo", "SELECT * FROM foo WHERE id = 1"},
lvl: proto.QueryRequest_QUERY_REQUEST_LEVEL_NONE,
requires: false,
name: "Single INSERT, non-existent table",
stmts: []string{"INSERT INTO qux(id, name) VALUES(1, 'fiona')"},
queriesOnly: false,
},
{
name: "Single SELECT with STRONG",
stmts: []string{"SELECT * FROM foo"},
lvl: proto.QueryRequest_QUERY_REQUEST_LEVEL_STRONG,
requires: true,
name: "Single SELECT",
stmts: []string{"SELECT * FROM foo"},
queriesOnly: true,
},
{
name: "Single SELECT with WEAK",
stmts: []string{"SELECT * FROM foo"},
lvl: proto.QueryRequest_QUERY_REQUEST_LEVEL_WEAK,
requires: true,
name: "Single SELECT from non-existent table",
stmts: []string{"SELECT * FROM qux"},
queriesOnly: false,
},
{
name: "Mix queries and executes with NONE",
stmts: []string{"SELECT * FROM foo", "INSERT INTO foo(id, name) VALUES(1, 'fiona')"},
lvl: proto.QueryRequest_QUERY_REQUEST_LEVEL_NONE,
requires: true,
name: "Double SELECT",
stmts: []string{"SELECT * FROM foo", "SELECT * FROM foo WHERE id = 1"},
queriesOnly: true,
},
{
name: "Mix queries and executes with WEAK",
stmts: []string{"SELECT * FROM foo", "INSERT INTO foo(id, name) VALUES(1, 'fiona')"},
lvl: proto.QueryRequest_QUERY_REQUEST_LEVEL_WEAK,
requires: true,
name: "Mix queries and executes",
stmts: []string{"SELECT * FROM foo", "INSERT INTO foo(id, name) VALUES(1, 'fiona')"},
queriesOnly: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
requires := s.RequiresLeader(executeQueryRequestFromStrings(tt.stmts, tt.lvl, false, false))
if requires != tt.requires {
t.Fatalf(" test %s failed, unexpected requires: expected %v, got %v", tt.name, tt.requires, requires)
requires := s.QueriesOnly(executeQueryRequestFromStrings(tt.stmts, proto.QueryRequest_QUERY_REQUEST_LEVEL_NONE, false, false))
if requires != tt.queriesOnly {
t.Fatalf(" test %s failed, unexpected requires: expected %v, got %v", tt.name, tt.queriesOnly, requires)
}
})
}

Loading…
Cancel
Save