From 26545006ee75214e2d58580f1cae277828fe9981 Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Sun, 24 Apr 2016 18:43:26 -0700 Subject: [PATCH 1/8] Add initial cluster service package --- cluster/cluster.go | 120 ++++++++++++++++++++++++++++++++++++++++ cluster/cluster_test.go | 9 +++ 2 files changed, 129 insertions(+) create mode 100644 cluster/cluster.go create mode 100644 cluster/cluster_test.go diff --git a/cluster/cluster.go b/cluster/cluster.go new file mode 100644 index 00000000..9ddcac61 --- /dev/null +++ b/cluster/cluster.go @@ -0,0 +1,120 @@ +package cluster + +import ( + "encoding/json" + "log" + "net" + "time" +) + +const ( + ConnectionTimeout = 10 * time.Second +) + +// Listener is the interface the network service must provide. +type Listener interface { + net.Listener + + // Dial is used to create a new outgoing connection + Dial(address string, timeout time.Duration) (net.Conn, error) +} + +// Store represents a store of information, managed via consensus. +type Store interface { + // Leader returns the leader of the consensus system. + Leader() string + + // UpdateAPIPeers updates the API peers on the store. + UpdateAPIPeers(peers map[string]string) error +} + +// Service allows access to the cluster and associated meta data, +// via consensus. +type Service struct { + ln Listener + store Store + + logger *log.Logger +} + +// NewService returns a new instance of the cluster service +func NewService(ln Listener, store Store) *Service { + return &Service{ + ln: ln, + store: store, + } +} + +// Open opens the Service. +func (s *Service) Open() error { + go s.serve() + return nil +} + +// Close closes the service. +func (s *Service) Close() error { + s.ln.Close() + return nil +} + +// SetPeers will set the mapping between raftAddr and apiAddr for the entire cluster. +func (s *Service) SetPeers(raftAddr, apiAddr string) error { + peer := map[string]string{ + raftAddr: apiAddr, + } + + // Try the local store. It might be the leader. + err := s.store.UpdateAPIPeers(peer) + if err == nil { + // All done! Aren't we lucky? + return nil + } + + // Try talking to the leader over the network. + conn, err := s.ln.Dial(s.store.Leader(), ConnectionTimeout) + if err != nil { + return err + } + defer conn.Close() + + b, err := json.Marshal(peer) + if err != nil { + return err + } + + if _, err := conn.Write(b); err != nil { + return err + } + + // XXXX Wait for response and check for error. + return nil +} + +func (s *Service) serve() error { + for { + conn, err := s.ln.Accept() + if err != nil { + return err + } + + go s.handleConn(conn) + } +} + +func (s *Service) handleConn(conn net.Conn) { + // Only handles peers updates for now. + peers := make(map[string]string) + d := json.NewDecoder(conn) + + err := d.Decode(&peers) + if err != nil { + return + } + + // Update the peers. + if err := s.store.UpdateAPIPeers(peers); err != nil { + // Write error back down conn + return + } + // write OK back down conn +} diff --git a/cluster/cluster_test.go b/cluster/cluster_test.go new file mode 100644 index 00000000..7a78f8f1 --- /dev/null +++ b/cluster/cluster_test.go @@ -0,0 +1,9 @@ +package cluster + +import ( + "testing" +) + +func Test_NewService(t *testing.T) { + return +} From 20f5e3b298303c7460c8862d0e6ab8b6f4389c6b Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Sun, 24 Apr 2016 19:03:19 -0700 Subject: [PATCH 2/8] Error check remote operations --- cluster/cluster.go | 46 ++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 42 insertions(+), 4 deletions(-) diff --git a/cluster/cluster.go b/cluster/cluster.go index 9ddcac61..5032d272 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -2,6 +2,7 @@ package cluster import ( "encoding/json" + "fmt" "log" "net" "time" @@ -11,6 +12,21 @@ const ( ConnectionTimeout = 10 * time.Second ) +var respOKMarshalled []byte + +func init() { + var err error + respOKMarshalled, err = json.Marshal(response{}) + if err != nil { + panic(fmt.Sprintf("unable to JSON marshall OK response: %s", err.Error())) + } +} + +type response struct { + Code int `json:"code,omitempty"` + Message string `json:"message,omitempty"` +} + // Listener is the interface the network service must provide. type Listener interface { net.Listener @@ -86,7 +102,17 @@ func (s *Service) SetPeers(raftAddr, apiAddr string) error { return err } - // XXXX Wait for response and check for error. + // Wait for the response and verify the operation went through. + resp := response{} + d := json.NewDecoder(conn) + err = d.Decode(&resp) + if err != nil { + return err + } + + if resp.Code != 0 { + return fmt.Errorf(resp.Message) + } return nil } @@ -105,7 +131,6 @@ func (s *Service) handleConn(conn net.Conn) { // Only handles peers updates for now. peers := make(map[string]string) d := json.NewDecoder(conn) - err := d.Decode(&peers) if err != nil { return @@ -113,8 +138,21 @@ func (s *Service) handleConn(conn net.Conn) { // Update the peers. if err := s.store.UpdateAPIPeers(peers); err != nil { - // Write error back down conn + resp := response{1, err.Error()} + b, err := json.Marshal(resp) + if err != nil { + conn.Close() // Only way left to signal. + } else { + if _, err := conn.Write(b); err != nil { + conn.Close() // Only way left to signal. + } + } return } - // write OK back down conn + + // Let the remote node know everything went OK. + if _, err := conn.Write(respOKMarshalled); err != nil { + conn.Close() // Only way left to signal. + } + return } From a02f44f0897f0f2779ff9df713d480c11c379aad Mon Sep 17 00:00:00 2001 From: Philip O'Toole Date: Sun, 24 Apr 2016 19:06:33 -0700 Subject: [PATCH 3/8] Remove cluster-like code from Store --- store/store.go | 34 ---------------------------------- store/store_test.go | 24 ------------------------ 2 files changed, 58 deletions(-) diff --git a/store/store.go b/store/store.go index 984ad3b4..e281076c 100644 --- a/store/store.go +++ b/store/store.go @@ -127,7 +127,6 @@ type Store struct { metaMu sync.RWMutex meta *clusterMeta - metaLn net.Listener logger *log.Logger } @@ -198,10 +197,6 @@ func (s *Store) Open(enableSingle bool) error { } go s.mux.Serve(ln) - // Setup meta updates communication - s.metaLn = s.mux.Listen(muxMetaHeader) - go s.serveMeta() - // Setup Raft communication. s.ln = newNetworkLayer(s.mux.Listen(muxRaftHeader), ln.Addr()) transport := raft.NewNetworkTransport(s.ln, 3, 10*time.Second, os.Stderr) @@ -460,35 +455,6 @@ func (s *Store) Join(addr string) error { return nil } -// serveMeta accepts new connections to the meta server. -func (s *Store) serveMeta() error { - for { - conn, err := s.metaLn.Accept() - if err != nil { - return err - } - - go s.handleMetaConn(conn) - } -} - -// handleMetaConn processes individual connections to the meta server. -func (s *Store) handleMetaConn(conn net.Conn) error { - defer conn.Close() - - // Only handles peers updates for now. - peers := make(map[string]string) - d := json.NewDecoder(conn) - - err := d.Decode(&peers) - if err != nil { - return err - } - - // Update the peers. - return s.UpdateAPIPeers(peers) -} - type fsmExecuteResponse struct { results []*sql.Result error error diff --git a/store/store_test.go b/store/store_test.go index e30f74e6..d19b6ca7 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -3,7 +3,6 @@ package store import ( "encoding/json" "io/ioutil" - "net" "os" "path/filepath" "reflect" @@ -380,29 +379,6 @@ func Test_APIPeers(t *testing.T) { } } -func Test_MetaServer(t *testing.T) { - t.Skip() - s := mustNewStore(false) - defer os.RemoveAll(s.Path()) - - if err := s.Open(true); err != nil { - t.Fatalf("failed to open single-node store: %s", err.Error()) - } - defer s.Close(true) - s.WaitForLeader(10 * time.Second) - - raddr, err := net.ResolveTCPAddr("tcp", "localhost:4002") - if err != nil { - t.Fatalf("failed to resolve remote address: %s", err.Error()) - } - conn, err := net.DialTCP("tcp4", nil, raddr) - if err != nil { - t.Fatalf("failed to connect to remote address: %s", err.Error()) - } - conn.Write([]byte{2}) - conn.Write([]byte(`{"localhost:4002": "localhost:4001"}`)) -} - func mustNewStore(inmem bool) *Store { path := mustTempDir() defer os.RemoveAll(path) From d3ec6856c90998df038310e908467a7ba60e1524 Mon Sep 17 00:00:00 2001 From: Philip O Toole Date: Sun, 24 Apr 2016 20:46:26 -0700 Subject: [PATCH 4/8] Start fleshing out tests --- cluster/cluster.go | 7 ++++++ cluster/cluster_test.go | 50 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 57 insertions(+) diff --git a/cluster/cluster.go b/cluster/cluster.go index 5032d272..283e6a2d 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -49,6 +49,7 @@ type Store interface { type Service struct { ln Listener store Store + addr net.Addr logger *log.Logger } @@ -58,6 +59,7 @@ func NewService(ln Listener, store Store) *Service { return &Service{ ln: ln, store: store, + addr: ln.Addr(), } } @@ -73,6 +75,11 @@ func (s *Service) Close() error { return nil } +// Addr returns the address the service is listening on. +func (s *Service) Addr() string { + return s.addr.String() +} + // SetPeers will set the mapping between raftAddr and apiAddr for the entire cluster. func (s *Service) SetPeers(raftAddr, apiAddr string) error { peer := map[string]string{ diff --git a/cluster/cluster_test.go b/cluster/cluster_test.go index 7a78f8f1..1cb9a848 100644 --- a/cluster/cluster_test.go +++ b/cluster/cluster_test.go @@ -1,9 +1,59 @@ package cluster import ( + "net" "testing" + "time" ) func Test_NewService(t *testing.T) { + ml := &mockListener{} + ms := &mockStore{} + s := NewService(ml, ms) + if s == nil { + t.Fatalf("failed to create cluster service") + } return } + +type mockListener struct { + ln net.Listener +} + +func mustNewMockListener() *mockListener { + ln, err := net.Listen("tcp", "localhost:0") + if err != nil { + panic("failed to create mock listener") + } + return &mockListener{ + ln: ln, + } +} + +func (ml *mockListener) Accept() (c net.Conn, err error) { + return nil, nil +} + +func (ml *mockListener) Addr() net.Addr { + return nil +} + +func (ml *mockListener) Close() (err error) { + return ml.ln.Close() +} + +func (ml *mockListener) Dial(addr string, t time.Duration) (net.Conn, error) { + return nil, nil +} + +type mockStore struct { + leader string +} + +func (ms *mockStore) Leader() string { + return ms.leader +} + +func (ms *mockStore) UpdateAPIPeers(peers map[string]string) error { + return nil +} From f50dc9a533400667981cf2c4ba769d04395e0e97 Mon Sep 17 00:00:00 2001 From: Philip O Toole Date: Mon, 25 Apr 2016 10:16:36 -0700 Subject: [PATCH 5/8] More cluster package testing --- cluster/cluster.go | 16 +++++++++++++--- cluster/cluster_test.go | 16 +++++++++++----- 2 files changed, 24 insertions(+), 8 deletions(-) diff --git a/cluster/cluster.go b/cluster/cluster.go index 283e6a2d..b38e7ac2 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -5,6 +5,8 @@ import ( "fmt" "log" "net" + "os" + "sync" "time" ) @@ -51,27 +53,33 @@ type Service struct { store Store addr net.Addr + wg sync.WaitGroup + logger *log.Logger } // NewService returns a new instance of the cluster service func NewService(ln Listener, store Store) *Service { return &Service{ - ln: ln, - store: store, - addr: ln.Addr(), + ln: ln, + store: store, + addr: ln.Addr(), + logger: log.New(os.Stderr, "[cluster] ", log.LstdFlags), } } // Open opens the Service. func (s *Service) Open() error { + s.wg.Add(1) go s.serve() + s.logger.Println("service listening on", s.ln.Addr()) return nil } // Close closes the service. func (s *Service) Close() error { s.ln.Close() + s.wg.Wait() return nil } @@ -124,6 +132,8 @@ func (s *Service) SetPeers(raftAddr, apiAddr string) error { } func (s *Service) serve() error { + defer s.wg.Done() + for { conn, err := s.ln.Accept() if err != nil { diff --git a/cluster/cluster_test.go b/cluster/cluster_test.go index 1cb9a848..61fa82d6 100644 --- a/cluster/cluster_test.go +++ b/cluster/cluster_test.go @@ -6,14 +6,20 @@ import ( "time" ) -func Test_NewService(t *testing.T) { - ml := &mockListener{} +func Test_NewServiceOpenClose(t *testing.T) { + ml := mustNewMockListener() ms := &mockStore{} s := NewService(ml, ms) if s == nil { t.Fatalf("failed to create cluster service") } - return + + if err := s.Open(); err != nil { + t.Fatalf("failed to open cluster service") + } + if err := s.Close(); err != nil { + t.Fatalf("failed to close cluster service") + } } type mockListener struct { @@ -31,11 +37,11 @@ func mustNewMockListener() *mockListener { } func (ml *mockListener) Accept() (c net.Conn, err error) { - return nil, nil + return ml.ln.Accept() } func (ml *mockListener) Addr() net.Addr { - return nil + return ml.ln.Addr() } func (ml *mockListener) Close() (err error) { From d5e3889ac835c6d35150b397cf72b64215d74210 Mon Sep 17 00:00:00 2001 From: Philip O Toole Date: Mon, 25 Apr 2016 10:41:50 -0700 Subject: [PATCH 6/8] Unit test setting peers on cluster service --- cluster/cluster.go | 4 +-- cluster/cluster_test.go | 63 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 65 insertions(+), 2 deletions(-) diff --git a/cluster/cluster.go b/cluster/cluster.go index b38e7ac2..acb3b2a1 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -88,8 +88,8 @@ func (s *Service) Addr() string { return s.addr.String() } -// SetPeers will set the mapping between raftAddr and apiAddr for the entire cluster. -func (s *Service) SetPeers(raftAddr, apiAddr string) error { +// SetPeer will set the mapping between raftAddr and apiAddr for the entire cluster. +func (s *Service) SetPeer(raftAddr, apiAddr string) error { peer := map[string]string{ raftAddr: apiAddr, } diff --git a/cluster/cluster_test.go b/cluster/cluster_test.go index 61fa82d6..df51931b 100644 --- a/cluster/cluster_test.go +++ b/cluster/cluster_test.go @@ -1,6 +1,7 @@ package cluster import ( + "fmt" "net" "testing" "time" @@ -22,6 +23,58 @@ func Test_NewServiceOpenClose(t *testing.T) { } } +func Test_SetAPIPeer(t *testing.T) { + raftAddr, apiAddr := "localhost:4002", "localhost:4001" + + s, _, ms := mustNewOpenService() + defer s.Close() + if err := s.SetPeer(raftAddr, apiAddr); err != nil { + t.Fatalf("failed to set peer: %s", err.Error()) + } + + if ms.peers[raftAddr] != apiAddr { + t.Fatalf("peer not set correctly, exp %s, got %s", apiAddr, ms.peers[raftAddr]) + } +} + +func Test_SerAPIPeerNetwork(t *testing.T) { + t.Skip("service not responding correctly") + + raftAddr, apiAddr := "localhost:4002", "localhost:4001" + + s, _, ms := mustNewOpenService() + defer s.Close() + + raddr, err := net.ResolveTCPAddr("tcp", s.Addr()) + if err != nil { + t.Fatalf("failed to resolve remote uster ervice address: %s", err.Error()) + } + + conn, err := net.DialTCP("tcp4", nil, raddr) + if err != nil { + t.Fatalf("failed to connect to remote cluster service: %s", err.Error()) + } + conn.Write([]byte(fmt.Sprintf(`{"%s": "%d"}`, raftAddr, apiAddr))) + if err != nil { + t.Fatalf("failed to write to remote cluster service: %s", err.Error()) + } + // XXX Check response + + if ms.peers[raftAddr] != apiAddr { + t.Fatalf("peer not set correctly, exp %s, got %s", apiAddr, ms.peers[raftAddr]) + } +} + +func mustNewOpenService() (*Service, *mockListener, *mockStore) { + ml := mustNewMockListener() + ms := newMockStore() + s := NewService(ml, ms) + if err := s.Open(); err != nil { + panic("failed to open new service") + } + return s, ml, ms +} + type mockListener struct { ln net.Listener } @@ -54,6 +107,13 @@ func (ml *mockListener) Dial(addr string, t time.Duration) (net.Conn, error) { type mockStore struct { leader string + peers map[string]string +} + +func newMockStore() *mockStore { + return &mockStore{ + peers: make(map[string]string), + } } func (ms *mockStore) Leader() string { @@ -61,5 +121,8 @@ func (ms *mockStore) Leader() string { } func (ms *mockStore) UpdateAPIPeers(peers map[string]string) error { + for k, v := range peers { + ms.peers[k] = v + } return nil } From 5ce4f4d0a5e102fbb47e56187c9b3d1e8d89a562 Mon Sep 17 00:00:00 2001 From: Philip O Toole Date: Mon, 25 Apr 2016 10:42:29 -0700 Subject: [PATCH 7/8] 'go fmt' fixes --- cluster/cluster_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cluster/cluster_test.go b/cluster/cluster_test.go index df51931b..16a4ac8f 100644 --- a/cluster/cluster_test.go +++ b/cluster/cluster_test.go @@ -38,7 +38,7 @@ func Test_SetAPIPeer(t *testing.T) { } func Test_SerAPIPeerNetwork(t *testing.T) { - t.Skip("service not responding correctly") + t.Skip("service not responding correctly") raftAddr, apiAddr := "localhost:4002", "localhost:4001" @@ -58,7 +58,7 @@ func Test_SerAPIPeerNetwork(t *testing.T) { if err != nil { t.Fatalf("failed to write to remote cluster service: %s", err.Error()) } - // XXX Check response + // XXX Check response if ms.peers[raftAddr] != apiAddr { t.Fatalf("peer not set correctly, exp %s, got %s", apiAddr, ms.peers[raftAddr]) From 2fee305a89ebfcb14fe37f6f24313002dbbf9969 Mon Sep 17 00:00:00 2001 From: Philip O Toole Date: Mon, 25 Apr 2016 10:47:17 -0700 Subject: [PATCH 8/8] 'go vet' fixes --- cluster/cluster_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cluster/cluster_test.go b/cluster/cluster_test.go index 16a4ac8f..4892ff20 100644 --- a/cluster/cluster_test.go +++ b/cluster/cluster_test.go @@ -38,7 +38,7 @@ func Test_SetAPIPeer(t *testing.T) { } func Test_SerAPIPeerNetwork(t *testing.T) { - t.Skip("service not responding correctly") + t.Skip("remote service not responding correctly") raftAddr, apiAddr := "localhost:4002", "localhost:4001" @@ -54,7 +54,7 @@ func Test_SerAPIPeerNetwork(t *testing.T) { if err != nil { t.Fatalf("failed to connect to remote cluster service: %s", err.Error()) } - conn.Write([]byte(fmt.Sprintf(`{"%s": "%d"}`, raftAddr, apiAddr))) + conn.Write([]byte(fmt.Sprintf(`{"%s": "%s"}`, raftAddr, apiAddr))) if err != nil { t.Fatalf("failed to write to remote cluster service: %s", err.Error()) }