1
0
Fork 0

Allow Raft address advertise (#115)

* Use resolved Raft address as API peer key

* Allow Raft advertise address to be set

* Better log message for mux

* CHANGELOG updates

* Unit test mux layer address advertise
master
Philip O'Toole 9 years ago
parent 32514f4744
commit f16b6ba55b

@ -1,6 +1,7 @@
## 3.0.0 (unreleased)
- [PR #115](https://github.com/rqlite/rqlite/pull/115): Support advertising address different than Raft bind address.
- [PR #113](https://github.com/rqlite/rqlite/pull/113): Switch to in-memory SQLite databases by default.
- [PR #109](https://github.com/rqlite/rqlite/pull/109): Nodes broadcast meta to cluster via Raft
- [PR #109](https://github.com/rqlite/rqlite/pull/109): Nodes broadcast meta to cluster via Raft.
- [PR #109](https://github.com/rqlite/rqlite/pull/109), [PR #111](https://github.com/rqlite/rqlite/pull/111): Leader redirection
- [PR #104](https://github.com/rqlite/rqlite/pull/104): Handle the `-join` option sensibly when already member of cluster.

@ -67,6 +67,7 @@ var authFile string
var x509Cert string
var x509Key string
var raftAddr string
var raftAdv string
var joinAddr string
var noVerify bool
var expvar bool
@ -83,6 +84,7 @@ func init() {
flag.StringVar(&x509Key, "x509key", "", "Path to X.509 private key for certificate")
flag.StringVar(&authFile, "auth", "", "Path to authentication and authorization file. If not set, not enabled.")
flag.StringVar(&raftAddr, "raft", "localhost:4002", "Raft communication bind address")
flag.StringVar(&raftAdv, "raftadv", "", "Raft advertise address. If not set, same as bind")
flag.StringVar(&joinAddr, "join", "", "protocol://host:port of leader to join")
flag.BoolVar(&noVerify, "noverify", false, "Skip verification of any HTTPS cert when joining")
flag.BoolVar(&expvar, "expvar", true, "Serve expvar data on HTTP server")
@ -154,7 +156,14 @@ func main() {
if err != nil {
log.Fatalf("failed to listen on %s: %s", raftAddr, err.Error())
}
mux := tcp.NewMux(ln)
var adv net.Addr
if raftAdv != "" {
adv, err = net.ResolveTCPAddr("tcp", raftAdv)
if err != nil {
log.Fatalf("failed to resolve advertise address %s: %s", raftAdv, err.Error())
}
}
mux := tcp.NewMux(ln, adv)
go mux.Serve()
// Start up mux and get transports for cluster.
@ -191,7 +200,8 @@ func main() {
}
// Publish to the cluster the mapping between this Raft address and API address.
if err := publishAPIAddr(cs, raftAddr, httpAddr, publishPeerTimeout); err != nil {
// The Raft layer broadcasts the resolved address, so use that as the key.
if err := publishAPIAddr(cs, raftTn.Addr().String(), httpAddr, publishPeerTimeout); err != nil {
log.Fatalf("failed to set peer for %s to %s: %s", raftAddr, httpAddr, err.Error())
}
log.Printf("set peer for %s to %s", raftAddr, httpAddr)

@ -65,11 +65,17 @@ type Mux struct {
Logger *log.Logger
}
// NewMux returns a new instance of Mux for ln.
func NewMux(ln net.Listener) *Mux {
// NewMux returns a new instance of Mux for ln. If adv is nil,
// then the addr of ln is used.
func NewMux(ln net.Listener, adv net.Addr) *Mux {
addr := adv
if addr == nil {
addr = ln.Addr()
}
return &Mux{
ln: ln,
addr: ln.Addr(),
addr: addr,
m: make(map[byte]*listener),
Timeout: DefaultTimeout,
Logger: log.New(os.Stderr, "[tcp] ", log.LstdFlags),
@ -78,7 +84,7 @@ func NewMux(ln net.Listener) *Mux {
// Serve handles connections from ln and multiplexes then across registered listener.
func (mux *Mux) Serve() error {
mux.Logger.Println("mux serving on", mux.addr)
mux.Logger.Printf("mux serving on %s, advertising %s", mux.ln.Addr().String(), mux.addr)
for {
// Wait for the next connection.

@ -34,7 +34,7 @@ func TestMux(t *testing.T) {
defer tcpListener.Close()
// Setup muxer & listeners.
mux := NewMux(tcpListener)
mux := NewMux(tcpListener, nil)
mux.Timeout = 200 * time.Millisecond
if !testing.Verbose() {
mux.Logger = log.New(ioutil.Discard, "", 0)
@ -120,6 +120,32 @@ func TestMux(t *testing.T) {
}
}
func TestMux_Advertise(t *testing.T) {
// Setup muxer.
tcpListener, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
defer tcpListener.Close()
addr := &mockAddr{
Nwk: "tcp",
Addr: "rqlite.com:8081",
}
mux := NewMux(tcpListener, addr)
mux.Timeout = 200 * time.Millisecond
if !testing.Verbose() {
mux.Logger = log.New(ioutil.Discard, "", 0)
}
layer := mux.Listen(1)
if layer.Addr().String() != addr.Addr {
t.Fatalf("layer advertise address not correct, exp %s, got %s",
layer.Addr().String(), addr.Addr)
}
}
// Ensure two handlers cannot be registered for the same header byte.
func TestMux_Listen_ErrAlreadyRegistered(t *testing.T) {
defer func() {
@ -133,7 +159,20 @@ func TestMux_Listen_ErrAlreadyRegistered(t *testing.T) {
if err != nil {
t.Fatal(err)
}
mux := NewMux(tcpListener)
mux := NewMux(tcpListener, nil)
mux.Listen(5)
mux.Listen(5)
}
type mockAddr struct {
Nwk string
Addr string
}
func (m *mockAddr) Network() string {
return m.Nwk
}
func (m *mockAddr) String() string {
return m.Addr
}

Loading…
Cancel
Save