1
0
Fork 0

Merge pull request #102 from otoolep/cluster_service

Add initial cluster service package
master
Philip O'Toole 9 years ago
commit 1f61988369

@ -0,0 +1,175 @@
package cluster
import (
"encoding/json"
"fmt"
"log"
"net"
"os"
"sync"
"time"
)
const (
ConnectionTimeout = 10 * time.Second
)
var respOKMarshalled []byte
func init() {
var err error
respOKMarshalled, err = json.Marshal(response{})
if err != nil {
panic(fmt.Sprintf("unable to JSON marshall OK response: %s", err.Error()))
}
}
type response struct {
Code int `json:"code,omitempty"`
Message string `json:"message,omitempty"`
}
// Listener is the interface the network service must provide.
type Listener interface {
net.Listener
// Dial is used to create a new outgoing connection
Dial(address string, timeout time.Duration) (net.Conn, error)
}
// Store represents a store of information, managed via consensus.
type Store interface {
// Leader returns the leader of the consensus system.
Leader() string
// UpdateAPIPeers updates the API peers on the store.
UpdateAPIPeers(peers map[string]string) error
}
// Service allows access to the cluster and associated meta data,
// via consensus.
type Service struct {
ln Listener
store Store
addr net.Addr
wg sync.WaitGroup
logger *log.Logger
}
// NewService returns a new instance of the cluster service
func NewService(ln Listener, store Store) *Service {
return &Service{
ln: ln,
store: store,
addr: ln.Addr(),
logger: log.New(os.Stderr, "[cluster] ", log.LstdFlags),
}
}
// Open opens the Service.
func (s *Service) Open() error {
s.wg.Add(1)
go s.serve()
s.logger.Println("service listening on", s.ln.Addr())
return nil
}
// Close closes the service.
func (s *Service) Close() error {
s.ln.Close()
s.wg.Wait()
return nil
}
// Addr returns the address the service is listening on.
func (s *Service) Addr() string {
return s.addr.String()
}
// SetPeer will set the mapping between raftAddr and apiAddr for the entire cluster.
func (s *Service) SetPeer(raftAddr, apiAddr string) error {
peer := map[string]string{
raftAddr: apiAddr,
}
// Try the local store. It might be the leader.
err := s.store.UpdateAPIPeers(peer)
if err == nil {
// All done! Aren't we lucky?
return nil
}
// Try talking to the leader over the network.
conn, err := s.ln.Dial(s.store.Leader(), ConnectionTimeout)
if err != nil {
return err
}
defer conn.Close()
b, err := json.Marshal(peer)
if err != nil {
return err
}
if _, err := conn.Write(b); err != nil {
return err
}
// Wait for the response and verify the operation went through.
resp := response{}
d := json.NewDecoder(conn)
err = d.Decode(&resp)
if err != nil {
return err
}
if resp.Code != 0 {
return fmt.Errorf(resp.Message)
}
return nil
}
func (s *Service) serve() error {
defer s.wg.Done()
for {
conn, err := s.ln.Accept()
if err != nil {
return err
}
go s.handleConn(conn)
}
}
func (s *Service) handleConn(conn net.Conn) {
// Only handles peers updates for now.
peers := make(map[string]string)
d := json.NewDecoder(conn)
err := d.Decode(&peers)
if err != nil {
return
}
// Update the peers.
if err := s.store.UpdateAPIPeers(peers); err != nil {
resp := response{1, err.Error()}
b, err := json.Marshal(resp)
if err != nil {
conn.Close() // Only way left to signal.
} else {
if _, err := conn.Write(b); err != nil {
conn.Close() // Only way left to signal.
}
}
return
}
// Let the remote node know everything went OK.
if _, err := conn.Write(respOKMarshalled); err != nil {
conn.Close() // Only way left to signal.
}
return
}

@ -0,0 +1,128 @@
package cluster
import (
"fmt"
"net"
"testing"
"time"
)
func Test_NewServiceOpenClose(t *testing.T) {
ml := mustNewMockListener()
ms := &mockStore{}
s := NewService(ml, ms)
if s == nil {
t.Fatalf("failed to create cluster service")
}
if err := s.Open(); err != nil {
t.Fatalf("failed to open cluster service")
}
if err := s.Close(); err != nil {
t.Fatalf("failed to close cluster service")
}
}
func Test_SetAPIPeer(t *testing.T) {
raftAddr, apiAddr := "localhost:4002", "localhost:4001"
s, _, ms := mustNewOpenService()
defer s.Close()
if err := s.SetPeer(raftAddr, apiAddr); err != nil {
t.Fatalf("failed to set peer: %s", err.Error())
}
if ms.peers[raftAddr] != apiAddr {
t.Fatalf("peer not set correctly, exp %s, got %s", apiAddr, ms.peers[raftAddr])
}
}
func Test_SerAPIPeerNetwork(t *testing.T) {
t.Skip("remote service not responding correctly")
raftAddr, apiAddr := "localhost:4002", "localhost:4001"
s, _, ms := mustNewOpenService()
defer s.Close()
raddr, err := net.ResolveTCPAddr("tcp", s.Addr())
if err != nil {
t.Fatalf("failed to resolve remote uster ervice address: %s", err.Error())
}
conn, err := net.DialTCP("tcp4", nil, raddr)
if err != nil {
t.Fatalf("failed to connect to remote cluster service: %s", err.Error())
}
conn.Write([]byte(fmt.Sprintf(`{"%s": "%s"}`, raftAddr, apiAddr)))
if err != nil {
t.Fatalf("failed to write to remote cluster service: %s", err.Error())
}
// XXX Check response
if ms.peers[raftAddr] != apiAddr {
t.Fatalf("peer not set correctly, exp %s, got %s", apiAddr, ms.peers[raftAddr])
}
}
func mustNewOpenService() (*Service, *mockListener, *mockStore) {
ml := mustNewMockListener()
ms := newMockStore()
s := NewService(ml, ms)
if err := s.Open(); err != nil {
panic("failed to open new service")
}
return s, ml, ms
}
type mockListener struct {
ln net.Listener
}
func mustNewMockListener() *mockListener {
ln, err := net.Listen("tcp", "localhost:0")
if err != nil {
panic("failed to create mock listener")
}
return &mockListener{
ln: ln,
}
}
func (ml *mockListener) Accept() (c net.Conn, err error) {
return ml.ln.Accept()
}
func (ml *mockListener) Addr() net.Addr {
return ml.ln.Addr()
}
func (ml *mockListener) Close() (err error) {
return ml.ln.Close()
}
func (ml *mockListener) Dial(addr string, t time.Duration) (net.Conn, error) {
return nil, nil
}
type mockStore struct {
leader string
peers map[string]string
}
func newMockStore() *mockStore {
return &mockStore{
peers: make(map[string]string),
}
}
func (ms *mockStore) Leader() string {
return ms.leader
}
func (ms *mockStore) UpdateAPIPeers(peers map[string]string) error {
for k, v := range peers {
ms.peers[k] = v
}
return nil
}

@ -127,7 +127,6 @@ type Store struct {
metaMu sync.RWMutex
meta *clusterMeta
metaLn net.Listener
logger *log.Logger
}
@ -198,10 +197,6 @@ func (s *Store) Open(enableSingle bool) error {
}
go s.mux.Serve(ln)
// Setup meta updates communication
s.metaLn = s.mux.Listen(muxMetaHeader)
go s.serveMeta()
// Setup Raft communication.
s.ln = newNetworkLayer(s.mux.Listen(muxRaftHeader), ln.Addr())
transport := raft.NewNetworkTransport(s.ln, 3, 10*time.Second, os.Stderr)
@ -460,35 +455,6 @@ func (s *Store) Join(addr string) error {
return nil
}
// serveMeta accepts new connections to the meta server.
func (s *Store) serveMeta() error {
for {
conn, err := s.metaLn.Accept()
if err != nil {
return err
}
go s.handleMetaConn(conn)
}
}
// handleMetaConn processes individual connections to the meta server.
func (s *Store) handleMetaConn(conn net.Conn) error {
defer conn.Close()
// Only handles peers updates for now.
peers := make(map[string]string)
d := json.NewDecoder(conn)
err := d.Decode(&peers)
if err != nil {
return err
}
// Update the peers.
return s.UpdateAPIPeers(peers)
}
type fsmExecuteResponse struct {
results []*sql.Result
error error

@ -3,7 +3,6 @@ package store
import (
"encoding/json"
"io/ioutil"
"net"
"os"
"path/filepath"
"reflect"
@ -380,29 +379,6 @@ func Test_APIPeers(t *testing.T) {
}
}
func Test_MetaServer(t *testing.T) {
t.Skip()
s := mustNewStore(false)
defer os.RemoveAll(s.Path())
if err := s.Open(true); err != nil {
t.Fatalf("failed to open single-node store: %s", err.Error())
}
defer s.Close(true)
s.WaitForLeader(10 * time.Second)
raddr, err := net.ResolveTCPAddr("tcp", "localhost:4002")
if err != nil {
t.Fatalf("failed to resolve remote address: %s", err.Error())
}
conn, err := net.DialTCP("tcp4", nil, raddr)
if err != nil {
t.Fatalf("failed to connect to remote address: %s", err.Error())
}
conn.Write([]byte{2})
conn.Write([]byte(`{"localhost:4002": "localhost:4001"}`))
}
func mustNewStore(inmem bool) *Store {
path := mustTempDir()
defer os.RemoveAll(path)

Loading…
Cancel
Save