1
0
Fork 0

Merge pull request #1667 from rqlite/query-timeout-tweaks

Execute, Query, Request timeout tweaks
master
Philip O'Toole 8 months ago committed by GitHub
commit 9e2a726999
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -1,4 +1,7 @@
## 8.18.8 (unreleased) ## 8.19.0 (unreleased)
### New features
- [PR #1666](https://github.com/rqlite/rqlite/pull/1667), [PR #1667](https://github.com/rqlite/rqlite/pull/1667): Support timing out if query doesn't finish within specified interval. Fixes issue [#1657](https://github.com/rqlite/rqlite/issues/1657). Thanks @mauri870
### Implementation changes and bug fixes ### Implementation changes and bug fixes
- [PR #1665](https://github.com/rqlite/rqlite/pull/1665): Minor improvements to `random` module. - [PR #1665](https://github.com/rqlite/rqlite/pull/1665): Minor improvements to `random` module.

@ -1,7 +1,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT. // Code generated by protoc-gen-go. DO NOT EDIT.
// versions: // versions:
// protoc-gen-go v1.32.0 // protoc-gen-go v1.32.0
// protoc v3.12.4 // protoc v3.6.1
// source: message.proto // source: message.proto
package proto package proto

@ -1,7 +1,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT. // Code generated by protoc-gen-go. DO NOT EDIT.
// versions: // versions:
// protoc-gen-go v1.32.0 // protoc-gen-go v1.32.0
// protoc v3.12.4 // protoc v3.6.1
// source: command.proto // source: command.proto
package proto package proto
@ -375,7 +375,7 @@ type Request struct {
Transaction bool `protobuf:"varint,1,opt,name=transaction,proto3" json:"transaction,omitempty"` Transaction bool `protobuf:"varint,1,opt,name=transaction,proto3" json:"transaction,omitempty"`
Statements []*Statement `protobuf:"bytes,2,rep,name=statements,proto3" json:"statements,omitempty"` Statements []*Statement `protobuf:"bytes,2,rep,name=statements,proto3" json:"statements,omitempty"`
DBTimeout int64 `protobuf:"varint,3,opt,name=DBTimeout,proto3" json:"DBTimeout,omitempty"` DbTimeout int64 `protobuf:"varint,3,opt,name=dbTimeout,proto3" json:"dbTimeout,omitempty"`
} }
func (x *Request) Reset() { func (x *Request) Reset() {
@ -424,9 +424,9 @@ func (x *Request) GetStatements() []*Statement {
return nil return nil
} }
func (x *Request) GetDBTimeout() int64 { func (x *Request) GetDbTimeout() int64 {
if x != nil { if x != nil {
return x.DBTimeout return x.DbTimeout
} }
return 0 return 0
} }
@ -1415,8 +1415,8 @@ var file_command_proto_rawDesc = []byte{
0x6f, 0x6e, 0x12, 0x32, 0x0a, 0x0a, 0x73, 0x74, 0x61, 0x74, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x73, 0x6f, 0x6e, 0x12, 0x32, 0x0a, 0x0a, 0x73, 0x74, 0x61, 0x74, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x73,
0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64,
0x2e, 0x53, 0x74, 0x61, 0x74, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x52, 0x0a, 0x73, 0x74, 0x61, 0x74, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x52, 0x0a, 0x73, 0x74, 0x61, 0x74,
0x65, 0x6d, 0x65, 0x6e, 0x74, 0x73, 0x12, 0x1c, 0x0a, 0x09, 0x44, 0x42, 0x54, 0x69, 0x6d, 0x65, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x73, 0x12, 0x1c, 0x0a, 0x09, 0x64, 0x62, 0x54, 0x69, 0x6d, 0x65,
0x6f, 0x75, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x09, 0x44, 0x42, 0x54, 0x69, 0x6d, 0x6f, 0x75, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x09, 0x64, 0x62, 0x54, 0x69, 0x6d,
0x65, 0x6f, 0x75, 0x74, 0x22, 0x8a, 0x02, 0x0a, 0x0c, 0x51, 0x75, 0x65, 0x72, 0x79, 0x52, 0x65, 0x65, 0x6f, 0x75, 0x74, 0x22, 0x8a, 0x02, 0x0a, 0x0c, 0x51, 0x75, 0x65, 0x72, 0x79, 0x52, 0x65,
0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x2a, 0x0a, 0x07, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x2a, 0x0a, 0x07, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74,
0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x10, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x10, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64,

@ -23,7 +23,7 @@ message Statement {
message Request { message Request {
bool transaction = 1; bool transaction = 1;
repeated Statement statements = 2; repeated Statement statements = 2;
int64 DBTimeout = 3; int64 dbTimeout = 3;
} }
message QueryRequest { message QueryRequest {

@ -542,7 +542,7 @@ func (db *DB) ExecuteStringStmtWithTimeout(query string, timeout time.Duration)
Sql: query, Sql: query,
}, },
}, },
DBTimeout: int64(timeout), DbTimeout: int64(timeout),
} }
return db.Execute(r, false) return db.Execute(r, false)
} }
@ -619,7 +619,7 @@ func (db *DB) executeWithConn(req *command.Request, xTime bool, conn *sql.Conn)
continue continue
} }
result, err := db.executeStmtWithConn(stmt, xTime, execer, time.Duration(req.DBTimeout)) result, err := db.executeStmtWithConn(stmt, xTime, execer, time.Duration(req.DbTimeout))
if err != nil { if err != nil {
if handleError(result, err) { if handleError(result, err) {
continue continue
@ -702,7 +702,7 @@ func (db *DB) QueryStringStmtWithTimeout(query string, timeout time.Duration) ([
Sql: query, Sql: query,
}, },
}, },
DBTimeout: int64(timeout), DbTimeout: int64(timeout),
} }
return db.Query(r, false) return db.Query(r, false)
} }
@ -767,7 +767,7 @@ func (db *DB) queryWithConn(req *command.Request, xTime bool, conn *sql.Conn) ([
continue continue
} }
rows, err = db.queryStmtWithConn(stmt, xTime, queryer, time.Duration(req.DBTimeout)) rows, err = db.queryStmtWithConn(stmt, xTime, queryer, time.Duration(req.DbTimeout))
if err != nil { if err != nil {
stats.Add(numQueryErrors, 1) stats.Add(numQueryErrors, 1)
rows = &command.QueryRows{ rows = &command.QueryRows{
@ -876,6 +876,18 @@ func (db *DB) RequestStringStmts(stmts []string) ([]*command.ExecuteQueryRespons
return db.Request(req, false) return db.Request(req, false)
} }
// RequestStringStmtsWithTimeout processes a request that can contain both executes and queries.
func (db *DB) RequestStringStmtsWithTimeout(stmts []string, timeout time.Duration) ([]*command.ExecuteQueryResponse, error) {
req := &command.Request{}
for _, q := range stmts {
req.Statements = append(req.Statements, &command.Statement{
Sql: q,
})
}
req.DbTimeout = int64(timeout)
return db.Request(req, false)
}
// Request processes a request that can contain both executes and queries. // Request processes a request that can contain both executes and queries.
func (db *DB) Request(req *command.Request, xTime bool) ([]*command.ExecuteQueryResponse, error) { func (db *DB) Request(req *command.Request, xTime bool) ([]*command.ExecuteQueryResponse, error) {
stats.Add(numRequests, int64(len(req.Statements))) stats.Add(numRequests, int64(len(req.Statements)))
@ -931,13 +943,13 @@ func (db *DB) Request(req *command.Request, xTime bool) ([]*command.ExecuteQuery
} }
if ro { if ro {
rows, opErr := db.queryStmtWithConn(stmt, xTime, queryer, time.Duration(req.DBTimeout)) rows, opErr := db.queryStmtWithConn(stmt, xTime, queryer, time.Duration(req.DbTimeout))
eqResponse = append(eqResponse, createEQQueryResponse(rows, opErr)) eqResponse = append(eqResponse, createEQQueryResponse(rows, opErr))
if abortOnError(opErr) { if abortOnError(opErr) {
break break
} }
} else { } else {
result, opErr := db.executeStmtWithConn(stmt, xTime, execer, time.Duration(req.DBTimeout)) result, opErr := db.executeStmtWithConn(stmt, xTime, execer, time.Duration(req.DbTimeout))
eqResponse = append(eqResponse, createEQExecuteResponse(result, opErr)) eqResponse = append(eqResponse, createEQExecuteResponse(result, opErr))
if abortOnError(opErr) { if abortOnError(opErr) {
break break

@ -1044,7 +1044,19 @@ func Test_ExecShouldTimeout(t *testing.T) {
INSERT INTO test_table (key1, key_id, key2, key3, key4, key5, key6, data) INSERT INTO test_table (key1, key_id, key2, key3, key4, key5, key6, data)
SELECT t1.key1 || t2.key1, t1.key_id || t2.key_id, t1.key2 || t2.key2, t1.key3 || t2.key3, t1.key4 || t2.key4, t1.key5 || t2.key5, t1.key6 || t2.key6, t1.data || t2.data SELECT t1.key1 || t2.key1, t1.key_id || t2.key_id, t1.key2 || t2.key2, t1.key3 || t2.key3, t1.key4 || t2.key4, t1.key5 || t2.key5, t1.key6 || t2.key6, t1.data || t2.data
FROM test_table t1 LEFT OUTER JOIN test_table t2` FROM test_table t1 LEFT OUTER JOIN test_table t2`
assertExecTimeoutReached(t, db, q, 1*time.Millisecond) r, err := db.ExecuteStringStmtWithTimeout(q, 1*time.Millisecond)
if err != nil {
t.Fatalf("failed to execute: %s", err.Error())
}
if len(r) != 1 {
t.Fatalf("expected one result, got %d: %s", len(r), asJSON(r))
}
res := r[0]
if !strings.Contains(res.Error, "context deadline exceeded") {
t.Fatalf("expected context.DeadlineExceeded, got %s", res.Error)
}
qr, err := db.QueryStringStmt("SELECT COUNT(*) FROM test_table") qr, err := db.QueryStringStmt("SELECT COUNT(*) FROM test_table")
if err != nil { if err != nil {
@ -1063,13 +1075,9 @@ func Test_QueryShouldTimeout(t *testing.T) {
q := `SELECT key1, key_id, key2, key3, key4, key5, key6, data q := `SELECT key1, key_id, key2, key3, key4, key5, key6, data
FROM test_table FROM test_table
ORDER BY key2 ASC` ORDER BY key2 ASC`
assertQueryTimeoutReached(t, db, q, 1*time.Microsecond) r, err := db.QueryStringStmtWithTimeout(q, 1*time.Microsecond)
}
func assertExecTimeoutReached(t *testing.T, db *DB, stmt string, timeout time.Duration) {
r, err := db.ExecuteStringStmtWithTimeout(stmt, timeout)
if err != nil { if err != nil {
t.Fatalf("failed to execute: %s", err.Error()) t.Fatalf("failed to run query: %s", err.Error())
} }
if len(r) != 1 { if len(r) != 1 {
@ -1082,19 +1090,26 @@ func assertExecTimeoutReached(t *testing.T, db *DB, stmt string, timeout time.Du
} }
} }
func assertQueryTimeoutReached(t *testing.T, db *DB, stmt string, timeout time.Duration) { func Test_RequestShouldTimeout(t *testing.T) {
r, err := db.QueryStringStmtWithTimeout(stmt, timeout) db, path := mustSetupDBForTimeoutTests(t, 1000)
defer db.Close()
defer os.Remove(path)
q := `SELECT key1, key_id, key2, key3, key4, key5, key6, data
FROM test_table
ORDER BY key2 ASC`
res, err := db.RequestStringStmtsWithTimeout([]string{q}, 1*time.Microsecond)
if err != nil { if err != nil {
t.Fatalf("failed to run query: %s", err.Error()) t.Fatalf("failed to run query: %s", err.Error())
} }
if len(r) != 1 { if len(res) != 1 {
t.Fatalf("expected one result, got %d: %s", len(r), asJSON(r)) t.Fatalf("expected one result, got %d: %s", len(res), asJSON(res))
} }
res := r[0] r := res[0]
if !strings.Contains(res.Error, "context deadline exceeded") { if !strings.Contains(r.GetQ().Error, "context deadline exceeded") {
t.Fatalf("expected context.DeadlineExceeded, got %s", res.Error) t.Fatalf("expected context.DeadlineExceeded, got %s", r.GetQ().Error)
} }
} }

@ -1114,7 +1114,7 @@ func (s *Service) execute(w http.ResponseWriter, r *http.Request, qp QueryParams
er := &proto.ExecuteRequest{ er := &proto.ExecuteRequest{
Request: &proto.Request{ Request: &proto.Request{
Transaction: qp.Tx(), Transaction: qp.Tx(),
DBTimeout: int64(qp.DBTimeout(0)), DbTimeout: int64(qp.DBTimeout(0)),
Statements: stmts, Statements: stmts,
}, },
Timings: qp.Timings(), Timings: qp.Timings(),
@ -1204,7 +1204,7 @@ func (s *Service) handleQuery(w http.ResponseWriter, r *http.Request, qp QueryPa
qr := &proto.QueryRequest{ qr := &proto.QueryRequest{
Request: &proto.Request{ Request: &proto.Request{
Transaction: qp.Tx(), Transaction: qp.Tx(),
DBTimeout: int64(qp.DBTimeout(0)), DbTimeout: int64(qp.DBTimeout(0)),
Statements: queries, Statements: queries,
}, },
Timings: qp.Timings(), Timings: qp.Timings(),
@ -1293,6 +1293,7 @@ func (s *Service) handleRequest(w http.ResponseWriter, r *http.Request, qp Query
Request: &proto.Request{ Request: &proto.Request{
Transaction: qp.Tx(), Transaction: qp.Tx(),
Statements: stmts, Statements: stmts,
DbTimeout: int64(qp.DBTimeout(0)),
}, },
Timings: qp.Timings(), Timings: qp.Timings(),
Level: qp.Level(), Level: qp.Level(),

Loading…
Cancel
Save