1
0
Fork 0

Merge pull request #1518 from rqlite/clearer-layer

Refactor internode networking implementation
master
Philip O'Toole 9 months ago committed by GitHub
commit 042d001e98
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -2,6 +2,7 @@
### Implementation changes and bug fixes ### Implementation changes and bug fixes
- [PR #1515](https://github.com/rqlite/rqlite/pull/1515): Fix a log message related to mutual TLS. - [PR #1515](https://github.com/rqlite/rqlite/pull/1515): Fix a log message related to mutual TLS.
- [PR #1516](https://github.com/rqlite/rqlite/pull/1516): Add support to Python end-to-end test helpers for mTLS. - [PR #1516](https://github.com/rqlite/rqlite/pull/1516): Add support to Python end-to-end test helpers for mTLS.
- [PR #1518](https://github.com/rqlite/rqlite/pull/1518): Refactor muxed internode communications.
## 8.13.2 (December 21st 2023) ## 8.13.2 (December 21st 2023)
### Implementation changes and bug fixes ### Implementation changes and bug fixes

@ -111,16 +111,10 @@ type CredentialStore interface {
AA(username, password, perm string) bool AA(username, password, perm string) bool
} }
// Transport is the interface the network layer must provide.
type Transport interface {
net.Listener
Dialer
}
// Service provides information about the node and cluster. // Service provides information about the node and cluster.
type Service struct { type Service struct {
tn Transport // Network layer this service uses ln net.Listener // Incoming connections to the service
addr net.Addr // Address on which this service is listening addr net.Addr // Address on which this service is listening
db Database // The queryable system. db Database // The queryable system.
mgr Manager // The cluster management system. mgr Manager // The cluster management system.
@ -135,10 +129,10 @@ type Service struct {
} }
// New returns a new instance of the cluster service // New returns a new instance of the cluster service
func New(tn Transport, db Database, m Manager, credentialStore CredentialStore) *Service { func New(ln net.Listener, db Database, m Manager, credentialStore CredentialStore) *Service {
return &Service{ return &Service{
tn: tn, ln: ln,
addr: tn.Addr(), addr: ln.Addr(),
db: db, db: db,
mgr: m, mgr: m,
logger: log.New(os.Stderr, "[cluster] ", log.LstdFlags), logger: log.New(os.Stderr, "[cluster] ", log.LstdFlags),
@ -149,13 +143,13 @@ func New(tn Transport, db Database, m Manager, credentialStore CredentialStore)
// Open opens the Service. // Open opens the Service.
func (s *Service) Open() error { func (s *Service) Open() error {
go s.serve() go s.serve()
s.logger.Println("service listening on", s.tn.Addr()) s.logger.Println("service listening on", s.addr)
return nil return nil
} }
// Close closes the service. // Close closes the service.
func (s *Service) Close() error { func (s *Service) Close() error {
s.tn.Close() s.ln.Close()
return nil return nil
} }
@ -211,7 +205,7 @@ func (s *Service) Stats() (map[string]interface{}, error) {
func (s *Service) serve() error { func (s *Service) serve() error {
for { for {
conn, err := s.tn.Accept() conn, err := s.ln.Accept()
if err != nil { if err != nil {
return err return err
} }

@ -91,8 +91,15 @@ func main() {
if err != nil { if err != nil {
log.Fatalf("failed to start node mux: %s", err.Error()) log.Fatalf("failed to start node mux: %s", err.Error())
} }
raftTn := mux.Listen(cluster.MuxRaftHeader)
// Raft internode layer
raftLn := mux.Listen(cluster.MuxRaftHeader)
log.Printf("Raft TCP mux Listener registered with byte header %d", cluster.MuxRaftHeader) log.Printf("Raft TCP mux Listener registered with byte header %d", cluster.MuxRaftHeader)
raftDialer, err := createRaftDialer(cfg)
if err != nil {
log.Fatalf("failed to create Raft dialer: %s", err.Error())
}
raftTn := tcp.NewLayer(raftLn, raftDialer)
// Create the store. // Create the store.
str, err := createStore(cfg, raftTn) str, err := createStore(cfg, raftTn)
@ -424,8 +431,8 @@ func credentialStore(cfg *Config) (*auth.CredentialsStore, error) {
return auth.NewCredentialsStoreFromFile(cfg.AuthFile) return auth.NewCredentialsStoreFromFile(cfg.AuthFile)
} }
func clusterService(cfg *Config, tn cluster.Transport, db cluster.Database, mgr cluster.Manager, credStr *auth.CredentialsStore) (*cluster.Service, error) { func clusterService(cfg *Config, ln net.Listener, db cluster.Database, mgr cluster.Manager, credStr *auth.CredentialsStore) (*cluster.Service, error) {
c := cluster.New(tn, db, mgr, credStr) c := cluster.New(ln, db, mgr, credStr)
c.SetAPIAddr(cfg.HTTPAdv) c.SetAPIAddr(cfg.HTTPAdv)
c.EnableHTTPS(cfg.HTTPx509Cert != "" && cfg.HTTPx509Key != "") // Conditions met for an HTTPS API c.EnableHTTPS(cfg.HTTPx509Cert != "" && cfg.HTTPx509Key != "") // Conditions met for an HTTPS API
if err := c.Open(); err != nil { if err := c.Open(); err != nil {
@ -452,6 +459,19 @@ func createClusterClient(cfg *Config, clstr *cluster.Service) (*cluster.Client,
return clstrClient, nil return clstrClient, nil
} }
func createRaftDialer(cfg *Config) (*tcp.Dialer, error) {
var dialerTLSConfig *tls.Config
var err error
if cfg.NodeX509Cert != "" || cfg.NodeX509CACert != "" {
dialerTLSConfig, err = rtls.CreateClientConfig(cfg.NodeX509Cert, cfg.NodeX509Key,
cfg.NodeX509CACert, cfg.NodeVerifyServerName, cfg.NoNodeVerify)
if err != nil {
return nil, fmt.Errorf("failed to create TLS config for Raft dialer: %s", err.Error())
}
}
return tcp.NewDialer(cluster.MuxRaftHeader, dialerTLSConfig), nil
}
func createCluster(cfg *Config, hasPeers bool, client *cluster.Client, str *store.Store, httpServ *httpd.Service, credStr *auth.CredentialsStore) error { func createCluster(cfg *Config, hasPeers bool, client *cluster.Client, str *store.Store, httpServ *httpd.Service, credStr *auth.CredentialsStore) error {
joins := cfg.JoinAddresses() joins := cfg.JoinAddresses()
if err := networkCheckJoinAddrs(cfg, joins); err != nil { if err := networkCheckJoinAddrs(cfg, joins); err != nil {

@ -208,7 +208,7 @@ type Store struct {
restoreDoneCh chan struct{} restoreDoneCh chan struct{}
raft *raft.Raft // The consensus mechanism. raft *raft.Raft // The consensus mechanism.
ln Listener ly Layer
raftTn *NodeTransport raftTn *NodeTransport
raftID string // Node ID. raftID string // Node ID.
dbConf *DBConfig // SQLite database config. dbConf *DBConfig // SQLite database config.
@ -309,7 +309,7 @@ type Config struct {
} }
// New returns a new Store. // New returns a new Store.
func New(ln Listener, c *Config) *Store { func New(ly Layer, c *Config) *Store {
logger := c.Logger logger := c.Logger
if logger == nil { if logger == nil {
logger = log.New(os.Stderr, "[store] ", log.LstdFlags) logger = log.New(os.Stderr, "[store] ", log.LstdFlags)
@ -321,7 +321,7 @@ func New(ln Listener, c *Config) *Store {
} }
return &Store{ return &Store{
ln: ln, ly: ly,
raftDir: c.Dir, raftDir: c.Dir,
peersPath: filepath.Join(c.Dir, peersPath), peersPath: filepath.Join(c.Dir, peersPath),
peersInfoPath: filepath.Join(c.Dir, peersInfoPath), peersInfoPath: filepath.Join(c.Dir, peersInfoPath),
@ -376,7 +376,7 @@ func (s *Store) Open() (retErr error) {
} }
s.openT = time.Now() s.openT = time.Now()
s.logger.Printf("opening store with node ID %s, listening on %s", s.raftID, s.ln.Addr().String()) s.logger.Printf("opening store with node ID %s, listening on %s", s.raftID, s.ly.Addr().String())
// Create all the required Raft directories. // Create all the required Raft directories.
s.logger.Printf("ensuring data directory exists at %s", s.raftDir) s.logger.Printf("ensuring data directory exists at %s", s.raftDir)
@ -403,7 +403,7 @@ func (s *Store) Open() (retErr error) {
} }
// Create Raft-compatible network layer. // Create Raft-compatible network layer.
nt := raft.NewNetworkTransport(NewTransport(s.ln), connectionPoolCount, connectionTimeout, nil) nt := raft.NewNetworkTransport(NewTransport(s.ly), connectionPoolCount, connectionTimeout, nil)
s.raftTn = NewNodeTransport(nt) s.raftTn = NewNodeTransport(nt)
// Don't allow control over trailing logs directly, just implement a policy. // Don't allow control over trailing logs directly, just implement a policy.
@ -583,7 +583,7 @@ func (s *Store) Ready() bool {
func (s *Store) Close(wait bool) (retErr error) { func (s *Store) Close(wait bool) (retErr error) {
defer func() { defer func() {
if retErr == nil { if retErr == nil {
s.logger.Printf("store closed with node ID %s, listening on %s", s.raftID, s.ln.Addr().String()) s.logger.Printf("store closed with node ID %s, listening on %s", s.raftID, s.ly.Addr().String())
s.open = false s.open = false
} }
}() }()

@ -2061,7 +2061,7 @@ func Test_MultiNodeStoreAutoRestoreBootstrap(t *testing.T) {
// Trigger a bootstrap. // Trigger a bootstrap.
s0.BootstrapExpect = 3 s0.BootstrapExpect = 3
for _, s := range []*Store{s0, s1, s2} { for _, s := range []*Store{s0, s1, s2} {
if err := s0.Notify(notifyRequest(s.ID(), s.ln.Addr().String())); err != nil { if err := s0.Notify(notifyRequest(s.ID(), s.ly.Addr().String())); err != nil {
t.Fatalf("failed to notify store: %s", err.Error()) t.Fatalf("failed to notify store: %s", err.Error())
} }
} }
@ -2797,8 +2797,8 @@ func mustNewStoreAtPathsLn(id, dataPath, sqlitePath string, fk bool) (*Store, ne
cfg.FKConstraints = fk cfg.FKConstraints = fk
cfg.OnDiskPath = sqlitePath cfg.OnDiskPath = sqlitePath
ln := mustMockLister("localhost:0") ly := mustMockLayer("localhost:0")
s := New(ln, &Config{ s := New(ly, &Config{
DBConf: cfg, DBConf: cfg,
Dir: dataPath, Dir: dataPath,
ID: id, ID: id,
@ -2806,7 +2806,7 @@ func mustNewStoreAtPathsLn(id, dataPath, sqlitePath string, fk bool) (*Store, ne
if s == nil { if s == nil {
panic("failed to create new store") panic("failed to create new store")
} }
return s, ln return s, ly
} }
func mustNewStore(t *testing.T) (*Store, net.Listener) { func mustNewStore(t *testing.T) (*Store, net.Listener) {
@ -2837,27 +2837,27 @@ func (m *mockSnapshotSink) Cancel() error {
return nil return nil
} }
type mockListener struct { type mockLayer struct {
ln net.Listener ln net.Listener
} }
func mustMockLister(addr string) Listener { func mustMockLayer(addr string) Layer {
ln, err := net.Listen("tcp", addr) ln, err := net.Listen("tcp", addr)
if err != nil { if err != nil {
panic("failed to create new listner") panic("failed to create new listner")
} }
return &mockListener{ln} return &mockLayer{ln}
} }
func (m *mockListener) Dial(addr string, timeout time.Duration) (net.Conn, error) { func (m *mockLayer) Dial(addr string, timeout time.Duration) (net.Conn, error) {
return net.DialTimeout("tcp", addr, timeout) return net.DialTimeout("tcp", addr, timeout)
} }
func (m *mockListener) Accept() (net.Conn, error) { return m.ln.Accept() } func (m *mockLayer) Accept() (net.Conn, error) { return m.ln.Accept() }
func (m *mockListener) Close() error { return m.ln.Close() } func (m *mockLayer) Close() error { return m.ln.Close() }
func (m *mockListener) Addr() net.Addr { return m.ln.Addr() } func (m *mockLayer) Addr() net.Addr { return m.ln.Addr() }
func mustCreateTempFile() string { func mustCreateTempFile() string {
f, err := os.CreateTemp("", "rqlite-temp") f, err := os.CreateTemp("", "rqlite-temp")

@ -9,42 +9,43 @@ import (
"github.com/rqlite/rqlite/v8/store/gzip" "github.com/rqlite/rqlite/v8/store/gzip"
) )
// Listener is the interface expected by the Store for Transports. // Layer is the interface expected by the Store for network communication
type Listener interface { // between nodes, which is used for Raft distributed consensus.
type Layer interface {
net.Listener net.Listener
Dial(address string, timeout time.Duration) (net.Conn, error) Dial(address string, timeout time.Duration) (net.Conn, error)
} }
// Transport is the network service provided to Raft, and wraps a Listener. // Transport is the network service provided to Raft, and wraps a Listener.
type Transport struct { type Transport struct {
ln Listener ly Layer
} }
// NewTransport returns an initialized Transport. // NewTransport returns an initialized Transport.
func NewTransport(ln Listener) *Transport { func NewTransport(ly Layer) *Transport {
return &Transport{ return &Transport{
ln: ln, ly: ly,
} }
} }
// Dial creates a new network connection. // Dial creates a new network connection.
func (t *Transport) Dial(addr raft.ServerAddress, timeout time.Duration) (net.Conn, error) { func (t *Transport) Dial(addr raft.ServerAddress, timeout time.Duration) (net.Conn, error) {
return t.ln.Dial(string(addr), timeout) return t.ly.Dial(string(addr), timeout)
} }
// Accept waits for the next connection. // Accept waits for the next connection.
func (t *Transport) Accept() (net.Conn, error) { func (t *Transport) Accept() (net.Conn, error) {
return t.ln.Accept() return t.ly.Accept()
} }
// Close closes the transport // Close closes the transport
func (t *Transport) Close() error { func (t *Transport) Close() error {
return t.ln.Close() return t.ly.Close()
} }
// Addr returns the binding address of the transport. // Addr returns the binding address of the transport.
func (t *Transport) Addr() net.Addr { func (t *Transport) Addr() net.Addr {
return t.ln.Addr() return t.ly.Addr()
} }
// NodeTransport is a wrapper around the Raft NetworkTransport, which allows // NodeTransport is a wrapper around the Raft NetworkTransport, which allows

@ -629,8 +629,11 @@ func Test_MultiNodeClusterRaftAdv(t *testing.T) {
} }
go mux2.Serve() go mux2.Serve()
raftDialer := tcp.NewDialer(cluster.MuxRaftHeader, nil)
clstrDialer := tcp.NewDialer(cluster.MuxClusterHeader, nil)
// Start two nodes, and ensure a cluster can be formed. // Start two nodes, and ensure a cluster can be formed.
node1 := mustNodeEncrypted(mustTempDir(), true, false, mux1, "1") node1 := mustNodeEncrypted(mustTempDir(), true, false, mux1, raftDialer, clstrDialer, "1")
defer node1.Deprovision() defer node1.Deprovision()
leader, err := node1.WaitForLeader() leader, err := node1.WaitForLeader()
if err != nil { if err != nil {
@ -640,7 +643,7 @@ func Test_MultiNodeClusterRaftAdv(t *testing.T) {
t.Fatalf("node return wrong leader from leader, exp: %s, got %s", exp, got) t.Fatalf("node return wrong leader from leader, exp: %s, got %s", exp, got)
} }
node2 := mustNodeEncrypted(mustTempDir(), false, false, mux2, "2") node2 := mustNodeEncrypted(mustTempDir(), false, false, mux2, raftDialer, clstrDialer, "2")
defer node2.Deprovision() defer node2.Deprovision()
if err := node2.Join(node1); err != nil { if err := node2.Join(node1); err != nil {
t.Fatalf("node2 failed to join leader: %s", err.Error()) t.Fatalf("node2 failed to join leader: %s", err.Error())
@ -1415,11 +1418,14 @@ func Test_MultiNodeClusterRecoverSingle(t *testing.T) {
t.Fatalf("failed to close node3: %s", err.Error()) t.Fatalf("failed to close node3: %s", err.Error())
} }
raftDialer := tcp.NewDialer(cluster.MuxRaftHeader, nil)
clstrDialer := tcp.NewDialer(cluster.MuxClusterHeader, nil)
// Create a single node using the node's data directory. It should fail because // Create a single node using the node's data directory. It should fail because
// quorum can't be met. This isn't quite right since the Raft address is also // quorum can't be met. This isn't quite right since the Raft address is also
// changing, but it generally proves it doesn't come up. // changing, but it generally proves it doesn't come up.
mux0, ln0 := mustNewOpenMux("127.0.0.1:10000") mux0, ln0 := mustNewOpenMux("127.0.0.1:10000")
failedSingle := mustNodeEncrypted(node1.Dir, true, false, mux0, node1.Store.ID()) failedSingle := mustNodeEncrypted(node1.Dir, true, false, mux0, raftDialer, clstrDialer, node1.Store.ID())
_, err = failedSingle.WaitForLeader() _, err = failedSingle.WaitForLeader()
if err == nil { if err == nil {
t.Fatalf("no error waiting for leader") t.Fatalf("no error waiting for leader")
@ -1432,7 +1438,7 @@ func Test_MultiNodeClusterRecoverSingle(t *testing.T) {
peers := fmt.Sprintf(`[{"id": "%s","address": "%s"}]`, node1.Store.ID(), "127.0.0.1:10001") peers := fmt.Sprintf(`[{"id": "%s","address": "%s"}]`, node1.Store.ID(), "127.0.0.1:10001")
mustWriteFile(node1.PeersPath, peers) mustWriteFile(node1.PeersPath, peers)
okSingle := mustNodeEncrypted(node1.Dir, true, false, mux1, node1.Store.ID()) okSingle := mustNodeEncrypted(node1.Dir, true, false, mux1, raftDialer, clstrDialer, node1.Store.ID())
_, err = okSingle.WaitForLeader() _, err = okSingle.WaitForLeader()
if err != nil { if err != nil {
t.Fatalf("failed waiting for leader: %s", err.Error()) t.Fatalf("failed waiting for leader: %s", err.Error())
@ -1449,15 +1455,18 @@ func Test_MultiNodeClusterRecoverSingle(t *testing.T) {
func Test_MultiNodeClusterRecoverFull(t *testing.T) { func Test_MultiNodeClusterRecoverFull(t *testing.T) {
var err error var err error
raftDialer := tcp.NewDialer(cluster.MuxRaftHeader, nil)
clstrDialer := tcp.NewDialer(cluster.MuxClusterHeader, nil)
mux1, ln1 := mustNewOpenMux("127.0.0.1:10001") mux1, ln1 := mustNewOpenMux("127.0.0.1:10001")
node1 := mustNodeEncrypted(mustTempDir(), true, false, mux1, "1") node1 := mustNodeEncrypted(mustTempDir(), true, false, mux1, raftDialer, clstrDialer, "1")
_, err = node1.WaitForLeader() _, err = node1.WaitForLeader()
if err != nil { if err != nil {
t.Fatalf("failed waiting for leader: %s", err.Error()) t.Fatalf("failed waiting for leader: %s", err.Error())
} }
mux2, ln2 := mustNewOpenMux("127.0.0.1:10002") mux2, ln2 := mustNewOpenMux("127.0.0.1:10002")
node2 := mustNodeEncrypted(mustTempDir(), false, false, mux2, "2") node2 := mustNodeEncrypted(mustTempDir(), false, false, mux2, raftDialer, clstrDialer, "2")
if err := node2.Join(node1); err != nil { if err := node2.Join(node1); err != nil {
t.Fatalf("node failed to join leader: %s", err.Error()) t.Fatalf("node failed to join leader: %s", err.Error())
} }
@ -1467,7 +1476,7 @@ func Test_MultiNodeClusterRecoverFull(t *testing.T) {
} }
mux3, ln3 := mustNewOpenMux("127.0.0.1:10003") mux3, ln3 := mustNewOpenMux("127.0.0.1:10003")
node3 := mustNodeEncrypted(mustTempDir(), false, false, mux3, "3") node3 := mustNodeEncrypted(mustTempDir(), false, false, mux3, raftDialer, clstrDialer, "3")
if err := node3.Join(node1); err != nil { if err := node3.Join(node1); err != nil {
t.Fatalf("node failed to join leader: %s", err.Error()) t.Fatalf("node failed to join leader: %s", err.Error())
} }
@ -1515,17 +1524,17 @@ func Test_MultiNodeClusterRecoverFull(t *testing.T) {
mustWriteFile(node3.PeersPath, peers) mustWriteFile(node3.PeersPath, peers)
mux4, ln4 := mustNewOpenMux("127.0.0.1:11001") mux4, ln4 := mustNewOpenMux("127.0.0.1:11001")
node4 := mustNodeEncrypted(node1.Dir, false, false, mux4, "1") node4 := mustNodeEncrypted(node1.Dir, false, false, mux4, raftDialer, clstrDialer, "1")
defer node4.Deprovision() defer node4.Deprovision()
defer ln4.Close() defer ln4.Close()
mux5, ln5 := mustNewOpenMux("127.0.0.1:11002") mux5, ln5 := mustNewOpenMux("127.0.0.1:11002")
node5 := mustNodeEncrypted(node2.Dir, false, false, mux5, "2") node5 := mustNodeEncrypted(node2.Dir, false, false, mux5, raftDialer, clstrDialer, "2")
defer node5.Deprovision() defer node5.Deprovision()
defer ln5.Close() defer ln5.Close()
mux6, ln6 := mustNewOpenMux("127.0.0.1:11003") mux6, ln6 := mustNewOpenMux("127.0.0.1:11003")
node6 := mustNodeEncrypted(node3.Dir, false, false, mux6, "3") node6 := mustNodeEncrypted(node3.Dir, false, false, mux6, raftDialer, clstrDialer, "3")
defer node6.Deprovision() defer node6.Deprovision()
defer ln6.Close() defer ln6.Close()

@ -632,17 +632,22 @@ func mustNewNode(enableSingle bool) *Node {
func mustNewNodeEncrypted(enableSingle, httpEncrypt, nodeEncrypt bool) *Node { func mustNewNodeEncrypted(enableSingle, httpEncrypt, nodeEncrypt bool) *Node {
dir := mustTempDir() dir := mustTempDir()
var mux *tcp.Mux var mux *tcp.Mux
var raftDialer *tcp.Dialer
var clstrDialer *tcp.Dialer
if nodeEncrypt { if nodeEncrypt {
mux = mustNewOpenTLSMux(rX509.CertExampleDotComFile(dir), rX509.KeyExampleDotComFile(dir), "") mux = mustNewOpenTLSMux(rX509.CertExampleDotComFile(dir), rX509.KeyExampleDotComFile(dir), "")
raftDialer = tcp.NewDialer(cluster.MuxRaftHeader, mustCreateTLSConfig(rX509.CertExampleDotComFile(dir), rX509.KeyExampleDotComFile(dir), ""))
clstrDialer = tcp.NewDialer(cluster.MuxClusterHeader, mustCreateTLSConfig(rX509.CertExampleDotComFile(dir), rX509.KeyExampleDotComFile(dir), ""))
} else { } else {
mux, _ = mustNewOpenMux("") mux, _ = mustNewOpenMux("")
raftDialer = tcp.NewDialer(cluster.MuxRaftHeader, nil)
clstrDialer = tcp.NewDialer(cluster.MuxClusterHeader, nil)
} }
go mux.Serve() go mux.Serve()
return mustNodeEncrypted(dir, enableSingle, httpEncrypt, mux, raftDialer, clstrDialer, "")
return mustNodeEncrypted(dir, enableSingle, httpEncrypt, mux, "")
} }
func mustNodeEncrypted(dir string, enableSingle, httpEncrypt bool, mux *tcp.Mux, nodeID string) *Node { func mustNodeEncrypted(dir string, enableSingle, httpEncrypt bool, mux *tcp.Mux, raftDialer, clstrDialer *tcp.Dialer, nodeID string) *Node {
nodeCertPath := rX509.CertExampleDotComFile(dir) nodeCertPath := rX509.CertExampleDotComFile(dir)
nodeKeyPath := rX509.KeyExampleDotComFile(dir) nodeKeyPath := rX509.KeyExampleDotComFile(dir)
httpCertPath := nodeCertPath httpCertPath := nodeCertPath
@ -654,13 +659,14 @@ func mustNodeEncrypted(dir string, enableSingle, httpEncrypt bool, mux *tcp.Mux,
NodeKeyPath: nodeKeyPath, NodeKeyPath: nodeKeyPath,
HTTPCertPath: httpCertPath, HTTPCertPath: httpCertPath,
HTTPKeyPath: httpKeyPath, HTTPKeyPath: httpKeyPath,
TLSConfig: mustCreateTLSConfig(nodeCertPath, nodeKeyPath, ""), //TLSConfig: mustCreateTLSConfig(nodeCertPath, nodeKeyPath, ""),
PeersPath: filepath.Join(dir, "raft/peers.json"), PeersPath: filepath.Join(dir, "raft/peers.json"),
} }
dbConf := store.NewDBConfig() dbConf := store.NewDBConfig()
raftTn := mux.Listen(cluster.MuxRaftHeader) raftLn := mux.Listen(cluster.MuxRaftHeader)
raftTn := tcp.NewLayer(raftLn, raftDialer)
id := nodeID id := nodeID
if id == "" { if id == "" {
id = raftTn.Addr().String() id = raftTn.Addr().String()
@ -693,7 +699,6 @@ func mustNodeEncrypted(dir string, enableSingle, httpEncrypt bool, mux *tcp.Mux,
} }
node.Cluster = clstr node.Cluster = clstr
clstrDialer := tcp.NewDialer(cluster.MuxClusterHeader, nil)
clstrClient := cluster.NewClient(clstrDialer, 30*time.Second) clstrClient := cluster.NewClient(clstrDialer, 30*time.Second)
node.Client = clstrClient node.Client = clstrClient
node.Service = httpd.New("localhost:0", node.Store, clstrClient, nil) node.Service = httpd.New("localhost:0", node.Store, clstrClient, nil)

@ -31,7 +31,9 @@ func Test_SingleNodeBasicEndpoint(t *testing.T) {
dir := mustTempDir() dir := mustTempDir()
mux, ln := mustNewOpenMux("") mux, ln := mustNewOpenMux("")
defer ln.Close() defer ln.Close()
node = mustNodeEncrypted(dir, true, false, mux, "") raftDialer := tcp.NewDialer(cluster.MuxRaftHeader, nil)
clstrDialer := tcp.NewDialer(cluster.MuxClusterHeader, nil)
node = mustNodeEncrypted(dir, true, false, mux, raftDialer, clstrDialer, "")
if _, err := node.WaitForLeader(); err != nil { if _, err := node.WaitForLeader(); err != nil {
t.Fatalf("node never became leader") t.Fatalf("node never became leader")
} }
@ -863,8 +865,9 @@ func Test_SingleNodeUpgrades_NoSnapshots(t *testing.T) {
mux, ln := mustNewOpenMux("") mux, ln := mustNewOpenMux("")
defer ln.Close() defer ln.Close()
raftDialer := tcp.NewDialer(cluster.MuxRaftHeader, nil)
node := mustNodeEncrypted(destdir, true, false, mux, "node1") clstrDialer := tcp.NewDialer(cluster.MuxClusterHeader, nil)
node := mustNodeEncrypted(destdir, true, false, mux, raftDialer, clstrDialer, "node1")
defer node.Deprovision() defer node.Deprovision()
if _, err := node.WaitForLeader(); err != nil { if _, err := node.WaitForLeader(); err != nil {
t.Fatalf("node never became leader with %s data:", dir) t.Fatalf("node never became leader with %s data:", dir)
@ -922,8 +925,9 @@ func Test_SingleNodeUpgrades_Snapshots(t *testing.T) {
mux, ln := mustNewOpenMux("") mux, ln := mustNewOpenMux("")
defer ln.Close() defer ln.Close()
raftDialer := tcp.NewDialer(cluster.MuxRaftHeader, nil)
node := mustNodeEncrypted(destdir, true, false, mux, "node1") clstrDialer := tcp.NewDialer(cluster.MuxClusterHeader, nil)
node := mustNodeEncrypted(destdir, true, false, mux, raftDialer, clstrDialer, "node1")
defer node.Deprovision() defer node.Deprovision()
if _, err := node.WaitForLeader(); err != nil { if _, err := node.WaitForLeader(); err != nil {
t.Fatalf("node never became leader with %s data:", dir) t.Fatalf("node never became leader with %s data:", dir)
@ -1019,7 +1023,9 @@ func Test_SingleNodeReopen(t *testing.T) {
dir := mustTempDir() dir := mustTempDir()
mux, ln := mustNewOpenMux("") mux, ln := mustNewOpenMux("")
defer ln.Close() defer ln.Close()
node := mustNodeEncrypted(dir, true, false, mux, "") raftDialer := tcp.NewDialer(cluster.MuxRaftHeader, nil)
clstrDialer := tcp.NewDialer(cluster.MuxClusterHeader, nil)
node := mustNodeEncrypted(dir, true, false, mux, raftDialer, clstrDialer, "")
if _, err := node.WaitForLeader(); err != nil { if _, err := node.WaitForLeader(); err != nil {
t.Fatalf("node never became leader") t.Fatalf("node never became leader")
@ -1052,7 +1058,9 @@ func Test_SingleNodeNoopReopen(t *testing.T) {
dir := mustTempDir() dir := mustTempDir()
mux, ln := mustNewOpenMux("") mux, ln := mustNewOpenMux("")
defer ln.Close() defer ln.Close()
node := mustNodeEncrypted(dir, true, false, mux, "") raftDialer := tcp.NewDialer(cluster.MuxRaftHeader, nil)
clstrDialer := tcp.NewDialer(cluster.MuxClusterHeader, nil)
node := mustNodeEncrypted(dir, true, false, mux, raftDialer, clstrDialer, "")
if _, err := node.WaitForLeader(); err != nil { if _, err := node.WaitForLeader(); err != nil {
t.Fatalf("node never became leader") t.Fatalf("node never became leader")
@ -1133,7 +1141,9 @@ func Test_SingleNodeNoopSnapReopen(t *testing.T) {
dir := mustTempDir() dir := mustTempDir()
mux, ln := mustNewOpenMux("") mux, ln := mustNewOpenMux("")
defer ln.Close() defer ln.Close()
node := mustNodeEncrypted(dir, true, false, mux, "") raftDialer := tcp.NewDialer(cluster.MuxRaftHeader, nil)
clstrDialer := tcp.NewDialer(cluster.MuxClusterHeader, nil)
node := mustNodeEncrypted(dir, true, false, mux, raftDialer, clstrDialer, "")
if _, err := node.WaitForLeader(); err != nil { if _, err := node.WaitForLeader(); err != nil {
t.Fatalf("node never became leader") t.Fatalf("node never became leader")
@ -1219,7 +1229,9 @@ func Test_SingleNodeNoopSnapLogsReopen(t *testing.T) {
dir := mustTempDir() dir := mustTempDir()
mux, ln := mustNewOpenMux("") mux, ln := mustNewOpenMux("")
defer ln.Close() defer ln.Close()
node := mustNodeEncrypted(dir, true, false, mux, "") raftDialer := tcp.NewDialer(cluster.MuxRaftHeader, nil)
clstrDialer := tcp.NewDialer(cluster.MuxClusterHeader, nil)
node := mustNodeEncrypted(dir, true, false, mux, raftDialer, clstrDialer, "")
if _, err := node.WaitForLeader(); err != nil { if _, err := node.WaitForLeader(); err != nil {
t.Fatalf("node never became leader") t.Fatalf("node never became leader")
@ -1310,7 +1322,8 @@ func Test_SingleNodeAutoRestore(t *testing.T) {
mux, _ := mustNewOpenMux("") mux, _ := mustNewOpenMux("")
go mux.Serve() go mux.Serve()
raftTn := mux.Listen(cluster.MuxRaftHeader) raftLn := mux.Listen(cluster.MuxRaftHeader)
raftTn := tcp.NewLayer(raftLn, tcp.NewDialer(cluster.MuxRaftHeader, nil))
node.Store = store.New(raftTn, &store.Config{ node.Store = store.New(raftTn, &store.Config{
DBConf: store.NewDBConfig(), DBConf: store.NewDBConfig(),
Dir: node.Dir, Dir: node.Dir,

@ -43,6 +43,15 @@ type Layer struct {
dialer *Dialer dialer *Dialer
} }
// NewLayer returns a new instance of Layer.
func NewLayer(ln net.Listener, dialer *Dialer) *Layer {
return &Layer{
ln: ln,
addr: ln.Addr(),
dialer: dialer,
}
}
// Dial creates a new network connection. // Dial creates a new network connection.
func (l *Layer) Dial(addr string, timeout time.Duration) (net.Conn, error) { func (l *Layer) Dial(addr string, timeout time.Duration) (net.Conn, error) {
return l.dialer.Dial(addr, timeout) return l.dialer.Dial(addr, timeout)
@ -161,6 +170,23 @@ func (mux *Mux) Stats() (interface{}, error) {
return s, nil return s, nil
} }
// Listen returns a Listener associated with the given header. Any connection
// accepted by mux is multiplexed based on the initial header byte.
func (mux *Mux) Listen(header byte) net.Listener {
// Ensure two listeners are not created for the same header byte.
if _, ok := mux.m[header]; ok {
panic(fmt.Sprintf("listener already registered under header byte: %d", header))
}
// Create a new listener and assign it.
ln := &listener{
c: make(chan net.Conn),
addr: mux.addr,
}
mux.m[header] = ln
return ln
}
func (mux *Mux) handleConn(conn net.Conn) { func (mux *Mux) handleConn(conn net.Conn) {
stats.Add(numConnectionsHandled, 1) stats.Add(numConnectionsHandled, 1)
@ -201,32 +227,10 @@ func (mux *Mux) handleConn(conn net.Conn) {
handler.c <- conn handler.c <- conn
} }
// Listen returns a Layer associated with the given header. Any connection
// accepted by mux is multiplexed based on the initial header byte.
func (mux *Mux) Listen(header byte) *Layer {
// Ensure two listeners are not created for the same header byte.
if _, ok := mux.m[header]; ok {
panic(fmt.Sprintf("listener already registered under header byte: %d", header))
}
// Create a new listener and assign it.
ln := &listener{
c: make(chan net.Conn),
}
mux.m[header] = ln
layer := &Layer{
ln: ln,
addr: mux.addr,
}
layer.dialer = NewDialer(header, mux.tlsConfig)
return layer
}
// listener is a receiver for connections received by Mux. // listener is a receiver for connections received by Mux.
type listener struct { type listener struct {
c chan net.Conn c chan net.Conn
addr net.Addr
} }
// Accept waits for and returns the next connection to the listener. // Accept waits for and returns the next connection to the listener.
@ -242,4 +246,4 @@ func (ln *listener) Accept() (c net.Conn, err error) {
func (ln *listener) Close() error { return nil } func (ln *listener) Close() error { return nil }
// Addr always returns nil // Addr always returns nil
func (ln *listener) Addr() net.Addr { return nil } func (ln *listener) Addr() net.Addr { return ln.addr }

@ -142,10 +142,10 @@ func TestMux_Advertise(t *testing.T) {
mux.Logger = log.New(io.Discard, "", 0) mux.Logger = log.New(io.Discard, "", 0)
} }
layer := mux.Listen(1) ln := mux.Listen(1)
if layer.Addr().String() != addr.Addr { if ln.Addr().String() != addr.Addr {
t.Fatalf("layer advertise address not correct, exp %s, got %s", t.Fatalf("listener advertise address not correct, exp %s, got %s",
layer.Addr().String(), addr.Addr) ln.Addr().String(), addr.Addr)
} }
} }

Loading…
Cancel
Save