diff --git a/CHANGELOG.md b/CHANGELOG.md index 0147f1c9..f13c9607 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +## 8.21.0 (unreleased) +### New features +- [PR #1689](https://github.com/rqlite/rqlite/pull/1689): `/readyz` can wait for Commit Index. + ## 8.20.3 (Feburary 17th 2024) ### Implementation changes and bug fixes - [PR #1690](https://github.com/rqlite/rqlite/pull/1690): Check for `isTextType` in panic-proof way. diff --git a/http/query_params.go b/http/query_params.go index 5fbd938a..5079b220 100644 --- a/http/query_params.go +++ b/http/query_params.go @@ -181,6 +181,11 @@ func (qp QueryParams) FreshnessStrict() bool { return qp.HasKey("freshness_strict") } +// Sync returns whether the sync flag is set. +func (qp QueryParams) Sync() bool { + return qp.HasKey("sync") +} + // Timeout returns the requested timeout duration. func (qp QueryParams) Timeout(def time.Duration) time.Duration { t, ok := qp["timeout"] diff --git a/http/query_params_test.go b/http/query_params_test.go index 7883ee0d..79e48c16 100644 --- a/http/query_params_test.go +++ b/http/query_params_test.go @@ -33,6 +33,7 @@ func Test_NewQueryParams(t *testing.T) { {"Invalid URL Encoding", "invalid=%ZZ", nil, true}, {"freshness_strict", "&freshness=5s&freshness_strict", QueryParams{"freshness_strict": "", "freshness": "5s"}, false}, {"freshness_strict requires freshness", "freshness_strict", nil, true}, + {"sync with timeout", "sync&timeout=2s", QueryParams{"sync": "", "timeout": "2s"}, false}, } for _, tc := range testCases { diff --git a/http/service.go b/http/service.go index 3be6d0ff..183dc5fc 100644 --- a/http/service.go +++ b/http/service.go @@ -76,6 +76,10 @@ type Store interface { // Ready returns whether the Store is ready to service requests. Ready() bool + // Committed blocks until the local commit index is greater than or + // equal to the Leader index, as checked when the function is called. + Committed(timeout time.Duration) (uint64, error) + // Stats returns stats on the Store. Stats() (map[string]interface{}, error) @@ -1003,8 +1007,17 @@ func (s *Service) handleReadyz(w http.ResponseWriter, r *http.Request, qp QueryP return } + okMsg := "[+]node ok\n[+]leader ok\n[+]store ok" + if qp.Sync() { + if _, err := s.store.Committed(qp.Timeout(defaultTimeout)); err != nil { + w.WriteHeader(http.StatusServiceUnavailable) + w.Write([]byte(fmt.Sprintf("[+]node ok\n[+]leader ok\n[+]store ok\n[+]sync %s", err.Error()))) + return + } + okMsg += "\n[+]commit ok" + } w.WriteHeader(http.StatusOK) - w.Write([]byte("[+]node ok\n[+]leader ok\n[+]store ok")) + w.Write([]byte(okMsg)) } func (s *Service) handleExecute(w http.ResponseWriter, r *http.Request, qp QueryParams) { diff --git a/http/service_test.go b/http/service_test.go index bd5239f5..302e4a61 100644 --- a/http/service_test.go +++ b/http/service_test.go @@ -10,6 +10,7 @@ import ( "net/url" "os" "strings" + "sync/atomic" "testing" "time" @@ -1039,21 +1040,56 @@ func Test_Readyz(t *testing.T) { host := fmt.Sprintf("http://%s", s.Addr().String()) resp, err := client.Get(host + "/readyz") if err != nil { - t.Fatalf("failed to make nodes request") + t.Fatalf("failed to make readyz request") } if resp.StatusCode != http.StatusOK { - t.Fatalf("failed to get expected StatusOK for nodes, got %d", resp.StatusCode) + t.Fatalf("failed to get expected StatusOK for node, got %d", resp.StatusCode) } m.notReady = true + m.committedFn = func(timeout time.Duration) (uint64, error) { + t.Fatal("committedFn should not have been called") + return 0, nil + } resp, err = client.Get(host + "/readyz") if err != nil { - t.Fatalf("failed to make nodes request") + t.Fatalf("failed to make readyz request") } if resp.StatusCode != http.StatusServiceUnavailable { - t.Fatalf("failed to get expected StatusServiceUnavailable for nodes, got %d", resp.StatusCode) + t.Fatalf("failed to get expected StatusServiceUnavailable, got %d", resp.StatusCode) } + cnt := &atomic.Uint32{} + m.notReady = false + m.committedFn = func(timeout time.Duration) (uint64, error) { + cnt.Store(1) + return 0, fmt.Errorf("timeout") + } + resp, err = client.Get(host + "/readyz?sync") + if err != nil { + t.Fatalf("failed to make readyz request with sync set") + } + if resp.StatusCode != http.StatusServiceUnavailable { + t.Fatalf("failed to get expected StatusServiceUnavailable, got %d", resp.StatusCode) + } + if cnt.Load() != 1 { + t.Fatalf("failed to call committedFn") + } + m.notReady = false + m.committedFn = func(timeout time.Duration) (uint64, error) { + cnt.Store(2) + return 0, nil + } + resp, err = client.Get(host + "/readyz?sync") + if err != nil { + t.Fatalf("failed to make readyz request with sync set") + } + if resp.StatusCode != http.StatusOK { + t.Fatalf("failed to get expected StatusOK, got %d", resp.StatusCode) + } + if cnt.Load() != 2 { + t.Fatalf("failed to call committedFn") + } } func Test_ForwardingRedirectQuery(t *testing.T) { @@ -1368,14 +1404,15 @@ func Test_DBTimeoutQueryParam(t *testing.T) { } type MockStore struct { - executeFn func(er *command.ExecuteRequest) ([]*command.ExecuteResult, error) - queryFn func(qr *command.QueryRequest) ([]*command.QueryRows, error) - requestFn func(eqr *command.ExecuteQueryRequest) ([]*command.ExecuteQueryResponse, error) - backupFn func(br *command.BackupRequest, dst io.Writer) error - loadFn func(lr *command.LoadRequest) error - readFromFn func(r io.Reader) (int64, error) - leaderAddr string - notReady bool // Default value is true, easier to test. + executeFn func(er *command.ExecuteRequest) ([]*command.ExecuteResult, error) + queryFn func(qr *command.QueryRequest) ([]*command.QueryRows, error) + requestFn func(eqr *command.ExecuteQueryRequest) ([]*command.ExecuteQueryResponse, error) + backupFn func(br *command.BackupRequest, dst io.Writer) error + loadFn func(lr *command.LoadRequest) error + readFromFn func(r io.Reader) (int64, error) + committedFn func(timeout time.Duration) (uint64, error) + leaderAddr string + notReady bool // Default value is true, easier to test. } func (m *MockStore) Execute(er *command.ExecuteRequest) ([]*command.ExecuteResult, error) { @@ -1419,6 +1456,13 @@ func (m *MockStore) Ready() bool { return !m.notReady } +func (m *MockStore) Committed(timeout time.Duration) (uint64, error) { + if m.committedFn != nil { + return m.committedFn(timeout) + } + return 0, nil +} + func (m *MockStore) Stats() (map[string]interface{}, error) { return nil, nil } diff --git a/store/store.go b/store/store.go index e1d26929..ace7df9c 100644 --- a/store/store.go +++ b/store/store.go @@ -98,6 +98,7 @@ const ( sqliteFile = "db.sqlite" leaderWaitDelay = 100 * time.Millisecond appliedWaitDelay = 100 * time.Millisecond + commitEquivalenceDelay = 50 * time.Millisecond appliedIndexUpdateInterval = 5 * time.Second connectionPoolCount = 5 connectionTimeout = 10 * time.Second @@ -632,6 +633,21 @@ func (s *Store) Ready() bool { }() } +// Committed blocks until the local commit index is greater than or +// equal to the Leader index, as checked when the function is called. +// It returns the committed index. If the Leader index is 0, then the +// system waits until the commit index is at least 1. +func (s *Store) Committed(timeout time.Duration) (uint64, error) { + lci, err := s.LeaderCommitIndex() + if err != nil { + return lci, err + } + if lci == 0 { + lci = 1 + } + return lci, s.WaitForCommitIndex(lci, timeout) +} + // Close closes the store. If wait is true, waits for a graceful shutdown. func (s *Store) Close(wait bool) (retErr error) { defer func() { @@ -712,6 +728,33 @@ func (s *Store) WaitForAppliedIndex(idx uint64, timeout time.Duration) error { } } +// WaitForCommitIndex blocks until the local Raft commit index is equal to +// or greater the given index, or the timeout expires. +func (s *Store) WaitForCommitIndex(idx uint64, timeout time.Duration) error { + tck := time.NewTicker(commitEquivalenceDelay) + defer tck.Stop() + tmr := time.NewTimer(timeout) + defer tmr.Stop() + checkFn := func() bool { + return s.raft.CommitIndex() >= idx + } + + // Try the fast path. + if checkFn() { + return nil + } + for { + select { + case <-tck.C: + if checkFn() { + return nil + } + case <-tmr.C: + return fmt.Errorf("timeout expired") + } + } +} + // DBAppliedIndex returns the index of the last Raft log that changed the // underlying database. If the index is unknown then 0 is returned. func (s *Store) DBAppliedIndex() uint64 { @@ -869,7 +912,6 @@ func (s *Store) WaitForRemoval(id string, timeout time.Duration) error { if check() { return nil } - tck := time.NewTicker(appliedWaitDelay) defer tck.Stop() tmr := time.NewTimer(timeout) @@ -903,7 +945,6 @@ func (s *Store) WaitForLeader(timeout time.Duration) (string, error) { if check() { return leaderAddr, nil } - tck := time.NewTicker(leaderWaitDelay) defer tck.Stop() tmr := time.NewTimer(timeout) @@ -1071,7 +1112,6 @@ func (s *Store) Execute(ex *proto.ExecuteRequest) ([]*proto.ExecuteResult, error if !s.Ready() { return nil, ErrNotReady } - return s.execute(ex) } diff --git a/store/store_multi_test.go b/store/store_multi_test.go index ac84a506..940d5bbb 100644 --- a/store/store_multi_test.go +++ b/store/store_multi_test.go @@ -76,6 +76,13 @@ func Test_MultiNodeSimple(t *testing.T) { t.Fatalf("wrong leader commit index, got: %d, exp: %d", got, exp) } + if err := s0.WaitForCommitIndex(4, time.Second); err != nil { + t.Fatalf("failed to wait for commit index: %s", err.Error()) + } + if err := s0.WaitForCommitIndex(5, 500*time.Millisecond); err == nil { + t.Fatalf("unexpectedly waited successfully for commit index") + } + // Now, do a NONE consistency query on each node, to actually confirm the data // has been replicated. testFn1 := func(t *testing.T, s *Store) { @@ -108,6 +115,13 @@ func Test_MultiNodeSimple(t *testing.T) { t.Fatalf("wrong leader commit index, got: %d, exp: %d", got, exp) } + if err := s1.WaitForCommitIndex(4, time.Second); err != nil { + t.Fatalf("failed to wait for commit index: %s", err.Error()) + } + if err := s1.WaitForCommitIndex(5, 500*time.Millisecond); err == nil { + t.Fatalf("unexpectedly waited successfully for commit index") + } + // Write another row using Request rr := executeQueryRequestFromString("INSERT INTO foo(id, name) VALUES(2, 'fiona')", proto.QueryRequest_QUERY_REQUEST_LEVEL_STRONG, false, false) _, err = s0.Request(rr) diff --git a/system_test/e2e/helpers.py b/system_test/e2e/helpers.py index 119af4f6..d75339c1 100644 --- a/system_test/e2e/helpers.py +++ b/system_test/e2e/helpers.py @@ -281,8 +281,8 @@ class Node(object): raise_for_status(r) return r.json() - def ready(self, noleader=False): - r = requests.get(self._ready_url(noleader)) + def ready(self, noleader=False, sync=False): + r = requests.get(self._ready_url(noleader, sync)) return r.status_code == 200 def expvar(self): @@ -342,10 +342,10 @@ class Node(object): raise Exception('leader is available but node %s at %s reports empty leader addr' % (self.node_id, self.APIAddr())) return lr - def wait_for_ready(self, timeout=TIMEOUT): + def wait_for_ready(self, sync=False, timeout=TIMEOUT): deadline = time.time() + timeout while time.time() < deadline: - if self.ready(): + if self.ready(sync): return time.sleep(0.1) raise Exception('rqlite node failed to become ready within %d seconds' % timeout) @@ -602,10 +602,14 @@ class Node(object): return 'http://' + self.APIAddr() + '/status' def _nodes_url(self): return 'http://' + self.APIAddr() + '/nodes?nonvoters' # Getting all nodes back makes testing easier - def _ready_url(self, noleader=False): + def _ready_url(self, noleader=False, sync=False): + vals = [] nl = "" if noleader: - nl = "?noleader" + vals = vals + ["noleader"] + if sync: + vals = vals + ["sync"] + nl = '?' + '&'.join(vals) return 'http://' + self.APIAddr() + '/readyz' + nl def _expvar_url(self): return 'http://' + self.APIAddr() + '/debug/vars' diff --git a/system_test/e2e/multi_node.py b/system_test/e2e/multi_node.py index 46e4299f..5daa3e0b 100644 --- a/system_test/e2e/multi_node.py +++ b/system_test/e2e/multi_node.py @@ -649,6 +649,7 @@ class TestShutdown(unittest.TestCase): # Check that we have a working single-node cluster with a leader by doing # a write. n1.wait_for_ready() + n1.wait_for_ready(sync=True) j = n1.execute('CREATE TABLE foo (id INTEGER NOT NULL PRIMARY KEY, name TEXT)') self.assertEqual(j, d_("{'results': [{}]}")) diff --git a/system_test/e2e/single_node.py b/system_test/e2e/single_node.py index 58c71cac..b77a1df1 100644 --- a/system_test/e2e/single_node.py +++ b/system_test/e2e/single_node.py @@ -237,6 +237,7 @@ class TestSingleNodeReadyz(unittest.TestCase): self.assertEqual(False, n0.ready()) self.assertEqual(True, n0.ready(noleader=True)) self.assertEqual(False, n0.ready(noleader=False)) + self.assertEqual(False, n0.ready(sync=True)) deprovision_node(n0) class TestEndToEndSnapshotRestoreSingle(unittest.TestCase):