1
0
Fork 0

Merge pull request #310 from rqlite/add_mux

HTTP service now supports registered statuses
master
Philip O'Toole 7 years ago committed by GitHub
commit da4a364823

@ -4,6 +4,7 @@
- [PR #309](https://github.com/rqlite/rqlite/pull/309): Tweak start-up logo.
- [PR #308](https://github.com/rqlite/rqlite/pull/308): Move to clearer command-line options.
- [PR #307](https://github.com/rqlite/rqlite/pull/307): Support node-to-node encryption. Fixes [issue #93](https://github.com/rqlite/rqlite/issues/93).
- [PR #310](https://github.com/rqlite/rqlite/pull/310): HTTP service supports registration of Status providers; Mux is first client.
## 3.14.0 (May 4th 2017)
- [PR #304](https://github.com/rqlite/rqlite/pull/304): Switch to Go 1.8.1.

@ -304,6 +304,12 @@ func main() {
log.Fatalf("failed to start HTTP server: %s", err.Error())
}
// Register cross-component statuses.
if err := s.RegisterStatus("mux", mux); err != nil {
log.Fatalf("failed to register mux status: %s", err.Error())
}
// Block until signalled.
terminate := make(chan os.Signal, 1)
signal.Notify(terminate, os.Interrupt)
<-terminate

@ -16,6 +16,7 @@ import (
"os"
"runtime"
"strings"
"sync"
"time"
sql "github.com/rqlite/rqlite/db"
@ -64,6 +65,11 @@ type CredentialStore interface {
HasPerm(username string, perm string) bool
}
// Statuser is the interface status providers must implement.
type Statuser interface {
Stats() (interface{}, error)
}
// Response represents a response from the HTTP service.
type Response struct {
Results interface{} `json:"results,omitempty"`
@ -134,6 +140,9 @@ type Service struct {
start time.Time // Start up time.
lastBackup time.Time // Time of last successful backup.
statusMu sync.RWMutex
statuses map[string]Statuser
CertFile string // Path to SSL certificate.
KeyFile string // Path to SSL private key.
@ -154,6 +163,7 @@ func New(addr string, store Store, credentials CredentialStore) *Service {
addr: addr,
store: store,
start: time.Now(),
statuses: make(map[string]Statuser),
credentialStore: credentials,
logger: log.New(os.Stderr, "[http] ", log.LstdFlags),
}
@ -243,6 +253,19 @@ func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
}
// RegisterStatus allows other modules to register status for serving over HTTP.
func (s *Service) RegisterStatus(key string, stat Statuser) error {
s.statusMu.Lock()
defer s.statusMu.Unlock()
if _, ok := s.statuses[key]; ok {
return fmt.Errorf("status already registered with key %s", key)
}
s.statuses[key] = stat
return nil
}
// handleJoin handles cluster-join requests from other nodes.
func (s *Service) handleJoin(w http.ResponseWriter, r *http.Request) {
if !s.CheckRequestPerm(r, PermJoin) {
@ -470,6 +493,20 @@ func (s *Service) handleStatus(w http.ResponseWriter, r *http.Request) {
status["build"] = s.BuildInfo
}
// Add any registered statusers.
func() {
s.statusMu.RLock()
defer s.statusMu.RUnlock()
for k, v := range s.statuses {
stat, err := v.Stats()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
status[k] = stat
}
}()
pretty, _ := isPretty(r)
var b []byte
if pretty {

@ -332,6 +332,20 @@ func Test_401Routes_BasicAuthBadPerm(t *testing.T) {
}
}
func Test_RegisterStatus(t *testing.T) {
var stats *mockStatuser
m := &MockStore{}
s := New("127.0.0.1:0", m, nil)
if err := s.RegisterStatus("foo", stats); err != nil {
t.Fatalf("failed to register statuser: %s", err.Error())
}
if err := s.RegisterStatus("foo", stats); err == nil {
t.Fatal("successfully re-registered statuser")
}
}
type MockStore struct {
executeFn func(queries []string, tx bool) ([]*sql.Result, error)
queryFn func(queries []string, tx, leader, verify bool) ([]*sql.Rows, error)
@ -387,3 +401,10 @@ func (m *mockCredentialStore) Check(username, password string) bool {
func (m *mockCredentialStore) HasPerm(username, perm string) bool {
return m.HasPermOK
}
type mockStatuser struct {
}
func (m *mockStatuser) Stats() (interface{}, error) {
return nil, nil
}

@ -8,6 +8,7 @@ import (
"log"
"net"
"os"
"strconv"
"sync"
"time"
)
@ -104,7 +105,7 @@ func NewMux(ln net.Listener, adv net.Addr) (*Mux, error) {
addr: addr,
m: make(map[byte]*listener),
Timeout: DefaultTimeout,
Logger: log.New(os.Stderr, "[tcp] ", log.LstdFlags),
Logger: log.New(os.Stderr, "[mux] ", log.LstdFlags),
}, nil
}
@ -156,6 +157,23 @@ func (mux *Mux) Serve() error {
}
}
// Stats returns status of the mux.
func (mux *Mux) Stats() (interface{}, error) {
s := map[string]string{
"addr": mux.addr.String(),
"timeout": mux.Timeout.String(),
"encrypted": strconv.FormatBool(mux.remoteEncrypted),
}
if mux.remoteEncrypted {
s["certificate"] = mux.nodeX509Cert
s["key"] = mux.nodeX509Key
s["skip_verify"] = strconv.FormatBool(mux.InsecureSkipVerify)
}
return s, nil
}
func (mux *Mux) handleConn(conn net.Conn) {
defer mux.wg.Done()
// Set a read deadline so connections with no data don't timeout.

Loading…
Cancel
Save