Start refactor for Hashicorp
Heavily influenced by hraftd. rqlited builds without error.master
parent
f864b15e23
commit
d7d46f79c0
@ -0,0 +1,107 @@
|
||||
/*
|
||||
rqlite -- a replicated SQLite database.
|
||||
|
||||
rqlite is a distributed system that provides a replicated SQLite database.
|
||||
rqlite is written in Go and uses Raft to achieve consensus across all the
|
||||
instances of the SQLite databases. rqlite ensures that every change made to
|
||||
the database is made to a majority of underlying SQLite files, or none-at-all.
|
||||
*/
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"flag"
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/signal"
|
||||
"runtime"
|
||||
"runtime/pprof"
|
||||
"time"
|
||||
|
||||
httpd "github.com/otoolep/rqlite/http"
|
||||
"github.com/otoolep/rqlite/store"
|
||||
)
|
||||
|
||||
var httpAddr string
|
||||
var tcpAddr string
|
||||
var raftAddr string
|
||||
var joinAddr string
|
||||
var cpuprofile string
|
||||
var disableReporting bool
|
||||
|
||||
func init() {
|
||||
flag.StringVar(&httpAddr, "http", "localhost:4001", "HTTP query server bind address")
|
||||
flag.StringVar(&tcpAddr, "tcp", "localhost:4002", "TCP query server bind address")
|
||||
flag.StringVar(&raftAddr, "raft", "localhost:4003", "Raft communication bind address")
|
||||
flag.StringVar(&joinAddr, "join", "", "host:port of leader to join")
|
||||
flag.StringVar(&cpuprofile, "cpuprofile", "", "write CPU profile to file")
|
||||
flag.BoolVar(&disableReporting, "noreport", false, "Disable anonymised launch reporting")
|
||||
flag.Usage = func() {
|
||||
fmt.Fprintf(os.Stderr, "Usage: %s [arguments] <data-path> \n", os.Args[0])
|
||||
flag.PrintDefaults()
|
||||
}
|
||||
}
|
||||
|
||||
func main() {
|
||||
flag.Parse()
|
||||
|
||||
// Ensure the data path was set.
|
||||
if flag.NArg() == 0 {
|
||||
flag.Usage()
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
dataPath := flag.Arg(0)
|
||||
|
||||
// Set up profiling, if requested.
|
||||
if cpuprofile != "" {
|
||||
log.Println("profiling enabled")
|
||||
f, err := os.Create(cpuprofile)
|
||||
if err != nil {
|
||||
log.Printf("unable to create path: %s", err.Error())
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
err = pprof.StartCPUProfile(f)
|
||||
if err != nil {
|
||||
log.Printf("unable to start CPU Profile: %s", err.Error())
|
||||
}
|
||||
|
||||
defer pprof.StopCPUProfile()
|
||||
}
|
||||
|
||||
// Create the store.
|
||||
store := store.New(dataPath, raftAddr)
|
||||
|
||||
// Create the HTTP query server.
|
||||
s := httpd.New(httpAddr, store)
|
||||
if err := s.Start(); err != nil {
|
||||
log.Printf("failed to start HTTP server: %s", err.Error())
|
||||
|
||||
}
|
||||
|
||||
if !disableReporting {
|
||||
reportLaunch()
|
||||
}
|
||||
|
||||
terminate := make(chan os.Signal, 1)
|
||||
signal.Notify(terminate, os.Interrupt)
|
||||
<-terminate
|
||||
log.Println("rqlite server stopped")
|
||||
}
|
||||
|
||||
func reportLaunch() {
|
||||
json := fmt.Sprintf(`{"os": "%s", "arch": "%s", "app": "rqlite"}`, runtime.GOOS, runtime.GOARCH)
|
||||
data := bytes.NewBufferString(json)
|
||||
client := http.Client{Timeout: time.Duration(5 * time.Second)}
|
||||
go func() {
|
||||
_, err := client.Post("https://logs-01.loggly.com/inputs/8a0edd84-92ba-46e4-ada8-c529d0f105af/tag/reporting/",
|
||||
"application/json", data)
|
||||
if err != nil {
|
||||
log.Printf("Report launch failed: %s", err.Error())
|
||||
}
|
||||
}()
|
||||
}
|
@ -1,90 +0,0 @@
|
||||
package command
|
||||
|
||||
import (
|
||||
"github.com/otoolep/raft"
|
||||
"github.com/otoolep/rqlite/db"
|
||||
|
||||
"github.com/otoolep/rqlite/log"
|
||||
)
|
||||
|
||||
// ExecuteCommand encapsulates a sqlite statement.
|
||||
type ExecuteCommand struct {
|
||||
Stmt string `json:"stmt"`
|
||||
}
|
||||
|
||||
// NewExecuteCommand creates a new Execute command.
|
||||
func NewExecuteCommand(stmt string) *ExecuteCommand {
|
||||
return &ExecuteCommand{
|
||||
Stmt: stmt,
|
||||
}
|
||||
}
|
||||
|
||||
// CommandName of the ExecuteCommand in the log.
|
||||
func (c *ExecuteCommand) CommandName() string {
|
||||
return "execute"
|
||||
}
|
||||
|
||||
// Apply executes an sqlite statement.
|
||||
func (c *ExecuteCommand) Apply(server raft.Server) (interface{}, error) {
|
||||
log.Tracef("Applying ExecuteCommand: '%s'", c.Stmt)
|
||||
db := server.Context().(*db.DB)
|
||||
return nil, db.Execute(c.Stmt)
|
||||
}
|
||||
|
||||
// TransactionExecuteCommandSet encapsulates a set of sqlite statement, which are executed
|
||||
// within a transaction.
|
||||
type TransactionExecuteCommandSet struct {
|
||||
Stmts []string `json:"stmts"`
|
||||
}
|
||||
|
||||
// NewTransactionExecuteCommandSet Creates a new set of sqlite commands, which
|
||||
// execute within a transaction.
|
||||
func NewTransactionExecuteCommandSet(stmts []string) *TransactionExecuteCommandSet {
|
||||
return &TransactionExecuteCommandSet{
|
||||
Stmts: stmts,
|
||||
}
|
||||
}
|
||||
|
||||
// CommandName of the TransactionExecute command in the log.
|
||||
func (c *TransactionExecuteCommandSet) CommandName() string {
|
||||
return "transaction_execute"
|
||||
}
|
||||
|
||||
// Apply executes a set of sqlite statements, within a transaction. All statements
|
||||
// will take effect, or none.
|
||||
func (c *TransactionExecuteCommandSet) Apply(server raft.Server) (interface{}, error) {
|
||||
log.Tracef("Applying TransactionExecuteCommandSet of size %d", len(c.Stmts))
|
||||
|
||||
commitSuccess := false
|
||||
db := server.Context().(*db.DB)
|
||||
defer func() {
|
||||
if !commitSuccess {
|
||||
err := db.RollbackTransaction()
|
||||
if err != nil {
|
||||
log.Errorf("Failed to rollback transaction: %s", err.Error())
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
err := db.StartTransaction()
|
||||
if err != nil {
|
||||
log.Errorf("Failed to start transaction: %s", err.Error())
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for i := range c.Stmts {
|
||||
err = db.Execute(c.Stmts[i])
|
||||
if err != nil {
|
||||
log.Errorf("Failed to execute statement within transaction: %s", err.Error())
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
if err = db.CommitTransaction(); err != nil {
|
||||
log.Errorf("Failed to commit transaction: %s", err.Error())
|
||||
return nil, err
|
||||
}
|
||||
|
||||
commitSuccess = true
|
||||
return nil, nil
|
||||
}
|
@ -1,37 +0,0 @@
|
||||
package command
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
. "gopkg.in/check.v1"
|
||||
)
|
||||
|
||||
// Hook up gocheck into the "go test" runner.
|
||||
func Test(t *testing.T) {
|
||||
TestingT(t)
|
||||
}
|
||||
|
||||
type ExecuteCommandSuite struct{}
|
||||
|
||||
var _ = Suite(&ExecuteCommandSuite{})
|
||||
|
||||
/*
|
||||
* ExecuteCommand tests
|
||||
*
|
||||
* These are somewhat trivial right now.
|
||||
*/
|
||||
|
||||
func (s *ExecuteCommandSuite) Test_NewExecuteCommand(c *C) {
|
||||
e := NewExecuteCommand("stmt1")
|
||||
c.Assert(e, NotNil)
|
||||
c.Assert(e.Stmt, Equals, "stmt1")
|
||||
c.Assert(e.CommandName(), Equals, "execute")
|
||||
}
|
||||
|
||||
func (s *ExecuteCommandSuite) Test_NewTransactionExecuteCommandSet(c *C) {
|
||||
e := NewTransactionExecuteCommandSet([]string{"stmt1"})
|
||||
c.Assert(e, NotNil)
|
||||
c.Assert(reflect.DeepEqual(e.Stmts, []string{"stmt1"}), Equals, true)
|
||||
c.Assert(e.CommandName(), Equals, "transaction_execute")
|
||||
}
|
@ -0,0 +1,121 @@
|
||||
// Package http provides the HTTP server for accessing the distributed database.
|
||||
// It also provides the endpoint for other nodes to join an existing cluster.
|
||||
package http
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net"
|
||||
"net/http"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// Store is the interface the Raft-driven database must implement.
|
||||
type Store interface {
|
||||
// Execute executes the set of statements, possibly within a transaction.
|
||||
Execute(stmts []string, tx bool) error
|
||||
|
||||
// Join joins the node, reachable at addr, to the cluster.
|
||||
Join(addr string) error
|
||||
}
|
||||
|
||||
// Service provides HTTP service.
|
||||
type Service struct {
|
||||
addr string
|
||||
ln net.Listener
|
||||
|
||||
store Store
|
||||
}
|
||||
|
||||
// New returns an uninitialized HTTP service.
|
||||
func New(addr string, store Store) *Service {
|
||||
return &Service{
|
||||
addr: addr,
|
||||
store: store,
|
||||
}
|
||||
}
|
||||
|
||||
// Start starts the service.
|
||||
func (s *Service) Start() error {
|
||||
server := http.Server{
|
||||
Handler: s,
|
||||
}
|
||||
|
||||
ln, err := net.Listen("tcp", s.addr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.ln = ln
|
||||
|
||||
http.Handle("/", s)
|
||||
|
||||
go func() {
|
||||
err := server.Serve(s.ln)
|
||||
if err != nil {
|
||||
log.Fatalf("HTTP serve: %s", err)
|
||||
}
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close closes the service.
|
||||
func (s *Service) Close() {
|
||||
s.ln.Close()
|
||||
return
|
||||
}
|
||||
|
||||
// ServeHTTP allows Service to serve HTTP requests.
|
||||
func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
if strings.HasPrefix(r.URL.Path, "/query") {
|
||||
s.handleQuery(w, r)
|
||||
} else if r.URL.Path == "/join" {
|
||||
s.handleJoin(w, r)
|
||||
} else {
|
||||
w.WriteHeader(http.StatusNotFound)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Service) handleJoin(w http.ResponseWriter, r *http.Request) {
|
||||
b, err := ioutil.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
m := map[string]string{}
|
||||
if err := json.Unmarshal(b, &m); err != nil {
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
if len(m) != 1 {
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
remoteAddr, ok := m["addr"]
|
||||
if !ok {
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
if err := s.store.Join(remoteAddr); err != nil {
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Service) handleQuery(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != "GET" {
|
||||
w.WriteHeader(http.StatusMethodNotAllowed)
|
||||
return
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// Addr returns the address on which the Service is listening
|
||||
func (s *Service) Addr() net.Addr {
|
||||
return s.ln.Addr()
|
||||
}
|
@ -1,8 +0,0 @@
|
||||
package interfaces
|
||||
|
||||
import "github.com/rcrowley/go-metrics"
|
||||
|
||||
// Statistics is an interface for metrics statistics
|
||||
type Statistics interface {
|
||||
GetStatistics() (metrics.Registry, error)
|
||||
}
|
@ -1,103 +0,0 @@
|
||||
package log
|
||||
|
||||
import (
|
||||
"io"
|
||||
"log"
|
||||
)
|
||||
|
||||
// Log levels
|
||||
const (
|
||||
TRACE = iota // 0
|
||||
DEBUG = iota // 1
|
||||
INFO = iota // 2
|
||||
WARN = iota // 3
|
||||
ERROR = iota // 4
|
||||
)
|
||||
|
||||
// Level set for logs
|
||||
var Level = TRACE
|
||||
|
||||
// SetLevel sets the log level
|
||||
// given a string
|
||||
func SetLevel(level string) {
|
||||
switch level {
|
||||
case "TRACE":
|
||||
Level = TRACE
|
||||
case "DEBUG":
|
||||
Level = DEBUG
|
||||
case "INFO":
|
||||
Level = INFO
|
||||
case "WARN":
|
||||
Level = WARN
|
||||
case "ERROR":
|
||||
Level = ERROR
|
||||
default:
|
||||
Level = TRACE
|
||||
}
|
||||
}
|
||||
|
||||
// SetOutput set the output destination
|
||||
// of logs
|
||||
func SetOutput(w io.Writer) {
|
||||
log.SetOutput(w)
|
||||
}
|
||||
|
||||
// Tracef writes a formatted log on TRACE level
|
||||
func Tracef(format string, v ...interface{}) {
|
||||
if Level <= TRACE {
|
||||
log.Printf("[TRACE] "+format, v...)
|
||||
}
|
||||
}
|
||||
|
||||
// Trace writes a log on TRACE level
|
||||
func Trace(s string) {
|
||||
Tracef(s)
|
||||
}
|
||||
|
||||
// Debugf writes a formatted log on DEBUG level
|
||||
func Debugf(format string, v ...interface{}) {
|
||||
if Level <= DEBUG {
|
||||
log.Printf("[DEBUG] "+format, v...)
|
||||
}
|
||||
}
|
||||
|
||||
// Debug writes a log on DEBUG level
|
||||
func Debug(s string) {
|
||||
Debugf(s)
|
||||
}
|
||||
|
||||
// Infof writes a formatted log on INFO level
|
||||
func Infof(format string, v ...interface{}) {
|
||||
if Level <= INFO {
|
||||
log.Printf("[INFO ] "+format, v...)
|
||||
}
|
||||
}
|
||||
|
||||
// Info write a log on INFO level
|
||||
func Info(s string) {
|
||||
Infof(s)
|
||||
}
|
||||
|
||||
// Warnf writes a formatted log on WARN level
|
||||
func Warnf(format string, v ...interface{}) {
|
||||
if Level <= WARN {
|
||||
log.Printf("[WARN ] "+format, v...)
|
||||
}
|
||||
}
|
||||
|
||||
// Warn write a log on WARN level
|
||||
func Warn(s string) {
|
||||
Warnf(s)
|
||||
}
|
||||
|
||||
// Errorf writes a formatted log on ERROR level
|
||||
func Errorf(format string, v ...interface{}) {
|
||||
if Level <= ERROR {
|
||||
log.Printf("[ERROR] "+format, v...)
|
||||
}
|
||||
}
|
||||
|
||||
// Error write a log on ERROR level
|
||||
func Error(s string) {
|
||||
Errorf(s)
|
||||
}
|
@ -1,131 +0,0 @@
|
||||
package log
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"io"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
. "gopkg.in/check.v1"
|
||||
)
|
||||
|
||||
// Hook up gocheck into the "go test" runner.
|
||||
func Test(t *testing.T) {
|
||||
TestingT(t)
|
||||
}
|
||||
|
||||
type LogSuite struct{}
|
||||
|
||||
var _ = Suite(&LogSuite{})
|
||||
|
||||
func (s *LogSuite) TestSetLevel(c *C) {
|
||||
SetLevel("DEBUG")
|
||||
c.Assert(Level, Equals, DEBUG)
|
||||
|
||||
SetLevel("INFO")
|
||||
c.Assert(Level, Equals, INFO)
|
||||
|
||||
SetLevel("WARN")
|
||||
c.Assert(Level, Equals, WARN)
|
||||
|
||||
SetLevel("ERROR")
|
||||
c.Assert(Level, Equals, ERROR)
|
||||
|
||||
SetLevel("TRACE")
|
||||
c.Assert(Level, Equals, TRACE)
|
||||
}
|
||||
|
||||
func (s *LogSuite) TestLog(c *C) {
|
||||
logFile, err := os.OpenFile("tmptestfile.txt", os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
|
||||
c.Check(err, IsNil)
|
||||
defer logFile.Close()
|
||||
defer os.Remove("tmptestfile.txt")
|
||||
|
||||
SetOutput(logFile)
|
||||
|
||||
num, err := lineCounter(logFile)
|
||||
c.Check(num, Equals, 0)
|
||||
c.Check(err, IsNil)
|
||||
|
||||
tmpFile, _ := os.Open("tmptestfile.txt")
|
||||
Level = ERROR
|
||||
Trace("a")
|
||||
Debug("a")
|
||||
Info("a")
|
||||
Warn("a")
|
||||
Error("a")
|
||||
num, err = lineCounter(tmpFile)
|
||||
c.Check(num, Equals, 1)
|
||||
c.Check(err, IsNil)
|
||||
tmpFile.Close()
|
||||
|
||||
tmpFile, _ = os.Open("tmptestfile.txt")
|
||||
Level = WARN
|
||||
Trace("a")
|
||||
Debug("a")
|
||||
Info("a")
|
||||
Warn("a")
|
||||
Error("a")
|
||||
num, err = lineCounter(tmpFile)
|
||||
c.Check(num, Equals, 3)
|
||||
c.Check(err, IsNil)
|
||||
tmpFile.Close()
|
||||
|
||||
tmpFile, _ = os.Open("tmptestfile.txt")
|
||||
Level = INFO
|
||||
Trace("a")
|
||||
Debug("a")
|
||||
Info("a")
|
||||
Warn("a")
|
||||
Error("a")
|
||||
num, err = lineCounter(tmpFile)
|
||||
c.Check(num, Equals, 6)
|
||||
c.Check(err, IsNil)
|
||||
tmpFile.Close()
|
||||
|
||||
tmpFile, _ = os.Open("tmptestfile.txt")
|
||||
Level = DEBUG
|
||||
Trace("a")
|
||||
Debug("a")
|
||||
Info("a")
|
||||
Warn("a")
|
||||
Error("a")
|
||||
num, err = lineCounter(tmpFile)
|
||||
c.Check(num, Equals, 10)
|
||||
c.Check(err, IsNil)
|
||||
tmpFile.Close()
|
||||
|
||||
tmpFile, _ = os.Open("tmptestfile.txt")
|
||||
Level = TRACE
|
||||
Trace("a")
|
||||
Debug("a")
|
||||
Info("a")
|
||||
Warn("a")
|
||||
Error("a")
|
||||
num, err = lineCounter(tmpFile)
|
||||
c.Check(num, Equals, 15)
|
||||
c.Check(err, IsNil)
|
||||
tmpFile.Close()
|
||||
}
|
||||
|
||||
// Taken from http://stackoverflow.com/a/24563853/1187471
|
||||
func lineCounter(r io.Reader) (int, error) {
|
||||
buf := make([]byte, 8196)
|
||||
count := 0
|
||||
lineSep := []byte{'\n'}
|
||||
|
||||
for {
|
||||
c, err := r.Read(buf)
|
||||
if err != nil && err != io.EOF {
|
||||
return count, err
|
||||
}
|
||||
|
||||
count += bytes.Count(buf[:c], lineSep)
|
||||
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return count, nil
|
||||
}
|
@ -1,160 +0,0 @@
|
||||
/*
|
||||
rqlite -- a replicated SQLite database.
|
||||
|
||||
rqlite is a distributed system that provides a replicated SQLite database.
|
||||
rqlite is written in Go and uses Raft to achieve consensus across all the
|
||||
instances of the SQLite databases. rqlite ensures that every change made to
|
||||
the database is made to a majority of underlying SQLite files, or none-at-all.
|
||||
*/
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"flag"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/signal"
|
||||
"os/user"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"runtime/pprof"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/otoolep/rqlite/log"
|
||||
"github.com/otoolep/rqlite/server"
|
||||
)
|
||||
|
||||
var host string
|
||||
var port int
|
||||
var join string
|
||||
var dbfile string
|
||||
var cpuprofile string
|
||||
var logFile string
|
||||
var logLevel string
|
||||
var snapAfter int
|
||||
var disableReporting bool
|
||||
|
||||
func init() {
|
||||
flag.StringVar(&host, "h", "localhost", "hostname")
|
||||
flag.IntVar(&port, "p", 4001, "port")
|
||||
flag.StringVar(&join, "join", "", "host:port of leader to join")
|
||||
flag.StringVar(&dbfile, "dbfile", "db.sqlite", "sqlite filename")
|
||||
flag.StringVar(&cpuprofile, "cpuprofile", "", "write CPU profile to file")
|
||||
flag.StringVar(&logFile, "logfile", "stdout", "log file path")
|
||||
flag.StringVar(&logLevel, "loglevel", "INFO", "log level (ERROR|WARN|INFO|DEBUG|TRACE)")
|
||||
flag.IntVar(&snapAfter, "s", 100, "Snapshot and compact after this number of new log entries")
|
||||
flag.BoolVar(&disableReporting, "noreport", false, "Disable anonymised launch reporting")
|
||||
flag.Usage = func() {
|
||||
fmt.Fprintf(os.Stderr, "Usage: %s [arguments] <data-path> \n", os.Args[0])
|
||||
flag.PrintDefaults()
|
||||
}
|
||||
}
|
||||
|
||||
func main() {
|
||||
flag.Parse()
|
||||
|
||||
// Set up profiling, if requested.
|
||||
if cpuprofile != "" {
|
||||
log.Info("Profiling enabled")
|
||||
f, err := os.Create(cpuprofile)
|
||||
if err != nil {
|
||||
log.Errorf("Unable to create path: %s", err.Error())
|
||||
}
|
||||
defer closeFile(f)
|
||||
|
||||
err = pprof.StartCPUProfile(f)
|
||||
if err != nil {
|
||||
log.Errorf("Unable to start CPU Profile: %s", err.Error())
|
||||
}
|
||||
|
||||
defer pprof.StopCPUProfile()
|
||||
}
|
||||
|
||||
// Set logging
|
||||
log.SetLevel(logLevel)
|
||||
if logFile != "stdout" {
|
||||
f := createFile(logFile)
|
||||
defer closeFile(f)
|
||||
|
||||
log.Infof("Redirecting logging to %s", logFile)
|
||||
log.SetOutput(f)
|
||||
}
|
||||
|
||||
// Set the data directory.
|
||||
if flag.NArg() == 0 {
|
||||
flag.Usage()
|
||||
println("Data path argument required")
|
||||
log.Error("No data path supplied -- aborting")
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
dataPath := flag.Arg(0)
|
||||
createDir(dataPath)
|
||||
|
||||
s := server.NewServer(dataPath, dbfile, snapAfter, host, port)
|
||||
go func() {
|
||||
log.Error(s.ListenAndServe(join).Error())
|
||||
}()
|
||||
|
||||
if !disableReporting {
|
||||
reportLaunch()
|
||||
}
|
||||
|
||||
terminate := make(chan os.Signal, 1)
|
||||
signal.Notify(terminate, os.Interrupt)
|
||||
<-terminate
|
||||
log.Info("rqlite server stopped")
|
||||
}
|
||||
|
||||
func closeFile(f *os.File) {
|
||||
if err := f.Close(); err != nil {
|
||||
log.Errorf("Unable to close file: %s", err.Error())
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
func createFile(path string) *os.File {
|
||||
usr, _ := user.Current()
|
||||
dir := usr.HomeDir
|
||||
|
||||
// Check in case of paths like "/something/~/something/"
|
||||
if path[:2] == "~/" {
|
||||
path = strings.Replace(path, "~/", dir+"/", 1)
|
||||
}
|
||||
|
||||
if err := os.MkdirAll(filepath.Dir(path), 0744); err != nil {
|
||||
log.Errorf("Unable to create path: %s", err.Error())
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
|
||||
if err != nil && !strings.Contains(err.Error(), "is a directory") {
|
||||
log.Errorf("Unable to open file: %s", err.Error())
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
return f
|
||||
}
|
||||
|
||||
func createDir(path string) {
|
||||
if err := os.MkdirAll(path, 0744); err != nil {
|
||||
log.Errorf("Unable to create path: %s", err.Error())
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
func reportLaunch() {
|
||||
json := fmt.Sprintf(`{"os": "%s", "arch": "%s", "app": "rqlite"}`, runtime.GOOS, runtime.GOARCH)
|
||||
data := bytes.NewBufferString(json)
|
||||
client := http.Client{Timeout: time.Duration(5 * time.Second)}
|
||||
go func() {
|
||||
_, err := client.Post("https://logs-01.loggly.com/inputs/8a0edd84-92ba-46e4-ada8-c529d0f105af/tag/reporting/",
|
||||
"application/json", data)
|
||||
if err != nil {
|
||||
log.Errorf("Report launch failed: %s", err.Error())
|
||||
}
|
||||
}()
|
||||
}
|
@ -1,633 +0,0 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"math/rand"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/otoolep/raft"
|
||||
"github.com/otoolep/rqlite/command"
|
||||
"github.com/otoolep/rqlite/db"
|
||||
"github.com/otoolep/rqlite/interfaces"
|
||||
"github.com/otoolep/rqlite/log"
|
||||
"github.com/rcrowley/go-metrics"
|
||||
)
|
||||
|
||||
// FailedSqlStmt contains a SQL query and an error.
|
||||
type FailedSqlStmt struct {
|
||||
Sql string `json:"sql"`
|
||||
Error string `json:"error"`
|
||||
}
|
||||
|
||||
// StmtResponse contains a date and a list of failed
|
||||
// SQL statements.
|
||||
type StmtResponse struct {
|
||||
Time string `json:"time,omitempty"`
|
||||
Failures []FailedSqlStmt `json:"failures"`
|
||||
}
|
||||
|
||||
// QueryResponse contains the response to a query.
|
||||
type QueryResponse struct {
|
||||
Time string `json:"time,omitempty"`
|
||||
Failures []FailedSqlStmt `json:"failures"`
|
||||
Rows db.RowResults `json:"rows"`
|
||||
}
|
||||
|
||||
// Metrics are the server metrics user for statistics.
|
||||
type Metrics struct {
|
||||
registry metrics.Registry
|
||||
joinSuccess metrics.Counter
|
||||
joinFail metrics.Counter
|
||||
queryReceived metrics.Counter
|
||||
querySuccess metrics.Counter
|
||||
queryFail metrics.Counter
|
||||
executeReceived metrics.Counter
|
||||
executeTxReceived metrics.Counter
|
||||
executeSuccess metrics.Counter
|
||||
executeFail metrics.Counter
|
||||
snapshotCreated metrics.Counter
|
||||
}
|
||||
|
||||
// Diagnostics contains a start time of the server.
|
||||
type Diagnostics struct {
|
||||
startTime time.Time
|
||||
}
|
||||
|
||||
// SnapshotConf contains the index when the last snapshot happened
|
||||
// and a threshold for index entries since the last snapshot.
|
||||
type SnapshotConf struct {
|
||||
// The index when the last snapshot happened
|
||||
lastIndex uint64
|
||||
|
||||
// If the incremental number of index entries since the last
|
||||
// snapshot exceeds snapshotAfter rqlite will do a snapshot
|
||||
snapshotAfter uint64
|
||||
}
|
||||
|
||||
// Server is is a combination of the Raft server and an HTTP
|
||||
// server which acts as the transport.
|
||||
type Server struct {
|
||||
name string
|
||||
host string
|
||||
port int
|
||||
path string
|
||||
router *mux.Router
|
||||
raftServer raft.Server
|
||||
httpServer *http.Server
|
||||
dbPath string
|
||||
db *db.DB
|
||||
snapConf *SnapshotConf
|
||||
metrics *Metrics
|
||||
diagnostics *Diagnostics
|
||||
mutex sync.Mutex
|
||||
}
|
||||
|
||||
// ensurePrettyPrint returns a JSON representation of the object o. If
|
||||
// the HTTP request requested pretty-printing, it ensures that happens.
|
||||
func ensurePrettyPrint(req *http.Request, o map[string]interface{}) []byte {
|
||||
var b []byte
|
||||
pretty, _ := isPretty(req)
|
||||
if pretty {
|
||||
b, _ = json.MarshalIndent(o, "", " ")
|
||||
} else {
|
||||
b, _ = json.Marshal(o)
|
||||
}
|
||||
return b
|
||||
}
|
||||
|
||||
// queryParam returns whether the given query param is set to true.
|
||||
func queryParam(req *http.Request, param string) (bool, error) {
|
||||
err := req.ParseForm()
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if _, ok := req.Form[param]; ok {
|
||||
return true, nil
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// stmtParam returns the value for URL param 'q', if present.
|
||||
func stmtParam(req *http.Request) (string, error) {
|
||||
q := req.URL.Query()
|
||||
stmt := strings.TrimSpace(q.Get("q"))
|
||||
if stmt == "" {
|
||||
return "", fmt.Errorf(`required parameter 'q' is missing`)
|
||||
}
|
||||
return stmt, nil
|
||||
}
|
||||
|
||||
// isPretty returns whether the HTTP response body should be pretty-printed.
|
||||
func isPretty(req *http.Request) (bool, error) {
|
||||
return queryParam(req, "pretty")
|
||||
}
|
||||
|
||||
// isExplain returns whether the HTTP response body should contain metainformation
|
||||
// how request processing.
|
||||
func isExplain(req *http.Request) (bool, error) {
|
||||
return queryParam(req, "explain")
|
||||
}
|
||||
|
||||
// isTransaction returns whether the client requested an explicit
|
||||
// transaction for the request.
|
||||
func isTransaction(req *http.Request) (bool, error) {
|
||||
return queryParam(req, "transaction")
|
||||
}
|
||||
|
||||
// NewMetrics creates a new Metrics object.
|
||||
func NewMetrics() *Metrics {
|
||||
m := &Metrics{
|
||||
registry: metrics.NewRegistry(),
|
||||
joinSuccess: metrics.NewCounter(),
|
||||
joinFail: metrics.NewCounter(),
|
||||
queryReceived: metrics.NewCounter(),
|
||||
querySuccess: metrics.NewCounter(),
|
||||
queryFail: metrics.NewCounter(),
|
||||
executeReceived: metrics.NewCounter(),
|
||||
executeTxReceived: metrics.NewCounter(),
|
||||
executeSuccess: metrics.NewCounter(),
|
||||
executeFail: metrics.NewCounter(),
|
||||
snapshotCreated: metrics.NewCounter(),
|
||||
}
|
||||
|
||||
_ = m.registry.Register("join_success", m.joinSuccess)
|
||||
_ = m.registry.Register("join_fail", m.joinFail)
|
||||
_ = m.registry.Register("query_received", m.queryReceived)
|
||||
_ = m.registry.Register("query_success", m.querySuccess)
|
||||
_ = m.registry.Register("query_fail", m.queryFail)
|
||||
_ = m.registry.Register("execute_received", m.executeReceived)
|
||||
_ = m.registry.Register("execute_tx_received", m.executeTxReceived)
|
||||
_ = m.registry.Register("execute_success", m.executeSuccess)
|
||||
_ = m.registry.Register("execute_fail", m.executeFail)
|
||||
_ = m.registry.Register("snapshot_created", m.snapshotCreated)
|
||||
|
||||
return m
|
||||
}
|
||||
|
||||
// NewDiagnostics creates a new Diagnostics object.
|
||||
func NewDiagnostics() *Diagnostics {
|
||||
d := &Diagnostics{
|
||||
startTime: time.Now(),
|
||||
}
|
||||
return d
|
||||
}
|
||||
|
||||
// NewServer creates a new server.
|
||||
func NewServer(dataDir string, dbfile string, snapAfter int, host string, port int) *Server {
|
||||
dbPath := path.Join(dataDir, dbfile)
|
||||
|
||||
// Raft requires randomness.
|
||||
rand.Seed(time.Now().UnixNano())
|
||||
log.Info("Raft random seed initialized")
|
||||
|
||||
// Setup commands.
|
||||
raft.RegisterCommand(&command.ExecuteCommand{})
|
||||
raft.RegisterCommand(&command.TransactionExecuteCommandSet{})
|
||||
log.Info("Raft commands registered")
|
||||
|
||||
s := &Server{
|
||||
host: host,
|
||||
port: port,
|
||||
path: dataDir,
|
||||
dbPath: dbPath,
|
||||
db: db.New(dbPath),
|
||||
snapConf: &SnapshotConf{snapshotAfter: uint64(snapAfter)},
|
||||
metrics: NewMetrics(),
|
||||
diagnostics: NewDiagnostics(),
|
||||
router: mux.NewRouter(),
|
||||
}
|
||||
|
||||
// Read existing name or generate a new one.
|
||||
if b, err := ioutil.ReadFile(filepath.Join(dataDir, "name")); err == nil {
|
||||
s.name = string(b)
|
||||
} else {
|
||||
s.name = fmt.Sprintf("%07x", rand.Int())[0:7]
|
||||
if err = ioutil.WriteFile(filepath.Join(dataDir, "name"), []byte(s.name), 0644); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
return s
|
||||
}
|
||||
|
||||
// GetStatistics returns an object storing statistics, which supports JSON
|
||||
// marshalling.
|
||||
func (s *Server) GetStatistics() (metrics.Registry, error) {
|
||||
return s.metrics.registry, nil
|
||||
}
|
||||
|
||||
// connectionString returns the string used to connect to this server.
|
||||
func (s *Server) connectionString() string {
|
||||
return fmt.Sprintf("http://%s:%d", s.host, s.port)
|
||||
}
|
||||
|
||||
// logSnapshot logs about the snapshot that was taken.
|
||||
func (s *Server) logSnapshot(err error, currentIndex, count uint64) {
|
||||
info := fmt.Sprintf("%s: snapshot of %d events at index %d", s.connectionString(), count, currentIndex)
|
||||
if err != nil {
|
||||
log.Infof("%s attempted and failed: %v", info, err)
|
||||
} else {
|
||||
log.Infof("%s completed", info)
|
||||
}
|
||||
}
|
||||
|
||||
// ListenAndServe starts the server.
|
||||
func (s *Server) ListenAndServe(leader string) error {
|
||||
var err error
|
||||
|
||||
log.Infof("Initializing Raft Server: %s", s.path)
|
||||
|
||||
// Initialize and start Raft server.
|
||||
transporter := raft.NewHTTPTransporter("/raft", 200*time.Millisecond)
|
||||
stateMachine := NewDbStateMachine(s.dbPath)
|
||||
s.raftServer, err = raft.NewServer(s.name, s.path, transporter, stateMachine, s.db, "")
|
||||
if err != nil {
|
||||
log.Errorf("Failed to create new Raft server: %s", err.Error())
|
||||
return err
|
||||
}
|
||||
|
||||
log.Info("Loading latest snapshot, if any, from disk")
|
||||
if err := s.raftServer.LoadSnapshot(); err != nil && os.IsNotExist(err) {
|
||||
log.Info("no snapshot found")
|
||||
} else if err != nil {
|
||||
log.Errorf("Error loading snapshot: %s", err.Error())
|
||||
}
|
||||
|
||||
transporter.Install(s.raftServer, s)
|
||||
if err := s.raftServer.Start(); err != nil {
|
||||
log.Errorf("Error starting raft server: %s", err.Error())
|
||||
}
|
||||
|
||||
if leader != "" {
|
||||
// Join to leader if specified.
|
||||
|
||||
log.Infof("Attempting to join leader at %s", leader)
|
||||
|
||||
if !s.raftServer.IsLogEmpty() {
|
||||
log.Error("Cannot join with an existing log")
|
||||
return errors.New("Cannot join with an existing log")
|
||||
}
|
||||
if err := s.Join(leader); err != nil {
|
||||
log.Errorf("Failed to join leader: %s", err.Error())
|
||||
return err
|
||||
}
|
||||
|
||||
} else if s.raftServer.IsLogEmpty() {
|
||||
// Initialize the server by joining itself.
|
||||
|
||||
log.Info("Initializing new cluster")
|
||||
|
||||
_, err := s.raftServer.Do(&raft.DefaultJoinCommand{
|
||||
Name: s.raftServer.Name(),
|
||||
ConnectionString: s.connectionString(),
|
||||
})
|
||||
if err != nil {
|
||||
log.Errorf("Failed to join to self: %s", err.Error())
|
||||
}
|
||||
|
||||
} else {
|
||||
log.Info("Recovered from log")
|
||||
}
|
||||
|
||||
log.Info("Initializing HTTP server")
|
||||
|
||||
// Initialize and start HTTP server.
|
||||
s.httpServer = &http.Server{
|
||||
Addr: fmt.Sprintf(":%d", s.port),
|
||||
Handler: s.router,
|
||||
}
|
||||
|
||||
s.router.HandleFunc("/statistics", s.serveStatistics).Methods("GET")
|
||||
s.router.HandleFunc("/diagnostics", s.serveDiagnostics).Methods("GET")
|
||||
s.router.HandleFunc("/raft", s.serveRaftInfo).Methods("GET")
|
||||
s.router.HandleFunc("/db", s.readHandler).Methods("GET")
|
||||
s.router.HandleFunc("/db", s.writeHandler).Methods("POST")
|
||||
s.router.HandleFunc("/join", s.joinHandler).Methods("POST")
|
||||
|
||||
log.Infof("Listening at %s", s.connectionString())
|
||||
|
||||
return s.httpServer.ListenAndServe()
|
||||
}
|
||||
|
||||
// HandleFunc is a hack around Gorilla mux not providing the correct net/http
|
||||
// HandleFunc() interface.
|
||||
func (s *Server) HandleFunc(pattern string, handler func(http.ResponseWriter, *http.Request)) {
|
||||
s.router.HandleFunc(pattern, handler)
|
||||
}
|
||||
|
||||
// Join joins to the leader of an existing cluster.
|
||||
func (s *Server) Join(leader string) error {
|
||||
command := &raft.DefaultJoinCommand{
|
||||
Name: s.raftServer.Name(),
|
||||
ConnectionString: s.connectionString(),
|
||||
}
|
||||
|
||||
var b bytes.Buffer
|
||||
if err := json.NewEncoder(&b).Encode(command); err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
resp, err := http.Post(fmt.Sprintf("http://%s/join", leader), "application/json", &b)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
_ = resp.Body.Close()
|
||||
}()
|
||||
|
||||
// Look for redirect.
|
||||
if resp.StatusCode == http.StatusTemporaryRedirect {
|
||||
leader := resp.Header.Get("Location")
|
||||
if leader == "" {
|
||||
return errors.New("Redirect requested, but no location header supplied")
|
||||
}
|
||||
u, err := url.Parse(leader)
|
||||
if err != nil {
|
||||
return errors.New("Failed to parse redirect location")
|
||||
}
|
||||
log.Infof("Redirecting to leader at %s", u.Host)
|
||||
return s.Join(u.Host)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Server) joinHandler(w http.ResponseWriter, req *http.Request) {
|
||||
s.mutex.Lock()
|
||||
defer s.mutex.Unlock()
|
||||
|
||||
if s.raftServer.State() != "leader" {
|
||||
s.leaderRedirect(w, req)
|
||||
return
|
||||
}
|
||||
|
||||
command := &raft.DefaultJoinCommand{}
|
||||
|
||||
if err := json.NewDecoder(req.Body).Decode(&command); err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
s.metrics.joinFail.Inc(1)
|
||||
return
|
||||
}
|
||||
if _, err := s.raftServer.Do(command); err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
s.metrics.joinFail.Inc(1)
|
||||
return
|
||||
}
|
||||
s.metrics.joinSuccess.Inc(1)
|
||||
}
|
||||
|
||||
func (s *Server) readHandler(w http.ResponseWriter, req *http.Request) {
|
||||
log.Tracef("readHandler for URL: %s", req.URL)
|
||||
s.metrics.queryReceived.Inc(1)
|
||||
|
||||
var failures = make([]FailedSqlStmt, 0)
|
||||
|
||||
// Get the query statement
|
||||
stmt, err := stmtParam(req)
|
||||
if err != nil {
|
||||
log.Tracef("Bad HTTP request: %s", err.Error())
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
s.metrics.queryFail.Inc(1)
|
||||
return
|
||||
}
|
||||
|
||||
startTime := time.Now()
|
||||
r, err := s.db.Query(stmt)
|
||||
if err != nil {
|
||||
log.Tracef("Bad SQL statement: %s", err.Error())
|
||||
s.metrics.queryFail.Inc(1)
|
||||
failures = append(failures, FailedSqlStmt{stmt, err.Error()})
|
||||
} else {
|
||||
s.metrics.querySuccess.Inc(1)
|
||||
}
|
||||
duration := time.Since(startTime)
|
||||
|
||||
rr := QueryResponse{Failures: failures, Rows: r}
|
||||
if e, _ := isExplain(req); e {
|
||||
rr.Time = duration.String()
|
||||
}
|
||||
|
||||
pretty, _ := isPretty(req)
|
||||
var b []byte
|
||||
if pretty {
|
||||
b, err = json.MarshalIndent(rr, "", " ")
|
||||
} else {
|
||||
b, err = json.Marshal(rr)
|
||||
}
|
||||
if err != nil {
|
||||
log.Tracef("Failed to marshal JSON data: %s", err.Error())
|
||||
http.Error(w, err.Error(), http.StatusBadRequest) // Internal error actually
|
||||
} else {
|
||||
_, err = w.Write([]byte(b))
|
||||
if err != nil {
|
||||
log.Errorf("Error writting JSON data: %s", err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) execute(tx bool, stmts []string) ([]FailedSqlStmt, error) {
|
||||
var failures = make([]FailedSqlStmt, 0)
|
||||
|
||||
if tx {
|
||||
log.Trace("Transaction requested")
|
||||
s.metrics.executeTxReceived.Inc(1)
|
||||
|
||||
_, err := s.raftServer.Do(command.NewTransactionExecuteCommandSet(stmts))
|
||||
if err != nil {
|
||||
log.Tracef("Transaction failed: %s", err.Error())
|
||||
s.metrics.executeFail.Inc(1)
|
||||
failures = append(failures, FailedSqlStmt{stmts[0], err.Error()})
|
||||
} else {
|
||||
s.metrics.executeSuccess.Inc(1)
|
||||
}
|
||||
} else {
|
||||
log.Trace("No transaction requested")
|
||||
for i := range stmts {
|
||||
_, err := s.raftServer.Do(command.NewExecuteCommand(stmts[i]))
|
||||
if err != nil {
|
||||
log.Tracef("Execute statement %s failed: %s", stmts[i], err.Error())
|
||||
s.metrics.executeFail.Inc(1)
|
||||
failures = append(failures, FailedSqlStmt{stmts[i], err.Error()})
|
||||
} else {
|
||||
s.metrics.executeSuccess.Inc(1)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
return failures, nil
|
||||
}
|
||||
|
||||
func (s *Server) writeHandler(w http.ResponseWriter, req *http.Request) {
|
||||
s.mutex.Lock()
|
||||
defer s.mutex.Unlock()
|
||||
|
||||
if s.raftServer.State() != "leader" {
|
||||
s.leaderRedirect(w, req)
|
||||
return
|
||||
}
|
||||
|
||||
log.Tracef("writeHandler for URL: %s", req.URL)
|
||||
s.metrics.executeReceived.Inc(1)
|
||||
|
||||
currentIndex := s.raftServer.CommitIndex()
|
||||
count := currentIndex - s.snapConf.lastIndex
|
||||
if uint64(count) > s.snapConf.snapshotAfter {
|
||||
log.Info("Committed log entries snapshot threshold reached, starting snapshot")
|
||||
err := s.raftServer.TakeSnapshot()
|
||||
s.logSnapshot(err, currentIndex, count)
|
||||
s.snapConf.lastIndex = currentIndex
|
||||
s.metrics.snapshotCreated.Inc(1)
|
||||
}
|
||||
|
||||
// Read the value from the POST body.
|
||||
b, err := ioutil.ReadAll(req.Body)
|
||||
if err != nil {
|
||||
log.Tracef("Bad HTTP request: %s", err.Error())
|
||||
s.metrics.executeFail.Inc(1)
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
stmts := strings.Split(string(b), ";")
|
||||
if stmts[len(stmts)-1] == "" {
|
||||
stmts = stmts[:len(stmts)-1]
|
||||
}
|
||||
|
||||
log.Tracef("Execute statement contains %d commands", len(stmts))
|
||||
if len(stmts) == 0 {
|
||||
log.Trace("No database execute commands supplied")
|
||||
s.metrics.executeFail.Inc(1)
|
||||
w.WriteHeader(http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
transaction, _ := isTransaction(req)
|
||||
startTime := time.Now()
|
||||
failures, err := s.execute(transaction, stmts)
|
||||
if err != nil {
|
||||
log.Errorf("Database mutation failed: %s", err.Error())
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
duration := time.Since(startTime)
|
||||
|
||||
wr := StmtResponse{Failures: failures}
|
||||
if e, _ := isExplain(req); e {
|
||||
wr.Time = duration.String()
|
||||
}
|
||||
|
||||
pretty, _ := isPretty(req)
|
||||
if pretty {
|
||||
b, err = json.MarshalIndent(wr, "", " ")
|
||||
} else {
|
||||
b, err = json.Marshal(wr)
|
||||
}
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
} else {
|
||||
_, err = w.Write([]byte(b))
|
||||
if err != nil {
|
||||
log.Errorf("Error writting JSON data: %s", err.Error())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// serveStatistics returns the statistics for the program
|
||||
func (s *Server) serveStatistics(w http.ResponseWriter, req *http.Request) {
|
||||
statistics := make(map[string]interface{})
|
||||
resources := map[string]interfaces.Statistics{"server": s}
|
||||
for k, v := range resources {
|
||||
s, err := v.GetStatistics()
|
||||
if err != nil {
|
||||
log.Error("failed to get " + k + " stats")
|
||||
http.Error(w, "failed to get "+k+" stats", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
statistics[k] = s
|
||||
}
|
||||
|
||||
_, err := w.Write(ensurePrettyPrint(req, statistics))
|
||||
if err != nil {
|
||||
log.Error("failed to serve stats")
|
||||
http.Error(w, "failed to serve stats", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// serveDiagnostics returns basic server diagnostics
|
||||
func (s *Server) serveDiagnostics(w http.ResponseWriter, req *http.Request) {
|
||||
diagnostics := make(map[string]interface{})
|
||||
diagnostics["started"] = s.diagnostics.startTime.String()
|
||||
diagnostics["uptime"] = time.Since(s.diagnostics.startTime).String()
|
||||
diagnostics["host"] = s.host
|
||||
diagnostics["port"] = s.port
|
||||
diagnostics["data"] = s.path
|
||||
diagnostics["database"] = s.dbPath
|
||||
diagnostics["connection"] = s.connectionString()
|
||||
diagnostics["snapafter"] = s.snapConf.snapshotAfter
|
||||
diagnostics["snapindex"] = s.snapConf.lastIndex
|
||||
|
||||
_, err := w.Write(ensurePrettyPrint(req, diagnostics))
|
||||
if err != nil {
|
||||
log.Error("failed to serve diagnostics")
|
||||
http.Error(w, "failed to serve diagnostics", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// serveRaftInfo returns information about the underlying Raft server
|
||||
func (s *Server) serveRaftInfo(w http.ResponseWriter, req *http.Request) {
|
||||
var peers []interface{}
|
||||
for _, v := range s.raftServer.Peers() {
|
||||
peers = append(peers, v)
|
||||
}
|
||||
|
||||
info := make(map[string]interface{})
|
||||
info["name"] = s.raftServer.Name()
|
||||
info["state"] = s.raftServer.State()
|
||||
info["leader"] = s.raftServer.Leader()
|
||||
info["peers"] = peers
|
||||
|
||||
_, err := w.Write(ensurePrettyPrint(req, info))
|
||||
if err != nil {
|
||||
log.Error("failed to serve raft info")
|
||||
http.Error(w, "failed to serve raft info", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// leaderRedirect returns a 307 Temporary Redirect, with the full path
|
||||
// to the leader.
|
||||
func (s *Server) leaderRedirect(w http.ResponseWriter, r *http.Request) {
|
||||
peers := s.raftServer.Peers()
|
||||
leader := peers[s.raftServer.Leader()]
|
||||
|
||||
if leader == nil {
|
||||
// No leader available, give up.
|
||||
log.Error("attempted leader redirection, but no leader available")
|
||||
w.WriteHeader(http.StatusServiceUnavailable)
|
||||
w.Write([]byte("no leader available"))
|
||||
return
|
||||
}
|
||||
|
||||
var u string
|
||||
for _, p := range peers {
|
||||
if p.Name == leader.Name {
|
||||
u = p.ConnectionString
|
||||
break
|
||||
}
|
||||
}
|
||||
http.Redirect(w, r, u+r.URL.Path, http.StatusTemporaryRedirect)
|
||||
}
|
@ -1,117 +0,0 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
. "gopkg.in/check.v1"
|
||||
)
|
||||
|
||||
const (
|
||||
host = "localhost"
|
||||
port = 4001
|
||||
snapAfter = 1000
|
||||
dbfile = "rqlite-test"
|
||||
)
|
||||
|
||||
type SingleServerSuite struct{}
|
||||
|
||||
var _ = Suite(&SingleServerSuite{})
|
||||
|
||||
func getEndpoint(endpoint string) (*http.Response, error) {
|
||||
url := fmt.Sprintf("http://%s:%d%s", host, port, endpoint)
|
||||
return http.Get(url)
|
||||
}
|
||||
|
||||
func getEndpointQuery(endpoint string, query string) (*http.Response, error) {
|
||||
q := url.Values{"q": []string{query}}
|
||||
v, _ := url.Parse(fmt.Sprintf("http://%s:%d%s", host, port, endpoint))
|
||||
v.RawQuery = q.Encode()
|
||||
|
||||
req, err := http.Get(v.String())
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return req, nil
|
||||
}
|
||||
|
||||
func postEndpoint(endpoint string, body string) (*http.Response, error) {
|
||||
var jsonStr = []byte(body)
|
||||
url := fmt.Sprintf("http://%s:%d%s", host, port, endpoint)
|
||||
req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonStr))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
|
||||
client := &http.Client{}
|
||||
return client.Do(req)
|
||||
}
|
||||
|
||||
func isJSONBody(res *http.Response) bool {
|
||||
b, err := ioutil.ReadAll(res.Body)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
var o interface{}
|
||||
err = json.Unmarshal(b, &o)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func (s *SingleServerSuite) Test_SingleServer(c *C) {
|
||||
dir, err := ioutil.TempDir("", "rqlite-test-")
|
||||
c.Assert(err, IsNil)
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
server := NewServer(dir, dbfile, snapAfter, host, port)
|
||||
c.Assert(server, NotNil)
|
||||
go func() { server.ListenAndServe("") }()
|
||||
|
||||
// Wait to ensure server is up. This is not ideal, and the server should
|
||||
// really use a channel to flag it is ready.
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
// Sanity-check admin API endpoints
|
||||
var res *http.Response
|
||||
res, err = getEndpoint("/statistics")
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(res.StatusCode, Equals, 200)
|
||||
c.Assert(isJSONBody(res), Equals, true)
|
||||
|
||||
res, err = getEndpoint("/diagnostics")
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(res.StatusCode, Equals, 200)
|
||||
c.Assert(isJSONBody(res), Equals, true)
|
||||
|
||||
res, err = getEndpoint("/raft")
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(res.StatusCode, Equals, 200)
|
||||
c.Assert(isJSONBody(res), Equals, true)
|
||||
|
||||
// Create a database.
|
||||
res, err = postEndpoint("/db", "CREATE TABLE foo (id integer not null primary key, name text)")
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(res.StatusCode, Equals, 200)
|
||||
|
||||
// Data write.
|
||||
res, err = postEndpoint("/db", "INSERT INTO foo(name) VALUES(\"fiona\")")
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(res.StatusCode, Equals, 200)
|
||||
|
||||
// Data read
|
||||
res, err = getEndpointQuery("/db", "SELECT * from foo")
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(res.StatusCode, Equals, 200)
|
||||
c.Assert(isJSONBody(res), Equals, true)
|
||||
}
|
@ -1,51 +0,0 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"os"
|
||||
|
||||
"github.com/otoolep/rqlite/log"
|
||||
)
|
||||
|
||||
// DbStateMachine contains the DB path.
|
||||
type DbStateMachine struct {
|
||||
dbpath string
|
||||
}
|
||||
|
||||
// NewDbStateMachine returns a StateMachine for capturing and restoring
|
||||
// the state of an sqlite database.
|
||||
func NewDbStateMachine(path string) *DbStateMachine {
|
||||
d := &DbStateMachine{
|
||||
dbpath: path,
|
||||
}
|
||||
log.Tracef("New DB state machine created with path: %s", path)
|
||||
return d
|
||||
}
|
||||
|
||||
// Save captures the state of the database. The caller must ensure that
|
||||
// no transaction is taking place during this call.
|
||||
//
|
||||
// http://sqlite.org/howtocorrupt.html states it is safe to do this
|
||||
// as long as no transaction is in progress.
|
||||
func (d *DbStateMachine) Save() ([]byte, error) {
|
||||
log.Tracef("Capturing database state from path: %s", d.dbpath)
|
||||
b, err := ioutil.ReadFile(d.dbpath)
|
||||
if err != nil {
|
||||
log.Errorf("Failed to save state: %s", err.Error())
|
||||
return nil, err
|
||||
}
|
||||
log.Tracef("Database state successfully saved to %s", d.dbpath)
|
||||
return b, nil
|
||||
}
|
||||
|
||||
// Recovery restores the state of the database using the given data.
|
||||
func (d *DbStateMachine) Recovery(b []byte) error {
|
||||
log.Tracef("Restoring database state to path: %s", d.dbpath)
|
||||
err := ioutil.WriteFile(d.dbpath, b, os.ModePerm)
|
||||
if err != nil {
|
||||
log.Errorf("Failed to recover state: %s", err.Error())
|
||||
return err
|
||||
}
|
||||
log.Tracef("Database restored successfully to %s", d.dbpath)
|
||||
return nil
|
||||
}
|
@ -1,73 +0,0 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
_ "github.com/mattn/go-sqlite3"
|
||||
. "gopkg.in/check.v1"
|
||||
)
|
||||
|
||||
// Hook up gocheck into the "go test" runner.
|
||||
func Test(t *testing.T) {
|
||||
TestingT(t)
|
||||
}
|
||||
|
||||
type SnapshotSuite struct{}
|
||||
|
||||
var _ = Suite(&SnapshotSuite{})
|
||||
|
||||
func (s *SnapshotSuite) Test_Snapshot(c *C) {
|
||||
dir, err := ioutil.TempDir("", "rqlite-test-")
|
||||
path1 := path.Join(dir, "test_db1")
|
||||
path2 := path.Join(dir, "test_db2")
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
// Create a small database.
|
||||
dbc, err := sql.Open("sqlite3", path1)
|
||||
c.Assert(err, IsNil)
|
||||
_, err = dbc.Exec("create table foo (id integer not null primary key, name text)")
|
||||
c.Assert(err, IsNil)
|
||||
_, err = dbc.Exec("INSERT INTO foo(name) VALUES(\"fiona\")")
|
||||
dbc.Close()
|
||||
|
||||
// Snapshot it.
|
||||
snapper1 := NewDbStateMachine(path1)
|
||||
c.Assert(snapper1, NotNil)
|
||||
snap, err := snapper1.Save()
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
// Save it to a different location.
|
||||
snapper2 := NewDbStateMachine(path2)
|
||||
c.Assert(snapper2, NotNil)
|
||||
err = snapper2.Recovery(snap)
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
// Confirm two files are byte-for-byte identical.
|
||||
b1, err := ioutil.ReadFile(path1)
|
||||
c.Assert(err, IsNil)
|
||||
b2, err := ioutil.ReadFile(path2)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(reflect.DeepEqual(b1, b2), Equals, true)
|
||||
|
||||
// Open database using snapshot copy.
|
||||
dbc, err = sql.Open("sqlite3", path2)
|
||||
c.Assert(err, IsNil)
|
||||
rows, err := dbc.Query("SELECT name FROM foo")
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
var nrows int
|
||||
for rows.Next() {
|
||||
var name string
|
||||
err = rows.Scan(&name)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(name, Equals, "fiona")
|
||||
nrows++
|
||||
}
|
||||
c.Assert(nrows, Equals, 1)
|
||||
dbc.Close()
|
||||
}
|
@ -0,0 +1,192 @@
|
||||
// Package store provides a distributed SQLite instance.
|
||||
//
|
||||
// Distributed consensus is provided via the Raft algorithm.
|
||||
package store
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/raft"
|
||||
"github.com/hashicorp/raft-boltdb"
|
||||
)
|
||||
|
||||
const (
|
||||
retainSnapshotCount = 2
|
||||
raftTimeout = 10 * time.Second
|
||||
)
|
||||
|
||||
// Store is a SQLite database, where all changes are made via Raft consensus.
|
||||
type Store struct {
|
||||
raftDir string
|
||||
raftBind string
|
||||
|
||||
mu sync.Mutex
|
||||
|
||||
raft *raft.Raft // The consensus mechanism
|
||||
|
||||
logger *log.Logger
|
||||
}
|
||||
|
||||
// New returns a new Store.
|
||||
func New(dir, bind string) *Store {
|
||||
return &Store{
|
||||
raftDir: dir,
|
||||
raftBind: bind,
|
||||
logger: log.New(os.Stderr, "[store] ", log.LstdFlags),
|
||||
}
|
||||
}
|
||||
|
||||
// Open opens the store. If enableSingle is set, and there are no existing peers,
|
||||
// then this node becomesthe first node, and therefore leader, of the cluster.
|
||||
func (s *Store) Open(enableSingle bool) error {
|
||||
// Setup Raft configuration.
|
||||
config := raft.DefaultConfig()
|
||||
|
||||
// Check for any existing peers.
|
||||
peers, err := readPeersJSON(filepath.Join(s.raftDir, "peers.json"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Allow the node to entry single-mode, potentially electing itself, if
|
||||
// explicitly enabled and there is only 1 node in the cluster already.
|
||||
if enableSingle && len(peers) <= 1 {
|
||||
s.logger.Println("enabling single-node mode")
|
||||
config.EnableSingleNode = true
|
||||
config.DisableBootstrapAfterElect = false
|
||||
}
|
||||
|
||||
// Setup Raft communication.
|
||||
addr, err := net.ResolveTCPAddr("tcp", s.raftBind)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
transport, err := raft.NewTCPTransport(s.raftBind, addr, 3, 10*time.Second, os.Stderr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Create peer storage.
|
||||
peerStore := raft.NewJSONPeers(s.raftDir, transport)
|
||||
|
||||
// Create the snapshot store. This allows the Raft to truncate the log.
|
||||
snapshots, err := raft.NewFileSnapshotStore(s.raftDir, retainSnapshotCount, os.Stderr)
|
||||
if err != nil {
|
||||
return fmt.Errorf("file snapshot store: %s", err)
|
||||
}
|
||||
|
||||
// Create the log store and stable store.
|
||||
logStore, err := raftboltdb.NewBoltStore(filepath.Join(s.raftDir, "raft.db"))
|
||||
if err != nil {
|
||||
return fmt.Errorf("new bolt store: %s", err)
|
||||
}
|
||||
|
||||
// Instantiate the Raft systems.
|
||||
ra, err := raft.NewRaft(config, (*fsm)(s), logStore, logStore, snapshots, peerStore, transport)
|
||||
if err != nil {
|
||||
return fmt.Errorf("new raft: %s", err)
|
||||
}
|
||||
s.raft = ra
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Store) Execute(stmts []string, tx bool) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Join joins a node, located at addr, to this store. The node must be ready to
|
||||
// respond to Raft communications at that address.
|
||||
func (s *Store) Join(addr string) error {
|
||||
s.logger.Printf("received join request for remote node as %s", addr)
|
||||
|
||||
f := s.raft.AddPeer(addr)
|
||||
if f.Error() != nil {
|
||||
return f.Error()
|
||||
}
|
||||
s.logger.Printf("node at %s joined successfully", addr)
|
||||
return nil
|
||||
}
|
||||
|
||||
type fsm Store
|
||||
|
||||
// Apply applies a Raft log entry to the database.
|
||||
func (f *fsm) Apply(l *raft.Log) interface{} {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Snapshot returns a snapshot of the database.
|
||||
func (f *fsm) Snapshot() (raft.FSMSnapshot, error) {
|
||||
f.mu.Lock()
|
||||
defer f.mu.Unlock()
|
||||
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Restore restores the database to a previous state.
|
||||
func (f *fsm) Restore(rc io.ReadCloser) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
type fsmSnapshot struct {
|
||||
store map[string]string
|
||||
}
|
||||
|
||||
func (f *fsmSnapshot) Persist(sink raft.SnapshotSink) error {
|
||||
err := func() error {
|
||||
// Encode data.
|
||||
b, err := json.Marshal(f.store)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Write data to sink.
|
||||
if _, err := sink.Write(b); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Close the sink.
|
||||
if err := sink.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}()
|
||||
|
||||
if err != nil {
|
||||
sink.Cancel()
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *fsmSnapshot) Release() {}
|
||||
|
||||
func readPeersJSON(path string) ([]string, error) {
|
||||
b, err := ioutil.ReadFile(path)
|
||||
if err != nil && !os.IsNotExist(err) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(b) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
var peers []string
|
||||
dec := json.NewDecoder(bytes.NewReader(b))
|
||||
if err := dec.Decode(&peers); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return peers, nil
|
||||
}
|
Loading…
Reference in New Issue