From 16471b1ee29422b9c4e75f9b4a5d07fdc91fed20 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Sat, 23 Apr 2016 10:44:26 -0700 Subject: [PATCH 1/9] Add mux for TCP connections --- tcp/doc.go | 4 ++ tcp/mux.go | 154 ++++++++++++++++++++++++++++++++++++++++++++++++ tcp/mux_test.go | 135 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 293 insertions(+) create mode 100644 tcp/doc.go create mode 100644 tcp/mux.go create mode 100644 tcp/mux_test.go diff --git a/tcp/doc.go b/tcp/doc.go new file mode 100644 index 00000000..8630ff9b --- /dev/null +++ b/tcp/doc.go @@ -0,0 +1,4 @@ +/* +Package tcp provides various TCP-related utilities. The TCP mux code provided by this package originated with InfluxDB. +*/ +package tcp diff --git a/tcp/mux.go b/tcp/mux.go new file mode 100644 index 00000000..98ce9917 --- /dev/null +++ b/tcp/mux.go @@ -0,0 +1,154 @@ +package tcp + +import ( + "errors" + "fmt" + "io" + "log" + "net" + "os" + "sync" + "time" +) + +const ( + // DefaultTimeout is the default length of time to wait for first byte. + DefaultTimeout = 30 * time.Second +) + +// Mux multiplexes a network connection. +type Mux struct { + ln net.Listener + m map[byte]*listener + + wg sync.WaitGroup + + // The amount of time to wait for the first header byte. + Timeout time.Duration + + // Out-of-band error logger + Logger *log.Logger +} + +// NewMux returns a new instance of Mux for ln. +func NewMux() *Mux { + return &Mux{ + m: make(map[byte]*listener), + Timeout: DefaultTimeout, + Logger: log.New(os.Stderr, "[tcp] ", log.LstdFlags), + } +} + +// Serve handles connections from ln and multiplexes then across registered listener. +func (mux *Mux) Serve(ln net.Listener) error { + for { + // Wait for the next connection. + // If it returns a temporary error then simply retry. + // If it returns any other error then exit immediately. + conn, err := ln.Accept() + if err, ok := err.(interface { + Temporary() bool + }); ok && err.Temporary() { + continue + } + if err != nil { + // Wait for all connections to be demux + mux.wg.Wait() + for _, ln := range mux.m { + close(ln.c) + } + return err + } + + // Demux in a goroutine to + mux.wg.Add(1) + go mux.handleConn(conn) + } +} + +func (mux *Mux) handleConn(conn net.Conn) { + defer mux.wg.Done() + // Set a read deadline so connections with no data don't timeout. + if err := conn.SetReadDeadline(time.Now().Add(mux.Timeout)); err != nil { + conn.Close() + mux.Logger.Printf("tcp.Mux: cannot set read deadline: %s", err) + return + } + + // Read first byte from connection to determine handler. + var typ [1]byte + if _, err := io.ReadFull(conn, typ[:]); err != nil { + conn.Close() + mux.Logger.Printf("tcp.Mux: cannot read header byte: %s", err) + return + } + + // Reset read deadline and let the listener handle that. + if err := conn.SetReadDeadline(time.Time{}); err != nil { + conn.Close() + mux.Logger.Printf("tcp.Mux: cannot reset set read deadline: %s", err) + return + } + + // Retrieve handler based on first byte. + handler := mux.m[typ[0]] + if handler == nil { + conn.Close() + mux.Logger.Printf("tcp.Mux: handler not registered: %d", typ[0]) + return + } + + // Send connection to handler. The handler is responsible for closing the connection. + handler.c <- conn +} + +// Listen returns a listener identified by header. +// Any connection accepted by mux is multiplexed based on the initial header byte. +func (mux *Mux) Listen(header byte) net.Listener { + // Ensure two listeners are not created for the same header byte. + if _, ok := mux.m[header]; ok { + panic(fmt.Sprintf("listener already registered under header byte: %d", header)) + } + + // Create a new listener and assign it. + ln := &listener{ + c: make(chan net.Conn), + } + mux.m[header] = ln + + return ln +} + +// listener is a receiver for connections received by Mux. +type listener struct { + c chan net.Conn +} + +// Accept waits for and returns the next connection to the listener. +func (ln *listener) Accept() (c net.Conn, err error) { + conn, ok := <-ln.c + if !ok { + return nil, errors.New("network connection closed") + } + return conn, nil +} + +// Close is a no-op. The mux's listener should be closed instead. +func (ln *listener) Close() error { return nil } + +// Addr always returns nil. +func (ln *listener) Addr() net.Addr { return nil } + +// Dial connects to a remote mux listener with a given header byte. +func Dial(network, address string, header byte) (net.Conn, error) { + conn, err := net.Dial(network, address) + if err != nil { + return nil, err + } + + if _, err := conn.Write([]byte{header}); err != nil { + return nil, fmt.Errorf("write mux header: %s", err) + } + + return conn, nil +} diff --git a/tcp/mux_test.go b/tcp/mux_test.go new file mode 100644 index 00000000..d72bf9dd --- /dev/null +++ b/tcp/mux_test.go @@ -0,0 +1,135 @@ +package tcp + +import ( + "bytes" + "io" + "io/ioutil" + "log" + "net" + "strings" + "sync" + "testing" + "testing/quick" + "time" +) + +// Ensure the muxer can split a listener's connections across multiple listeners. +func TestMux(t *testing.T) { + if err := quick.Check(func(n uint8, msg []byte) bool { + if testing.Verbose() { + if len(msg) == 0 { + log.Printf("n=%d, ", n) + } else { + log.Printf("n=%d, hdr=%d, len=%d", n, msg[0], len(msg)) + } + } + + var wg sync.WaitGroup + + // Open single listener on random port. + tcpListener, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + defer tcpListener.Close() + + // Setup muxer & listeners. + mux := NewMux() + mux.Timeout = 200 * time.Millisecond + if !testing.Verbose() { + mux.Logger = log.New(ioutil.Discard, "", 0) + } + for i := uint8(0); i < n; i++ { + ln := mux.Listen(byte(i)) + + wg.Add(1) + go func(i uint8, ln net.Listener) { + defer wg.Done() + + // Wait for a connection for this listener. + conn, err := ln.Accept() + if conn != nil { + defer conn.Close() + } + + // If there is no message or the header byte + // doesn't match then expect close. + if len(msg) == 0 || msg[0] != byte(i) { + if err == nil || err.Error() != "network connection closed" { + t.Fatalf("unexpected error: %s", err) + } + return + } + + // If the header byte matches this listener + // then expect a connection and read the message. + var buf bytes.Buffer + if _, err := io.CopyN(&buf, conn, int64(len(msg)-1)); err != nil { + t.Fatal(err) + } else if !bytes.Equal(msg[1:], buf.Bytes()) { + t.Fatalf("message mismatch:\n\nexp=%x\n\ngot=%x\n\n", msg[1:], buf.Bytes()) + } + + // Write response. + if _, err := conn.Write([]byte("OK")); err != nil { + t.Fatal(err) + } + }(i, ln) + } + + // Begin serving from the listener. + go mux.Serve(tcpListener) + + // Write message to TCP listener and read OK response. + conn, err := net.Dial("tcp", tcpListener.Addr().String()) + if err != nil { + t.Fatal(err) + } else if _, err = conn.Write(msg); err != nil { + t.Fatal(err) + } + + // Read the response into the buffer. + var resp [2]byte + _, err = io.ReadFull(conn, resp[:]) + + // If the message header is less than n then expect a response. + // Otherwise we should get an EOF because the mux closed. + if len(msg) > 0 && uint8(msg[0]) < n { + if string(resp[:]) != `OK` { + t.Fatalf("unexpected response: %s", resp[:]) + } + } else { + if err == nil || (err != io.EOF && !(strings.Contains(err.Error(), "connection reset by peer") || + strings.Contains(err.Error(), "closed by the remote host"))) { + t.Fatalf("unexpected error: %s", err) + } + } + + // Close connection. + if err := conn.Close(); err != nil { + t.Fatal(err) + } + + // Close original TCP listener and wait for all goroutines to close. + tcpListener.Close() + wg.Wait() + + return true + }, nil); err != nil { + t.Error(err) + } +} + +// Ensure two handlers cannot be registered for the same header byte. +func TestMux_Listen_ErrAlreadyRegistered(t *testing.T) { + defer func() { + if r := recover(); r != `listener already registered under header byte: 5` { + t.Fatalf("unexpected recover: %#v", r) + } + }() + + // Register two listeners with the same header byte. + mux := NewMux() + mux.Listen(5) + mux.Listen(5) +} From 099c7c18ac6fb22d9131afb7ef873ef1a60ef88f Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Sat, 23 Apr 2016 11:24:01 -0700 Subject: [PATCH 2/9] Enable TCP muxing layer for Raft commns --- store/network_layer.go | 17 ++++++++++++++--- store/store.go | 15 +++++++++++++-- tcp/mux.go | 2 +- 3 files changed, 28 insertions(+), 6 deletions(-) diff --git a/store/network_layer.go b/store/network_layer.go index d892c42a..b6040b06 100644 --- a/store/network_layer.go +++ b/store/network_layer.go @@ -12,10 +12,10 @@ type networkLayer struct { } // newNetworkLayer returns a new instance of networkLayer. -func newNetworkLayer(ln net.Listener) *networkLayer { +func newNetworkLayer(ln net.Listener, addr net.Addr) *networkLayer { return &networkLayer{ ln: ln, - addr: ln.Addr(), + addr: addr, } } @@ -26,7 +26,18 @@ func (l *networkLayer) Addr() net.Addr { // Dial creates a new network connection. func (l *networkLayer) Dial(addr string, timeout time.Duration) (net.Conn, error) { - return net.DialTimeout("tcp", addr, timeout) + conn, err := net.DialTimeout("tcp", addr, timeout) + if err != nil { + return nil, err + } + + // Write a marker byte for raft messages. + _, err = conn.Write([]byte{muxRaftHeader}) + if err != nil { + conn.Close() + return nil, err + } + return conn, err } // Accept waits for the next connection. diff --git a/store/store.go b/store/store.go index 3d3c5c10..c3317c79 100644 --- a/store/store.go +++ b/store/store.go @@ -20,6 +20,7 @@ import ( "github.com/hashicorp/raft" "github.com/hashicorp/raft-boltdb" sql "github.com/otoolep/rqlite/db" + mux "github.com/otoolep/rqlite/tcp" ) const ( @@ -30,6 +31,11 @@ const ( appliedWaitDelay = 100 * time.Millisecond ) +const ( + muxRaftHeader = 1 // Raft consensus communications + muxClusterHeader = 2 // Cluster communications +) + var ( // ErrFieldsRequired is returned when a node attempts to execute a leader-only // operation. @@ -73,6 +79,7 @@ func NewDBConfig(dsn string, memory bool) *DBConfig { type Store struct { raftDir string raftBind string + mux *mux.Mux mu sync.RWMutex // Sync access between queries and snapshots. @@ -90,6 +97,7 @@ func New(dbConf *DBConfig, dir, bind string) *Store { return &Store{ raftDir: dir, raftBind: bind, + mux: mux.NewMux(), dbConf: dbConf, dbPath: filepath.Join(dir, sqliteFile), logger: log.New(os.Stderr, "[store] ", log.LstdFlags), @@ -142,12 +150,15 @@ func (s *Store) Open(enableSingle bool) error { config.DisableBootstrapAfterElect = false } - // Setup Raft communication. + // Set up TCP communication between nodes. ln, err := net.Listen("tcp", s.raftBind) if err != nil { return err } - s.ln = newNetworkLayer(ln) + go s.mux.Serve(ln) + + // Setup Raft communication. + s.ln = newNetworkLayer(s.mux.Listen(muxRaftHeader), ln.Addr()) transport := raft.NewNetworkTransport(s.ln, 3, 10*time.Second, os.Stderr) // Create peer storage. diff --git a/tcp/mux.go b/tcp/mux.go index 98ce9917..8f62f82a 100644 --- a/tcp/mux.go +++ b/tcp/mux.go @@ -52,7 +52,7 @@ func (mux *Mux) Serve(ln net.Listener) error { continue } if err != nil { - // Wait for all connections to be demux + // Wait for all connections to be demuxed mux.wg.Wait() for _, ln := range mux.m { close(ln.c) From ed56f77714e82d419f83eeb3b6be471be5d6079f Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Sat, 23 Apr 2016 14:52:49 -0700 Subject: [PATCH 3/9] Add hierarchy to Raft commands --- store/store.go | 89 +++++++++++++++++++++++++++++++++++--------------- 1 file changed, 63 insertions(+), 26 deletions(-) diff --git a/store/store.go b/store/store.go index c3317c79..3edfe144 100644 --- a/store/store.go +++ b/store/store.go @@ -36,12 +36,40 @@ const ( muxClusterHeader = 2 // Cluster communications ) -var ( - // ErrFieldsRequired is returned when a node attempts to execute a leader-only - // operation. - ErrNotLeader = errors.New("not leader") +// commandType are commands that affect the state of the cluster, and must go through Raft. +type commandType int + +const ( + execute commandType = iota // Commands which modify the database. + query // Commands which query the database. + meta // Commands that affect cluster meta state. ) +type command struct { + Typ commandType `json:"typ,omitempty"` + Sub json.RawMessage `json:"sub,omitempty"` +} + +func newCommand(t commandType, d interface{}) (*command, error) { + b, err := json.Marshal(d) + if err != nil { + return nil, err + } + return &command{ + Typ: t, + Sub: b, + }, nil + +} + +// databaseSub is a command sub which involves interaction with the database. +type databaseSub struct { + Tx bool `json:"tx,omitempty"` + Queries []string `json:"queries,omitempty"` + Timings bool `json:"timings,omitempty"` +} + +// ConsistencyLevel represents the available read consistency levels. type ConsistencyLevel int const ( @@ -50,20 +78,12 @@ const ( Strong ) -type commandType int - -const ( - execute commandType = iota - query +var ( + // ErrFieldsRequired is returned when a node attempts to execute a leader-only + // operation. + ErrNotLeader = errors.New("not leader") ) -type command struct { - Typ commandType `json:"typ,omitempty"` - Tx bool `json:"tx,omitempty"` - Queries []string `json:"queries,omitempty"` - Timings bool `json:"timings,omitempty"` -} - // DBConfig represents the configuration of the underlying SQLite database. type DBConfig struct { DSN string // Any custom DSN @@ -287,12 +307,15 @@ func (s *Store) Execute(queries []string, timings, tx bool) ([]*sql.Result, erro return nil, ErrNotLeader } - c := &command{ - Typ: execute, + d := &databaseSub{ Tx: tx, Queries: queries, Timings: timings, } + c, err := newCommand(execute, d) + if err != nil { + return nil, err + } b, err := json.Marshal(c) if err != nil { return nil, err @@ -338,12 +361,15 @@ func (s *Store) Query(queries []string, timings, tx bool, lvl ConsistencyLevel) defer s.mu.RUnlock() if lvl == Strong { - c := &command{ - Typ: query, + d := &databaseSub{ Tx: tx, Queries: queries, Timings: timings, } + c, err := newCommand(query, d) + if err != nil { + return nil, err + } b, err := json.Marshal(c) if err != nil { return nil, err @@ -393,15 +419,26 @@ type fsmQueryResponse struct { func (s *Store) Apply(l *raft.Log) interface{} { var c command if err := json.Unmarshal(l.Data, &c); err != nil { - panic(fmt.Sprintf("failed to unmarshal command: %s", err.Error())) + panic(fmt.Sprintf("failed to unmarshal cluster command: %s", err.Error())) } - if c.Typ == execute { - r, err := s.db.Execute(c.Queries, c.Tx, c.Timings) - return &fsmExecuteResponse{results: r, error: err} + switch c.Typ { + case execute, query: + var d databaseSub + if err := json.Unmarshal(c.Sub, &d); err != nil { + panic(fmt.Sprintf("failed to unmarshal cluster command: %s", err.Error())) + } + if c.Typ == execute { + r, err := s.db.Execute(d.Queries, d.Tx, d.Timings) + return &fsmExecuteResponse{results: r, error: err} + } + r, err := s.db.Query(d.Queries, d.Tx, d.Timings) + return &fsmQueryResponse{rows: r, error: err} + case meta: + fallthrough + default: + panic("unsupported!") } - r, err := s.db.Query(c.Queries, c.Tx, c.Timings) - return &fsmQueryResponse{rows: r, error: err} } // Snapshot returns a snapshot of the database. The caller must ensure that From 99ebaf9767531ce3a87f8a4253b537f3be1a40bb Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Sat, 23 Apr 2016 15:06:49 -0700 Subject: [PATCH 4/9] Add support for peersSub --- store/store.go | 31 ++++++++++++++++++++++++++----- 1 file changed, 26 insertions(+), 5 deletions(-) diff --git a/store/store.go b/store/store.go index 3edfe144..dcb60a69 100644 --- a/store/store.go +++ b/store/store.go @@ -42,7 +42,7 @@ type commandType int const ( execute commandType = iota // Commands which modify the database. query // Commands which query the database. - meta // Commands that affect cluster meta state. + peer // Commands that modify peers map. ) type command struct { @@ -69,6 +69,9 @@ type databaseSub struct { Timings bool `json:"timings,omitempty"` } +// peersSub is a command which sets the API address for a Raft address. +type peersSub map[string]string + // ConsistencyLevel represents the available read consistency levels. type ConsistencyLevel int @@ -109,6 +112,9 @@ type Store struct { dbPath string // Path to underlying SQLite file, if not in-memory. db *sql.DB // The underlying SQLite store. + peersMu sync.RWMutex // Sync access between queries and snapshots. + peers map[string]string // Mapping from Raft addresses to API addresses + logger *log.Logger } @@ -415,6 +421,10 @@ type fsmQueryResponse struct { error error } +type fsmGenericResponse struct { + error error +} + // Apply applies a Raft log entry to the database. func (s *Store) Apply(l *raft.Log) interface{} { var c command @@ -426,7 +436,7 @@ func (s *Store) Apply(l *raft.Log) interface{} { case execute, query: var d databaseSub if err := json.Unmarshal(c.Sub, &d); err != nil { - panic(fmt.Sprintf("failed to unmarshal cluster command: %s", err.Error())) + return &fsmGenericResponse{error: err} } if c.Typ == execute { r, err := s.db.Execute(d.Queries, d.Tx, d.Timings) @@ -434,10 +444,21 @@ func (s *Store) Apply(l *raft.Log) interface{} { } r, err := s.db.Query(d.Queries, d.Tx, d.Timings) return &fsmQueryResponse{rows: r, error: err} - case meta: - fallthrough + case peer: + var d peersSub + if err := json.Unmarshal(c.Sub, &d); err != nil { + return &fsmGenericResponse{error: err} + } + func() { + s.peersMu.Lock() + defer s.peersMu.Unlock() + for k, v := range d { + s.peers[k] = v + } + }() + return &fsmGenericResponse{} default: - panic("unsupported!") + return &fsmGenericResponse{error: fmt.Errorf("unknown command: %v", c.Typ)} } } From 16e312514974c018c9586450d3b4d58b774e537e Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Sat, 23 Apr 2016 16:21:11 -0700 Subject: [PATCH 5/9] Add support for cluster meta This change required that the cluster meta become part of the Raft snapshot. --- store/store.go | 102 ++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 79 insertions(+), 23 deletions(-) diff --git a/store/store.go b/store/store.go index dcb60a69..419a41a2 100644 --- a/store/store.go +++ b/store/store.go @@ -5,6 +5,7 @@ package store import ( "bytes" + "encoding/binary" "encoding/json" "errors" "fmt" @@ -23,6 +24,12 @@ import ( mux "github.com/otoolep/rqlite/tcp" ) +var ( + // ErrFieldsRequired is returned when a node attempts to execute a leader-only + // operation. + ErrNotLeader = errors.New("not leader") +) + const ( retainSnapshotCount = 2 raftTimeout = 10 * time.Second @@ -81,11 +88,16 @@ const ( Strong ) -var ( - // ErrFieldsRequired is returned when a node attempts to execute a leader-only - // operation. - ErrNotLeader = errors.New("not leader") -) +// clusterMeta represents cluster meta which must be kept in consensus. +type clusterMeta struct { + APIPeers map[string]string // Map from Raft address to API address +} + +func newClusterMeta() *clusterMeta { + return &clusterMeta{ + APIPeers: make(map[string]string), + } +} // DBConfig represents the configuration of the underlying SQLite database. type DBConfig struct { @@ -112,8 +124,8 @@ type Store struct { dbPath string // Path to underlying SQLite file, if not in-memory. db *sql.DB // The underlying SQLite store. - peersMu sync.RWMutex // Sync access between queries and snapshots. - peers map[string]string // Mapping from Raft addresses to API addresses + metaMu sync.RWMutex + meta *clusterMeta logger *log.Logger } @@ -126,6 +138,7 @@ func New(dbConf *DBConfig, dir, bind string) *Store { mux: mux.NewMux(), dbConf: dbConf, dbPath: filepath.Join(dir, sqliteFile), + meta: newClusterMeta(), logger: log.New(os.Stderr, "[store] ", log.LstdFlags), } } @@ -450,10 +463,10 @@ func (s *Store) Apply(l *raft.Log) interface{} { return &fsmGenericResponse{error: err} } func() { - s.peersMu.Lock() - defer s.peersMu.Unlock() + s.metaMu.Lock() + defer s.metaMu.Unlock() for k, v := range d { - s.peers[k] = v + s.meta.APIPeers[k] = v } }() return &fsmGenericResponse{} @@ -484,29 +497,45 @@ func (s *Store) Snapshot() (raft.FSMSnapshot, error) { return nil, err } - b, err := ioutil.ReadFile(f.Name()) + fsm := &fsmSnapshot{} + fsm.database, err = ioutil.ReadFile(f.Name()) if err != nil { - log.Printf("Failed to generate snapshot: %s", err.Error()) + log.Printf("Failed to read database for snapshot: %s", err.Error()) return nil, err } - return &fsmSnapshot{data: b}, nil + + fsm.meta, err = json.Marshal(s.meta) + if err != nil { + log.Printf("Failed to encode meta for snapshot: %s", err.Error()) + return nil, err + } + + return fsm, nil } -// Restore restores the database to a previous state. +// Restore restores the node to a previous state. func (s *Store) Restore(rc io.ReadCloser) error { if err := s.db.Close(); err != nil { return err } - b, err := ioutil.ReadAll(rc) - if err != nil { + // Get size of database. + var sz uint64 + if err := binary.Read(rc, binary.LittleEndian, &sz); err != nil { + return err + } + + // Now read in the database file data and restore. + database := make([]byte, sz) + if _, err := io.ReadFull(rc, database); err != nil { return err } var db *sql.DB + var err error if !s.dbConf.Memory { // Write snapshot over any existing database file. - if err := ioutil.WriteFile(s.dbPath, b, 0660); err != nil { + if err := ioutil.WriteFile(s.dbPath, database, 0660); err != nil { return err } @@ -524,7 +553,7 @@ func (s *Store) Restore(rc io.ReadCloser) error { f.Close() defer os.Remove(f.Name()) - if err := ioutil.WriteFile(f.Name(), b, 0660); err != nil { + if err := ioutil.WriteFile(f.Name(), database, 0660); err != nil { return err } @@ -536,18 +565,45 @@ func (s *Store) Restore(rc io.ReadCloser) error { } s.db = db - return nil + // Read remaining bytes, and set to cluster meta. + b, err := ioutil.ReadAll(rc) + if err != nil { + return err + } + + return func() error { + s.metaMu.Lock() + defer s.metaMu.Unlock() + return json.Unmarshal(b, &s.meta) + }() } type fsmSnapshot struct { - data []byte + database []byte + meta []byte } -// Persist writes the snapshot to the give sink. +// Persist writes the snapshot to the given sink. func (f *fsmSnapshot) Persist(sink raft.SnapshotSink) error { err := func() error { - // Write data to sink. - if _, err := sink.Write(f.data); err != nil { + // Start by writing size of database. + b := new(bytes.Buffer) + sz := uint64(len(f.database)) + err := binary.Write(b, binary.LittleEndian, sz) + if err != nil { + return err + } + if _, err := sink.Write(b.Bytes()); err != nil { + return err + } + + // Next write database to sink. + if _, err := sink.Write(f.database); err != nil { + return err + } + + // Finally write the meta. + if _, err := sink.Write(f.meta); err != nil { return err } From 0bffa38319bdbc71a98dafd573df7f2dbbcebe29 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Sun, 24 Apr 2016 10:18:43 -0700 Subject: [PATCH 6/9] Start implementing cluster meta comms --- store/store.go | 24 ++++++++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/store/store.go b/store/store.go index 419a41a2..737a4264 100644 --- a/store/store.go +++ b/store/store.go @@ -39,8 +39,8 @@ const ( ) const ( - muxRaftHeader = 1 // Raft consensus communications - muxClusterHeader = 2 // Cluster communications + muxRaftHeader = 1 // Raft consensus communications + muxMetaHeader = 2 // Cluster meta communications ) // commandType are commands that affect the state of the cluster, and must go through Raft. @@ -126,6 +126,7 @@ type Store struct { metaMu sync.RWMutex meta *clusterMeta + metaLn net.Listener logger *log.Logger } @@ -196,6 +197,10 @@ func (s *Store) Open(enableSingle bool) error { } go s.mux.Serve(ln) + // Setup meta updates communication + s.metaLn = s.mux.Listen(muxMetaHeader) + go s.serveMeta() + // Setup Raft communication. s.ln = newNetworkLayer(s.mux.Listen(muxRaftHeader), ln.Addr()) transport := raft.NewNetworkTransport(s.ln, 3, 10*time.Second, os.Stderr) @@ -424,6 +429,21 @@ func (s *Store) Join(addr string) error { return nil } +func (s *Store) serveMeta() error { + for { + conn, err := s.metaLn.Accept() + if err != nil { + return err + } + + go s.handleMetaConn(conn) + } +} + +func (s *Store) handleMetaConn(conn net.Conn) error { + return nil +} + type fsmExecuteResponse struct { results []*sql.Result error error From 0a3454a67555f632cbb9473d248696423843f04f Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Sun, 24 Apr 2016 16:04:26 -0700 Subject: [PATCH 7/9] Methods on store to set and get API peers --- store/store.go | 30 ++++++++++++++++++++++++++++++ store/store_test.go | 31 +++++++++++++++++++++++++++++++ 2 files changed, 61 insertions(+) diff --git a/store/store.go b/store/store.go index 737a4264..34f70245 100644 --- a/store/store.go +++ b/store/store.go @@ -259,6 +259,18 @@ func (s *Store) Leader() string { return s.raft.Leader() } +// APIPeers return the map of Raft addresses to API addresses. +func (s *Store) APIPeers() (map[string]string, error) { + s.metaMu.RLock() + defer s.metaMu.RUnlock() + + peers := make(map[string]string, len(s.meta.APIPeers)) + for k, v := range s.meta.APIPeers { + peers[k] = v + } + return peers, nil +} + // WaitForLeader blocks until a leader is detected, or the timeout expires. func (s *Store) WaitForLeader(timeout time.Duration) (string, error) { tck := time.NewTicker(leaderWaitDelay) @@ -416,6 +428,24 @@ func (s *Store) Query(queries []string, timings, tx bool, lvl ConsistencyLevel) return r, err } +// UpdateAPIPeers updates the cluster-wide peer information. +func (s *Store) UpdateAPIPeers(peers map[string]string) error { + c, err := newCommand(peer, peers) + if err != nil { + return err + } + b, err := json.Marshal(c) + if err != nil { + return err + } + + f := s.raft.Apply(b, raftTimeout) + if e := f.(raft.Future); e.Error() != nil { + return e.Error() + } + return nil +} + // Join joins a node, located at addr, to this store. The node must be ready to // respond to Raft communications at that address. func (s *Store) Join(addr string) error { diff --git a/store/store_test.go b/store/store_test.go index 1cba908f..7cf35628 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -5,6 +5,7 @@ import ( "io/ioutil" "os" "path/filepath" + "reflect" "testing" "time" ) @@ -349,6 +350,36 @@ func Test_SingleNodeSnapshotInMem(t *testing.T) { } } +func Test_APIPeers(t *testing.T) { + s := mustNewStore(false) + defer os.RemoveAll(s.Path()) + + if err := s.Open(true); err != nil { + t.Fatalf("failed to open single-node store: %s", err.Error()) + } + defer s.Close(true) + s.WaitForLeader(10 * time.Second) + + peers := map[string]string{ + "localhost:4002": "localhost:4001", + "localhost:4004": "localhost:4003", + } + if err := s.UpdateAPIPeers(peers); err != nil { + t.Fatalf("failed to update API peers: %s", err.Error()) + } + + // Retrieve peers and verify them. + apiPeers, err := s.APIPeers() + if err != nil { + t.Fatalf("failed to retrieve API peers: %s", err.Error()) + } + if !reflect.DeepEqual(peers, apiPeers) { + t.Fatalf("set and retrieved API peers not identical, got %v, exp %v", + apiPeers, peers) + } + +} + func mustNewStore(inmem bool) *Store { path := mustTempDir() defer os.RemoveAll(path) From bdeda477740d0f057f923470bb0d87bfe930af56 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Sun, 24 Apr 2016 16:33:18 -0700 Subject: [PATCH 8/9] Initial implementation of Meta server This should really be a distinct cluster package. --- store/store.go | 19 ++++++++++++++++++- store/store_test.go | 23 +++++++++++++++++++++++ 2 files changed, 41 insertions(+), 1 deletion(-) diff --git a/store/store.go b/store/store.go index 34f70245..f770a8fc 100644 --- a/store/store.go +++ b/store/store.go @@ -93,6 +93,7 @@ type clusterMeta struct { APIPeers map[string]string // Map from Raft address to API address } +// newClusterMeta returns an initialized cluster meta store. func newClusterMeta() *clusterMeta { return &clusterMeta{ APIPeers: make(map[string]string), @@ -459,6 +460,7 @@ func (s *Store) Join(addr string) error { return nil } +// serveMeta accepts new connections to the meta server. func (s *Store) serveMeta() error { for { conn, err := s.metaLn.Accept() @@ -470,8 +472,23 @@ func (s *Store) serveMeta() error { } } +// handleMetaConn processes individual connections to the meta server. func (s *Store) handleMetaConn(conn net.Conn) error { - return nil + fmt.Println("handling meta connection") + defer conn.Close() + + // Only handles peers updates for now. + peers := make(map[string]string) + d := json.NewDecoder(conn) + + err := d.Decode(&peers) + if err != nil { + return err + } + + // Update the peers. + fmt.Printf("%v", peers) + return s.UpdateAPIPeers(peers) } type fsmExecuteResponse struct { diff --git a/store/store_test.go b/store/store_test.go index 7cf35628..e30f74e6 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -3,6 +3,7 @@ package store import ( "encoding/json" "io/ioutil" + "net" "os" "path/filepath" "reflect" @@ -377,7 +378,29 @@ func Test_APIPeers(t *testing.T) { t.Fatalf("set and retrieved API peers not identical, got %v, exp %v", apiPeers, peers) } +} + +func Test_MetaServer(t *testing.T) { + t.Skip() + s := mustNewStore(false) + defer os.RemoveAll(s.Path()) + + if err := s.Open(true); err != nil { + t.Fatalf("failed to open single-node store: %s", err.Error()) + } + defer s.Close(true) + s.WaitForLeader(10 * time.Second) + raddr, err := net.ResolveTCPAddr("tcp", "localhost:4002") + if err != nil { + t.Fatalf("failed to resolve remote address: %s", err.Error()) + } + conn, err := net.DialTCP("tcp4", nil, raddr) + if err != nil { + t.Fatalf("failed to connect to remote address: %s", err.Error()) + } + conn.Write([]byte{2}) + conn.Write([]byte(`{"localhost:4002": "localhost:4001"}`)) } func mustNewStore(inmem bool) *Store { From 9164550aabec8b90e858a6a8690effa800f529b9 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Sun, 24 Apr 2016 17:03:09 -0700 Subject: [PATCH 9/9] Remove debug print --- store/store.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/store/store.go b/store/store.go index f770a8fc..984ad3b4 100644 --- a/store/store.go +++ b/store/store.go @@ -474,7 +474,6 @@ func (s *Store) serveMeta() error { // handleMetaConn processes individual connections to the meta server. func (s *Store) handleMetaConn(conn net.Conn) error { - fmt.Println("handling meta connection") defer conn.Close() // Only handles peers updates for now. @@ -487,7 +486,6 @@ func (s *Store) handleMetaConn(conn net.Conn) error { } // Update the peers. - fmt.Printf("%v", peers) return s.UpdateAPIPeers(peers) }