From 71b0a5a3bf64b208548f7d9356fc73b5a8350ddf Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Mon, 23 May 2022 12:43:28 -0400 Subject: [PATCH] Support statement-less queue waits --- http/service.go | 54 +++++++++++++++++++++----------------- queue/queue.go | 1 - queue/queue_test.go | 64 +++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 94 insertions(+), 25 deletions(-) diff --git a/http/service.go b/http/service.go index 0652b53a..9f4818a1 100644 --- a/http/service.go +++ b/http/service.go @@ -1020,23 +1020,25 @@ func (s *Service) queuedExecute(w http.ResponseWriter, r *http.Request) { } } - b, err := ioutil.ReadAll(r.Body) + wait, err := isWait(r) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } - r.Body.Close() - stmts, err := ParseRequest(b) + b, err := ioutil.ReadAll(r.Body) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } + r.Body.Close() - wait, err := isWait(r) + stmts, err := ParseRequest(b) if err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return + if !wait || errors.Is(err, ErrNoStatements) { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } } timeout, err := timeoutParam(r, defaultTimeout) @@ -1353,25 +1355,29 @@ func (s *Service) runQueue() { }, } for { - _, err = s.store.Execute(er) - if err != nil { - if err == store.ErrNotLeader { - addr, err := s.store.LeaderAddr() - if err != nil || addr == "" { - s.logger.Printf("execute queue can't find leader for sequence number %d", - req.SequenceNumber) - stats.Add(numQueuedExecutionsFailed, 1) - time.Sleep(retryDelay) - continue - } - _, err = s.cluster.Execute(er, addr, defaultTimeout) - if err != nil { - s.logger.Printf("execute queue write failed for sequence number %d: %s", - req.SequenceNumber, err.Error()) - time.Sleep(retryDelay) - continue + // Nil statements are valid, as clients may want to just send + // a "checkpoint" through the queue. + if er.Request.Statements != nil { + _, err = s.store.Execute(er) + if err != nil { + if err == store.ErrNotLeader { + addr, err := s.store.LeaderAddr() + if err != nil || addr == "" { + s.logger.Printf("execute queue can't find leader for sequence number %d", + req.SequenceNumber) + stats.Add(numQueuedExecutionsFailed, 1) + time.Sleep(retryDelay) + continue + } + _, err = s.cluster.Execute(er, addr, defaultTimeout) + if err != nil { + s.logger.Printf("execute queue write failed for sequence number %d: %s", + req.SequenceNumber, err.Error()) + time.Sleep(retryDelay) + continue + } + stats.Add(numRemoteExecutions, 1) } - stats.Add(numRemoteExecutions, 1) } } s.seqNumMu.Lock() diff --git a/queue/queue.go b/queue/queue.go index 3cc92a45..db40394f 100644 --- a/queue/queue.go +++ b/queue/queue.go @@ -40,7 +40,6 @@ func mergeQueued(qs []*queuedStatements) *Request { if o == nil { o = &Request{ SequenceNumber: qs[i].SequenceNumber, - Statements: make([]*command.Statement, 0), flushChans: make([]FlushChannel, 0), } } else { diff --git a/queue/queue_test.go b/queue/queue_test.go index 2fedc232..d612d4c7 100644 --- a/queue/queue_test.go +++ b/queue/queue_test.go @@ -14,6 +14,7 @@ var ( testStmtQux = &command.Statement{Sql: "SELECT * FROM qux"} testStmtsFoo = []*command.Statement{testStmtFoo} testStmtsBar = []*command.Statement{testStmtBar} + testStmtsNilFoo = []*command.Statement{nil, testStmtFoo} testStmtsFooBar = []*command.Statement{testStmtFoo, testStmtBar} testStmtsFooBarFoo = []*command.Statement{testStmtFoo, testStmtBar, testStmtFoo} flushChan1 = make(FlushChannel) @@ -29,6 +30,19 @@ func Test_MergeQueuedStatements(t *testing.T) { qs []*queuedStatements exp *Request }{ + { + qs: []*queuedStatements{ + {1, nil, flushChan1}, + }, + exp: &Request{1, nil, []FlushChannel{flushChan1}}, + }, + { + qs: []*queuedStatements{ + {1, nil, flushChan1}, + {2, testStmtsFoo, nil}, + }, + exp: &Request{2, testStmtsFoo, []FlushChannel{flushChan1}}, + }, { qs: []*queuedStatements{ {1, testStmtsFoo, nil}, @@ -172,6 +186,28 @@ func Test_NewQueueWriteBatchSizeDouble(t *testing.T) { } } +func Test_NewQueueWriteNilAndOne(t *testing.T) { + q := New(1024, 2, 60*time.Second) + defer q.Close() + + if _, err := q.Write(nil, nil); err != nil { + t.Fatalf("failed to write nil: %s", err.Error()) + } + if _, err := q.Write(testStmtsFoo, nil); err != nil { + t.Fatalf("failed to write: %s", err.Error()) + } + + select { + case req := <-q.C: + if exp, got := 2, len(req.Statements); exp != got { + t.Fatalf("received wrong length slice, exp %d, got %d", exp, got) + } + req.Close() + case <-time.NewTimer(5 * time.Second).C: + t.Fatalf("timed out waiting for statement") + } +} + func Test_NewQueueWriteBatchSizeSingleChan(t *testing.T) { q := New(1024, 1, 60*time.Second) defer q.Close() @@ -203,6 +239,34 @@ func Test_NewQueueWriteBatchSizeSingleChan(t *testing.T) { } } +func Test_NewQueueWriteNilSingleChan(t *testing.T) { + q := New(1024, 1, 60*time.Second) + defer q.Close() + + fc := make(FlushChannel) + if _, err := q.Write(nil, fc); err != nil { + t.Fatalf("failed to write nil: %s", err.Error()) + } + + select { + case req := <-q.C: + if req.Statements != nil { + t.Fatalf("statements slice is not nil") + } + req.Close() + case <-time.NewTimer(5 * time.Second).C: + t.Fatalf("timed out waiting for statement") + } + + select { + case <-fc: + // nothing to do. + default: + // Not closed, something is wrong. + t.Fatalf("flush channel not closed") + } +} + func Test_NewQueueWriteBatchSizeMulti(t *testing.T) { q := New(1024, 5, 60*time.Second) defer q.Close()