1
0
Fork 0

Track latest committed FSM command

Not all Raft log entries represent changes to the FSM. Some are cluster
config changes, which are never passed to rqlite. This means that
CommitIndex can be ahead of FSM index, even though the FSM is fully
caught up.
master
Philip O'Toole 7 months ago
parent 542d1f6086
commit 7f1977176d

@ -3,6 +3,7 @@ package store
import (
"io"
"net"
"sync/atomic"
"time"
"github.com/hashicorp/raft"
@ -52,18 +53,26 @@ func (t *Transport) Addr() net.Addr {
// custom configuration of the InstallSnapshot method.
type NodeTransport struct {
*raft.NetworkTransport
done chan struct{}
closed bool
commandCommitIndex *atomic.Uint64
done chan struct{}
closed bool
}
// NewNodeTransport returns an initialized NodeTransport.
func NewNodeTransport(transport *raft.NetworkTransport) *NodeTransport {
return &NodeTransport{
NetworkTransport: transport,
done: make(chan struct{}),
NetworkTransport: transport,
commandCommitIndex: &atomic.Uint64{},
done: make(chan struct{}),
}
}
// CommandCommitIndex returns the index of the latest committed log entry
// which is applied to the FSM.
func (n *NodeTransport) CommandCommitIndex() uint64 {
return n.commandCommitIndex.Load()
}
// Close closes the transport
func (n *NodeTransport) Close() error {
if n.closed {
@ -100,8 +109,17 @@ func (n *NodeTransport) Consumer() <-chan raft.RPC {
case <-n.done:
return
case rpc := <-srcCh:
if rpc.Reader != nil {
rpc.Reader = gzip.NewDecompressor(rpc.Reader)
switch cmd := rpc.Command.(type) {
case *raft.InstallSnapshotRequest:
if rpc.Reader != nil {
rpc.Reader = gzip.NewDecompressor(rpc.Reader)
}
case raft.AppendEntriesRequest:
for _, e := range cmd.Entries {
if e.Type == raft.LogCommand {
n.commandCommitIndex.Store(e.Index)
}
}
}
ch <- rpc
}

Loading…
Cancel
Save