1
0
Fork 0

Hook in explicit Cluster Dialer

master
Philip O'Toole 3 years ago
parent 281c454fc2
commit 796ef147f7

@ -37,15 +37,19 @@ func init() {
stats.Add(numGetNodeAPIResponse, 0)
}
// Transport is the interface the network layer must provide.
type Transport interface {
net.Listener
// Dialer is the interface dialers must implement.
type Dialer interface {
// Dial is used to create a connection to a service listening
// on an address.
Dial(address string, timeout time.Duration) (net.Conn, error)
}
// Transport is the interface the network layer must provide.
type Transport interface {
net.Listener
Dialer
}
// Service provides information about the node and cluster.
type Service struct {
tn Transport // Network layer this service uses

@ -305,7 +305,9 @@ func main() {
log.Println("store has reached consensus")
// Start the HTTP API server.
if err := startHTTPService(str, clstr); err != nil {
clstrDialer := tcp.NewDialer(cluster.MuxClusterHeader, nodeEncrypt, noNodeVerify)
clstrClient := cluster.NewClient(clstrDialer)
if err := startHTTPService(str, clstrClient); err != nil {
log.Fatalf("failed to start HTTP server: %s", err.Error())
}
log.Println("node is ready")
@ -375,7 +377,7 @@ func waitForConsensus(str *store.Store) error {
return nil
}
func startHTTPService(str *store.Store, cltr *cluster.Service) error {
func startHTTPService(str *store.Store, cltr *cluster.Client) error {
// Get the credential store.
credStr, err := credentialStore()
if err != nil {

@ -35,29 +35,31 @@ func init() {
stats.Add(numUnregisteredHandlers, 0)
}
// Layer represents the connection between nodes. It can be both used to
// make connections to other nodes, and receive connections from other
// nodes.
type Layer struct {
ln net.Listener
header byte
addr net.Addr
// NewDialer returns an initialized Dialer
func NewDialer(header byte, remoteEncrypted, skipVerify bool) *Dialer {
return &Dialer{
header: header,
remoteEncrypted: remoteEncrypted,
skipVerify: skipVerify,
}
}
// Dialer supports dialing a cluster service.
type Dialer struct {
header byte
remoteEncrypted bool
skipVerify bool
nodeX509CACert string
tlsConfig *tls.Config
}
// Dial creates a new network connection.
func (l *Layer) Dial(addr string, timeout time.Duration) (net.Conn, error) {
// Dial dials the cluster service a the given addr and returns a connection.
func (d *Dialer) Dial(addr string, timeout time.Duration) (net.Conn, error) {
dialer := &net.Dialer{Timeout: timeout}
var err error
var conn net.Conn
if l.remoteEncrypted {
if d.remoteEncrypted {
conf := &tls.Config{
InsecureSkipVerify: l.skipVerify,
InsecureSkipVerify: d.skipVerify,
}
conn, err = tls.DialWithDialer(dialer, "tcp", addr, conf)
} else {
@ -68,7 +70,7 @@ func (l *Layer) Dial(addr string, timeout time.Duration) (net.Conn, error) {
}
// Write a marker byte to indicate message type.
_, err = conn.Write([]byte{l.header})
_, err = conn.Write([]byte{d.header})
if err != nil {
conn.Close()
return nil, err
@ -76,6 +78,24 @@ func (l *Layer) Dial(addr string, timeout time.Duration) (net.Conn, error) {
return conn, err
}
// Layer represents the connection between nodes. It can be both used to
// make connections to other nodes, and receive connections from other
// nodes.
type Layer struct {
ln net.Listener
addr net.Addr
dialer *Dialer
nodeX509CACert string
tlsConfig *tls.Config
}
// Dial creates a new network connection.
func (l *Layer) Dial(addr string, timeout time.Duration) (net.Conn, error) {
return l.dialer.Dial(addr, timeout)
}
// Accept waits for the next connection.
func (l *Layer) Accept() (net.Conn, error) { return l.ln.Accept() }
@ -263,14 +283,12 @@ func (mux *Mux) Listen(header byte) *Layer {
mux.m[header] = ln
layer := &Layer{
ln: ln,
header: header,
addr: mux.addr,
remoteEncrypted: mux.remoteEncrypted,
skipVerify: mux.InsecureSkipVerify,
nodeX509CACert: mux.x509CACert,
tlsConfig: mux.tlsConfig,
ln: ln,
addr: mux.addr,
nodeX509CACert: mux.x509CACert,
tlsConfig: mux.tlsConfig,
}
layer.dialer = NewDialer(header, mux.remoteEncrypted, mux.InsecureSkipVerify)
return layer
}

Loading…
Cancel
Save