1
0
Fork 0

Merge pull request #98 from otoolep/leader_forwarding_mux

Add mux for TCP connections
master
Philip O'Toole 9 years ago
commit 918f37d2f8

@ -12,10 +12,10 @@ type networkLayer struct {
} }
// newNetworkLayer returns a new instance of networkLayer. // newNetworkLayer returns a new instance of networkLayer.
func newNetworkLayer(ln net.Listener) *networkLayer { func newNetworkLayer(ln net.Listener, addr net.Addr) *networkLayer {
return &networkLayer{ return &networkLayer{
ln: ln, ln: ln,
addr: ln.Addr(), addr: addr,
} }
} }
@ -26,7 +26,18 @@ func (l *networkLayer) Addr() net.Addr {
// Dial creates a new network connection. // Dial creates a new network connection.
func (l *networkLayer) Dial(addr string, timeout time.Duration) (net.Conn, error) { func (l *networkLayer) Dial(addr string, timeout time.Duration) (net.Conn, error) {
return net.DialTimeout("tcp", addr, timeout) conn, err := net.DialTimeout("tcp", addr, timeout)
if err != nil {
return nil, err
}
// Write a marker byte for raft messages.
_, err = conn.Write([]byte{muxRaftHeader})
if err != nil {
conn.Close()
return nil, err
}
return conn, err
} }
// Accept waits for the next connection. // Accept waits for the next connection.

@ -5,6 +5,7 @@ package store
import ( import (
"bytes" "bytes"
"encoding/binary"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
@ -20,6 +21,13 @@ import (
"github.com/hashicorp/raft" "github.com/hashicorp/raft"
"github.com/hashicorp/raft-boltdb" "github.com/hashicorp/raft-boltdb"
sql "github.com/otoolep/rqlite/db" sql "github.com/otoolep/rqlite/db"
mux "github.com/otoolep/rqlite/tcp"
)
var (
// ErrFieldsRequired is returned when a node attempts to execute a leader-only
// operation.
ErrNotLeader = errors.New("not leader")
) )
const ( const (
@ -30,12 +38,48 @@ const (
appliedWaitDelay = 100 * time.Millisecond appliedWaitDelay = 100 * time.Millisecond
) )
var ( const (
// ErrFieldsRequired is returned when a node attempts to execute a leader-only muxRaftHeader = 1 // Raft consensus communications
// operation. muxMetaHeader = 2 // Cluster meta communications
ErrNotLeader = errors.New("not leader")
) )
// commandType are commands that affect the state of the cluster, and must go through Raft.
type commandType int
const (
execute commandType = iota // Commands which modify the database.
query // Commands which query the database.
peer // Commands that modify peers map.
)
type command struct {
Typ commandType `json:"typ,omitempty"`
Sub json.RawMessage `json:"sub,omitempty"`
}
func newCommand(t commandType, d interface{}) (*command, error) {
b, err := json.Marshal(d)
if err != nil {
return nil, err
}
return &command{
Typ: t,
Sub: b,
}, nil
}
// databaseSub is a command sub which involves interaction with the database.
type databaseSub struct {
Tx bool `json:"tx,omitempty"`
Queries []string `json:"queries,omitempty"`
Timings bool `json:"timings,omitempty"`
}
// peersSub is a command which sets the API address for a Raft address.
type peersSub map[string]string
// ConsistencyLevel represents the available read consistency levels.
type ConsistencyLevel int type ConsistencyLevel int
const ( const (
@ -44,18 +88,16 @@ const (
Strong Strong
) )
type commandType int // clusterMeta represents cluster meta which must be kept in consensus.
type clusterMeta struct {
const ( APIPeers map[string]string // Map from Raft address to API address
execute commandType = iota }
query
)
type command struct { // newClusterMeta returns an initialized cluster meta store.
Typ commandType `json:"typ,omitempty"` func newClusterMeta() *clusterMeta {
Tx bool `json:"tx,omitempty"` return &clusterMeta{
Queries []string `json:"queries,omitempty"` APIPeers: make(map[string]string),
Timings bool `json:"timings,omitempty"` }
} }
// DBConfig represents the configuration of the underlying SQLite database. // DBConfig represents the configuration of the underlying SQLite database.
@ -73,6 +115,7 @@ func NewDBConfig(dsn string, memory bool) *DBConfig {
type Store struct { type Store struct {
raftDir string raftDir string
raftBind string raftBind string
mux *mux.Mux
mu sync.RWMutex // Sync access between queries and snapshots. mu sync.RWMutex // Sync access between queries and snapshots.
@ -82,6 +125,10 @@ type Store struct {
dbPath string // Path to underlying SQLite file, if not in-memory. dbPath string // Path to underlying SQLite file, if not in-memory.
db *sql.DB // The underlying SQLite store. db *sql.DB // The underlying SQLite store.
metaMu sync.RWMutex
meta *clusterMeta
metaLn net.Listener
logger *log.Logger logger *log.Logger
} }
@ -90,8 +137,10 @@ func New(dbConf *DBConfig, dir, bind string) *Store {
return &Store{ return &Store{
raftDir: dir, raftDir: dir,
raftBind: bind, raftBind: bind,
mux: mux.NewMux(),
dbConf: dbConf, dbConf: dbConf,
dbPath: filepath.Join(dir, sqliteFile), dbPath: filepath.Join(dir, sqliteFile),
meta: newClusterMeta(),
logger: log.New(os.Stderr, "[store] ", log.LstdFlags), logger: log.New(os.Stderr, "[store] ", log.LstdFlags),
} }
} }
@ -142,12 +191,19 @@ func (s *Store) Open(enableSingle bool) error {
config.DisableBootstrapAfterElect = false config.DisableBootstrapAfterElect = false
} }
// Setup Raft communication. // Set up TCP communication between nodes.
ln, err := net.Listen("tcp", s.raftBind) ln, err := net.Listen("tcp", s.raftBind)
if err != nil { if err != nil {
return err return err
} }
s.ln = newNetworkLayer(ln) go s.mux.Serve(ln)
// Setup meta updates communication
s.metaLn = s.mux.Listen(muxMetaHeader)
go s.serveMeta()
// Setup Raft communication.
s.ln = newNetworkLayer(s.mux.Listen(muxRaftHeader), ln.Addr())
transport := raft.NewNetworkTransport(s.ln, 3, 10*time.Second, os.Stderr) transport := raft.NewNetworkTransport(s.ln, 3, 10*time.Second, os.Stderr)
// Create peer storage. // Create peer storage.
@ -204,6 +260,18 @@ func (s *Store) Leader() string {
return s.raft.Leader() return s.raft.Leader()
} }
// APIPeers return the map of Raft addresses to API addresses.
func (s *Store) APIPeers() (map[string]string, error) {
s.metaMu.RLock()
defer s.metaMu.RUnlock()
peers := make(map[string]string, len(s.meta.APIPeers))
for k, v := range s.meta.APIPeers {
peers[k] = v
}
return peers, nil
}
// WaitForLeader blocks until a leader is detected, or the timeout expires. // WaitForLeader blocks until a leader is detected, or the timeout expires.
func (s *Store) WaitForLeader(timeout time.Duration) (string, error) { func (s *Store) WaitForLeader(timeout time.Duration) (string, error) {
tck := time.NewTicker(leaderWaitDelay) tck := time.NewTicker(leaderWaitDelay)
@ -276,12 +344,15 @@ func (s *Store) Execute(queries []string, timings, tx bool) ([]*sql.Result, erro
return nil, ErrNotLeader return nil, ErrNotLeader
} }
c := &command{ d := &databaseSub{
Typ: execute,
Tx: tx, Tx: tx,
Queries: queries, Queries: queries,
Timings: timings, Timings: timings,
} }
c, err := newCommand(execute, d)
if err != nil {
return nil, err
}
b, err := json.Marshal(c) b, err := json.Marshal(c)
if err != nil { if err != nil {
return nil, err return nil, err
@ -327,12 +398,15 @@ func (s *Store) Query(queries []string, timings, tx bool, lvl ConsistencyLevel)
defer s.mu.RUnlock() defer s.mu.RUnlock()
if lvl == Strong { if lvl == Strong {
c := &command{ d := &databaseSub{
Typ: query,
Tx: tx, Tx: tx,
Queries: queries, Queries: queries,
Timings: timings, Timings: timings,
} }
c, err := newCommand(query, d)
if err != nil {
return nil, err
}
b, err := json.Marshal(c) b, err := json.Marshal(c)
if err != nil { if err != nil {
return nil, err return nil, err
@ -355,6 +429,24 @@ func (s *Store) Query(queries []string, timings, tx bool, lvl ConsistencyLevel)
return r, err return r, err
} }
// UpdateAPIPeers updates the cluster-wide peer information.
func (s *Store) UpdateAPIPeers(peers map[string]string) error {
c, err := newCommand(peer, peers)
if err != nil {
return err
}
b, err := json.Marshal(c)
if err != nil {
return err
}
f := s.raft.Apply(b, raftTimeout)
if e := f.(raft.Future); e.Error() != nil {
return e.Error()
}
return nil
}
// Join joins a node, located at addr, to this store. The node must be ready to // Join joins a node, located at addr, to this store. The node must be ready to
// respond to Raft communications at that address. // respond to Raft communications at that address.
func (s *Store) Join(addr string) error { func (s *Store) Join(addr string) error {
@ -368,6 +460,35 @@ func (s *Store) Join(addr string) error {
return nil return nil
} }
// serveMeta accepts new connections to the meta server.
func (s *Store) serveMeta() error {
for {
conn, err := s.metaLn.Accept()
if err != nil {
return err
}
go s.handleMetaConn(conn)
}
}
// handleMetaConn processes individual connections to the meta server.
func (s *Store) handleMetaConn(conn net.Conn) error {
defer conn.Close()
// Only handles peers updates for now.
peers := make(map[string]string)
d := json.NewDecoder(conn)
err := d.Decode(&peers)
if err != nil {
return err
}
// Update the peers.
return s.UpdateAPIPeers(peers)
}
type fsmExecuteResponse struct { type fsmExecuteResponse struct {
results []*sql.Result results []*sql.Result
error error error error
@ -378,19 +499,45 @@ type fsmQueryResponse struct {
error error error error
} }
type fsmGenericResponse struct {
error error
}
// Apply applies a Raft log entry to the database. // Apply applies a Raft log entry to the database.
func (s *Store) Apply(l *raft.Log) interface{} { func (s *Store) Apply(l *raft.Log) interface{} {
var c command var c command
if err := json.Unmarshal(l.Data, &c); err != nil { if err := json.Unmarshal(l.Data, &c); err != nil {
panic(fmt.Sprintf("failed to unmarshal command: %s", err.Error())) panic(fmt.Sprintf("failed to unmarshal cluster command: %s", err.Error()))
} }
if c.Typ == execute { switch c.Typ {
r, err := s.db.Execute(c.Queries, c.Tx, c.Timings) case execute, query:
return &fsmExecuteResponse{results: r, error: err} var d databaseSub
if err := json.Unmarshal(c.Sub, &d); err != nil {
return &fsmGenericResponse{error: err}
}
if c.Typ == execute {
r, err := s.db.Execute(d.Queries, d.Tx, d.Timings)
return &fsmExecuteResponse{results: r, error: err}
}
r, err := s.db.Query(d.Queries, d.Tx, d.Timings)
return &fsmQueryResponse{rows: r, error: err}
case peer:
var d peersSub
if err := json.Unmarshal(c.Sub, &d); err != nil {
return &fsmGenericResponse{error: err}
}
func() {
s.metaMu.Lock()
defer s.metaMu.Unlock()
for k, v := range d {
s.meta.APIPeers[k] = v
}
}()
return &fsmGenericResponse{}
default:
return &fsmGenericResponse{error: fmt.Errorf("unknown command: %v", c.Typ)}
} }
r, err := s.db.Query(c.Queries, c.Tx, c.Timings)
return &fsmQueryResponse{rows: r, error: err}
} }
// Snapshot returns a snapshot of the database. The caller must ensure that // Snapshot returns a snapshot of the database. The caller must ensure that
@ -415,29 +562,45 @@ func (s *Store) Snapshot() (raft.FSMSnapshot, error) {
return nil, err return nil, err
} }
b, err := ioutil.ReadFile(f.Name()) fsm := &fsmSnapshot{}
fsm.database, err = ioutil.ReadFile(f.Name())
if err != nil {
log.Printf("Failed to read database for snapshot: %s", err.Error())
return nil, err
}
fsm.meta, err = json.Marshal(s.meta)
if err != nil { if err != nil {
log.Printf("Failed to generate snapshot: %s", err.Error()) log.Printf("Failed to encode meta for snapshot: %s", err.Error())
return nil, err return nil, err
} }
return &fsmSnapshot{data: b}, nil
return fsm, nil
} }
// Restore restores the database to a previous state. // Restore restores the node to a previous state.
func (s *Store) Restore(rc io.ReadCloser) error { func (s *Store) Restore(rc io.ReadCloser) error {
if err := s.db.Close(); err != nil { if err := s.db.Close(); err != nil {
return err return err
} }
b, err := ioutil.ReadAll(rc) // Get size of database.
if err != nil { var sz uint64
if err := binary.Read(rc, binary.LittleEndian, &sz); err != nil {
return err
}
// Now read in the database file data and restore.
database := make([]byte, sz)
if _, err := io.ReadFull(rc, database); err != nil {
return err return err
} }
var db *sql.DB var db *sql.DB
var err error
if !s.dbConf.Memory { if !s.dbConf.Memory {
// Write snapshot over any existing database file. // Write snapshot over any existing database file.
if err := ioutil.WriteFile(s.dbPath, b, 0660); err != nil { if err := ioutil.WriteFile(s.dbPath, database, 0660); err != nil {
return err return err
} }
@ -455,7 +618,7 @@ func (s *Store) Restore(rc io.ReadCloser) error {
f.Close() f.Close()
defer os.Remove(f.Name()) defer os.Remove(f.Name())
if err := ioutil.WriteFile(f.Name(), b, 0660); err != nil { if err := ioutil.WriteFile(f.Name(), database, 0660); err != nil {
return err return err
} }
@ -467,18 +630,45 @@ func (s *Store) Restore(rc io.ReadCloser) error {
} }
s.db = db s.db = db
return nil // Read remaining bytes, and set to cluster meta.
b, err := ioutil.ReadAll(rc)
if err != nil {
return err
}
return func() error {
s.metaMu.Lock()
defer s.metaMu.Unlock()
return json.Unmarshal(b, &s.meta)
}()
} }
type fsmSnapshot struct { type fsmSnapshot struct {
data []byte database []byte
meta []byte
} }
// Persist writes the snapshot to the give sink. // Persist writes the snapshot to the given sink.
func (f *fsmSnapshot) Persist(sink raft.SnapshotSink) error { func (f *fsmSnapshot) Persist(sink raft.SnapshotSink) error {
err := func() error { err := func() error {
// Write data to sink. // Start by writing size of database.
if _, err := sink.Write(f.data); err != nil { b := new(bytes.Buffer)
sz := uint64(len(f.database))
err := binary.Write(b, binary.LittleEndian, sz)
if err != nil {
return err
}
if _, err := sink.Write(b.Bytes()); err != nil {
return err
}
// Next write database to sink.
if _, err := sink.Write(f.database); err != nil {
return err
}
// Finally write the meta.
if _, err := sink.Write(f.meta); err != nil {
return err return err
} }

@ -3,8 +3,10 @@ package store
import ( import (
"encoding/json" "encoding/json"
"io/ioutil" "io/ioutil"
"net"
"os" "os"
"path/filepath" "path/filepath"
"reflect"
"testing" "testing"
"time" "time"
) )
@ -349,6 +351,58 @@ func Test_SingleNodeSnapshotInMem(t *testing.T) {
} }
} }
func Test_APIPeers(t *testing.T) {
s := mustNewStore(false)
defer os.RemoveAll(s.Path())
if err := s.Open(true); err != nil {
t.Fatalf("failed to open single-node store: %s", err.Error())
}
defer s.Close(true)
s.WaitForLeader(10 * time.Second)
peers := map[string]string{
"localhost:4002": "localhost:4001",
"localhost:4004": "localhost:4003",
}
if err := s.UpdateAPIPeers(peers); err != nil {
t.Fatalf("failed to update API peers: %s", err.Error())
}
// Retrieve peers and verify them.
apiPeers, err := s.APIPeers()
if err != nil {
t.Fatalf("failed to retrieve API peers: %s", err.Error())
}
if !reflect.DeepEqual(peers, apiPeers) {
t.Fatalf("set and retrieved API peers not identical, got %v, exp %v",
apiPeers, peers)
}
}
func Test_MetaServer(t *testing.T) {
t.Skip()
s := mustNewStore(false)
defer os.RemoveAll(s.Path())
if err := s.Open(true); err != nil {
t.Fatalf("failed to open single-node store: %s", err.Error())
}
defer s.Close(true)
s.WaitForLeader(10 * time.Second)
raddr, err := net.ResolveTCPAddr("tcp", "localhost:4002")
if err != nil {
t.Fatalf("failed to resolve remote address: %s", err.Error())
}
conn, err := net.DialTCP("tcp4", nil, raddr)
if err != nil {
t.Fatalf("failed to connect to remote address: %s", err.Error())
}
conn.Write([]byte{2})
conn.Write([]byte(`{"localhost:4002": "localhost:4001"}`))
}
func mustNewStore(inmem bool) *Store { func mustNewStore(inmem bool) *Store {
path := mustTempDir() path := mustTempDir()
defer os.RemoveAll(path) defer os.RemoveAll(path)

@ -0,0 +1,4 @@
/*
Package tcp provides various TCP-related utilities. The TCP mux code provided by this package originated with InfluxDB.
*/
package tcp

@ -0,0 +1,154 @@
package tcp
import (
"errors"
"fmt"
"io"
"log"
"net"
"os"
"sync"
"time"
)
const (
// DefaultTimeout is the default length of time to wait for first byte.
DefaultTimeout = 30 * time.Second
)
// Mux multiplexes a network connection.
type Mux struct {
ln net.Listener
m map[byte]*listener
wg sync.WaitGroup
// The amount of time to wait for the first header byte.
Timeout time.Duration
// Out-of-band error logger
Logger *log.Logger
}
// NewMux returns a new instance of Mux for ln.
func NewMux() *Mux {
return &Mux{
m: make(map[byte]*listener),
Timeout: DefaultTimeout,
Logger: log.New(os.Stderr, "[tcp] ", log.LstdFlags),
}
}
// Serve handles connections from ln and multiplexes then across registered listener.
func (mux *Mux) Serve(ln net.Listener) error {
for {
// Wait for the next connection.
// If it returns a temporary error then simply retry.
// If it returns any other error then exit immediately.
conn, err := ln.Accept()
if err, ok := err.(interface {
Temporary() bool
}); ok && err.Temporary() {
continue
}
if err != nil {
// Wait for all connections to be demuxed
mux.wg.Wait()
for _, ln := range mux.m {
close(ln.c)
}
return err
}
// Demux in a goroutine to
mux.wg.Add(1)
go mux.handleConn(conn)
}
}
func (mux *Mux) handleConn(conn net.Conn) {
defer mux.wg.Done()
// Set a read deadline so connections with no data don't timeout.
if err := conn.SetReadDeadline(time.Now().Add(mux.Timeout)); err != nil {
conn.Close()
mux.Logger.Printf("tcp.Mux: cannot set read deadline: %s", err)
return
}
// Read first byte from connection to determine handler.
var typ [1]byte
if _, err := io.ReadFull(conn, typ[:]); err != nil {
conn.Close()
mux.Logger.Printf("tcp.Mux: cannot read header byte: %s", err)
return
}
// Reset read deadline and let the listener handle that.
if err := conn.SetReadDeadline(time.Time{}); err != nil {
conn.Close()
mux.Logger.Printf("tcp.Mux: cannot reset set read deadline: %s", err)
return
}
// Retrieve handler based on first byte.
handler := mux.m[typ[0]]
if handler == nil {
conn.Close()
mux.Logger.Printf("tcp.Mux: handler not registered: %d", typ[0])
return
}
// Send connection to handler. The handler is responsible for closing the connection.
handler.c <- conn
}
// Listen returns a listener identified by header.
// Any connection accepted by mux is multiplexed based on the initial header byte.
func (mux *Mux) Listen(header byte) net.Listener {
// Ensure two listeners are not created for the same header byte.
if _, ok := mux.m[header]; ok {
panic(fmt.Sprintf("listener already registered under header byte: %d", header))
}
// Create a new listener and assign it.
ln := &listener{
c: make(chan net.Conn),
}
mux.m[header] = ln
return ln
}
// listener is a receiver for connections received by Mux.
type listener struct {
c chan net.Conn
}
// Accept waits for and returns the next connection to the listener.
func (ln *listener) Accept() (c net.Conn, err error) {
conn, ok := <-ln.c
if !ok {
return nil, errors.New("network connection closed")
}
return conn, nil
}
// Close is a no-op. The mux's listener should be closed instead.
func (ln *listener) Close() error { return nil }
// Addr always returns nil.
func (ln *listener) Addr() net.Addr { return nil }
// Dial connects to a remote mux listener with a given header byte.
func Dial(network, address string, header byte) (net.Conn, error) {
conn, err := net.Dial(network, address)
if err != nil {
return nil, err
}
if _, err := conn.Write([]byte{header}); err != nil {
return nil, fmt.Errorf("write mux header: %s", err)
}
return conn, nil
}

@ -0,0 +1,135 @@
package tcp
import (
"bytes"
"io"
"io/ioutil"
"log"
"net"
"strings"
"sync"
"testing"
"testing/quick"
"time"
)
// Ensure the muxer can split a listener's connections across multiple listeners.
func TestMux(t *testing.T) {
if err := quick.Check(func(n uint8, msg []byte) bool {
if testing.Verbose() {
if len(msg) == 0 {
log.Printf("n=%d, <no message>", n)
} else {
log.Printf("n=%d, hdr=%d, len=%d", n, msg[0], len(msg))
}
}
var wg sync.WaitGroup
// Open single listener on random port.
tcpListener, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
defer tcpListener.Close()
// Setup muxer & listeners.
mux := NewMux()
mux.Timeout = 200 * time.Millisecond
if !testing.Verbose() {
mux.Logger = log.New(ioutil.Discard, "", 0)
}
for i := uint8(0); i < n; i++ {
ln := mux.Listen(byte(i))
wg.Add(1)
go func(i uint8, ln net.Listener) {
defer wg.Done()
// Wait for a connection for this listener.
conn, err := ln.Accept()
if conn != nil {
defer conn.Close()
}
// If there is no message or the header byte
// doesn't match then expect close.
if len(msg) == 0 || msg[0] != byte(i) {
if err == nil || err.Error() != "network connection closed" {
t.Fatalf("unexpected error: %s", err)
}
return
}
// If the header byte matches this listener
// then expect a connection and read the message.
var buf bytes.Buffer
if _, err := io.CopyN(&buf, conn, int64(len(msg)-1)); err != nil {
t.Fatal(err)
} else if !bytes.Equal(msg[1:], buf.Bytes()) {
t.Fatalf("message mismatch:\n\nexp=%x\n\ngot=%x\n\n", msg[1:], buf.Bytes())
}
// Write response.
if _, err := conn.Write([]byte("OK")); err != nil {
t.Fatal(err)
}
}(i, ln)
}
// Begin serving from the listener.
go mux.Serve(tcpListener)
// Write message to TCP listener and read OK response.
conn, err := net.Dial("tcp", tcpListener.Addr().String())
if err != nil {
t.Fatal(err)
} else if _, err = conn.Write(msg); err != nil {
t.Fatal(err)
}
// Read the response into the buffer.
var resp [2]byte
_, err = io.ReadFull(conn, resp[:])
// If the message header is less than n then expect a response.
// Otherwise we should get an EOF because the mux closed.
if len(msg) > 0 && uint8(msg[0]) < n {
if string(resp[:]) != `OK` {
t.Fatalf("unexpected response: %s", resp[:])
}
} else {
if err == nil || (err != io.EOF && !(strings.Contains(err.Error(), "connection reset by peer") ||
strings.Contains(err.Error(), "closed by the remote host"))) {
t.Fatalf("unexpected error: %s", err)
}
}
// Close connection.
if err := conn.Close(); err != nil {
t.Fatal(err)
}
// Close original TCP listener and wait for all goroutines to close.
tcpListener.Close()
wg.Wait()
return true
}, nil); err != nil {
t.Error(err)
}
}
// Ensure two handlers cannot be registered for the same header byte.
func TestMux_Listen_ErrAlreadyRegistered(t *testing.T) {
defer func() {
if r := recover(); r != `listener already registered under header byte: 5` {
t.Fatalf("unexpected recover: %#v", r)
}
}()
// Register two listeners with the same header byte.
mux := NewMux()
mux.Listen(5)
mux.Listen(5)
}
Loading…
Cancel
Save