parent
3d81f386ac
commit
ed4a7950e4
@ -0,0 +1,30 @@
|
|||||||
|
package command
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/otoolep/raft"
|
||||||
|
"github.com/otoolep/rqlite/db"
|
||||||
|
)
|
||||||
|
|
||||||
|
// This command encapsulates a sqlite statement.
|
||||||
|
type WriteCommand struct {
|
||||||
|
Stmt string `json:"stmt"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Creates a new write command.
|
||||||
|
func NewWriteCommand(stmt string) *WriteCommand {
|
||||||
|
return &WriteCommand{
|
||||||
|
Stmt: stmt,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// The name of the command in the log.
|
||||||
|
func (c *WriteCommand) CommandName() string {
|
||||||
|
return "write"
|
||||||
|
}
|
||||||
|
|
||||||
|
// Executes an sqlite statement.
|
||||||
|
func (c *WriteCommand) Apply(server raft.Server) (interface{}, error) {
|
||||||
|
db := server.Context().(*db.DB)
|
||||||
|
db.Exec(c.Stmt)
|
||||||
|
return nil, nil
|
||||||
|
}
|
@ -0,0 +1,45 @@
|
|||||||
|
package db
|
||||||
|
|
||||||
|
import (
|
||||||
|
"database/sql"
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
"os"
|
||||||
|
"path"
|
||||||
|
|
||||||
|
_ "github.com/mattn/go-sqlite3"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
dbName = "db.sqlite"
|
||||||
|
)
|
||||||
|
|
||||||
|
// The SQL database.
|
||||||
|
type DB struct {
|
||||||
|
dbConn *sql.DB
|
||||||
|
}
|
||||||
|
|
||||||
|
// Creates a new database.
|
||||||
|
func New(dir string) *DB {
|
||||||
|
path := path.Join(dir, dbName)
|
||||||
|
os.Remove(path)
|
||||||
|
|
||||||
|
fmt.Println("database path is", path)
|
||||||
|
dbc, err := sql.Open("sqlite3", path)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
return &DB{
|
||||||
|
dbConn: dbc,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Executes the query.
|
||||||
|
func (db *DB) Query(query string) string {
|
||||||
|
return "the query"
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sets the value for a given key.
|
||||||
|
func (db *DB) Exec(stmt string) {
|
||||||
|
return
|
||||||
|
}
|
@ -0,0 +1,50 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"flag"
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
"math/rand"
|
||||||
|
"os"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/otoolep/raft"
|
||||||
|
"github.com/otoolep/rqlite/command"
|
||||||
|
"github.com/otoolep/rqlite/server"
|
||||||
|
)
|
||||||
|
|
||||||
|
var host string
|
||||||
|
var port int
|
||||||
|
var join string
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
flag.StringVar(&host, "h", "localhost", "hostname")
|
||||||
|
flag.IntVar(&port, "p", 4001, "port")
|
||||||
|
flag.StringVar(&join, "join", "", "host:port of leader to join")
|
||||||
|
flag.Usage = func() {
|
||||||
|
fmt.Fprintf(os.Stderr, "Usage: %s [arguments] <data-path> \n", os.Args[0])
|
||||||
|
flag.PrintDefaults()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
flag.Parse()
|
||||||
|
|
||||||
|
rand.Seed(time.Now().UnixNano())
|
||||||
|
|
||||||
|
// Setup commands.
|
||||||
|
raft.RegisterCommand(&command.WriteCommand{})
|
||||||
|
|
||||||
|
// Set the data directory.
|
||||||
|
if flag.NArg() == 0 {
|
||||||
|
flag.Usage()
|
||||||
|
log.Fatal("Data path argument required")
|
||||||
|
}
|
||||||
|
path := flag.Arg(0)
|
||||||
|
if err := os.MkdirAll(path, 0744); err != nil {
|
||||||
|
log.Fatalf("Unable to create path: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
s := server.New(path, host, port)
|
||||||
|
log.Fatal(s.ListenAndServe(join))
|
||||||
|
}
|
@ -0,0 +1,184 @@
|
|||||||
|
package server
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
|
"log"
|
||||||
|
"math/rand"
|
||||||
|
"net/http"
|
||||||
|
"path/filepath"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/gorilla/mux"
|
||||||
|
"github.com/otoolep/raft"
|
||||||
|
"github.com/otoolep/rqlite/command"
|
||||||
|
"github.com/otoolep/rqlite/db"
|
||||||
|
)
|
||||||
|
|
||||||
|
// The raftd server is a combination of the Raft server and an HTTP
|
||||||
|
// server which acts as the transport.
|
||||||
|
type Server struct {
|
||||||
|
name string
|
||||||
|
host string
|
||||||
|
port int
|
||||||
|
path string
|
||||||
|
router *mux.Router
|
||||||
|
raftServer raft.Server
|
||||||
|
httpServer *http.Server
|
||||||
|
db *db.DB
|
||||||
|
mutex sync.RWMutex
|
||||||
|
}
|
||||||
|
|
||||||
|
// Creates a new server.
|
||||||
|
func New(path string, host string, port int) *Server {
|
||||||
|
s := &Server{
|
||||||
|
host: host,
|
||||||
|
port: port,
|
||||||
|
path: path,
|
||||||
|
db: db.New(path),
|
||||||
|
router: mux.NewRouter(),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read existing name or generate a new one.
|
||||||
|
if b, err := ioutil.ReadFile(filepath.Join(path, "name")); err == nil {
|
||||||
|
s.name = string(b)
|
||||||
|
} else {
|
||||||
|
s.name = fmt.Sprintf("%07x", rand.Int())[0:7]
|
||||||
|
if err = ioutil.WriteFile(filepath.Join(path, "name"), []byte(s.name), 0644); err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
|
||||||
|
// Returns the connection string.
|
||||||
|
func (s *Server) connectionString() string {
|
||||||
|
return fmt.Sprintf("http://%s:%d", s.host, s.port)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Starts the server.
|
||||||
|
func (s *Server) ListenAndServe(leader string) error {
|
||||||
|
var err error
|
||||||
|
|
||||||
|
log.Printf("Initializing Raft Server: %s", s.path)
|
||||||
|
|
||||||
|
// Initialize and start Raft server.
|
||||||
|
transporter := raft.NewHTTPTransporter("/raft", 200*time.Millisecond)
|
||||||
|
s.raftServer, err = raft.NewServer(s.name, s.path, transporter, nil, s.db, "")
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
transporter.Install(s.raftServer, s)
|
||||||
|
s.raftServer.Start()
|
||||||
|
|
||||||
|
if leader != "" {
|
||||||
|
// Join to leader if specified.
|
||||||
|
|
||||||
|
log.Println("Attempting to join leader:", leader)
|
||||||
|
|
||||||
|
if !s.raftServer.IsLogEmpty() {
|
||||||
|
log.Fatal("Cannot join with an existing log")
|
||||||
|
}
|
||||||
|
if err := s.Join(leader); err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
} else if s.raftServer.IsLogEmpty() {
|
||||||
|
// Initialize the server by joining itself.
|
||||||
|
|
||||||
|
log.Println("Initializing new cluster")
|
||||||
|
|
||||||
|
_, err := s.raftServer.Do(&raft.DefaultJoinCommand{
|
||||||
|
Name: s.raftServer.Name(),
|
||||||
|
ConnectionString: s.connectionString(),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
} else {
|
||||||
|
log.Println("Recovered from log")
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Println("Initializing HTTP server")
|
||||||
|
|
||||||
|
// Initialize and start HTTP server.
|
||||||
|
s.httpServer = &http.Server{
|
||||||
|
Addr: fmt.Sprintf(":%d", s.port),
|
||||||
|
Handler: s.router,
|
||||||
|
}
|
||||||
|
|
||||||
|
s.router.HandleFunc("/db/{key}", s.readHandler).Methods("GET")
|
||||||
|
s.router.HandleFunc("/db/{key}", s.writeHandler).Methods("POST")
|
||||||
|
s.router.HandleFunc("/join", s.joinHandler).Methods("POST")
|
||||||
|
|
||||||
|
log.Println("Listening at:", s.connectionString())
|
||||||
|
|
||||||
|
return s.httpServer.ListenAndServe()
|
||||||
|
}
|
||||||
|
|
||||||
|
// This is a hack around Gorilla mux not providing the correct net/http
|
||||||
|
// HandleFunc() interface.
|
||||||
|
func (s *Server) HandleFunc(pattern string, handler func(http.ResponseWriter, *http.Request)) {
|
||||||
|
s.router.HandleFunc(pattern, handler)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Joins to the leader of an existing cluster.
|
||||||
|
func (s *Server) Join(leader string) error {
|
||||||
|
command := &raft.DefaultJoinCommand{
|
||||||
|
Name: s.raftServer.Name(),
|
||||||
|
ConnectionString: s.connectionString(),
|
||||||
|
}
|
||||||
|
|
||||||
|
var b bytes.Buffer
|
||||||
|
json.NewEncoder(&b).Encode(command)
|
||||||
|
resp, err := http.Post(fmt.Sprintf("http://%s/join", leader), "application/json", &b)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
resp.Body.Close()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) joinHandler(w http.ResponseWriter, req *http.Request) {
|
||||||
|
command := &raft.DefaultJoinCommand{}
|
||||||
|
|
||||||
|
if err := json.NewDecoder(req.Body).Decode(&command); err != nil {
|
||||||
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if _, err := s.raftServer.Do(command); err != nil {
|
||||||
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) readHandler(w http.ResponseWriter, req *http.Request) {
|
||||||
|
value := s.db.Query("query")
|
||||||
|
w.Write([]byte(value))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) writeHandler(w http.ResponseWriter, req *http.Request) {
|
||||||
|
vars := mux.Vars(req)
|
||||||
|
var _ = vars
|
||||||
|
|
||||||
|
// Read the value from the POST body.
|
||||||
|
b, err := ioutil.ReadAll(req.Body)
|
||||||
|
if err != nil {
|
||||||
|
w.WriteHeader(http.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
value := string(b)
|
||||||
|
var _ = value
|
||||||
|
|
||||||
|
// Execute the command against the Raft server.
|
||||||
|
_, err = s.raftServer.Do(command.NewWriteCommand("exec"))
|
||||||
|
if err != nil {
|
||||||
|
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue