1
0
Fork 0

add support for sql query timeout

master
Mauri de Souza Meneguzzo 8 months ago
parent 790874f60c
commit a9ed45b68d

@ -199,6 +199,14 @@ curl -XPOST 'localhost:4001/db/execute?timeout=2m' -H "Content-Type: application
]'
```
### SQL Query Timeout
By default, SQL queries do not have a predefined timeout. You can set a timeout by sending a `sql_timeout` query parameter in the request. This parameter allows you to specify the maximum amount of time to spend executing the query before it is interrupted.
```bash
curl -XPOST 'localhost:4001/db/execute?sql_timeout=2s' -H "Content-Type: application/json" -d '[
["INSERT INTO foo(name, age) VALUES(?, ?)", "fiona", 20]
]'
```
### Disabling Request Forwarding
If you do not wish a Follower to transparently forward a request to a Leader, add `redirect` to the URL as a query parameter. In that case if a Follower receives a request that can only be serviced by the Leader, the Follower will respond with [HTTP 301 Moved Permanently](https://en.wikipedia.org/wiki/HTTP_301) and include the address of the Leader as the `Location` header in the response. It is then up the clients to re-issue the command to the Leader.

@ -375,6 +375,7 @@ type Request struct {
Transaction bool `protobuf:"varint,1,opt,name=transaction,proto3" json:"transaction,omitempty"`
Statements []*Statement `protobuf:"bytes,2,rep,name=statements,proto3" json:"statements,omitempty"`
SqlTimeout int64 `protobuf:"varint,3,opt,name=sqlTimeout,proto3" json:"sqlTimeout,omitempty"`
}
func (x *Request) Reset() {
@ -423,6 +424,13 @@ func (x *Request) GetStatements() []*Statement {
return nil
}
func (x *Request) GetSqlTimeout() int64 {
if x != nil {
return x.SqlTimeout
}
return 0
}
type QueryRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
@ -1401,13 +1409,15 @@ var file_command_proto_rawDesc = []byte{
0x6c, 0x12, 0x32, 0x0a, 0x0a, 0x70, 0x61, 0x72, 0x61, 0x6d, 0x65, 0x74, 0x65, 0x72, 0x73, 0x18,
0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x2e,
0x50, 0x61, 0x72, 0x61, 0x6d, 0x65, 0x74, 0x65, 0x72, 0x52, 0x0a, 0x70, 0x61, 0x72, 0x61, 0x6d,
0x65, 0x74, 0x65, 0x72, 0x73, 0x22, 0x5f, 0x0a, 0x07, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74,
0x65, 0x74, 0x65, 0x72, 0x73, 0x22, 0x7f, 0x0a, 0x07, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74,
0x12, 0x20, 0x0a, 0x0b, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x18,
0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0b, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69,
0x6f, 0x6e, 0x12, 0x32, 0x0a, 0x0a, 0x73, 0x74, 0x61, 0x74, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x73,
0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64,
0x2e, 0x53, 0x74, 0x61, 0x74, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x52, 0x0a, 0x73, 0x74, 0x61, 0x74,
0x65, 0x6d, 0x65, 0x6e, 0x74, 0x73, 0x22, 0x8a, 0x02, 0x0a, 0x0c, 0x51, 0x75, 0x65, 0x72, 0x79,
0x65, 0x6d, 0x65, 0x6e, 0x74, 0x73, 0x12, 0x1e, 0x0a, 0x0a, 0x73, 0x71, 0x6c, 0x54, 0x69, 0x6d,
0x65, 0x6f, 0x75, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0a, 0x73, 0x71, 0x6c, 0x54,
0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x22, 0x8a, 0x02, 0x0a, 0x0c, 0x51, 0x75, 0x65, 0x72, 0x79,
0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x2a, 0x0a, 0x07, 0x72, 0x65, 0x71, 0x75, 0x65,
0x73, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x10, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x61,
0x6e, 0x64, 0x2e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x52, 0x07, 0x72, 0x65, 0x71, 0x75,

@ -23,6 +23,7 @@ message Statement {
message Request {
bool transaction = 1;
repeated Statement statements = 2;
int64 sqlTimeout = 3;
}
message QueryRequest {

@ -533,6 +533,20 @@ func (db *DB) ConnectionPoolStats(sqlDB *sql.DB) *PoolStats {
}
// ExecuteStringStmtWithTimeout executes a single query that modifies the database.
// It also sets a timeout for the query. This is primarily a convenience function.
func (db *DB) ExecuteStringStmtWithTimeout(query string, timeout time.Duration) ([]*command.ExecuteResult, error) {
r := &command.Request{
Statements: []*command.Statement{
{
Sql: query,
},
},
SqlTimeout: int64(timeout),
}
return db.Execute(r, false)
}
// ExecuteStringStmt executes a single query that modifies the database. This is
// primarily a convenience function.
func (db *DB) ExecuteStringStmt(query string) ([]*command.ExecuteResult, error) {
@ -605,7 +619,7 @@ func (db *DB) executeWithConn(req *command.Request, xTime bool, conn *sql.Conn)
continue
}
result, err := db.executeStmtWithConn(stmt, xTime, execer)
result, err := db.executeStmtWithConn(stmt, xTime, execer, time.Duration(req.SqlTimeout))
if err != nil {
if handleError(result, err) {
continue
@ -621,7 +635,7 @@ func (db *DB) executeWithConn(req *command.Request, xTime bool, conn *sql.Conn)
return allResults, err
}
func (db *DB) executeStmtWithConn(stmt *command.Statement, xTime bool, e execer) (*command.ExecuteResult, error) {
func (db *DB) executeStmtWithConn(stmt *command.Statement, xTime bool, e execer, timeout time.Duration) (*command.ExecuteResult, error) {
result := &command.ExecuteResult{}
start := time.Now()
@ -631,7 +645,14 @@ func (db *DB) executeStmtWithConn(stmt *command.Statement, xTime bool, e execer)
return result, nil
}
r, err := e.ExecContext(context.Background(), stmt.Sql, parameters...)
ctx := context.Background()
if timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, timeout)
defer cancel()
}
r, err := e.ExecContext(ctx, stmt.Sql, parameters...)
if err != nil {
result.Error = err.Error()
return result, err
@ -672,6 +693,20 @@ func (db *DB) QueryStringStmt(query string) ([]*command.QueryRows, error) {
return db.Query(r, false)
}
// QueryStringStmtWithTimeout executes a single query that return rows, but don't modify database.
// It also sets a timeout for the query.
func (db *DB) QueryStringStmtWithTimeout(query string, timeout time.Duration) ([]*command.QueryRows, error) {
r := &command.Request{
Statements: []*command.Statement{
{
Sql: query,
},
},
SqlTimeout: int64(timeout),
}
return db.Query(r, false)
}
// Query executes queries that return rows, but don't modify the database.
func (db *DB) Query(req *command.Request, xTime bool) ([]*command.QueryRows, error) {
stats.Add(numQueries, int64(len(req.Statements)))
@ -732,7 +767,7 @@ func (db *DB) queryWithConn(req *command.Request, xTime bool, conn *sql.Conn) ([
continue
}
rows, err = db.queryStmtWithConn(stmt, xTime, queryer)
rows, err = db.queryStmtWithConn(stmt, xTime, queryer, time.Duration(req.SqlTimeout))
if err != nil {
stats.Add(numQueryErrors, 1)
rows = &command.QueryRows{
@ -748,7 +783,7 @@ func (db *DB) queryWithConn(req *command.Request, xTime bool, conn *sql.Conn) ([
return allRows, err
}
func (db *DB) queryStmtWithConn(stmt *command.Statement, xTime bool, q queryer) (*command.QueryRows, error) {
func (db *DB) queryStmtWithConn(stmt *command.Statement, xTime bool, q queryer, timeout time.Duration) (*command.QueryRows, error) {
rows := &command.QueryRows{}
start := time.Now()
@ -759,7 +794,14 @@ func (db *DB) queryStmtWithConn(stmt *command.Statement, xTime bool, q queryer)
return rows, nil
}
rs, err := q.QueryContext(context.Background(), stmt.Sql, parameters...)
ctx := context.Background()
if timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, timeout)
defer cancel()
}
rs, err := q.QueryContext(ctx, stmt.Sql, parameters...)
if err != nil {
stats.Add(numQueryErrors, 1)
rows.Error = err.Error()
@ -889,13 +931,13 @@ func (db *DB) Request(req *command.Request, xTime bool) ([]*command.ExecuteQuery
}
if ro {
rows, opErr := db.queryStmtWithConn(stmt, xTime, queryer)
rows, opErr := db.queryStmtWithConn(stmt, xTime, queryer, time.Duration(req.SqlTimeout))
eqResponse = append(eqResponse, createEQQueryResponse(rows, opErr))
if abortOnError(opErr) {
break
}
} else {
result, opErr := db.executeStmtWithConn(stmt, xTime, execer)
result, opErr := db.executeStmtWithConn(stmt, xTime, execer, time.Duration(req.SqlTimeout))
eqResponse = append(eqResponse, createEQExecuteResponse(result, opErr))
if abortOnError(opErr) {
break

@ -5,12 +5,14 @@ import (
"fmt"
"io"
"os"
"strings"
"sync"
"testing"
"time"
"github.com/rqlite/rqlite/v8/command/encoding"
command "github.com/rqlite/rqlite/v8/command/proto"
"github.com/rqlite/rqlite/v8/random"
)
// Test_OpenNonExistentDatabase tests that opening a non-existent database
@ -986,6 +988,117 @@ func Test_ParallelOperationsInMemory(t *testing.T) {
exWg.Wait()
}
func mustSetupDbForTimeoutTests(t *testing.T, n int) (*DB, string) {
db, path := mustCreateOnDiskDatabase()
req := &command.Request{
Statements: []*command.Statement{
{
Sql: `CREATE TABLE IF NOT EXISTS test_table (
key1 VARCHAR(64) PRIMARY KEY,
key_id VARCHAR(64) NOT NULL,
key2 VARCHAR(64) NOT NULL,
key3 VARCHAR(64) NOT NULL,
key4 VARCHAR(64) NOT NULL,
key5 VARCHAR(64) NOT NULL,
key6 VARCHAR(64) NOT NULL,
data BLOB NOT NULL
);`,
},
},
}
for i := 0; i < n; i++ {
args := []any{
random.String(),
fmt.Sprint(i),
random.String(),
random.String(),
random.String(),
random.String(),
random.String(),
random.String(),
}
req.Statements = append(req.Statements, &command.Statement{
Sql: fmt.Sprintf(`INSERT INTO test_table
(key1, key_id, key2, key3, key4, key5, key6, data)
VALUES
(%q, %q, %q, %q, %q, %q, %q, %q);`, args...),
})
}
_, err := db.Execute(req, false)
if err != nil {
t.Fatalf("failed to insert records: %s", err.Error())
}
return db, path
}
func Test_ExecShouldTimeout(t *testing.T) {
db, path := mustSetupDbForTimeoutTests(t, 1000)
defer db.Close()
defer os.Remove(path)
q := `
INSERT INTO test_table (key1, key_id, key2, key3, key4, key5, key6, data)
SELECT t1.key1 || t2.key1, t1.key_id || t2.key_id, t1.key2 || t2.key2, t1.key3 || t2.key3, t1.key4 || t2.key4, t1.key5 || t2.key5, t1.key6 || t2.key6, t1.data || t2.data
FROM test_table t1 LEFT OUTER JOIN test_table t2`
mustTimeoutExecute(db, q, 1*time.Millisecond)
qr, err := db.QueryStringStmt("SELECT COUNT(*) FROM test_table")
if err != nil {
t.Fatalf("failed to query empty table: %s", err.Error())
}
if exp, got := `[{"columns":["COUNT(*)"],"types":["integer"],"values":[[1000]]}]`, asJSON(qr); exp != got {
t.Fatalf("unexpected results for query, expected %s, got %s", exp, got)
}
}
func Test_QueryShouldTimeout(t *testing.T) {
db, path := mustSetupDbForTimeoutTests(t, 1000)
defer db.Close()
defer os.Remove(path)
q := `SELECT key1, key_id, key2, key3, key4, key5, key6, data
FROM test_table
ORDER BY key2 ASC`
mustTimeoutQuery(db, q, 1*time.Microsecond)
}
// mustTimeoutExecute executes a statement with a given timeout, and panics if the timeout is NOT reached.
func mustTimeoutExecute(db *DB, stmt string, timeout time.Duration) {
r, err := db.ExecuteStringStmtWithTimeout(stmt, timeout)
if err != nil {
panic(fmt.Sprintf("unexpected error: %s", err.Error()))
}
if len(r) != 1 {
panic(fmt.Sprintf("expected one result, got %d", len(r)))
}
res := r[0]
if !strings.Contains(res.Error, "context deadline exceeded") {
panic(fmt.Sprintf("expected context.DeadlineExceeded, got %s", res.Error))
}
}
func mustTimeoutQuery(db *DB, stmt string, timeout time.Duration) {
r, err := db.QueryStringStmtWithTimeout(stmt, timeout)
if err != nil {
panic("unexpected error")
}
if len(r) != 1 {
panic(fmt.Sprintf("expected one result, got %d", len(r)))
}
res := r[0]
if !strings.Contains(res.Error, "context deadline exceeded") {
panic(fmt.Sprintf("expected context.DeadlineExceeded, got %s", res.Error))
}
}
func mustCreateOnDiskDatabase() (*DB, string) {
var err error
f := mustTempFile()

@ -123,6 +123,16 @@ func (qp QueryParams) Key() string {
return qp["key"]
}
// SqlTimeout returns the value of the key named "sql_timeout_seconds".
func (qp QueryParams) SqlTimeout(def time.Duration) time.Duration {
t, ok := qp["sql_timeout"]
if !ok {
return def
}
d, _ := time.ParseDuration(t)
return d
}
// Level returns the requested consistency level.
func (qp QueryParams) Level() command.QueryRequest_Level {
lvl := qp["level"]

@ -1114,6 +1114,7 @@ func (s *Service) execute(w http.ResponseWriter, r *http.Request, qp QueryParams
er := &proto.ExecuteRequest{
Request: &proto.Request{
Transaction: qp.Tx(),
SqlTimeout: int64(qp.SqlTimeout(0)),
Statements: stmts,
},
Timings: qp.Timings(),
@ -1203,6 +1204,7 @@ func (s *Service) handleQuery(w http.ResponseWriter, r *http.Request, qp QueryPa
qr := &proto.QueryRequest{
Request: &proto.Request{
Transaction: qp.Tx(),
SqlTimeout: int64(qp.SqlTimeout(0)),
Statements: queries,
},
Timings: qp.Timings(),

@ -1316,6 +1316,43 @@ func Test_timeoutVersionPrettyQueryParam(t *testing.T) {
}
}
func Test_SQLTimeoutQueryParam(t *testing.T) {
tests := []struct {
url string
want time.Duration
}{
{
url: "http://localhost:4001/execute?sql_timeout=1s",
want: 1 * time.Second,
},
{
url: "http://localhost:4001/execute?sql_timeout=100ms",
want: 100 * time.Millisecond,
},
{
url: "http://localhost:4001/execute?sql_timeout=xyz",
want: 0,
},
}
for i, tt := range tests {
mustURLParse(tt.url)
req, err := http.NewRequest("GET", tt.url, nil)
if err != nil {
t.Fatalf("failed to create request: %s", err)
}
qp, err := NewQueryParams(req)
if err != nil {
t.Fatalf(" unexpectedly failed to parse query params on test %d: %s", i, err)
}
got := qp.SqlTimeout(0)
if got != tt.want {
t.Fatalf("want %d, got %d", tt.want, got)
}
}
}
type MockStore struct {
executeFn func(er *command.ExecuteRequest) ([]*command.ExecuteResult, error)
queryFn func(qr *command.QueryRequest) ([]*command.QueryRows, error)

Loading…
Cancel
Save