From f16b6ba55be58487644190014e59bece16d87d36 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Sun, 1 May 2016 15:03:27 -0700 Subject: [PATCH] Allow Raft address advertise (#115) * Use resolved Raft address as API peer key * Allow Raft advertise address to be set * Better log message for mux * CHANGELOG updates * Unit test mux layer address advertise --- CHANGELOG.md | 3 ++- cmd/rqlited/main.go | 14 ++++++++++++-- tcp/mux.go | 14 ++++++++++---- tcp/mux_test.go | 43 +++++++++++++++++++++++++++++++++++++++++-- 4 files changed, 65 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6a15caf6..563bf3dd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ ## 3.0.0 (unreleased) +- [PR #115](https://github.com/rqlite/rqlite/pull/115): Support advertising address different than Raft bind address. - [PR #113](https://github.com/rqlite/rqlite/pull/113): Switch to in-memory SQLite databases by default. -- [PR #109](https://github.com/rqlite/rqlite/pull/109): Nodes broadcast meta to cluster via Raft +- [PR #109](https://github.com/rqlite/rqlite/pull/109): Nodes broadcast meta to cluster via Raft. - [PR #109](https://github.com/rqlite/rqlite/pull/109), [PR #111](https://github.com/rqlite/rqlite/pull/111): Leader redirection - [PR #104](https://github.com/rqlite/rqlite/pull/104): Handle the `-join` option sensibly when already member of cluster. diff --git a/cmd/rqlited/main.go b/cmd/rqlited/main.go index d07b8890..af3ef339 100644 --- a/cmd/rqlited/main.go +++ b/cmd/rqlited/main.go @@ -67,6 +67,7 @@ var authFile string var x509Cert string var x509Key string var raftAddr string +var raftAdv string var joinAddr string var noVerify bool var expvar bool @@ -83,6 +84,7 @@ func init() { flag.StringVar(&x509Key, "x509key", "", "Path to X.509 private key for certificate") flag.StringVar(&authFile, "auth", "", "Path to authentication and authorization file. If not set, not enabled.") flag.StringVar(&raftAddr, "raft", "localhost:4002", "Raft communication bind address") + flag.StringVar(&raftAdv, "raftadv", "", "Raft advertise address. If not set, same as bind") flag.StringVar(&joinAddr, "join", "", "protocol://host:port of leader to join") flag.BoolVar(&noVerify, "noverify", false, "Skip verification of any HTTPS cert when joining") flag.BoolVar(&expvar, "expvar", true, "Serve expvar data on HTTP server") @@ -154,7 +156,14 @@ func main() { if err != nil { log.Fatalf("failed to listen on %s: %s", raftAddr, err.Error()) } - mux := tcp.NewMux(ln) + var adv net.Addr + if raftAdv != "" { + adv, err = net.ResolveTCPAddr("tcp", raftAdv) + if err != nil { + log.Fatalf("failed to resolve advertise address %s: %s", raftAdv, err.Error()) + } + } + mux := tcp.NewMux(ln, adv) go mux.Serve() // Start up mux and get transports for cluster. @@ -191,7 +200,8 @@ func main() { } // Publish to the cluster the mapping between this Raft address and API address. - if err := publishAPIAddr(cs, raftAddr, httpAddr, publishPeerTimeout); err != nil { + // The Raft layer broadcasts the resolved address, so use that as the key. + if err := publishAPIAddr(cs, raftTn.Addr().String(), httpAddr, publishPeerTimeout); err != nil { log.Fatalf("failed to set peer for %s to %s: %s", raftAddr, httpAddr, err.Error()) } log.Printf("set peer for %s to %s", raftAddr, httpAddr) diff --git a/tcp/mux.go b/tcp/mux.go index 0165a8e6..ceda1527 100644 --- a/tcp/mux.go +++ b/tcp/mux.go @@ -65,11 +65,17 @@ type Mux struct { Logger *log.Logger } -// NewMux returns a new instance of Mux for ln. -func NewMux(ln net.Listener) *Mux { +// NewMux returns a new instance of Mux for ln. If adv is nil, +// then the addr of ln is used. +func NewMux(ln net.Listener, adv net.Addr) *Mux { + addr := adv + if addr == nil { + addr = ln.Addr() + } + return &Mux{ ln: ln, - addr: ln.Addr(), + addr: addr, m: make(map[byte]*listener), Timeout: DefaultTimeout, Logger: log.New(os.Stderr, "[tcp] ", log.LstdFlags), @@ -78,7 +84,7 @@ func NewMux(ln net.Listener) *Mux { // Serve handles connections from ln and multiplexes then across registered listener. func (mux *Mux) Serve() error { - mux.Logger.Println("mux serving on", mux.addr) + mux.Logger.Printf("mux serving on %s, advertising %s", mux.ln.Addr().String(), mux.addr) for { // Wait for the next connection. diff --git a/tcp/mux_test.go b/tcp/mux_test.go index 11c7cff1..293db4a1 100644 --- a/tcp/mux_test.go +++ b/tcp/mux_test.go @@ -34,7 +34,7 @@ func TestMux(t *testing.T) { defer tcpListener.Close() // Setup muxer & listeners. - mux := NewMux(tcpListener) + mux := NewMux(tcpListener, nil) mux.Timeout = 200 * time.Millisecond if !testing.Verbose() { mux.Logger = log.New(ioutil.Discard, "", 0) @@ -120,6 +120,32 @@ func TestMux(t *testing.T) { } } +func TestMux_Advertise(t *testing.T) { + // Setup muxer. + tcpListener, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + defer tcpListener.Close() + + addr := &mockAddr{ + Nwk: "tcp", + Addr: "rqlite.com:8081", + } + + mux := NewMux(tcpListener, addr) + mux.Timeout = 200 * time.Millisecond + if !testing.Verbose() { + mux.Logger = log.New(ioutil.Discard, "", 0) + } + + layer := mux.Listen(1) + if layer.Addr().String() != addr.Addr { + t.Fatalf("layer advertise address not correct, exp %s, got %s", + layer.Addr().String(), addr.Addr) + } +} + // Ensure two handlers cannot be registered for the same header byte. func TestMux_Listen_ErrAlreadyRegistered(t *testing.T) { defer func() { @@ -133,7 +159,20 @@ func TestMux_Listen_ErrAlreadyRegistered(t *testing.T) { if err != nil { t.Fatal(err) } - mux := NewMux(tcpListener) + mux := NewMux(tcpListener, nil) mux.Listen(5) mux.Listen(5) } + +type mockAddr struct { + Nwk string + Addr string +} + +func (m *mockAddr) Network() string { + return m.Nwk +} + +func (m *mockAddr) String() string { + return m.Addr +}