1
0
Fork 0
You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

218 lines
4.8 KiB
Go

// Package store provides a distributed SQLite instance.
//
// Distributed consensus is provided via the Raft algorithm.
package store
import (
"bytes"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"log"
"net"
"os"
"path/filepath"
"sync"
"time"
"github.com/hashicorp/raft"
"github.com/hashicorp/raft-boltdb"
sql "github.com/otoolep/rqlite/db"
)
const (
retainSnapshotCount = 2
raftTimeout = 10 * time.Second
SQLiteDB = "db.sqlite"
)
type command struct {
Tx bool `json:"tx,omitempty"`
Queries []string `json:"queries,omitempty"`
}
// Store is a SQLite database, where all changes are made via Raft consensus.
type Store struct {
raftDir string
raftBind string
mu sync.Mutex
raft *raft.Raft // The consensus mechanism
db *sql.DB // The underlying SQLite store
logger *log.Logger
SQLiteDB string
}
// New returns a new Store.
func New(dir, bind string) *Store {
return &Store{
raftDir: dir,
raftBind: bind,
logger: log.New(os.Stderr, "[store] ", log.LstdFlags),
SQLiteDB: SQLiteDB,
}
}
// Open opens the store. If enableSingle is set, and there are no existing peers,
// then this node becomesthe first node, and therefore leader, of the cluster.
func (s *Store) Open(enableSingle bool) error {
// Setup Raft configuration.
config := raft.DefaultConfig()
// Check for any existing peers.
peers, err := readPeersJSON(filepath.Join(s.raftDir, "peers.json"))
if err != nil {
return err
}
// Allow the node to entry single-mode, potentially electing itself, if
// explicitly enabled and there is only 1 node in the cluster already.
if enableSingle && len(peers) <= 1 {
s.logger.Println("enabling single-node mode")
config.EnableSingleNode = true
config.DisableBootstrapAfterElect = false
}
// Setup Raft communication.
addr, err := net.ResolveTCPAddr("tcp", s.raftBind)
if err != nil {
return err
}
transport, err := raft.NewTCPTransport(s.raftBind, addr, 3, 10*time.Second, os.Stderr)
if err != nil {
return err
}
// Create peer storage.
peerStore := raft.NewJSONPeers(s.raftDir, transport)
// Create the snapshot store. This allows the Raft to truncate the log.
snapshots, err := raft.NewFileSnapshotStore(s.raftDir, retainSnapshotCount, os.Stderr)
if err != nil {
return fmt.Errorf("file snapshot store: %s", err)
}
// Create the log store and stable store.
logStore, err := raftboltdb.NewBoltStore(filepath.Join(s.raftDir, "raft.db"))
if err != nil {
return fmt.Errorf("new bolt store: %s", err)
}
// Instantiate the Raft systems.
ra, err := raft.NewRaft(config, (*fsm)(s), logStore, logStore, snapshots, peerStore, transport)
if err != nil {
return fmt.Errorf("new raft: %s", err)
}
s.raft = ra
// Setup the SQLite database.
db, err := sql.Open(filepath.Join(s.raftDir, s.SQLiteDB))
if err != nil {
return err
}
s.db = db
return nil
}
func (s *Store) Execute(queries []string, tx bool) ([]*sql.Result, error) {
if s.raft.State() != raft.Leader {
return nil, fmt.Errorf("not leader")
}
c := &command{
Tx: tx,
Queries: queries,
}
b, err := json.Marshal(c)
if err != nil {
return nil, err
}
f := s.raft.Apply(b, raftTimeout)
if err, ok := f.(error); ok {
return nil, err
}
return f, nil
}
func (s *Store) Query(queries []string, tx bool) ([]*sql.Rows, error) {
// Go straight to the local database. Optionally check if leader. Hard way?
return nil, nil
}
// Join joins a node, located at addr, to this store. The node must be ready to
// respond to Raft communications at that address.
func (s *Store) Join(addr string) error {
s.logger.Printf("received join request for remote node as %s", addr)
f := s.raft.AddPeer(addr)
if f.Error() != nil {
return f.Error()
}
s.logger.Printf("node at %s joined successfully", addr)
return nil
}
type fsm Store
// Apply applies a Raft log entry to the database.
func (f *fsm) Apply(l *raft.Log) interface{} {
var c command
if err := json.Unmarshal(l.Data, &c); err != nil {
panic(fmt.Sprintf("failed to unmarshal command: %s", err.Error()))
}
r, err := f.db.Execute(c.Queries, c.Tx)
if err != nil {
return err
}
return r
}
// Snapshot returns a snapshot of the database.
func (f *fsm) Snapshot() (raft.FSMSnapshot, error) {
f.mu.Lock()
defer f.mu.Unlock()
return nil, nil
}
// Restore restores the database to a previous state.
func (f *fsm) Restore(rc io.ReadCloser) error {
return nil
}
type fsmSnapshot struct {
}
func (f *fsmSnapshot) Persist(sink raft.SnapshotSink) error {
return nil
}
func (f *fsmSnapshot) Release() {}
func readPeersJSON(path string) ([]string, error) {
b, err := ioutil.ReadFile(path)
if err != nil && !os.IsNotExist(err) {
return nil, err
}
if len(b) == 0 {
return nil, nil
}
var peers []string
dec := json.NewDecoder(bytes.NewReader(b))
if err := dec.Decode(&peers); err != nil {
return nil, err
}
return peers, nil
}