|
|
|
@ -22,6 +22,30 @@ import (
|
|
|
|
|
log "code.google.com/p/log4go"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
type FailedSqlStmt struct {
|
|
|
|
|
Sql string `json:"sql"`
|
|
|
|
|
Error string `json:"error"`
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type WriteResponse struct {
|
|
|
|
|
Time string `json:"time"`
|
|
|
|
|
Failures []FailedSqlStmt `json:"failures"`
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// The raftd server is a combination of the Raft server and an HTTP
|
|
|
|
|
// server which acts as the transport.
|
|
|
|
|
type Server struct {
|
|
|
|
|
name string
|
|
|
|
|
host string
|
|
|
|
|
port int
|
|
|
|
|
path string
|
|
|
|
|
router *mux.Router
|
|
|
|
|
raftServer raft.Server
|
|
|
|
|
httpServer *http.Server
|
|
|
|
|
db *db.DB
|
|
|
|
|
mutex sync.RWMutex
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// queryParam returns whether the given query param is set to true.
|
|
|
|
|
func queryParam(req *http.Request, param string) (bool, error) {
|
|
|
|
|
err := req.ParseForm()
|
|
|
|
@ -45,26 +69,6 @@ func isTransaction(req *http.Request) (bool, error) {
|
|
|
|
|
return queryParam(req, "transaction")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type WriteResponse struct {
|
|
|
|
|
Time string
|
|
|
|
|
Success int
|
|
|
|
|
Fail int
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// The raftd server is a combination of the Raft server and an HTTP
|
|
|
|
|
// server which acts as the transport.
|
|
|
|
|
type Server struct {
|
|
|
|
|
name string
|
|
|
|
|
host string
|
|
|
|
|
port int
|
|
|
|
|
path string
|
|
|
|
|
router *mux.Router
|
|
|
|
|
raftServer raft.Server
|
|
|
|
|
httpServer *http.Server
|
|
|
|
|
db *db.DB
|
|
|
|
|
mutex sync.RWMutex
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Creates a new server.
|
|
|
|
|
func New(dataDir string, dbfile string, host string, port int) *Server {
|
|
|
|
|
s := &Server{
|
|
|
|
@ -226,8 +230,7 @@ func (s *Server) readHandler(w http.ResponseWriter, req *http.Request) {
|
|
|
|
|
func (s *Server) writeHandler(w http.ResponseWriter, req *http.Request) {
|
|
|
|
|
log.Trace("writeHandler for URL: %s", req.URL)
|
|
|
|
|
|
|
|
|
|
var nSuccess int
|
|
|
|
|
var nFail int
|
|
|
|
|
var failures = make([]FailedSqlStmt, 0)
|
|
|
|
|
var startTime time.Time
|
|
|
|
|
|
|
|
|
|
// Read the value from the POST body.
|
|
|
|
@ -242,40 +245,35 @@ func (s *Server) writeHandler(w http.ResponseWriter, req *http.Request) {
|
|
|
|
|
stmts = stmts[:len(stmts)-1]
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Execute the command against the Raft server.
|
|
|
|
|
startTime = time.Now()
|
|
|
|
|
switch {
|
|
|
|
|
case len(stmts) == 0:
|
|
|
|
|
log.Trace("Execute statement contains %d commands", len(stmts))
|
|
|
|
|
if len(stmts) == 0 {
|
|
|
|
|
log.Trace("No database execute commands supplied")
|
|
|
|
|
w.WriteHeader(http.StatusBadRequest)
|
|
|
|
|
return
|
|
|
|
|
case len(stmts) == 1:
|
|
|
|
|
log.Trace("Single statment, implicit transaction")
|
|
|
|
|
_, err = s.raftServer.Do(command.NewWriteCommand(stmts[0]))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
startTime = time.Now()
|
|
|
|
|
transaction, _ := isTransaction(req)
|
|
|
|
|
if transaction {
|
|
|
|
|
log.Trace("Transaction requested")
|
|
|
|
|
_, err = s.raftServer.Do(command.NewTransactionWriteCommandSet(stmts))
|
|
|
|
|
if err != nil {
|
|
|
|
|
nFail++
|
|
|
|
|
} else {
|
|
|
|
|
nSuccess++
|
|
|
|
|
failures = append(failures, FailedSqlStmt{stmts[0], err.Error()})
|
|
|
|
|
log.Trace("Transaction failed: %s", err.Error())
|
|
|
|
|
}
|
|
|
|
|
case len(stmts) > 1:
|
|
|
|
|
log.Trace("Multistatement, transaction possible")
|
|
|
|
|
transaction, _ := isTransaction(req)
|
|
|
|
|
if transaction {
|
|
|
|
|
log.Trace("Transaction requested")
|
|
|
|
|
_, err = s.raftServer.Do(command.NewTransactionWriteCommandSet(stmts))
|
|
|
|
|
} else {
|
|
|
|
|
log.Trace("No transaction requested")
|
|
|
|
|
for i := range stmts {
|
|
|
|
|
_, err = s.raftServer.Do(command.NewWriteCommand(stmts[i]))
|
|
|
|
|
if err != nil {
|
|
|
|
|
nFail++
|
|
|
|
|
} else {
|
|
|
|
|
nSuccess++
|
|
|
|
|
log.Trace("Execute statement %s failed: %s", stmts[i], err.Error())
|
|
|
|
|
failures = append(failures, FailedSqlStmt{stmts[i], err.Error()})
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
log.Trace("No transaction requested")
|
|
|
|
|
// Do each individually, returning JSON respoonse
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
duration := time.Since(startTime)
|
|
|
|
|
|
|
|
|
|
wr := WriteResponse{Time: duration.String(), Success: nSuccess, Fail: nFail}
|
|
|
|
|
wr := WriteResponse{Time: duration.String(), Failures: failures}
|
|
|
|
|
pretty, _ := isPretty(req)
|
|
|
|
|
if pretty {
|
|
|
|
|
b, err = json.MarshalIndent(wr, "", " ")
|
|
|
|
|