1
0
Fork 0

Support statement-less queue waits

master
Philip O'Toole 2 years ago
parent f58440511f
commit 71b0a5a3bf

@ -1020,24 +1020,26 @@ func (s *Service) queuedExecute(w http.ResponseWriter, r *http.Request) {
}
}
b, err := ioutil.ReadAll(r.Body)
wait, err := isWait(r)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
r.Body.Close()
stmts, err := ParseRequest(b)
b, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
r.Body.Close()
wait, err := isWait(r)
stmts, err := ParseRequest(b)
if err != nil {
if !wait || errors.Is(err, ErrNoStatements) {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
}
timeout, err := timeoutParam(r, defaultTimeout)
if err != nil {
@ -1353,6 +1355,9 @@ func (s *Service) runQueue() {
},
}
for {
// Nil statements are valid, as clients may want to just send
// a "checkpoint" through the queue.
if er.Request.Statements != nil {
_, err = s.store.Execute(er)
if err != nil {
if err == store.ErrNotLeader {
@ -1374,6 +1379,7 @@ func (s *Service) runQueue() {
stats.Add(numRemoteExecutions, 1)
}
}
}
s.seqNumMu.Lock()
s.seqNum = req.SequenceNumber
s.seqNumMu.Unlock()

@ -40,7 +40,6 @@ func mergeQueued(qs []*queuedStatements) *Request {
if o == nil {
o = &Request{
SequenceNumber: qs[i].SequenceNumber,
Statements: make([]*command.Statement, 0),
flushChans: make([]FlushChannel, 0),
}
} else {

@ -14,6 +14,7 @@ var (
testStmtQux = &command.Statement{Sql: "SELECT * FROM qux"}
testStmtsFoo = []*command.Statement{testStmtFoo}
testStmtsBar = []*command.Statement{testStmtBar}
testStmtsNilFoo = []*command.Statement{nil, testStmtFoo}
testStmtsFooBar = []*command.Statement{testStmtFoo, testStmtBar}
testStmtsFooBarFoo = []*command.Statement{testStmtFoo, testStmtBar, testStmtFoo}
flushChan1 = make(FlushChannel)
@ -29,6 +30,19 @@ func Test_MergeQueuedStatements(t *testing.T) {
qs []*queuedStatements
exp *Request
}{
{
qs: []*queuedStatements{
{1, nil, flushChan1},
},
exp: &Request{1, nil, []FlushChannel{flushChan1}},
},
{
qs: []*queuedStatements{
{1, nil, flushChan1},
{2, testStmtsFoo, nil},
},
exp: &Request{2, testStmtsFoo, []FlushChannel{flushChan1}},
},
{
qs: []*queuedStatements{
{1, testStmtsFoo, nil},
@ -172,6 +186,28 @@ func Test_NewQueueWriteBatchSizeDouble(t *testing.T) {
}
}
func Test_NewQueueWriteNilAndOne(t *testing.T) {
q := New(1024, 2, 60*time.Second)
defer q.Close()
if _, err := q.Write(nil, nil); err != nil {
t.Fatalf("failed to write nil: %s", err.Error())
}
if _, err := q.Write(testStmtsFoo, nil); err != nil {
t.Fatalf("failed to write: %s", err.Error())
}
select {
case req := <-q.C:
if exp, got := 2, len(req.Statements); exp != got {
t.Fatalf("received wrong length slice, exp %d, got %d", exp, got)
}
req.Close()
case <-time.NewTimer(5 * time.Second).C:
t.Fatalf("timed out waiting for statement")
}
}
func Test_NewQueueWriteBatchSizeSingleChan(t *testing.T) {
q := New(1024, 1, 60*time.Second)
defer q.Close()
@ -203,6 +239,34 @@ func Test_NewQueueWriteBatchSizeSingleChan(t *testing.T) {
}
}
func Test_NewQueueWriteNilSingleChan(t *testing.T) {
q := New(1024, 1, 60*time.Second)
defer q.Close()
fc := make(FlushChannel)
if _, err := q.Write(nil, fc); err != nil {
t.Fatalf("failed to write nil: %s", err.Error())
}
select {
case req := <-q.C:
if req.Statements != nil {
t.Fatalf("statements slice is not nil")
}
req.Close()
case <-time.NewTimer(5 * time.Second).C:
t.Fatalf("timed out waiting for statement")
}
select {
case <-fc:
// nothing to do.
default:
// Not closed, something is wrong.
t.Fatalf("flush channel not closed")
}
}
func Test_NewQueueWriteBatchSizeMulti(t *testing.T) {
q := New(1024, 5, 60*time.Second)
defer q.Close()

Loading…
Cancel
Save