1
0
Fork 0

Basic cluster RPC service

master
Philip O'Toole 3 years ago
parent 38b2abca84
commit e934630a58

@ -0,0 +1,108 @@
package cluster
import (
"io/ioutil"
"log"
"net"
"os"
"sync"
"time"
)
// Transport is the interface the network layer must provide.
type Transport interface {
net.Listener
// Dial is used to create a connection to a service listening
// on an address.
Dial(address string, timeout time.Duration) (net.Conn, error)
}
// Service provides information about the node and cluster.
type Service struct {
tn Transport // Network layer this service uses
addr net.Addr // Address on which this service is listening
timeout time.Duration
mu sync.RWMutex
apiAddr string // host:port this node serves the HTTP API.
wg sync.WaitGroup
logger *log.Logger
}
// NewService returns a new instance of the cluster service
func NewService(tn Transport) *Service {
return &Service{
tn: tn,
addr: tn.Addr(),
timeout: 10 * time.Second,
logger: log.New(os.Stderr, "[cluster] ", log.LstdFlags),
}
}
// Open opens the Service.
func (s *Service) Open() error {
s.wg.Add(1)
go s.serve()
s.logger.Println("service listening on", s.tn.Addr())
return nil
}
// Close closes the service.
func (s *Service) Close() error {
s.tn.Close()
s.wg.Wait()
return nil
}
// Addr returns the address the service is listening on.
func (s *Service) Addr() string {
return s.addr.String()
}
func (s *Service) SetAPIAddr(addr string) {
s.mu.Lock()
defer s.mu.Unlock()
s.apiAddr = addr
}
func (s *Service) GetAPIAddr() string {
s.mu.RLock()
defer s.mu.RUnlock()
return s.apiAddr
}
func (s *Service) GetNodeAPIAddr(nodeAddr string) (string, error) {
conn, err := s.tn.Dial(nodeAddr, s.timeout)
if err != nil {
return "", err
}
defer conn.Close()
b, err := ioutil.ReadAll(conn)
if err != nil {
return "", err
}
return string(b), nil
}
func (s *Service) serve() error {
defer s.wg.Done()
for {
conn, err := s.tn.Accept()
if err != nil {
return err
}
go s.handleConn(conn)
}
}
func (s *Service) handleConn(conn net.Conn) {
// This is where we'd actually switch on incoming command in protobuf format
// and write a protobuf back out.
conn.Write([]byte(s.apiAddr))
conn.Close()
}

@ -0,0 +1,55 @@
package cluster
import (
"fmt"
"net"
"testing"
"github.com/rqlite/rqlite/tcp"
)
func Test_NewServiceSetGetNodeAPIAddrMuxed(t *testing.T) {
ln, mux := mustNewMux()
go mux.Serve()
tn := mux.Listen(1) // Could be any byte value.
s := NewService(tn)
if s == nil {
t.Fatalf("failed to create cluster service")
}
if err := s.Open(); err != nil {
t.Fatalf("failed to open cluster service")
}
s.SetAPIAddr("foo")
addr, err := s.GetNodeAPIAddr(s.Addr())
if err != nil {
t.Fatalf("failed to get node API address: %s", err)
}
if addr != "foo" {
t.Fatalf("failed to get correct node API address")
}
if err := ln.Close(); err != nil {
t.Fatalf("failed to close Mux's listener: %s", err)
}
if err := s.Close(); err != nil {
t.Fatalf("failed to close cluster service")
}
}
func mustNewMux() (net.Listener, *tcp.Mux) {
ln, err := net.Listen("tcp", "localhost:0")
if err != nil {
panic("failed to create mock listener")
}
mux, err := tcp.NewMux(ln, nil)
if err != nil {
panic(fmt.Sprintf("failed to create mux: %s", err))
}
return ln, mux
}

@ -0,0 +1,102 @@
package cluster
import (
"net"
"testing"
"time"
)
func Test_NewServiceOpenClose(t *testing.T) {
ml := mustNewMockTransport()
s := NewService(ml)
if s == nil {
t.Fatalf("failed to create cluster service")
}
if err := s.Open(); err != nil {
t.Fatalf("failed to open cluster service")
}
if ml.Addr().String() != s.Addr() {
t.Fatalf("service returned incorrect address")
}
if err := s.Close(); err != nil {
t.Fatalf("failed to close cluster service")
}
}
func Test_NewServiceSetGetAPIAddr(t *testing.T) {
ml := mustNewMockTransport()
s := NewService(ml)
if s == nil {
t.Fatalf("failed to create cluster service")
}
if err := s.Open(); err != nil {
t.Fatalf("failed to open cluster service")
}
s.SetAPIAddr("foo")
if exp, got := "foo", s.GetAPIAddr(); exp != got {
t.Fatalf("got incorrect API address, exp %s, got %s", exp, got)
}
if err := s.Close(); err != nil {
t.Fatalf("failed to close cluster service")
}
}
func Test_NewServiceSetGetNodeAPIAddr(t *testing.T) {
ml := mustNewMockTransport()
s := NewService(ml)
if s == nil {
t.Fatalf("failed to create cluster service")
}
if err := s.Open(); err != nil {
t.Fatalf("failed to open cluster service")
}
s.SetAPIAddr("foo")
addr, err := s.GetNodeAPIAddr(s.Addr())
if err != nil {
t.Fatalf("failed to get node API address: %s", err)
}
if addr != "foo" {
t.Fatalf("failed to get correct node API address")
}
if err := s.Close(); err != nil {
t.Fatalf("failed to close cluster service")
}
}
type mockTransport struct {
tn net.Listener
}
func mustNewMockTransport() *mockTransport {
tn, err := net.Listen("tcp", "localhost:0")
if err != nil {
panic("failed to create mock listener")
}
return &mockTransport{
tn: tn,
}
}
func (ml *mockTransport) Accept() (c net.Conn, err error) {
return ml.tn.Accept()
}
func (ml *mockTransport) Addr() net.Addr {
return ml.tn.Addr()
}
func (ml *mockTransport) Close() (err error) {
return ml.tn.Close()
}
func (ml *mockTransport) Dial(addr string, t time.Duration) (net.Conn, error) {
return net.DialTimeout("tcp", addr, 5*time.Second)
}
Loading…
Cancel
Save