1
0
Fork 0

rqbench supports queued writes

master
Philip O'Toole 2 years ago
parent a357915a3d
commit 5668464ce5

@ -73,3 +73,8 @@ func (h *HTTPTester) Once() (time.Duration, error) {
return dur, nil
}
// Close closes the tester
func (h *HTTPTester) Close() error {
return nil
}

@ -14,6 +14,7 @@ var numReqs int
var batchSz int
var modPrint int
var tx bool
var qw bool
var tp string
var path string
var oneshot string
@ -27,6 +28,7 @@ func init() {
flag.IntVar(&batchSz, "b", 1, "Statements per request")
flag.IntVar(&modPrint, "m", 0, "Print progress every m requests")
flag.BoolVar(&tx, "x", false, "Use explicit transaction per request")
flag.BoolVar(&qw, "q", false, "Use queued writes")
flag.StringVar(&tp, "t", "http", "Transport to use")
flag.StringVar(&path, "p", "/db/execute", "Endpoint to use")
flag.StringVar(&oneshot, "o", "", "One-shot execute statement to preload")
@ -64,7 +66,12 @@ func main() {
}
}
tester := NewHTTPTester(addr, path)
var tester Tester
tester = NewHTTPTester(addr, path)
if qw {
fmt.Println("using queued write tester")
tester = NewQueuedHTTPTester(addr, path)
}
if err := tester.Prepare(stmt, batchSz, tx); err != nil {
fmt.Println("failed to prepare test:", err.Error())
os.Exit(1)
@ -83,8 +90,10 @@ func main() {
// Tester is the interface test executors must implement.
type Tester interface {
fmt.Stringer
Prepare(stmt string, bSz int, tx bool) error
Once() (time.Duration, error)
Close() error
}
func run(t Tester, n int) (time.Duration, error) {
@ -101,5 +110,5 @@ func run(t Tester, n int) (time.Duration, error) {
fmt.Printf("%d requests completed in %s\n", i, d)
}
}
return dur, nil
return dur, t.Close()
}

@ -0,0 +1,89 @@
package main
import (
"bytes"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"time"
)
// QueuedHTTPTester represents an HTTP transport tester that uses
// queued writes
type QueuedHTTPTester struct {
client http.Client
url string
waitURL string
br *bytes.Reader
}
// NewHTTPTester returns an instantiated HTTP tester.
func NewQueuedHTTPTester(addr, path string) *QueuedHTTPTester {
return &QueuedHTTPTester{
client: http.Client{},
url: fmt.Sprintf("http://%s%s?queue", addr, path),
waitURL: fmt.Sprintf("http://%s%s?queue&wait", addr, path),
}
}
// String returns a string representation of the tester.
func (h *QueuedHTTPTester) String() string {
return h.url
}
// Prepare prepares the tester for execution.
func (h *QueuedHTTPTester) Prepare(stmt string, bSz int, _ bool) error {
s := make([]string, bSz)
for i := 0; i < len(s); i++ {
s[i] = stmt
}
b, err := json.Marshal(s)
if err != nil {
return err
}
h.br = bytes.NewReader(b)
return nil
}
// Once executes a single test request.
func (h *QueuedHTTPTester) Once() (time.Duration, error) {
h.br.Seek(0, io.SeekStart)
start := time.Now()
resp, err := h.client.Post(h.url, "application/json", h.br)
if err != nil {
return 0, err
}
defer resp.Body.Close()
_, err = ioutil.ReadAll(resp.Body)
if err != nil {
return 0, err
}
if resp.StatusCode != http.StatusOK {
return 0, fmt.Errorf("received %s", resp.Status)
}
dur := time.Since(start)
return dur, nil
}
// Close closes the tester
func (h *QueuedHTTPTester) Close() error {
start := time.Now()
resp, err := h.client.Post(h.waitURL, "application/json", h.br)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("Close() received %s", resp.Status)
}
fmt.Println("Queued tester wait request took", time.Since(start))
return nil
}
Loading…
Cancel
Save