diff --git a/cmd/rqbench/http.go b/cmd/rqbench/http.go index 2e734a47..b1b6b759 100644 --- a/cmd/rqbench/http.go +++ b/cmd/rqbench/http.go @@ -73,3 +73,8 @@ func (h *HTTPTester) Once() (time.Duration, error) { return dur, nil } + +// Close closes the tester +func (h *HTTPTester) Close() error { + return nil +} diff --git a/cmd/rqbench/main.go b/cmd/rqbench/main.go index 23446018..d83bec98 100644 --- a/cmd/rqbench/main.go +++ b/cmd/rqbench/main.go @@ -14,6 +14,7 @@ var numReqs int var batchSz int var modPrint int var tx bool +var qw bool var tp string var path string var oneshot string @@ -27,6 +28,7 @@ func init() { flag.IntVar(&batchSz, "b", 1, "Statements per request") flag.IntVar(&modPrint, "m", 0, "Print progress every m requests") flag.BoolVar(&tx, "x", false, "Use explicit transaction per request") + flag.BoolVar(&qw, "q", false, "Use queued writes") flag.StringVar(&tp, "t", "http", "Transport to use") flag.StringVar(&path, "p", "/db/execute", "Endpoint to use") flag.StringVar(&oneshot, "o", "", "One-shot execute statement to preload") @@ -64,7 +66,12 @@ func main() { } } - tester := NewHTTPTester(addr, path) + var tester Tester + tester = NewHTTPTester(addr, path) + if qw { + fmt.Println("using queued write tester") + tester = NewQueuedHTTPTester(addr, path) + } if err := tester.Prepare(stmt, batchSz, tx); err != nil { fmt.Println("failed to prepare test:", err.Error()) os.Exit(1) @@ -83,8 +90,10 @@ func main() { // Tester is the interface test executors must implement. type Tester interface { + fmt.Stringer Prepare(stmt string, bSz int, tx bool) error Once() (time.Duration, error) + Close() error } func run(t Tester, n int) (time.Duration, error) { @@ -101,5 +110,5 @@ func run(t Tester, n int) (time.Duration, error) { fmt.Printf("%d requests completed in %s\n", i, d) } } - return dur, nil + return dur, t.Close() } diff --git a/cmd/rqbench/queued_http.go b/cmd/rqbench/queued_http.go new file mode 100644 index 00000000..3c891a50 --- /dev/null +++ b/cmd/rqbench/queued_http.go @@ -0,0 +1,89 @@ +package main + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "io/ioutil" + "net/http" + "time" +) + +// QueuedHTTPTester represents an HTTP transport tester that uses +// queued writes +type QueuedHTTPTester struct { + client http.Client + url string + waitURL string + br *bytes.Reader +} + +// NewHTTPTester returns an instantiated HTTP tester. +func NewQueuedHTTPTester(addr, path string) *QueuedHTTPTester { + return &QueuedHTTPTester{ + client: http.Client{}, + url: fmt.Sprintf("http://%s%s?queue", addr, path), + waitURL: fmt.Sprintf("http://%s%s?queue&wait", addr, path), + } +} + +// String returns a string representation of the tester. +func (h *QueuedHTTPTester) String() string { + return h.url +} + +// Prepare prepares the tester for execution. +func (h *QueuedHTTPTester) Prepare(stmt string, bSz int, _ bool) error { + s := make([]string, bSz) + for i := 0; i < len(s); i++ { + s[i] = stmt + } + + b, err := json.Marshal(s) + if err != nil { + return err + } + h.br = bytes.NewReader(b) + + return nil +} + +// Once executes a single test request. +func (h *QueuedHTTPTester) Once() (time.Duration, error) { + h.br.Seek(0, io.SeekStart) + + start := time.Now() + resp, err := h.client.Post(h.url, "application/json", h.br) + if err != nil { + return 0, err + } + defer resp.Body.Close() + + _, err = ioutil.ReadAll(resp.Body) + if err != nil { + return 0, err + } + + if resp.StatusCode != http.StatusOK { + return 0, fmt.Errorf("received %s", resp.Status) + } + dur := time.Since(start) + + return dur, nil +} + +// Close closes the tester +func (h *QueuedHTTPTester) Close() error { + start := time.Now() + resp, err := h.client.Post(h.waitURL, "application/json", h.br) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("Close() received %s", resp.Status) + } + fmt.Println("Queued tester wait request took", time.Since(start)) + return nil +}