1
0
Fork 0

Remove older TCP transport code

Still unclear about whether tn.Open() is required with new mux.
master
Philip O'Toole 3 years ago
parent f32e71c00f
commit 38b2abca84

@ -185,31 +185,13 @@ func main() {
// Start requested profiling.
startProfile(cpuProfile, memProfile)
// Create internode network layer and mux.
ln, err := net.Listen("tcp", raftAddr)
// Create internode network mux and configure.
mux, err := startNodeMux()
if err != nil {
log.Fatalf("failed to listen on %s: %s", raftAddr, err.Error())
}
var adv net.Addr
if raftAdv != "" {
adv, err = net.ResolveTCPAddr("tcp", raftAdv)
if err != nil {
log.Fatalf("failed to resolve advertise address %s: %s", raftAdv, err.Error())
}
log.Fatalf("failed to start node mux: %s", err.Error())
}
var mux *tcp.Mux
if nodeEncrypt {
log.Printf("enabling node-to-node encryption with cert: %s, key: %s", nodeX509Cert, nodeX509Key)
mux, err = tcp.NewTLSMux(ln, adv, nodeX509Cert, nodeX509Key, nodeX509CACert)
} else {
mux, err = tcp.NewMux(ln, adv)
}
if err != nil {
log.Fatalf("failed to create node-to-node mux: %s", err.Error())
}
go mux.Serve()
tn := mux.Listen(muxRaftHeader)
raftTn := mux.Listen(muxRaftHeader)
//clusterTn := mux.Listen(muxClusterHeader)
// Create and open the store.
dataPath, err = filepath.Abs(dataPath)
@ -218,7 +200,7 @@ func main() {
}
dbConf := store.NewDBConfig(dsn, !onDisk)
str := store.New(tn, &store.StoreConfig{
str := store.New(raftTn, &store.StoreConfig{
DBConf: dbConf,
Dir: dataPath,
ID: idOrRaftAddr(),
@ -445,6 +427,35 @@ func startHTTPService(str *store.Store) error {
return s.Start()
}
func startNodeMux() (*tcp.Mux, error) {
ln, err := net.Listen("tcp", raftAddr)
if err != nil {
return nil, fmt.Errorf("failed to listen on %s: %s", raftAddr, err.Error())
}
var adv net.Addr
if raftAdv != "" {
adv, err = net.ResolveTCPAddr("tcp", raftAdv)
if err != nil {
return nil, fmt.Errorf("failed to resolve advertise address %s: %s", raftAdv, err.Error())
}
}
var mux *tcp.Mux
if nodeEncrypt {
log.Printf("enabling node-to-node encryption with cert: %s, key: %s", nodeX509Cert, nodeX509Key)
mux, err = tcp.NewTLSMux(ln, adv, nodeX509Cert, nodeX509Key, nodeX509CACert)
} else {
mux, err = tcp.NewMux(ln, adv)
}
if err != nil {
return nil, fmt.Errorf("failed to create node-to-node mux: %s", err.Error())
}
mux.InsecureSkipVerify = noNodeVerify
go mux.Serve()
return mux, nil
}
func credentialStore() (*auth.CredentialsStore, error) {
if authFile == "" {
return nil, nil

@ -6,6 +6,7 @@ import (
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"net/url"
"os"
@ -410,11 +411,11 @@ func mustNewNode(enableSingle bool) *Node {
func mustNewNodeEncrypted(enableSingle, httpEncrypt, nodeEncrypt bool) *Node {
dir := mustTempDir()
var tn *tcp.Transport
var tn *tcp.Layer
if nodeEncrypt {
tn = mustNewOpenTLSTransport(x509.CertFile(dir), x509.KeyFile(dir), "")
_, tn = mustNewOpenTLSTransport(x509.CertFile(dir), x509.KeyFile(dir), "")
} else {
tn = mustNewOpenTransport("")
_, tn = mustNewOpenTransport("")
}
return mustNodeEncrypted(dir, enableSingle, httpEncrypt, tn, "")
@ -493,28 +494,45 @@ func mustTempDir() string {
return path
}
func mustNewOpenTransport(addr string) *tcp.Transport {
func mustNewOpenTransport(addr string) (*tcp.Mux, *tcp.Layer) {
if addr == "" {
addr = "localhost:0"
}
tn := tcp.NewTransport()
if err := tn.Open(addr); err != nil {
panic(fmt.Sprintf("failed to open transport: %s", err))
ln, err := net.Listen("tcp", addr)
if err != nil {
panic(fmt.Sprintf("failed to listen on %s: %s", addr, err.Error()))
}
return tn
var mux *tcp.Mux
mux, err = tcp.NewMux(ln, nil)
if err != nil {
panic(fmt.Sprintf("failed to create node-to-node mux: %s", err.Error()))
}
go mux.Serve()
return mux, mux.Listen(1) // Could be any byte value.
}
func mustNewOpenTLSTransport(certFile, keyPath, addr string) *tcp.Transport {
func mustNewOpenTLSTransport(certFile, keyPath, addr string) (*tcp.Mux, *tcp.Layer) {
if addr == "" {
addr = "localhost:0"
}
tn := tcp.NewTLSTransport(certFile, keyPath, "", true)
if err := tn.Open(addr); err != nil {
panic(fmt.Sprintf("failed to open transport: %s", err))
ln, err := net.Listen("tcp", addr)
if err != nil {
panic(fmt.Sprintf("failed to listen on %s: %s", addr, err.Error()))
}
return tn
var mux *tcp.Mux
mux, err = tcp.NewTLSMux(ln, nil, certFile, keyPath, "")
if err != nil {
panic(fmt.Sprintf("failed to create node-to-node mux: %s", err.Error()))
}
mux.InsecureSkipVerify = true
go mux.Serve()
return mux, mux.Listen(1) // Could be any byte value.
}
func mustParseDuration(d string) time.Duration {

@ -356,7 +356,7 @@ func Test_SingleNodeRestart(t *testing.T) {
t.Fatalf("failed to copy node test directory: %s", err)
}
tn := mustNewOpenTransport("")
_, tn := mustNewOpenTransport("")
defer tn.Close()
node := mustNodeEncrypted(destdir, true, false, tn, "node1")
@ -421,7 +421,7 @@ func Test_SingleNodeReopen(t *testing.T) {
t.Logf("running test %s, on-disk=%v", t.Name(), onDisk)
dir := mustTempDir()
tn := mustNewOpenTransport("")
_, tn := mustNewOpenTransport("")
node := mustNodeEncrypted(dir, true, false, tn, "")
if _, err := node.WaitForLeader(); err != nil {
@ -432,9 +432,9 @@ func Test_SingleNodeReopen(t *testing.T) {
t.Fatalf("failed to close node")
}
if err := tn.Open("localhost:0"); err != nil {
t.Fatalf("failed to re-open transport: %s", err)
}
// if err := tn.Open("localhost:0"); err != nil {
// t.Fatalf("failed to re-open transport: %s", err)
// }
if err := node.Store.Open(true); err != nil {
t.Fatalf("failed to re-open store: %s", err)
}
@ -464,7 +464,7 @@ func Test_SingleNodeNoopReopen(t *testing.T) {
t.Logf("running test %s, on-disk=%v", t.Name(), onDisk)
dir := mustTempDir()
tn := mustNewOpenTransport("")
_, tn := mustNewOpenTransport("")
node := mustNodeEncryptedOnDisk(dir, true, false, tn, "", false)
if _, err := node.WaitForLeader(); err != nil {
@ -479,9 +479,9 @@ func Test_SingleNodeNoopReopen(t *testing.T) {
t.Fatalf("failed to close node")
}
if err := tn.Open("localhost:0"); err != nil {
t.Fatalf("failed to re-open transport: %s", err)
}
// if err := tn.Open("localhost:0"); err != nil {
// t.Fatalf("failed to re-open transport: %s", err)
// }
if err := node.Store.Open(true); err != nil {
t.Fatalf("failed to re-open store: %s", err)
}
@ -555,7 +555,7 @@ func Test_SingleNodeNoopSnapReopen(t *testing.T) {
t.Logf("running test %s, on-disk=%v", t.Name(), onDisk)
dir := mustTempDir()
tn := mustNewOpenTransport("")
_, tn := mustNewOpenTransport("")
node := mustNodeEncryptedOnDisk(dir, true, false, tn, "", onDisk)
if _, err := node.WaitForLeader(); err != nil {
@ -575,9 +575,9 @@ func Test_SingleNodeNoopSnapReopen(t *testing.T) {
t.Fatalf("failed to close node")
}
if err := tn.Open("localhost:0"); err != nil {
t.Fatalf("failed to re-open transport: %s", err)
}
// if err := tn.Open("localhost:0"); err != nil {
// t.Fatalf("failed to re-open transport: %s", err)
// }
if err := node.Store.Open(true); err != nil {
t.Fatalf("failed to re-open store: %s", err)
}
@ -651,7 +651,7 @@ func Test_SingleNodeNoopSnapLogsReopen(t *testing.T) {
t.Logf("running test %s, on-disk=%v", t.Name(), onDisk)
dir := mustTempDir()
tn := mustNewOpenTransport("")
_, tn := mustNewOpenTransport("")
node := mustNodeEncryptedOnDisk(dir, true, false, tn, "", onDisk)
raftAddr = node.RaftAddr
t.Logf("node listening for Raft on %s", raftAddr)
@ -679,9 +679,9 @@ func Test_SingleNodeNoopSnapLogsReopen(t *testing.T) {
}
// Reset network state in node.
if err := tn.Open(raftAddr); err != nil {
t.Fatalf("failed to re-open transport: %s", err)
}
// if err := tn.Open(raftAddr); err != nil {
// t.Fatalf("failed to re-open transport: %s", err)
// }
if err := node.Store.Open(true); err != nil {
t.Fatalf("failed to re-open store: %s", err)

@ -2,9 +2,11 @@ package tcp
import (
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"io"
"io/ioutil"
"log"
"net"
"os"
@ -39,7 +41,10 @@ func (l *Layer) Dial(addr string, timeout time.Duration) (net.Conn, error) {
var err error
var conn net.Conn
if l.remoteEncrypted {
conn, err = tls.DialWithDialer(dialer, "tcp", addr, l.tlsConfig)
conf := &tls.Config{
InsecureSkipVerify: l.skipVerify,
}
conn, err = tls.DialWithDialer(dialer, "tcp", addr, conf)
} else {
conn, err = dialer.Dial("tcp", addr)
}
@ -276,3 +281,29 @@ func newTLSListener(ln net.Listener, certFile, keyFile, caCertFile string) (net.
return tls.NewListener(ln, config), nil
}
// createTLSConfig returns a TLS config from the given cert, key and optionally
// Certificate Authority cert.
func createTLSConfig(certFile, keyFile, caCertFile string) (*tls.Config, error) {
var err error
config := &tls.Config{}
config.Certificates = make([]tls.Certificate, 1)
config.Certificates[0], err = tls.LoadX509KeyPair(certFile, keyFile)
if err != nil {
return nil, err
}
if caCertFile != "" {
asn1Data, err := ioutil.ReadFile(caCertFile)
if err != nil {
return nil, err
}
config.RootCAs = x509.NewCertPool()
ok := config.RootCAs.AppendCertsFromPEM([]byte(asn1Data))
if !ok {
return nil, fmt.Errorf("failed to parse root certificate(s) in %q", caCertFile)
}
}
return config, nil
}

@ -1,130 +0,0 @@
package tcp
import (
"crypto/tls"
"crypto/x509"
"fmt"
"io/ioutil"
"net"
"time"
)
// Transport is the network layer for internode communications.
type Transport struct {
ln net.Listener
certFile string // Path to local X.509 cert.
certKey string // Path to corresponding X.509 key.
caCertFile string // Path to root X.509 certificate.
remoteEncrypted bool // Remote nodes use encrypted communication.
skipVerify bool // Skip verification of remote node certs.
srcIP string // The specified source IP is optional
}
// NewTransport returns an initialized unencrypted Transport.
func NewTransport() *Transport {
return &Transport{}
}
// NewTLSTransport returns an initialized TLS-encrypted Transport.
func NewTLSTransport(certFile, keyPath, caCertFile string, skipVerify bool) *Transport {
return &Transport{
certFile: certFile,
certKey: keyPath,
caCertFile: caCertFile,
remoteEncrypted: true,
skipVerify: skipVerify,
}
}
// Open opens the transport, binding to the supplied address.
func (t *Transport) Open(addr string) error {
ln, err := net.Listen("tcp", addr)
if err != nil {
return err
}
if t.certFile != "" {
config, err := createTLSConfig(t.certFile, t.certKey, t.caCertFile)
if err != nil {
return err
}
ln = tls.NewListener(ln, config)
}
t.ln = ln
return nil
}
// Dial opens a network connection.
func (t *Transport) Dial(addr string, timeout time.Duration) (net.Conn, error) {
var dialer *net.Dialer
dialer = &net.Dialer{Timeout: timeout}
if t.srcIP != "" {
netAddr := &net.TCPAddr{
IP: net.ParseIP(t.srcIP),
Port: 0,
}
dialer = &net.Dialer{Timeout: timeout, LocalAddr: netAddr}
}
var err error
var conn net.Conn
if t.remoteEncrypted {
conf := &tls.Config{
InsecureSkipVerify: t.skipVerify,
}
conn, err = tls.DialWithDialer(dialer, "tcp", addr, conf)
} else {
conn, err = dialer.Dial("tcp", addr)
}
return conn, err
}
// Accept waits for the next connection.
func (t *Transport) Accept() (net.Conn, error) {
c, err := t.ln.Accept()
if err != nil {
fmt.Println("error accepting: ", err.Error())
}
return c, err
}
// Close closes the transport
func (t *Transport) Close() error {
if t.ln != nil {
return t.ln.Close()
}
return nil
}
// Addr returns the binding address of the transport.
func (t *Transport) Addr() net.Addr {
return t.ln.Addr()
}
// createTLSConfig returns a TLS config from the given cert, key and optionally
// Certificate Authority cert.
func createTLSConfig(certFile, keyFile, caCertFile string) (*tls.Config, error) {
var err error
config := &tls.Config{}
config.Certificates = make([]tls.Certificate, 1)
config.Certificates[0], err = tls.LoadX509KeyPair(certFile, keyFile)
if err != nil {
return nil, err
}
if caCertFile != "" {
asn1Data, err := ioutil.ReadFile(caCertFile)
if err != nil {
return nil, err
}
config.RootCAs = x509.NewCertPool()
ok := config.RootCAs.AppendCertsFromPEM([]byte(asn1Data))
if !ok {
return nil, fmt.Errorf("failed to parse root certificate(s) in %q", caCertFile)
}
}
return config, nil
}

@ -1,71 +0,0 @@
package tcp
import (
"os"
"testing"
"time"
"github.com/rqlite/rqlite/testdata/x509"
)
func Test_NewTransport(t *testing.T) {
if NewTransport() == nil {
t.Fatal("failed to create new Transport")
}
}
func Test_TransportOpenClose(t *testing.T) {
tn := NewTransport()
if err := tn.Open("localhost:0"); err != nil {
t.Fatalf("failed to open transport: %s", err.Error())
}
if tn.Addr().String() == "localhost:0" {
t.Fatalf("transport address set incorrectly, got: %s", tn.Addr().String())
}
if err := tn.Close(); err != nil {
t.Fatalf("failed to close transport: %s", err.Error())
}
}
func Test_TransportDial(t *testing.T) {
tn1 := NewTransport()
defer tn1.Close()
tn1.Open("localhost:0")
go tn1.Accept()
tn2 := NewTransport()
defer tn2.Close()
_, err := tn2.Dial(tn1.Addr().String(), time.Second)
if err != nil {
t.Fatalf("failed to connect to first transport: %s", err.Error())
}
}
func Test_NewTLSTransport(t *testing.T) {
c := x509.CertFile("")
defer os.Remove(c)
k := x509.KeyFile("")
defer os.Remove(k)
if NewTLSTransport(c, k, "", true) == nil {
t.Fatal("failed to create new TLS Transport")
}
}
func Test_TLSTransportOpenClose(t *testing.T) {
c := x509.CertFile("")
defer os.Remove(c)
k := x509.KeyFile("")
defer os.Remove(k)
tn := NewTLSTransport(c, k, "", true)
if err := tn.Open("localhost:0"); err != nil {
t.Fatalf("failed to open TLS transport: %s", err.Error())
}
if tn.Addr().String() == "localhost:0" {
t.Fatalf("TLS transport address set incorrectly, got: %s", tn.Addr().String())
}
if err := tn.Close(); err != nil {
t.Fatalf("failed to close TLS transport: %s", err.Error())
}
}
Loading…
Cancel
Save