|
|
|
@ -3,9 +3,9 @@ package server
|
|
|
|
|
import (
|
|
|
|
|
"bytes"
|
|
|
|
|
"encoding/json"
|
|
|
|
|
"errors"
|
|
|
|
|
"fmt"
|
|
|
|
|
"io/ioutil"
|
|
|
|
|
"log"
|
|
|
|
|
"math/rand"
|
|
|
|
|
"net/http"
|
|
|
|
|
"path"
|
|
|
|
@ -18,6 +18,8 @@ import (
|
|
|
|
|
"github.com/otoolep/raft"
|
|
|
|
|
"github.com/otoolep/rqlite/command"
|
|
|
|
|
"github.com/otoolep/rqlite/db"
|
|
|
|
|
|
|
|
|
|
log "code.google.com/p/log4go"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
// queryParam returns whether the given query param is set to true.
|
|
|
|
@ -89,13 +91,14 @@ func (s *Server) connectionString() string {
|
|
|
|
|
func (s *Server) ListenAndServe(leader string) error {
|
|
|
|
|
var err error
|
|
|
|
|
|
|
|
|
|
log.Printf("Initializing Raft Server: %s", s.path)
|
|
|
|
|
log.Info("Initializing Raft Server: %s", s.path)
|
|
|
|
|
|
|
|
|
|
// Initialize and start Raft server.
|
|
|
|
|
transporter := raft.NewHTTPTransporter("/raft", 200*time.Millisecond)
|
|
|
|
|
s.raftServer, err = raft.NewServer(s.name, s.path, transporter, nil, s.db, "")
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Fatal(err)
|
|
|
|
|
log.Error("Failed to create new Raft server", err.Error())
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
transporter.Install(s.raftServer, s)
|
|
|
|
|
s.raftServer.Start()
|
|
|
|
@ -103,33 +106,35 @@ func (s *Server) ListenAndServe(leader string) error {
|
|
|
|
|
if leader != "" {
|
|
|
|
|
// Join to leader if specified.
|
|
|
|
|
|
|
|
|
|
log.Println("Attempting to join leader:", leader)
|
|
|
|
|
log.Info("Attempting to join leader:", leader)
|
|
|
|
|
|
|
|
|
|
if !s.raftServer.IsLogEmpty() {
|
|
|
|
|
log.Fatal("Cannot join with an existing log")
|
|
|
|
|
log.Error("Cannot join with an existing log")
|
|
|
|
|
return errors.New("Cannot join with an existing log")
|
|
|
|
|
}
|
|
|
|
|
if err := s.Join(leader); err != nil {
|
|
|
|
|
log.Fatal(err)
|
|
|
|
|
log.Error("Failed to join leader", err.Error())
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
} else if s.raftServer.IsLogEmpty() {
|
|
|
|
|
// Initialize the server by joining itself.
|
|
|
|
|
|
|
|
|
|
log.Println("Initializing new cluster")
|
|
|
|
|
log.Info("Initializing new cluster")
|
|
|
|
|
|
|
|
|
|
_, err := s.raftServer.Do(&raft.DefaultJoinCommand{
|
|
|
|
|
Name: s.raftServer.Name(),
|
|
|
|
|
ConnectionString: s.connectionString(),
|
|
|
|
|
})
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Fatal(err)
|
|
|
|
|
log.Error("Failed to join to self", err.Error())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
} else {
|
|
|
|
|
log.Println("Recovered from log")
|
|
|
|
|
log.Info("Recovered from log")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
log.Println("Initializing HTTP server")
|
|
|
|
|
log.Info("Initializing HTTP server")
|
|
|
|
|
|
|
|
|
|
// Initialize and start HTTP server.
|
|
|
|
|
s.httpServer = &http.Server{
|
|
|
|
@ -141,7 +146,7 @@ func (s *Server) ListenAndServe(leader string) error {
|
|
|
|
|
s.router.HandleFunc("/db", s.writeHandler).Methods("POST")
|
|
|
|
|
s.router.HandleFunc("/join", s.joinHandler).Methods("POST")
|
|
|
|
|
|
|
|
|
|
log.Println("Listening at:", s.connectionString())
|
|
|
|
|
log.Info("Listening at %s", s.connectionString())
|
|
|
|
|
|
|
|
|
|
return s.httpServer.ListenAndServe()
|
|
|
|
|
}
|
|
|
|
@ -184,14 +189,17 @@ func (s *Server) joinHandler(w http.ResponseWriter, req *http.Request) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *Server) readHandler(w http.ResponseWriter, req *http.Request) {
|
|
|
|
|
log.Debug("readHandler for URL: %s", req.URL)
|
|
|
|
|
b, err := ioutil.ReadAll(req.Body)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Debug("Bad HTTP request", err.Error())
|
|
|
|
|
w.WriteHeader(http.StatusBadRequest)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
r, err := s.db.Query(string(b))
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Debug("Bad HTTP request", err.Error())
|
|
|
|
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -202,6 +210,7 @@ func (s *Server) readHandler(w http.ResponseWriter, req *http.Request) {
|
|
|
|
|
b, err = json.Marshal(r)
|
|
|
|
|
}
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Debug("Failed to marshal JSON data", err.Error())
|
|
|
|
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
@ -209,9 +218,11 @@ func (s *Server) readHandler(w http.ResponseWriter, req *http.Request) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *Server) writeHandler(w http.ResponseWriter, req *http.Request) {
|
|
|
|
|
log.Debug("writeHandler for URL: %s", req.URL)
|
|
|
|
|
// Read the value from the POST body.
|
|
|
|
|
b, err := ioutil.ReadAll(req.Body)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Debug("Bad HTTP request", err.Error())
|
|
|
|
|
w.WriteHeader(http.StatusBadRequest)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
@ -220,6 +231,7 @@ func (s *Server) writeHandler(w http.ResponseWriter, req *http.Request) {
|
|
|
|
|
// Execute the command against the Raft server.
|
|
|
|
|
switch {
|
|
|
|
|
case len(stmts) == 0:
|
|
|
|
|
log.Debug("No database execute commands supplied")
|
|
|
|
|
w.WriteHeader(http.StatusBadRequest)
|
|
|
|
|
return
|
|
|
|
|
case len(stmts) == 1:
|
|
|
|
@ -227,8 +239,10 @@ func (s *Server) writeHandler(w http.ResponseWriter, req *http.Request) {
|
|
|
|
|
case len(stmts) > 1:
|
|
|
|
|
transaction, _ := isTransaction(req)
|
|
|
|
|
if transaction {
|
|
|
|
|
log.Debug("Transaction requested")
|
|
|
|
|
_, err = s.raftServer.Do(command.NewTransactionWriteCommandSet(stmts))
|
|
|
|
|
} else {
|
|
|
|
|
log.Debug("No transaction requested")
|
|
|
|
|
// Do each individually, returning JSON respoonse
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|