1
0
Fork 0

Merge pull request #1689 from rqlite/ready-commit

readyz can wait for commit
master
Philip O'Toole 7 months ago committed by GitHub
commit f641193f37
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -1,3 +1,7 @@
## 8.21.0 (unreleased)
### New features
- [PR #1689](https://github.com/rqlite/rqlite/pull/1689): `/readyz` can wait for Commit Index.
## 8.20.3 (Feburary 17th 2024) ## 8.20.3 (Feburary 17th 2024)
### Implementation changes and bug fixes ### Implementation changes and bug fixes
- [PR #1690](https://github.com/rqlite/rqlite/pull/1690): Check for `isTextType` in panic-proof way. - [PR #1690](https://github.com/rqlite/rqlite/pull/1690): Check for `isTextType` in panic-proof way.

@ -181,6 +181,11 @@ func (qp QueryParams) FreshnessStrict() bool {
return qp.HasKey("freshness_strict") return qp.HasKey("freshness_strict")
} }
// Sync returns whether the sync flag is set.
func (qp QueryParams) Sync() bool {
return qp.HasKey("sync")
}
// Timeout returns the requested timeout duration. // Timeout returns the requested timeout duration.
func (qp QueryParams) Timeout(def time.Duration) time.Duration { func (qp QueryParams) Timeout(def time.Duration) time.Duration {
t, ok := qp["timeout"] t, ok := qp["timeout"]

@ -33,6 +33,7 @@ func Test_NewQueryParams(t *testing.T) {
{"Invalid URL Encoding", "invalid=%ZZ", nil, true}, {"Invalid URL Encoding", "invalid=%ZZ", nil, true},
{"freshness_strict", "&freshness=5s&freshness_strict", QueryParams{"freshness_strict": "", "freshness": "5s"}, false}, {"freshness_strict", "&freshness=5s&freshness_strict", QueryParams{"freshness_strict": "", "freshness": "5s"}, false},
{"freshness_strict requires freshness", "freshness_strict", nil, true}, {"freshness_strict requires freshness", "freshness_strict", nil, true},
{"sync with timeout", "sync&timeout=2s", QueryParams{"sync": "", "timeout": "2s"}, false},
} }
for _, tc := range testCases { for _, tc := range testCases {

@ -76,6 +76,10 @@ type Store interface {
// Ready returns whether the Store is ready to service requests. // Ready returns whether the Store is ready to service requests.
Ready() bool Ready() bool
// Committed blocks until the local commit index is greater than or
// equal to the Leader index, as checked when the function is called.
Committed(timeout time.Duration) (uint64, error)
// Stats returns stats on the Store. // Stats returns stats on the Store.
Stats() (map[string]interface{}, error) Stats() (map[string]interface{}, error)
@ -1003,8 +1007,17 @@ func (s *Service) handleReadyz(w http.ResponseWriter, r *http.Request, qp QueryP
return return
} }
okMsg := "[+]node ok\n[+]leader ok\n[+]store ok"
if qp.Sync() {
if _, err := s.store.Committed(qp.Timeout(defaultTimeout)); err != nil {
w.WriteHeader(http.StatusServiceUnavailable)
w.Write([]byte(fmt.Sprintf("[+]node ok\n[+]leader ok\n[+]store ok\n[+]sync %s", err.Error())))
return
}
okMsg += "\n[+]commit ok"
}
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
w.Write([]byte("[+]node ok\n[+]leader ok\n[+]store ok")) w.Write([]byte(okMsg))
} }
func (s *Service) handleExecute(w http.ResponseWriter, r *http.Request, qp QueryParams) { func (s *Service) handleExecute(w http.ResponseWriter, r *http.Request, qp QueryParams) {

@ -10,6 +10,7 @@ import (
"net/url" "net/url"
"os" "os"
"strings" "strings"
"sync/atomic"
"testing" "testing"
"time" "time"
@ -1039,21 +1040,56 @@ func Test_Readyz(t *testing.T) {
host := fmt.Sprintf("http://%s", s.Addr().String()) host := fmt.Sprintf("http://%s", s.Addr().String())
resp, err := client.Get(host + "/readyz") resp, err := client.Get(host + "/readyz")
if err != nil { if err != nil {
t.Fatalf("failed to make nodes request") t.Fatalf("failed to make readyz request")
} }
if resp.StatusCode != http.StatusOK { if resp.StatusCode != http.StatusOK {
t.Fatalf("failed to get expected StatusOK for nodes, got %d", resp.StatusCode) t.Fatalf("failed to get expected StatusOK for node, got %d", resp.StatusCode)
} }
m.notReady = true m.notReady = true
m.committedFn = func(timeout time.Duration) (uint64, error) {
t.Fatal("committedFn should not have been called")
return 0, nil
}
resp, err = client.Get(host + "/readyz") resp, err = client.Get(host + "/readyz")
if err != nil { if err != nil {
t.Fatalf("failed to make nodes request") t.Fatalf("failed to make readyz request")
} }
if resp.StatusCode != http.StatusServiceUnavailable { if resp.StatusCode != http.StatusServiceUnavailable {
t.Fatalf("failed to get expected StatusServiceUnavailable for nodes, got %d", resp.StatusCode) t.Fatalf("failed to get expected StatusServiceUnavailable, got %d", resp.StatusCode)
} }
cnt := &atomic.Uint32{}
m.notReady = false
m.committedFn = func(timeout time.Duration) (uint64, error) {
cnt.Store(1)
return 0, fmt.Errorf("timeout")
}
resp, err = client.Get(host + "/readyz?sync")
if err != nil {
t.Fatalf("failed to make readyz request with sync set")
}
if resp.StatusCode != http.StatusServiceUnavailable {
t.Fatalf("failed to get expected StatusServiceUnavailable, got %d", resp.StatusCode)
}
if cnt.Load() != 1 {
t.Fatalf("failed to call committedFn")
}
m.notReady = false
m.committedFn = func(timeout time.Duration) (uint64, error) {
cnt.Store(2)
return 0, nil
}
resp, err = client.Get(host + "/readyz?sync")
if err != nil {
t.Fatalf("failed to make readyz request with sync set")
}
if resp.StatusCode != http.StatusOK {
t.Fatalf("failed to get expected StatusOK, got %d", resp.StatusCode)
}
if cnt.Load() != 2 {
t.Fatalf("failed to call committedFn")
}
} }
func Test_ForwardingRedirectQuery(t *testing.T) { func Test_ForwardingRedirectQuery(t *testing.T) {
@ -1368,14 +1404,15 @@ func Test_DBTimeoutQueryParam(t *testing.T) {
} }
type MockStore struct { type MockStore struct {
executeFn func(er *command.ExecuteRequest) ([]*command.ExecuteResult, error) executeFn func(er *command.ExecuteRequest) ([]*command.ExecuteResult, error)
queryFn func(qr *command.QueryRequest) ([]*command.QueryRows, error) queryFn func(qr *command.QueryRequest) ([]*command.QueryRows, error)
requestFn func(eqr *command.ExecuteQueryRequest) ([]*command.ExecuteQueryResponse, error) requestFn func(eqr *command.ExecuteQueryRequest) ([]*command.ExecuteQueryResponse, error)
backupFn func(br *command.BackupRequest, dst io.Writer) error backupFn func(br *command.BackupRequest, dst io.Writer) error
loadFn func(lr *command.LoadRequest) error loadFn func(lr *command.LoadRequest) error
readFromFn func(r io.Reader) (int64, error) readFromFn func(r io.Reader) (int64, error)
leaderAddr string committedFn func(timeout time.Duration) (uint64, error)
notReady bool // Default value is true, easier to test. leaderAddr string
notReady bool // Default value is true, easier to test.
} }
func (m *MockStore) Execute(er *command.ExecuteRequest) ([]*command.ExecuteResult, error) { func (m *MockStore) Execute(er *command.ExecuteRequest) ([]*command.ExecuteResult, error) {
@ -1419,6 +1456,13 @@ func (m *MockStore) Ready() bool {
return !m.notReady return !m.notReady
} }
func (m *MockStore) Committed(timeout time.Duration) (uint64, error) {
if m.committedFn != nil {
return m.committedFn(timeout)
}
return 0, nil
}
func (m *MockStore) Stats() (map[string]interface{}, error) { func (m *MockStore) Stats() (map[string]interface{}, error) {
return nil, nil return nil, nil
} }

@ -98,6 +98,7 @@ const (
sqliteFile = "db.sqlite" sqliteFile = "db.sqlite"
leaderWaitDelay = 100 * time.Millisecond leaderWaitDelay = 100 * time.Millisecond
appliedWaitDelay = 100 * time.Millisecond appliedWaitDelay = 100 * time.Millisecond
commitEquivalenceDelay = 50 * time.Millisecond
appliedIndexUpdateInterval = 5 * time.Second appliedIndexUpdateInterval = 5 * time.Second
connectionPoolCount = 5 connectionPoolCount = 5
connectionTimeout = 10 * time.Second connectionTimeout = 10 * time.Second
@ -632,6 +633,21 @@ func (s *Store) Ready() bool {
}() }()
} }
// Committed blocks until the local commit index is greater than or
// equal to the Leader index, as checked when the function is called.
// It returns the committed index. If the Leader index is 0, then the
// system waits until the commit index is at least 1.
func (s *Store) Committed(timeout time.Duration) (uint64, error) {
lci, err := s.LeaderCommitIndex()
if err != nil {
return lci, err
}
if lci == 0 {
lci = 1
}
return lci, s.WaitForCommitIndex(lci, timeout)
}
// Close closes the store. If wait is true, waits for a graceful shutdown. // Close closes the store. If wait is true, waits for a graceful shutdown.
func (s *Store) Close(wait bool) (retErr error) { func (s *Store) Close(wait bool) (retErr error) {
defer func() { defer func() {
@ -712,6 +728,33 @@ func (s *Store) WaitForAppliedIndex(idx uint64, timeout time.Duration) error {
} }
} }
// WaitForCommitIndex blocks until the local Raft commit index is equal to
// or greater the given index, or the timeout expires.
func (s *Store) WaitForCommitIndex(idx uint64, timeout time.Duration) error {
tck := time.NewTicker(commitEquivalenceDelay)
defer tck.Stop()
tmr := time.NewTimer(timeout)
defer tmr.Stop()
checkFn := func() bool {
return s.raft.CommitIndex() >= idx
}
// Try the fast path.
if checkFn() {
return nil
}
for {
select {
case <-tck.C:
if checkFn() {
return nil
}
case <-tmr.C:
return fmt.Errorf("timeout expired")
}
}
}
// DBAppliedIndex returns the index of the last Raft log that changed the // DBAppliedIndex returns the index of the last Raft log that changed the
// underlying database. If the index is unknown then 0 is returned. // underlying database. If the index is unknown then 0 is returned.
func (s *Store) DBAppliedIndex() uint64 { func (s *Store) DBAppliedIndex() uint64 {
@ -869,7 +912,6 @@ func (s *Store) WaitForRemoval(id string, timeout time.Duration) error {
if check() { if check() {
return nil return nil
} }
tck := time.NewTicker(appliedWaitDelay) tck := time.NewTicker(appliedWaitDelay)
defer tck.Stop() defer tck.Stop()
tmr := time.NewTimer(timeout) tmr := time.NewTimer(timeout)
@ -903,7 +945,6 @@ func (s *Store) WaitForLeader(timeout time.Duration) (string, error) {
if check() { if check() {
return leaderAddr, nil return leaderAddr, nil
} }
tck := time.NewTicker(leaderWaitDelay) tck := time.NewTicker(leaderWaitDelay)
defer tck.Stop() defer tck.Stop()
tmr := time.NewTimer(timeout) tmr := time.NewTimer(timeout)
@ -1071,7 +1112,6 @@ func (s *Store) Execute(ex *proto.ExecuteRequest) ([]*proto.ExecuteResult, error
if !s.Ready() { if !s.Ready() {
return nil, ErrNotReady return nil, ErrNotReady
} }
return s.execute(ex) return s.execute(ex)
} }

@ -76,6 +76,13 @@ func Test_MultiNodeSimple(t *testing.T) {
t.Fatalf("wrong leader commit index, got: %d, exp: %d", got, exp) t.Fatalf("wrong leader commit index, got: %d, exp: %d", got, exp)
} }
if err := s0.WaitForCommitIndex(4, time.Second); err != nil {
t.Fatalf("failed to wait for commit index: %s", err.Error())
}
if err := s0.WaitForCommitIndex(5, 500*time.Millisecond); err == nil {
t.Fatalf("unexpectedly waited successfully for commit index")
}
// Now, do a NONE consistency query on each node, to actually confirm the data // Now, do a NONE consistency query on each node, to actually confirm the data
// has been replicated. // has been replicated.
testFn1 := func(t *testing.T, s *Store) { testFn1 := func(t *testing.T, s *Store) {
@ -108,6 +115,13 @@ func Test_MultiNodeSimple(t *testing.T) {
t.Fatalf("wrong leader commit index, got: %d, exp: %d", got, exp) t.Fatalf("wrong leader commit index, got: %d, exp: %d", got, exp)
} }
if err := s1.WaitForCommitIndex(4, time.Second); err != nil {
t.Fatalf("failed to wait for commit index: %s", err.Error())
}
if err := s1.WaitForCommitIndex(5, 500*time.Millisecond); err == nil {
t.Fatalf("unexpectedly waited successfully for commit index")
}
// Write another row using Request // Write another row using Request
rr := executeQueryRequestFromString("INSERT INTO foo(id, name) VALUES(2, 'fiona')", proto.QueryRequest_QUERY_REQUEST_LEVEL_STRONG, false, false) rr := executeQueryRequestFromString("INSERT INTO foo(id, name) VALUES(2, 'fiona')", proto.QueryRequest_QUERY_REQUEST_LEVEL_STRONG, false, false)
_, err = s0.Request(rr) _, err = s0.Request(rr)

@ -281,8 +281,8 @@ class Node(object):
raise_for_status(r) raise_for_status(r)
return r.json() return r.json()
def ready(self, noleader=False): def ready(self, noleader=False, sync=False):
r = requests.get(self._ready_url(noleader)) r = requests.get(self._ready_url(noleader, sync))
return r.status_code == 200 return r.status_code == 200
def expvar(self): def expvar(self):
@ -342,10 +342,10 @@ class Node(object):
raise Exception('leader is available but node %s at %s reports empty leader addr' % (self.node_id, self.APIAddr())) raise Exception('leader is available but node %s at %s reports empty leader addr' % (self.node_id, self.APIAddr()))
return lr return lr
def wait_for_ready(self, timeout=TIMEOUT): def wait_for_ready(self, sync=False, timeout=TIMEOUT):
deadline = time.time() + timeout deadline = time.time() + timeout
while time.time() < deadline: while time.time() < deadline:
if self.ready(): if self.ready(sync):
return return
time.sleep(0.1) time.sleep(0.1)
raise Exception('rqlite node failed to become ready within %d seconds' % timeout) raise Exception('rqlite node failed to become ready within %d seconds' % timeout)
@ -602,10 +602,14 @@ class Node(object):
return 'http://' + self.APIAddr() + '/status' return 'http://' + self.APIAddr() + '/status'
def _nodes_url(self): def _nodes_url(self):
return 'http://' + self.APIAddr() + '/nodes?nonvoters' # Getting all nodes back makes testing easier return 'http://' + self.APIAddr() + '/nodes?nonvoters' # Getting all nodes back makes testing easier
def _ready_url(self, noleader=False): def _ready_url(self, noleader=False, sync=False):
vals = []
nl = "" nl = ""
if noleader: if noleader:
nl = "?noleader" vals = vals + ["noleader"]
if sync:
vals = vals + ["sync"]
nl = '?' + '&'.join(vals)
return 'http://' + self.APIAddr() + '/readyz' + nl return 'http://' + self.APIAddr() + '/readyz' + nl
def _expvar_url(self): def _expvar_url(self):
return 'http://' + self.APIAddr() + '/debug/vars' return 'http://' + self.APIAddr() + '/debug/vars'

@ -649,6 +649,7 @@ class TestShutdown(unittest.TestCase):
# Check that we have a working single-node cluster with a leader by doing # Check that we have a working single-node cluster with a leader by doing
# a write. # a write.
n1.wait_for_ready() n1.wait_for_ready()
n1.wait_for_ready(sync=True)
j = n1.execute('CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)') j = n1.execute('CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)')
self.assertEqual(j, d_("{'results': [{}]}")) self.assertEqual(j, d_("{'results': [{}]}"))

@ -237,6 +237,7 @@ class TestSingleNodeReadyz(unittest.TestCase):
self.assertEqual(False, n0.ready()) self.assertEqual(False, n0.ready())
self.assertEqual(True, n0.ready(noleader=True)) self.assertEqual(True, n0.ready(noleader=True))
self.assertEqual(False, n0.ready(noleader=False)) self.assertEqual(False, n0.ready(noleader=False))
self.assertEqual(False, n0.ready(sync=True))
deprovision_node(n0) deprovision_node(n0)
class TestEndToEndSnapshotRestoreSingle(unittest.TestCase): class TestEndToEndSnapshotRestoreSingle(unittest.TestCase):

Loading…
Cancel
Save