1
0
Fork 0

Expose Leader Commit Index

master
Philip O'Toole 7 months ago
parent bffd47ce46
commit 71ed3c9657

@ -2,6 +2,7 @@
### Implementation changes and bug fixes
- [PR #1685](https://github.com/rqlite/rqlite/pull/1685): Rename a Proto (but not its fields).
- [PR #1686](https://github.com/rqlite/rqlite/pull/1686): Node returns _Meta_, not just Address.
- [PR #1688](https://github.com/rqlite/rqlite/pull/1688): Expose Leader Commit Index, as read from latest AppendEntries RPC.
## 8.20.1 (February 13th 2024)
### Implementation changes and bug fixes

@ -819,6 +819,16 @@ func (s *Store) CommitIndex() (uint64, error) {
return s.raft.CommitIndex(), nil
}
// LeaderCommitIndex returns the Raft leader commit index, as indicated
// by the latest AppendEntries RPC. If this node is the Leader then the
// commit index is returned directly from the Raft object.
func (s *Store) LeaderCommitIndex() (uint64, error) {
if s.raft.State() == raft.Leader {
return s.raft.CommitIndex(), nil
}
return s.raftTn.LeaderCommitIndex(), nil
}
// Nodes returns the slice of nodes in the cluster, sorted by ID ascending.
func (s *Store) Nodes() ([]*Server, error) {
if !s.open {

@ -61,6 +61,21 @@ func Test_MultiNodeSimple(t *testing.T) {
return s0.DBAppliedIndex() == s1.DBAppliedIndex()
}, 250*time.Millisecond, 3*time.Second)
ci, err := s0.CommitIndex()
if err != nil {
t.Fatalf("failed to retrieve commit index: %s", err.Error())
}
if exp, got := uint64(4), ci; exp != got {
t.Fatalf("wrong commit index, got: %d, exp: %d", got, exp)
}
lci, err := s0.LeaderCommitIndex()
if err != nil {
t.Fatalf("failed to retrieve commit index: %s", err.Error())
}
if exp, got := uint64(4), lci; exp != got {
t.Fatalf("wrong leader commit index, got: %d, exp: %d", got, exp)
}
// Now, do a NONE consistency query on each node, to actually confirm the data
// has been replicated.
testFn1 := func(t *testing.T, s *Store) {
@ -78,6 +93,21 @@ func Test_MultiNodeSimple(t *testing.T) {
testFn1(t, s0)
testFn1(t, s1)
ci, err = s1.CommitIndex()
if err != nil {
t.Fatalf("failed to retrieve commit index: %s", err.Error())
}
if exp, got := uint64(4), ci; exp != got {
t.Fatalf("wrong commit index, got: %d, exp: %d", got, exp)
}
lci, err = s1.LeaderCommitIndex()
if err != nil {
t.Fatalf("failed to retrieve commit index: %s", err.Error())
}
if exp, got := uint64(4), lci; exp != got {
t.Fatalf("wrong leader commit index, got: %d, exp: %d", got, exp)
}
// Write another row using Request
rr := executeQueryRequestFromString("INSERT INTO foo(id, name) VALUES(2, 'fiona')", proto.QueryRequest_QUERY_REQUEST_LEVEL_STRONG, false, false)
_, err = s0.Request(rr)

@ -90,6 +90,13 @@ func Test_SingleNodeOnDiskSQLitePath(t *testing.T) {
if exp, got := uint64(3), ci; exp != got {
t.Fatalf("wrong commit index, got: %d, exp: %d", got, exp)
}
lci, err := s.LeaderCommitIndex()
if err != nil {
t.Fatalf("failed to retrieve commit index: %s", err.Error())
}
if exp, got := uint64(3), lci; exp != got {
t.Fatalf("wrong leader commit index, got: %d, exp: %d", got, exp)
}
qr := queryRequestFromString("SELECT * FROM foo", false, false)
qr.Level = proto.QueryRequest_QUERY_REQUEST_LEVEL_NONE

@ -54,6 +54,7 @@ func (t *Transport) Addr() net.Addr {
type NodeTransport struct {
*raft.NetworkTransport
commandCommitIndex *atomic.Uint64
leaderCommitIndex *atomic.Uint64
done chan struct{}
closed bool
}
@ -63,6 +64,7 @@ func NewNodeTransport(transport *raft.NetworkTransport) *NodeTransport {
return &NodeTransport{
NetworkTransport: transport,
commandCommitIndex: &atomic.Uint64{},
leaderCommitIndex: &atomic.Uint64{},
done: make(chan struct{}),
}
}
@ -73,6 +75,12 @@ func (n *NodeTransport) CommandCommitIndex() uint64 {
return n.commandCommitIndex.Load()
}
// LeaderCommitIndex returns the index of the latest committed log entry
// which is known to be replicated to the majority of the cluster.
func (n *NodeTransport) LeaderCommitIndex() uint64 {
return n.leaderCommitIndex.Load()
}
// Close closes the transport
func (n *NodeTransport) Close() error {
if n.closed {
@ -120,6 +128,7 @@ func (n *NodeTransport) Consumer() <-chan raft.RPC {
n.commandCommitIndex.Store(e.Index)
}
}
n.leaderCommitIndex.Store(cmd.LeaderCommitIndex)
}
ch <- rpc
}

Loading…
Cancel
Save