1
0
Fork 0
You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

147 lines
3.8 KiB
Go

package store
import (
"io"
"net"
"sync/atomic"
"time"
"github.com/hashicorp/raft"
"github.com/rqlite/rqlite/v8/store/gzip"
)
// Layer is the interface expected by the Store for network communication
// between nodes, which is used for Raft distributed consensus.
type Layer interface {
net.Listener
Dial(address string, timeout time.Duration) (net.Conn, error)
}
// Transport is the network service provided to Raft, and wraps a Listener.
type Transport struct {
ly Layer
}
// NewTransport returns an initialized Transport.
func NewTransport(ly Layer) *Transport {
return &Transport{
ly: ly,
}
}
// Dial creates a new network connection.
func (t *Transport) Dial(addr raft.ServerAddress, timeout time.Duration) (net.Conn, error) {
return t.ly.Dial(string(addr), timeout)
}
// Accept waits for the next connection.
func (t *Transport) Accept() (net.Conn, error) {
return t.ly.Accept()
}
// Close closes the transport
func (t *Transport) Close() error {
return t.ly.Close()
}
// Addr returns the binding address of the transport.
func (t *Transport) Addr() net.Addr {
return t.ly.Addr()
}
// NodeTransport is a wrapper around the Raft NetworkTransport, which allows
// custom configuration of the InstallSnapshot method.
type NodeTransport struct {
*raft.NetworkTransport
commandCommitIndex *atomic.Uint64
leaderCommitIndex *atomic.Uint64
done chan struct{}
closed bool
}
// NewNodeTransport returns an initialized NodeTransport.
func NewNodeTransport(transport *raft.NetworkTransport) *NodeTransport {
return &NodeTransport{
NetworkTransport: transport,
commandCommitIndex: &atomic.Uint64{},
leaderCommitIndex: &atomic.Uint64{},
done: make(chan struct{}),
}
}
// CommandCommitIndex returns the index of the latest committed log entry
// which is applied to the FSM.
func (n *NodeTransport) CommandCommitIndex() uint64 {
return n.commandCommitIndex.Load()
}
// LeaderCommitIndex returns the index of the latest committed log entry
// which is known to be replicated to the majority of the cluster.
func (n *NodeTransport) LeaderCommitIndex() uint64 {
return n.leaderCommitIndex.Load()
}
// Close closes the transport
func (n *NodeTransport) Close() error {
if n.closed {
return nil
}
n.closed = true
close(n.done)
if n.NetworkTransport == nil {
return nil
}
return n.NetworkTransport.Close()
}
// InstallSnapshot is used to push a snapshot down to a follower. The data is read from
// the ReadCloser and streamed to the client.
func (n *NodeTransport) InstallSnapshot(id raft.ServerID, target raft.ServerAddress, args *raft.InstallSnapshotRequest,
resp *raft.InstallSnapshotResponse, data io.Reader) error {
gzipData, err := gzip.NewCompressor(data, gzip.DefaultBufferSize)
if err != nil {
return err
}
defer gzipData.Close()
return n.NetworkTransport.InstallSnapshot(id, target, args, resp, gzipData)
}
// Consumer returns a channel of RPC requests to be consumed.
func (n *NodeTransport) Consumer() <-chan raft.RPC {
ch := make(chan raft.RPC)
srcCh := n.NetworkTransport.Consumer()
go func() {
for {
select {
case <-n.done:
return
case rpc := <-srcCh:
switch cmd := rpc.Command.(type) {
case *raft.InstallSnapshotRequest:
if rpc.Reader != nil {
rpc.Reader = gzip.NewDecompressor(rpc.Reader)
}
case *raft.AppendEntriesRequest:
for _, e := range cmd.Entries {
if e.Type == raft.LogCommand {
n.commandCommitIndex.Store(e.Index)
}
}
n.leaderCommitIndex.Store(cmd.LeaderCommitIndex)
}
ch <- rpc
}
}
}()
return ch
}
// Stats returns the current stats of the transport.
func (n *NodeTransport) Stats() map[string]interface{} {
return map[string]interface{}{
"command_commit_index": n.CommandCommitIndex(),
"leader_commit_index": n.LeaderCommitIndex(),
}
}