diff --git a/CHANGELOG.md b/CHANGELOG.md index 1bc05d66..a3eabcc6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ### Implementation changes and bug fixes - [PR #1685](https://github.com/rqlite/rqlite/pull/1685): Rename a Proto (but not its fields). - [PR #1686](https://github.com/rqlite/rqlite/pull/1686): Node returns _Meta_, not just Address. +- [PR #1688](https://github.com/rqlite/rqlite/pull/1688): Expose Leader Commit Index, as read from latest AppendEntries RPC. ## 8.20.1 (February 13th 2024) ### Implementation changes and bug fixes diff --git a/store/store.go b/store/store.go index 40534db3..97c3ec88 100644 --- a/store/store.go +++ b/store/store.go @@ -819,6 +819,16 @@ func (s *Store) CommitIndex() (uint64, error) { return s.raft.CommitIndex(), nil } +// LeaderCommitIndex returns the Raft leader commit index, as indicated +// by the latest AppendEntries RPC. If this node is the Leader then the +// commit index is returned directly from the Raft object. +func (s *Store) LeaderCommitIndex() (uint64, error) { + if s.raft.State() == raft.Leader { + return s.raft.CommitIndex(), nil + } + return s.raftTn.LeaderCommitIndex(), nil +} + // Nodes returns the slice of nodes in the cluster, sorted by ID ascending. func (s *Store) Nodes() ([]*Server, error) { if !s.open { diff --git a/store/store_multi_test.go b/store/store_multi_test.go index 6be265bd..1b5f03af 100644 --- a/store/store_multi_test.go +++ b/store/store_multi_test.go @@ -61,6 +61,21 @@ func Test_MultiNodeSimple(t *testing.T) { return s0.DBAppliedIndex() == s1.DBAppliedIndex() }, 250*time.Millisecond, 3*time.Second) + ci, err := s0.CommitIndex() + if err != nil { + t.Fatalf("failed to retrieve commit index: %s", err.Error()) + } + if exp, got := uint64(4), ci; exp != got { + t.Fatalf("wrong commit index, got: %d, exp: %d", got, exp) + } + lci, err := s0.LeaderCommitIndex() + if err != nil { + t.Fatalf("failed to retrieve commit index: %s", err.Error()) + } + if exp, got := uint64(4), lci; exp != got { + t.Fatalf("wrong leader commit index, got: %d, exp: %d", got, exp) + } + // Now, do a NONE consistency query on each node, to actually confirm the data // has been replicated. testFn1 := func(t *testing.T, s *Store) { @@ -78,6 +93,21 @@ func Test_MultiNodeSimple(t *testing.T) { testFn1(t, s0) testFn1(t, s1) + ci, err = s1.CommitIndex() + if err != nil { + t.Fatalf("failed to retrieve commit index: %s", err.Error()) + } + if exp, got := uint64(4), ci; exp != got { + t.Fatalf("wrong commit index, got: %d, exp: %d", got, exp) + } + lci, err = s1.LeaderCommitIndex() + if err != nil { + t.Fatalf("failed to retrieve commit index: %s", err.Error()) + } + if exp, got := uint64(4), lci; exp != got { + t.Fatalf("wrong leader commit index, got: %d, exp: %d", got, exp) + } + // Write another row using Request rr := executeQueryRequestFromString("INSERT INTO foo(id, name) VALUES(2, 'fiona')", proto.QueryRequest_QUERY_REQUEST_LEVEL_STRONG, false, false) _, err = s0.Request(rr) diff --git a/store/store_test.go b/store/store_test.go index 25ddd62e..5f1d6a6d 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -90,6 +90,13 @@ func Test_SingleNodeOnDiskSQLitePath(t *testing.T) { if exp, got := uint64(3), ci; exp != got { t.Fatalf("wrong commit index, got: %d, exp: %d", got, exp) } + lci, err := s.LeaderCommitIndex() + if err != nil { + t.Fatalf("failed to retrieve commit index: %s", err.Error()) + } + if exp, got := uint64(3), lci; exp != got { + t.Fatalf("wrong leader commit index, got: %d, exp: %d", got, exp) + } qr := queryRequestFromString("SELECT * FROM foo", false, false) qr.Level = proto.QueryRequest_QUERY_REQUEST_LEVEL_NONE diff --git a/store/transport.go b/store/transport.go index d391a64c..4701beba 100644 --- a/store/transport.go +++ b/store/transport.go @@ -54,6 +54,7 @@ func (t *Transport) Addr() net.Addr { type NodeTransport struct { *raft.NetworkTransport commandCommitIndex *atomic.Uint64 + leaderCommitIndex *atomic.Uint64 done chan struct{} closed bool } @@ -63,6 +64,7 @@ func NewNodeTransport(transport *raft.NetworkTransport) *NodeTransport { return &NodeTransport{ NetworkTransport: transport, commandCommitIndex: &atomic.Uint64{}, + leaderCommitIndex: &atomic.Uint64{}, done: make(chan struct{}), } } @@ -73,6 +75,12 @@ func (n *NodeTransport) CommandCommitIndex() uint64 { return n.commandCommitIndex.Load() } +// LeaderCommitIndex returns the index of the latest committed log entry +// which is known to be replicated to the majority of the cluster. +func (n *NodeTransport) LeaderCommitIndex() uint64 { + return n.leaderCommitIndex.Load() +} + // Close closes the transport func (n *NodeTransport) Close() error { if n.closed { @@ -120,6 +128,7 @@ func (n *NodeTransport) Consumer() <-chan raft.RPC { n.commandCommitIndex.Store(e.Index) } } + n.leaderCommitIndex.Store(cmd.LeaderCommitIndex) } ch <- rpc }