From e8ea575a572567adfa0cb5f9621a5fcb2884e8b4 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Sat, 23 Dec 2023 08:12:29 -0500 Subject: [PATCH 1/6] Reorder unexpected function --- tcp/mux.go | 46 +++++++++++++++++++++++----------------------- 1 file changed, 23 insertions(+), 23 deletions(-) diff --git a/tcp/mux.go b/tcp/mux.go index fc40c4a4..4f8f3839 100644 --- a/tcp/mux.go +++ b/tcp/mux.go @@ -161,6 +161,29 @@ func (mux *Mux) Stats() (interface{}, error) { return s, nil } +// Listen returns a Layer associated with the given header. Any connection +// accepted by mux is multiplexed based on the initial header byte. +func (mux *Mux) Listen(header byte) *Layer { + // Ensure two listeners are not created for the same header byte. + if _, ok := mux.m[header]; ok { + panic(fmt.Sprintf("listener already registered under header byte: %d", header)) + } + + // Create a new listener and assign it. + ln := &listener{ + c: make(chan net.Conn), + } + mux.m[header] = ln + + layer := &Layer{ + ln: ln, + addr: mux.addr, + } + layer.dialer = NewDialer(header, mux.tlsConfig) + + return layer +} + func (mux *Mux) handleConn(conn net.Conn) { stats.Add(numConnectionsHandled, 1) @@ -201,29 +224,6 @@ func (mux *Mux) handleConn(conn net.Conn) { handler.c <- conn } -// Listen returns a Layer associated with the given header. Any connection -// accepted by mux is multiplexed based on the initial header byte. -func (mux *Mux) Listen(header byte) *Layer { - // Ensure two listeners are not created for the same header byte. - if _, ok := mux.m[header]; ok { - panic(fmt.Sprintf("listener already registered under header byte: %d", header)) - } - - // Create a new listener and assign it. - ln := &listener{ - c: make(chan net.Conn), - } - mux.m[header] = ln - - layer := &Layer{ - ln: ln, - addr: mux.addr, - } - layer.dialer = NewDialer(header, mux.tlsConfig) - - return layer -} - // listener is a receiver for connections received by Mux. type listener struct { c chan net.Conn From a7423b1b8fb47f5b7b114bb5f5bd7bca7e6e6d33 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Sat, 23 Dec 2023 08:20:06 -0500 Subject: [PATCH 2/6] Cluster Transport doesn't need Dialer --- cluster/service.go | 1 - 1 file changed, 1 deletion(-) diff --git a/cluster/service.go b/cluster/service.go index 0e61f84d..65dbf239 100644 --- a/cluster/service.go +++ b/cluster/service.go @@ -114,7 +114,6 @@ type CredentialStore interface { // Transport is the interface the network layer must provide. type Transport interface { net.Listener - Dialer } // Service provides information about the node and cluster. From 3f658a7cd87de7bc342af83c6c101760c513ccdf Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Sat, 23 Dec 2023 08:29:27 -0500 Subject: [PATCH 3/6] Cluster service just needs a net.Listener --- cluster/service.go | 21 ++++++++------------- cmd/rqlited/main.go | 4 ++-- 2 files changed, 10 insertions(+), 15 deletions(-) diff --git a/cluster/service.go b/cluster/service.go index 65dbf239..42defc21 100644 --- a/cluster/service.go +++ b/cluster/service.go @@ -111,15 +111,10 @@ type CredentialStore interface { AA(username, password, perm string) bool } -// Transport is the interface the network layer must provide. -type Transport interface { - net.Listener -} - // Service provides information about the node and cluster. type Service struct { - tn Transport // Network layer this service uses - addr net.Addr // Address on which this service is listening + ln net.Listener // Incoming connections to the service + addr net.Addr // Address on which this service is listening db Database // The queryable system. mgr Manager // The cluster management system. @@ -134,10 +129,10 @@ type Service struct { } // New returns a new instance of the cluster service -func New(tn Transport, db Database, m Manager, credentialStore CredentialStore) *Service { +func New(ln net.Listener, db Database, m Manager, credentialStore CredentialStore) *Service { return &Service{ - tn: tn, - addr: tn.Addr(), + ln: ln, + addr: ln.Addr(), db: db, mgr: m, logger: log.New(os.Stderr, "[cluster] ", log.LstdFlags), @@ -148,13 +143,13 @@ func New(tn Transport, db Database, m Manager, credentialStore CredentialStore) // Open opens the Service. func (s *Service) Open() error { go s.serve() - s.logger.Println("service listening on", s.tn.Addr()) + s.logger.Println("service listening on", s.addr) return nil } // Close closes the service. func (s *Service) Close() error { - s.tn.Close() + s.ln.Close() return nil } @@ -210,7 +205,7 @@ func (s *Service) Stats() (map[string]interface{}, error) { func (s *Service) serve() error { for { - conn, err := s.tn.Accept() + conn, err := s.ln.Accept() if err != nil { return err } diff --git a/cmd/rqlited/main.go b/cmd/rqlited/main.go index 64ab003f..f9fa5387 100644 --- a/cmd/rqlited/main.go +++ b/cmd/rqlited/main.go @@ -424,8 +424,8 @@ func credentialStore(cfg *Config) (*auth.CredentialsStore, error) { return auth.NewCredentialsStoreFromFile(cfg.AuthFile) } -func clusterService(cfg *Config, tn cluster.Transport, db cluster.Database, mgr cluster.Manager, credStr *auth.CredentialsStore) (*cluster.Service, error) { - c := cluster.New(tn, db, mgr, credStr) +func clusterService(cfg *Config, ln net.Listener, db cluster.Database, mgr cluster.Manager, credStr *auth.CredentialsStore) (*cluster.Service, error) { + c := cluster.New(ln, db, mgr, credStr) c.SetAPIAddr(cfg.HTTPAdv) c.EnableHTTPS(cfg.HTTPx509Cert != "" && cfg.HTTPx509Key != "") // Conditions met for an HTTPS API if err := c.Open(); err != nil { From 954503dc660c42808fbd02e31fa04bd58eb10e2a Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Sat, 23 Dec 2023 09:23:52 -0500 Subject: [PATCH 4/6] Move Store to Layer and create NewLayer in tcp --- store/store.go | 12 ++++++------ store/store_test.go | 22 +++++++++++----------- store/transport.go | 19 ++++++++++--------- tcp/mux.go | 30 +++++++++++++++++------------- tcp/mux_test.go | 8 ++++---- 5 files changed, 48 insertions(+), 43 deletions(-) diff --git a/store/store.go b/store/store.go index 248a32e8..25082d5a 100644 --- a/store/store.go +++ b/store/store.go @@ -208,7 +208,7 @@ type Store struct { restoreDoneCh chan struct{} raft *raft.Raft // The consensus mechanism. - ln Listener + ly Layer raftTn *NodeTransport raftID string // Node ID. dbConf *DBConfig // SQLite database config. @@ -309,7 +309,7 @@ type Config struct { } // New returns a new Store. -func New(ln Listener, c *Config) *Store { +func New(ly Layer, c *Config) *Store { logger := c.Logger if logger == nil { logger = log.New(os.Stderr, "[store] ", log.LstdFlags) @@ -321,7 +321,7 @@ func New(ln Listener, c *Config) *Store { } return &Store{ - ln: ln, + ly: ly, raftDir: c.Dir, peersPath: filepath.Join(c.Dir, peersPath), peersInfoPath: filepath.Join(c.Dir, peersInfoPath), @@ -376,7 +376,7 @@ func (s *Store) Open() (retErr error) { } s.openT = time.Now() - s.logger.Printf("opening store with node ID %s, listening on %s", s.raftID, s.ln.Addr().String()) + s.logger.Printf("opening store with node ID %s, listening on %s", s.raftID, s.ly.Addr().String()) // Create all the required Raft directories. s.logger.Printf("ensuring data directory exists at %s", s.raftDir) @@ -403,7 +403,7 @@ func (s *Store) Open() (retErr error) { } // Create Raft-compatible network layer. - nt := raft.NewNetworkTransport(NewTransport(s.ln), connectionPoolCount, connectionTimeout, nil) + nt := raft.NewNetworkTransport(NewTransport(s.ly), connectionPoolCount, connectionTimeout, nil) s.raftTn = NewNodeTransport(nt) // Don't allow control over trailing logs directly, just implement a policy. @@ -583,7 +583,7 @@ func (s *Store) Ready() bool { func (s *Store) Close(wait bool) (retErr error) { defer func() { if retErr == nil { - s.logger.Printf("store closed with node ID %s, listening on %s", s.raftID, s.ln.Addr().String()) + s.logger.Printf("store closed with node ID %s, listening on %s", s.raftID, s.ly.Addr().String()) s.open = false } }() diff --git a/store/store_test.go b/store/store_test.go index f71128c5..67504c07 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -2061,7 +2061,7 @@ func Test_MultiNodeStoreAutoRestoreBootstrap(t *testing.T) { // Trigger a bootstrap. s0.BootstrapExpect = 3 for _, s := range []*Store{s0, s1, s2} { - if err := s0.Notify(notifyRequest(s.ID(), s.ln.Addr().String())); err != nil { + if err := s0.Notify(notifyRequest(s.ID(), s.ly.Addr().String())); err != nil { t.Fatalf("failed to notify store: %s", err.Error()) } } @@ -2797,8 +2797,8 @@ func mustNewStoreAtPathsLn(id, dataPath, sqlitePath string, fk bool) (*Store, ne cfg.FKConstraints = fk cfg.OnDiskPath = sqlitePath - ln := mustMockLister("localhost:0") - s := New(ln, &Config{ + ly := mustMockLayer("localhost:0") + s := New(ly, &Config{ DBConf: cfg, Dir: dataPath, ID: id, @@ -2806,7 +2806,7 @@ func mustNewStoreAtPathsLn(id, dataPath, sqlitePath string, fk bool) (*Store, ne if s == nil { panic("failed to create new store") } - return s, ln + return s, ly } func mustNewStore(t *testing.T) (*Store, net.Listener) { @@ -2837,27 +2837,27 @@ func (m *mockSnapshotSink) Cancel() error { return nil } -type mockListener struct { +type mockLayer struct { ln net.Listener } -func mustMockLister(addr string) Listener { +func mustMockLayer(addr string) Layer { ln, err := net.Listen("tcp", addr) if err != nil { panic("failed to create new listner") } - return &mockListener{ln} + return &mockLayer{ln} } -func (m *mockListener) Dial(addr string, timeout time.Duration) (net.Conn, error) { +func (m *mockLayer) Dial(addr string, timeout time.Duration) (net.Conn, error) { return net.DialTimeout("tcp", addr, timeout) } -func (m *mockListener) Accept() (net.Conn, error) { return m.ln.Accept() } +func (m *mockLayer) Accept() (net.Conn, error) { return m.ln.Accept() } -func (m *mockListener) Close() error { return m.ln.Close() } +func (m *mockLayer) Close() error { return m.ln.Close() } -func (m *mockListener) Addr() net.Addr { return m.ln.Addr() } +func (m *mockLayer) Addr() net.Addr { return m.ln.Addr() } func mustCreateTempFile() string { f, err := os.CreateTemp("", "rqlite-temp") diff --git a/store/transport.go b/store/transport.go index 6ddde994..6400d3af 100644 --- a/store/transport.go +++ b/store/transport.go @@ -9,42 +9,43 @@ import ( "github.com/rqlite/rqlite/v8/store/gzip" ) -// Listener is the interface expected by the Store for Transports. -type Listener interface { +// Layer is the interface expected by the Store for network communication +// between nodes, which is used for Raft distributed consensus. +type Layer interface { net.Listener Dial(address string, timeout time.Duration) (net.Conn, error) } // Transport is the network service provided to Raft, and wraps a Listener. type Transport struct { - ln Listener + ly Layer } // NewTransport returns an initialized Transport. -func NewTransport(ln Listener) *Transport { +func NewTransport(ly Layer) *Transport { return &Transport{ - ln: ln, + ly: ly, } } // Dial creates a new network connection. func (t *Transport) Dial(addr raft.ServerAddress, timeout time.Duration) (net.Conn, error) { - return t.ln.Dial(string(addr), timeout) + return t.ly.Dial(string(addr), timeout) } // Accept waits for the next connection. func (t *Transport) Accept() (net.Conn, error) { - return t.ln.Accept() + return t.ly.Accept() } // Close closes the transport func (t *Transport) Close() error { - return t.ln.Close() + return t.ly.Close() } // Addr returns the binding address of the transport. func (t *Transport) Addr() net.Addr { - return t.ln.Addr() + return t.ly.Addr() } // NodeTransport is a wrapper around the Raft NetworkTransport, which allows diff --git a/tcp/mux.go b/tcp/mux.go index 4f8f3839..a2baa957 100644 --- a/tcp/mux.go +++ b/tcp/mux.go @@ -43,6 +43,15 @@ type Layer struct { dialer *Dialer } +// NewLayer returns a new instance of Layer. +func NewLayer(ln net.Listener, dialer *Dialer) *Layer { + return &Layer{ + ln: ln, + addr: ln.Addr(), + dialer: dialer, + } +} + // Dial creates a new network connection. func (l *Layer) Dial(addr string, timeout time.Duration) (net.Conn, error) { return l.dialer.Dial(addr, timeout) @@ -161,9 +170,9 @@ func (mux *Mux) Stats() (interface{}, error) { return s, nil } -// Listen returns a Layer associated with the given header. Any connection +// Listen returns a Listener associated with the given header. Any connection // accepted by mux is multiplexed based on the initial header byte. -func (mux *Mux) Listen(header byte) *Layer { +func (mux *Mux) Listen(header byte) net.Listener { // Ensure two listeners are not created for the same header byte. if _, ok := mux.m[header]; ok { panic(fmt.Sprintf("listener already registered under header byte: %d", header)) @@ -171,17 +180,11 @@ func (mux *Mux) Listen(header byte) *Layer { // Create a new listener and assign it. ln := &listener{ - c: make(chan net.Conn), - } - mux.m[header] = ln - - layer := &Layer{ - ln: ln, + c: make(chan net.Conn), addr: mux.addr, } - layer.dialer = NewDialer(header, mux.tlsConfig) - - return layer + mux.m[header] = ln + return ln } func (mux *Mux) handleConn(conn net.Conn) { @@ -226,7 +229,8 @@ func (mux *Mux) handleConn(conn net.Conn) { // listener is a receiver for connections received by Mux. type listener struct { - c chan net.Conn + c chan net.Conn + addr net.Addr } // Accept waits for and returns the next connection to the listener. @@ -242,4 +246,4 @@ func (ln *listener) Accept() (c net.Conn, err error) { func (ln *listener) Close() error { return nil } // Addr always returns nil -func (ln *listener) Addr() net.Addr { return nil } +func (ln *listener) Addr() net.Addr { return ln.addr } diff --git a/tcp/mux_test.go b/tcp/mux_test.go index 8ef17a1c..9e53b1c1 100644 --- a/tcp/mux_test.go +++ b/tcp/mux_test.go @@ -142,10 +142,10 @@ func TestMux_Advertise(t *testing.T) { mux.Logger = log.New(io.Discard, "", 0) } - layer := mux.Listen(1) - if layer.Addr().String() != addr.Addr { - t.Fatalf("layer advertise address not correct, exp %s, got %s", - layer.Addr().String(), addr.Addr) + ln := mux.Listen(1) + if ln.Addr().String() != addr.Addr { + t.Fatalf("listener advertise address not correct, exp %s, got %s", + ln.Addr().String(), addr.Addr) } } From 7517c7df52fe046504123316509bf3b78875e90f Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Sat, 23 Dec 2023 09:25:11 -0500 Subject: [PATCH 5/6] CHANGELOG --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7c3f9147..b7c7e7f7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ### Implementation changes and bug fixes - [PR #1515](https://github.com/rqlite/rqlite/pull/1515): Fix a log message related to mutual TLS. - [PR #1516](https://github.com/rqlite/rqlite/pull/1516): Add support to Python end-to-end test helpers for mTLS. +- [PR #1518](https://github.com/rqlite/rqlite/pull/1518): Refactor muxed internode communications. ## 8.13.2 (December 21st 2023) ### Implementation changes and bug fixes From 0155ef8aa0a15b5e56f8be1e7f298ac5429275ed Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Sat, 23 Dec 2023 09:47:21 -0500 Subject: [PATCH 6/6] Fix system testing --- cmd/rqlited/main.go | 22 +++++++++++++++++++++- system_test/cluster_test.go | 29 +++++++++++++++++++---------- system_test/helpers.go | 19 ++++++++++++------- system_test/single_node_test.go | 33 +++++++++++++++++++++++---------- 4 files changed, 75 insertions(+), 28 deletions(-) diff --git a/cmd/rqlited/main.go b/cmd/rqlited/main.go index f9fa5387..009b8a38 100644 --- a/cmd/rqlited/main.go +++ b/cmd/rqlited/main.go @@ -91,8 +91,15 @@ func main() { if err != nil { log.Fatalf("failed to start node mux: %s", err.Error()) } - raftTn := mux.Listen(cluster.MuxRaftHeader) + + // Raft internode layer + raftLn := mux.Listen(cluster.MuxRaftHeader) log.Printf("Raft TCP mux Listener registered with byte header %d", cluster.MuxRaftHeader) + raftDialer, err := createRaftDialer(cfg) + if err != nil { + log.Fatalf("failed to create Raft dialer: %s", err.Error()) + } + raftTn := tcp.NewLayer(raftLn, raftDialer) // Create the store. str, err := createStore(cfg, raftTn) @@ -452,6 +459,19 @@ func createClusterClient(cfg *Config, clstr *cluster.Service) (*cluster.Client, return clstrClient, nil } +func createRaftDialer(cfg *Config) (*tcp.Dialer, error) { + var dialerTLSConfig *tls.Config + var err error + if cfg.NodeX509Cert != "" || cfg.NodeX509CACert != "" { + dialerTLSConfig, err = rtls.CreateClientConfig(cfg.NodeX509Cert, cfg.NodeX509Key, + cfg.NodeX509CACert, cfg.NodeVerifyServerName, cfg.NoNodeVerify) + if err != nil { + return nil, fmt.Errorf("failed to create TLS config for Raft dialer: %s", err.Error()) + } + } + return tcp.NewDialer(cluster.MuxRaftHeader, dialerTLSConfig), nil +} + func createCluster(cfg *Config, hasPeers bool, client *cluster.Client, str *store.Store, httpServ *httpd.Service, credStr *auth.CredentialsStore) error { joins := cfg.JoinAddresses() if err := networkCheckJoinAddrs(cfg, joins); err != nil { diff --git a/system_test/cluster_test.go b/system_test/cluster_test.go index f5b9ebb2..31e29604 100644 --- a/system_test/cluster_test.go +++ b/system_test/cluster_test.go @@ -629,8 +629,11 @@ func Test_MultiNodeClusterRaftAdv(t *testing.T) { } go mux2.Serve() + raftDialer := tcp.NewDialer(cluster.MuxRaftHeader, nil) + clstrDialer := tcp.NewDialer(cluster.MuxClusterHeader, nil) + // Start two nodes, and ensure a cluster can be formed. - node1 := mustNodeEncrypted(mustTempDir(), true, false, mux1, "1") + node1 := mustNodeEncrypted(mustTempDir(), true, false, mux1, raftDialer, clstrDialer, "1") defer node1.Deprovision() leader, err := node1.WaitForLeader() if err != nil { @@ -640,7 +643,7 @@ func Test_MultiNodeClusterRaftAdv(t *testing.T) { t.Fatalf("node return wrong leader from leader, exp: %s, got %s", exp, got) } - node2 := mustNodeEncrypted(mustTempDir(), false, false, mux2, "2") + node2 := mustNodeEncrypted(mustTempDir(), false, false, mux2, raftDialer, clstrDialer, "2") defer node2.Deprovision() if err := node2.Join(node1); err != nil { t.Fatalf("node2 failed to join leader: %s", err.Error()) @@ -1415,11 +1418,14 @@ func Test_MultiNodeClusterRecoverSingle(t *testing.T) { t.Fatalf("failed to close node3: %s", err.Error()) } + raftDialer := tcp.NewDialer(cluster.MuxRaftHeader, nil) + clstrDialer := tcp.NewDialer(cluster.MuxClusterHeader, nil) + // Create a single node using the node's data directory. It should fail because // quorum can't be met. This isn't quite right since the Raft address is also // changing, but it generally proves it doesn't come up. mux0, ln0 := mustNewOpenMux("127.0.0.1:10000") - failedSingle := mustNodeEncrypted(node1.Dir, true, false, mux0, node1.Store.ID()) + failedSingle := mustNodeEncrypted(node1.Dir, true, false, mux0, raftDialer, clstrDialer, node1.Store.ID()) _, err = failedSingle.WaitForLeader() if err == nil { t.Fatalf("no error waiting for leader") @@ -1432,7 +1438,7 @@ func Test_MultiNodeClusterRecoverSingle(t *testing.T) { peers := fmt.Sprintf(`[{"id": "%s","address": "%s"}]`, node1.Store.ID(), "127.0.0.1:10001") mustWriteFile(node1.PeersPath, peers) - okSingle := mustNodeEncrypted(node1.Dir, true, false, mux1, node1.Store.ID()) + okSingle := mustNodeEncrypted(node1.Dir, true, false, mux1, raftDialer, clstrDialer, node1.Store.ID()) _, err = okSingle.WaitForLeader() if err != nil { t.Fatalf("failed waiting for leader: %s", err.Error()) @@ -1449,15 +1455,18 @@ func Test_MultiNodeClusterRecoverSingle(t *testing.T) { func Test_MultiNodeClusterRecoverFull(t *testing.T) { var err error + raftDialer := tcp.NewDialer(cluster.MuxRaftHeader, nil) + clstrDialer := tcp.NewDialer(cluster.MuxClusterHeader, nil) + mux1, ln1 := mustNewOpenMux("127.0.0.1:10001") - node1 := mustNodeEncrypted(mustTempDir(), true, false, mux1, "1") + node1 := mustNodeEncrypted(mustTempDir(), true, false, mux1, raftDialer, clstrDialer, "1") _, err = node1.WaitForLeader() if err != nil { t.Fatalf("failed waiting for leader: %s", err.Error()) } mux2, ln2 := mustNewOpenMux("127.0.0.1:10002") - node2 := mustNodeEncrypted(mustTempDir(), false, false, mux2, "2") + node2 := mustNodeEncrypted(mustTempDir(), false, false, mux2, raftDialer, clstrDialer, "2") if err := node2.Join(node1); err != nil { t.Fatalf("node failed to join leader: %s", err.Error()) } @@ -1467,7 +1476,7 @@ func Test_MultiNodeClusterRecoverFull(t *testing.T) { } mux3, ln3 := mustNewOpenMux("127.0.0.1:10003") - node3 := mustNodeEncrypted(mustTempDir(), false, false, mux3, "3") + node3 := mustNodeEncrypted(mustTempDir(), false, false, mux3, raftDialer, clstrDialer, "3") if err := node3.Join(node1); err != nil { t.Fatalf("node failed to join leader: %s", err.Error()) } @@ -1515,17 +1524,17 @@ func Test_MultiNodeClusterRecoverFull(t *testing.T) { mustWriteFile(node3.PeersPath, peers) mux4, ln4 := mustNewOpenMux("127.0.0.1:11001") - node4 := mustNodeEncrypted(node1.Dir, false, false, mux4, "1") + node4 := mustNodeEncrypted(node1.Dir, false, false, mux4, raftDialer, clstrDialer, "1") defer node4.Deprovision() defer ln4.Close() mux5, ln5 := mustNewOpenMux("127.0.0.1:11002") - node5 := mustNodeEncrypted(node2.Dir, false, false, mux5, "2") + node5 := mustNodeEncrypted(node2.Dir, false, false, mux5, raftDialer, clstrDialer, "2") defer node5.Deprovision() defer ln5.Close() mux6, ln6 := mustNewOpenMux("127.0.0.1:11003") - node6 := mustNodeEncrypted(node3.Dir, false, false, mux6, "3") + node6 := mustNodeEncrypted(node3.Dir, false, false, mux6, raftDialer, clstrDialer, "3") defer node6.Deprovision() defer ln6.Close() diff --git a/system_test/helpers.go b/system_test/helpers.go index eaebe576..a1e63403 100644 --- a/system_test/helpers.go +++ b/system_test/helpers.go @@ -632,17 +632,22 @@ func mustNewNode(enableSingle bool) *Node { func mustNewNodeEncrypted(enableSingle, httpEncrypt, nodeEncrypt bool) *Node { dir := mustTempDir() var mux *tcp.Mux + var raftDialer *tcp.Dialer + var clstrDialer *tcp.Dialer if nodeEncrypt { mux = mustNewOpenTLSMux(rX509.CertExampleDotComFile(dir), rX509.KeyExampleDotComFile(dir), "") + raftDialer = tcp.NewDialer(cluster.MuxRaftHeader, mustCreateTLSConfig(rX509.CertExampleDotComFile(dir), rX509.KeyExampleDotComFile(dir), "")) + clstrDialer = tcp.NewDialer(cluster.MuxClusterHeader, mustCreateTLSConfig(rX509.CertExampleDotComFile(dir), rX509.KeyExampleDotComFile(dir), "")) } else { mux, _ = mustNewOpenMux("") + raftDialer = tcp.NewDialer(cluster.MuxRaftHeader, nil) + clstrDialer = tcp.NewDialer(cluster.MuxClusterHeader, nil) } go mux.Serve() - - return mustNodeEncrypted(dir, enableSingle, httpEncrypt, mux, "") + return mustNodeEncrypted(dir, enableSingle, httpEncrypt, mux, raftDialer, clstrDialer, "") } -func mustNodeEncrypted(dir string, enableSingle, httpEncrypt bool, mux *tcp.Mux, nodeID string) *Node { +func mustNodeEncrypted(dir string, enableSingle, httpEncrypt bool, mux *tcp.Mux, raftDialer, clstrDialer *tcp.Dialer, nodeID string) *Node { nodeCertPath := rX509.CertExampleDotComFile(dir) nodeKeyPath := rX509.KeyExampleDotComFile(dir) httpCertPath := nodeCertPath @@ -654,13 +659,14 @@ func mustNodeEncrypted(dir string, enableSingle, httpEncrypt bool, mux *tcp.Mux, NodeKeyPath: nodeKeyPath, HTTPCertPath: httpCertPath, HTTPKeyPath: httpKeyPath, - TLSConfig: mustCreateTLSConfig(nodeCertPath, nodeKeyPath, ""), - PeersPath: filepath.Join(dir, "raft/peers.json"), + //TLSConfig: mustCreateTLSConfig(nodeCertPath, nodeKeyPath, ""), + PeersPath: filepath.Join(dir, "raft/peers.json"), } dbConf := store.NewDBConfig() - raftTn := mux.Listen(cluster.MuxRaftHeader) + raftLn := mux.Listen(cluster.MuxRaftHeader) + raftTn := tcp.NewLayer(raftLn, raftDialer) id := nodeID if id == "" { id = raftTn.Addr().String() @@ -693,7 +699,6 @@ func mustNodeEncrypted(dir string, enableSingle, httpEncrypt bool, mux *tcp.Mux, } node.Cluster = clstr - clstrDialer := tcp.NewDialer(cluster.MuxClusterHeader, nil) clstrClient := cluster.NewClient(clstrDialer, 30*time.Second) node.Client = clstrClient node.Service = httpd.New("localhost:0", node.Store, clstrClient, nil) diff --git a/system_test/single_node_test.go b/system_test/single_node_test.go index 2c9e34db..00528c5a 100644 --- a/system_test/single_node_test.go +++ b/system_test/single_node_test.go @@ -31,7 +31,9 @@ func Test_SingleNodeBasicEndpoint(t *testing.T) { dir := mustTempDir() mux, ln := mustNewOpenMux("") defer ln.Close() - node = mustNodeEncrypted(dir, true, false, mux, "") + raftDialer := tcp.NewDialer(cluster.MuxRaftHeader, nil) + clstrDialer := tcp.NewDialer(cluster.MuxClusterHeader, nil) + node = mustNodeEncrypted(dir, true, false, mux, raftDialer, clstrDialer, "") if _, err := node.WaitForLeader(); err != nil { t.Fatalf("node never became leader") } @@ -863,8 +865,9 @@ func Test_SingleNodeUpgrades_NoSnapshots(t *testing.T) { mux, ln := mustNewOpenMux("") defer ln.Close() - - node := mustNodeEncrypted(destdir, true, false, mux, "node1") + raftDialer := tcp.NewDialer(cluster.MuxRaftHeader, nil) + clstrDialer := tcp.NewDialer(cluster.MuxClusterHeader, nil) + node := mustNodeEncrypted(destdir, true, false, mux, raftDialer, clstrDialer, "node1") defer node.Deprovision() if _, err := node.WaitForLeader(); err != nil { t.Fatalf("node never became leader with %s data:", dir) @@ -922,8 +925,9 @@ func Test_SingleNodeUpgrades_Snapshots(t *testing.T) { mux, ln := mustNewOpenMux("") defer ln.Close() - - node := mustNodeEncrypted(destdir, true, false, mux, "node1") + raftDialer := tcp.NewDialer(cluster.MuxRaftHeader, nil) + clstrDialer := tcp.NewDialer(cluster.MuxClusterHeader, nil) + node := mustNodeEncrypted(destdir, true, false, mux, raftDialer, clstrDialer, "node1") defer node.Deprovision() if _, err := node.WaitForLeader(); err != nil { t.Fatalf("node never became leader with %s data:", dir) @@ -1019,7 +1023,9 @@ func Test_SingleNodeReopen(t *testing.T) { dir := mustTempDir() mux, ln := mustNewOpenMux("") defer ln.Close() - node := mustNodeEncrypted(dir, true, false, mux, "") + raftDialer := tcp.NewDialer(cluster.MuxRaftHeader, nil) + clstrDialer := tcp.NewDialer(cluster.MuxClusterHeader, nil) + node := mustNodeEncrypted(dir, true, false, mux, raftDialer, clstrDialer, "") if _, err := node.WaitForLeader(); err != nil { t.Fatalf("node never became leader") @@ -1052,7 +1058,9 @@ func Test_SingleNodeNoopReopen(t *testing.T) { dir := mustTempDir() mux, ln := mustNewOpenMux("") defer ln.Close() - node := mustNodeEncrypted(dir, true, false, mux, "") + raftDialer := tcp.NewDialer(cluster.MuxRaftHeader, nil) + clstrDialer := tcp.NewDialer(cluster.MuxClusterHeader, nil) + node := mustNodeEncrypted(dir, true, false, mux, raftDialer, clstrDialer, "") if _, err := node.WaitForLeader(); err != nil { t.Fatalf("node never became leader") @@ -1133,7 +1141,9 @@ func Test_SingleNodeNoopSnapReopen(t *testing.T) { dir := mustTempDir() mux, ln := mustNewOpenMux("") defer ln.Close() - node := mustNodeEncrypted(dir, true, false, mux, "") + raftDialer := tcp.NewDialer(cluster.MuxRaftHeader, nil) + clstrDialer := tcp.NewDialer(cluster.MuxClusterHeader, nil) + node := mustNodeEncrypted(dir, true, false, mux, raftDialer, clstrDialer, "") if _, err := node.WaitForLeader(); err != nil { t.Fatalf("node never became leader") @@ -1219,7 +1229,9 @@ func Test_SingleNodeNoopSnapLogsReopen(t *testing.T) { dir := mustTempDir() mux, ln := mustNewOpenMux("") defer ln.Close() - node := mustNodeEncrypted(dir, true, false, mux, "") + raftDialer := tcp.NewDialer(cluster.MuxRaftHeader, nil) + clstrDialer := tcp.NewDialer(cluster.MuxClusterHeader, nil) + node := mustNodeEncrypted(dir, true, false, mux, raftDialer, clstrDialer, "") if _, err := node.WaitForLeader(); err != nil { t.Fatalf("node never became leader") @@ -1310,7 +1322,8 @@ func Test_SingleNodeAutoRestore(t *testing.T) { mux, _ := mustNewOpenMux("") go mux.Serve() - raftTn := mux.Listen(cluster.MuxRaftHeader) + raftLn := mux.Listen(cluster.MuxRaftHeader) + raftTn := tcp.NewLayer(raftLn, tcp.NewDialer(cluster.MuxRaftHeader, nil)) node.Store = store.New(raftTn, &store.Config{ DBConf: store.NewDBConfig(), Dir: node.Dir,