1
0
Fork 0
You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

876 lines
21 KiB
Go

// Package store provides a distributed SQLite instance.
//
// Distributed consensus is provided via the Raft algorithm.
package store
import (
"bytes"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"log"
"net"
"os"
"path/filepath"
"sync"
"time"
"github.com/hashicorp/raft"
"github.com/hashicorp/raft-boltdb"
sql "github.com/rqlite/rqlite/db"
)
var (
8 years ago
// ErrNotLeader is returned when a node attempts to execute a leader-only
// operation.
8 years ago
ErrNotLeader = errors.New("not leader")
// ErrOpenTimeout is returned when the Store does not apply its initial
// logs within the specified time.
ErrOpenTimeout = errors.New("timeout waiting for initial logs application")
)
const (
retainSnapshotCount = 2
applyTimeout = 10 * time.Second
openTimeout = 120 * time.Second
sqliteFile = "db.sqlite"
leaderWaitDelay = 100 * time.Millisecond
appliedWaitDelay = 100 * time.Millisecond
)
// QueryRequest represents a query that returns rows, and does not modify
// the database.
type QueryRequest struct {
Queries []string
Timings bool
Tx bool
Lvl ConsistencyLevel
}
// ExecuteRequest represents a query that returns now rows, but does modify
// the database.
type ExecuteRequest struct {
Queries []string
Timings bool
Tx bool
}
// Transport is the interface the network service must provide.
type Transport interface {
net.Listener
// Dial is used to create a new outgoing connection
Dial(address string, timeout time.Duration) (net.Conn, error)
}
// commandType are commands that affect the state of the cluster, and must go through Raft.
type commandType int
const (
execute commandType = iota // Commands which modify the database.
query // Commands which query the database.
peer // Commands that modify peers map.
)
type command struct {
Typ commandType `json:"typ,omitempty"`
Sub json.RawMessage `json:"sub,omitempty"`
}
func newCommand(t commandType, d interface{}) (*command, error) {
b, err := json.Marshal(d)
if err != nil {
return nil, err
}
return &command{
Typ: t,
Sub: b,
}, nil
}
// databaseSub is a command sub which involves interaction with the database.
type databaseSub struct {
Tx bool `json:"tx,omitempty"`
Queries []string `json:"queries,omitempty"`
Timings bool `json:"timings,omitempty"`
}
// peersSub is a command which sets the API address for a Raft address.
type peersSub map[string]string
// ConsistencyLevel represents the available read consistency levels.
type ConsistencyLevel int
8 years ago
// Represents the available consistency levels.
const (
None ConsistencyLevel = iota
Weak
Strong
)
// ClusterState defines the possible Raft states the current node can be in
type ClusterState int
// Represents the Raft cluster states
const (
Leader ClusterState = iota
Follower
Candidate
Shutdown
Unknown
)
// clusterMeta represents cluster meta which must be kept in consensus.
type clusterMeta struct {
APIPeers map[string]string // Map from Raft address to API address
}
// NewClusterMeta returns an initialized cluster meta store.
8 years ago
func newClusterMeta() *clusterMeta {
return &clusterMeta{
APIPeers: make(map[string]string),
}
}
func (c *clusterMeta) AddrForPeer(addr string) string {
if api, ok := c.APIPeers[addr]; ok && api != "" {
return api
}
// Go through each entry, and see if any key resolves to addr.
for k, v := range c.APIPeers {
resv, err := net.ResolveTCPAddr("tcp", k)
if err != nil {
continue
}
if resv.String() == addr {
return v
}
}
return ""
}
// DBConfig represents the configuration of the underlying SQLite database.
type DBConfig struct {
9 years ago
DSN string // Any custom DSN
Memory bool // Whether the database is in-memory only.
}
// NewDBConfig returns a new DB config instance.
func NewDBConfig(dsn string, memory bool) *DBConfig {
return &DBConfig{DSN: dsn, Memory: memory}
}
// Store is a SQLite database, where all changes are made via Raft consensus.
type Store struct {
raftDir string
mu sync.RWMutex // Sync access between queries and snapshots.
raft *raft.Raft // The consensus mechanism.
raftTransport Transport
peerStore raft.PeerStore
dbConf *DBConfig // SQLite database config.
dbPath string // Path to underlying SQLite file, if not in-memory.
db *sql.DB // The underlying SQLite store.
joinRequired bool // Whether an explicit join is required.
metaMu sync.RWMutex
meta *clusterMeta
logger *log.Logger
SnapshotThreshold uint64
HeartbeatTimeout time.Duration
ApplyTimeout time.Duration
OpenTimeout time.Duration
}
// StoreConfig represents the configuration of the underlying Store.
type StoreConfig struct {
DBConf *DBConfig // The DBConfig object for this Store.
Dir string // The working directory for raft.
Tn Transport // The underlying Transport for raft.
Logger *log.Logger // The logger to use to log stuff.
PeerStore raft.PeerStore // The PeerStore to use for raft.
}
// New returns a new Store.
func New(c *StoreConfig) *Store {
logger := c.Logger
if logger == nil {
logger = log.New(os.Stderr, "[store] ", log.LstdFlags)
}
return &Store{
raftDir: c.Dir,
raftTransport: c.Tn,
dbConf: c.DBConf,
dbPath: filepath.Join(c.Dir, sqliteFile),
8 years ago
meta: newClusterMeta(),
logger: logger,
peerStore: c.PeerStore,
ApplyTimeout: applyTimeout,
OpenTimeout: openTimeout,
}
}
// Open opens the store. If enableSingle is set, and there are no existing peers,
9 years ago
// then this node becomes the first node, and therefore leader, of the cluster.
func (s *Store) Open(enableSingle bool) error {
if err := os.MkdirAll(s.raftDir, 0755); err != nil {
return err
}
db, err := s.open()
if err != nil {
return err
}
s.db = db
// Setup Raft communication.
transport := raft.NewNetworkTransport(s.raftTransport, 3, 10*time.Second, os.Stderr)
// Create peer storage if necesssary.
if s.peerStore == nil {
s.peerStore = raft.NewJSONPeers(s.raftDir, transport)
}
// Get the Raft configuration for this store.
config := s.raftConfig()
// Check for any existing peers.
peers, err := s.peerStore.Peers()
if err != nil {
return err
}
s.joinRequired = len(peers) <= 1
// Allow the node to entry single-mode, potentially electing itself, if
// explicitly enabled and there is only 1 node in the cluster already.
if enableSingle && len(peers) <= 1 {
s.logger.Println("enabling single-node mode")
config.EnableSingleNode = true
config.DisableBootstrapAfterElect = false
}
// Create the snapshot store. This allows Raft to truncate the log.
snapshots, err := raft.NewFileSnapshotStore(s.raftDir, retainSnapshotCount, os.Stderr)
if err != nil {
return fmt.Errorf("file snapshot store: %s", err)
}
// Create the log store and stable store.
logStore, err := raftboltdb.NewBoltStore(filepath.Join(s.raftDir, "raft.db"))
if err != nil {
return fmt.Errorf("new bolt store: %s", err)
}
// Instantiate the Raft system.
ra, err := raft.NewRaft(config, s, logStore, logStore, snapshots, s.peerStore, transport)
if err != nil {
return fmt.Errorf("new raft: %s", err)
}
s.raft = ra
if s.OpenTimeout != 0 {
// Wait until the initial logs are applied.
s.logger.Printf("waiting for up to %s for application of initial logs", s.OpenTimeout)
if err := s.WaitForAppliedIndex(s.raft.LastIndex(), s.OpenTimeout); err != nil {
return ErrOpenTimeout
}
} else {
s.logger.Println("not waiting for application of initial logs")
}
return nil
}
// Close closes the store. If wait is true, waits for a graceful shutdown.
func (s *Store) Close(wait bool) error {
if err := s.db.Close(); err != nil {
return err
}
f := s.raft.Shutdown()
if wait {
if e := f.(raft.Future); e.Error() != nil {
return e.Error()
}
}
return nil
}
// IsLeader is used to determine if the current node is cluster leader
func (s *Store) IsLeader() bool {
return s.raft.State() == raft.Leader
}
// State returns the current node's Raft state
func (s *Store) State() ClusterState {
state := s.raft.State()
switch state {
case raft.Leader:
return Leader
case raft.Candidate:
return Candidate
case raft.Follower:
return Follower
case raft.Shutdown:
return Shutdown
default:
return Unknown
}
}
// JoinRequired returns whether the node needs to join a cluster after being opened.
func (s *Store) JoinRequired() bool {
return s.joinRequired
}
// Path returns the path to the store's storage directory.
func (s *Store) Path() string {
return s.raftDir
}
8 years ago
// Addr returns the address of the store.
func (s *Store) Addr() net.Addr {
return s.raftTransport.Addr()
}
// Leader returns the current leader. Returns a blank string if there is
// no leader.
func (s *Store) Leader() string {
return s.raft.Leader()
}
// Peer returns the API address for the given addr. If there is no peer
// for the address, it returns the empty string.
func (s *Store) Peer(addr string) string {
return s.meta.AddrForPeer(addr)
}
// APIPeers return the map of Raft addresses to API addresses.
func (s *Store) APIPeers() (map[string]string, error) {
s.metaMu.RLock()
defer s.metaMu.RUnlock()
peers := make(map[string]string, len(s.meta.APIPeers))
for k, v := range s.meta.APIPeers {
peers[k] = v
}
return peers, nil
}
// Nodes returns the list of current peers.
func (s *Store) Nodes() ([]string, error) {
return s.peerStore.Peers()
}
// WaitForLeader blocks until a leader is detected, or the timeout expires.
func (s *Store) WaitForLeader(timeout time.Duration) (string, error) {
tck := time.NewTicker(leaderWaitDelay)
defer tck.Stop()
tmr := time.NewTimer(timeout)
defer tmr.Stop()
for {
select {
case <-tck.C:
l := s.Leader()
if l != "" {
return l, nil
}
case <-tmr.C:
return "", fmt.Errorf("timeout expired")
}
}
}
// WaitForAppliedIndex blocks until a given log index has been applied,
// or the timeout expires.
func (s *Store) WaitForAppliedIndex(idx uint64, timeout time.Duration) error {
tck := time.NewTicker(appliedWaitDelay)
defer tck.Stop()
tmr := time.NewTimer(timeout)
defer tmr.Stop()
for {
select {
case <-tck.C:
if s.raft.AppliedIndex() >= idx {
return nil
}
case <-tmr.C:
return fmt.Errorf("timeout expired")
}
}
}
// Stats returns stats for the store.
func (s *Store) Stats() (map[string]interface{}, error) {
fkEnabled, err := s.db.FKConstraints()
if err != nil {
return nil, err
}
dbStatus := map[string]interface{}{
"dns": s.dbConf.DSN,
"fk_constraints": enabledFromBool(fkEnabled),
"version": sql.DBVersion,
}
if !s.dbConf.Memory {
dbStatus["path"] = s.dbPath
stat, err := os.Stat(s.dbPath)
if err != nil {
return nil, err
}
dbStatus["size"] = stat.Size()
} else {
dbStatus["path"] = ":memory:"
}
s.metaMu.RLock()
defer s.metaMu.RUnlock()
peers, err := s.peerStore.Peers()
if err != nil {
return nil, err
}
status := map[string]interface{}{
"raft": s.raft.Stats(),
"addr": s.Addr().String(),
"leader": s.Leader(),
"apply_timeout": s.ApplyTimeout.String(),
"open_timeout": s.OpenTimeout.String(),
"heartbeat_timeout": s.HeartbeatTimeout.String(),
"snapshot_threshold": s.SnapshotThreshold,
"meta": s.meta,
"peers": peers,
"dir": s.raftDir,
"sqlite3": dbStatus,
"db_conf": s.dbConf,
}
return status, nil
}
9 years ago
// Execute executes queries that return no rows, but do modify the database.
func (s *Store) Execute(ex *ExecuteRequest) ([]*sql.Result, error) {
if s.raft.State() != raft.Leader {
return nil, ErrNotLeader
}
d := &databaseSub{
Tx: ex.Tx,
Queries: ex.Queries,
Timings: ex.Timings,
}
c, err := newCommand(execute, d)
if err != nil {
return nil, err
}
b, err := json.Marshal(c)
if err != nil {
return nil, err
}
f := s.raft.Apply(b, s.ApplyTimeout)
if e := f.(raft.Future); e.Error() != nil {
if e.Error() == raft.ErrNotLeader {
return nil, ErrNotLeader
}
return nil, e.Error()
}
r := f.Response().(*fsmExecuteResponse)
return r.results, r.error
}
// Backup return a snapshot of the underlying database.
//
// If leader is true, this operation is performed with a read consistency
// level equivalent to "weak". Otherwise no guarantees are made about the
// read consistency level.
func (s *Store) Backup(leader bool) ([]byte, error) {
if leader && s.raft.State() != raft.Leader {
return nil, ErrNotLeader
}
8 years ago
f, err := ioutil.TempFile("", "rqlite-bak-")
if err != nil {
return nil, err
}
f.Close()
defer os.Remove(f.Name())
if err := s.db.Backup(f.Name()); err != nil {
return nil, err
}
b, err := ioutil.ReadFile(f.Name())
if err != nil {
return nil, err
}
return b, nil
}
9 years ago
// Query executes queries that return rows, and do not modify the database.
func (s *Store) Query(qr *QueryRequest) ([]*sql.Rows, error) {
// Allow concurrent queries.
s.mu.RLock()
defer s.mu.RUnlock()
if qr.Lvl == Strong {
d := &databaseSub{
Tx: qr.Tx,
Queries: qr.Queries,
Timings: qr.Timings,
}
c, err := newCommand(query, d)
if err != nil {
return nil, err
}
b, err := json.Marshal(c)
if err != nil {
return nil, err
}
f := s.raft.Apply(b, s.ApplyTimeout)
if e := f.(raft.Future); e.Error() != nil {
if e.Error() == raft.ErrNotLeader {
return nil, ErrNotLeader
}
return nil, e.Error()
}
r := f.Response().(*fsmQueryResponse)
return r.rows, r.error
}
if qr.Lvl == Weak && s.raft.State() != raft.Leader {
return nil, ErrNotLeader
}
r, err := s.db.Query(qr.Queries, qr.Tx, qr.Timings)
return r, err
}
// UpdateAPIPeers updates the cluster-wide peer information.
func (s *Store) UpdateAPIPeers(peers map[string]string) error {
c, err := newCommand(peer, peers)
if err != nil {
return err
}
b, err := json.Marshal(c)
if err != nil {
return err
}
f := s.raft.Apply(b, s.ApplyTimeout)
return f.Error()
}
// Join joins a node, located at addr, to this store. The node must be ready to
// respond to Raft communications at that address.
func (s *Store) Join(addr string) error {
s.logger.Printf("received request to join node at %s", addr)
if s.raft.State() != raft.Leader {
return ErrNotLeader
}
f := s.raft.AddPeer(addr)
if e := f.(raft.Future); e.Error() != nil {
if e.Error() == raft.ErrNotLeader {
return ErrNotLeader
}
e.Error()
}
s.logger.Printf("node at %s joined successfully", addr)
return nil
}
// Remove removes a node from the store, specified by addr.
func (s *Store) Remove(addr string) error {
s.logger.Printf("received request to remove node %s", addr)
f := s.raft.RemovePeer(addr)
if f.Error() != nil {
return f.Error()
}
s.logger.Printf("node %s removed successfully", addr)
return nil
}
// open opens the the in-memory or file-based database.
func (s *Store) open() (*sql.DB, error) {
var db *sql.DB
var err error
if !s.dbConf.Memory {
// as it will be rebuilt from (possibly) a snapshot and committed log entries.
if err := os.Remove(s.dbPath); err != nil && !os.IsNotExist(err) {
return nil, err
}
db, err = sql.OpenWithDSN(s.dbPath, s.dbConf.DSN)
if err != nil {
return nil, err
}
s.logger.Println("SQLite database opened at", s.dbPath)
} else {
db, err = sql.OpenInMemoryWithDSN(s.dbConf.DSN)
if err != nil {
return nil, err
}
s.logger.Println("SQLite in-memory database opened")
}
return db, nil
}
// raftConfig returns a new Raft config for the store.
func (s *Store) raftConfig() *raft.Config {
config := raft.DefaultConfig()
if s.SnapshotThreshold != 0 {
config.SnapshotThreshold = s.SnapshotThreshold
}
if s.HeartbeatTimeout != 0 {
config.HeartbeatTimeout = s.HeartbeatTimeout
}
return config
}
type fsmExecuteResponse struct {
results []*sql.Result
error error
}
type fsmQueryResponse struct {
rows []*sql.Rows
error error
}
type fsmGenericResponse struct {
error error
}
// Apply applies a Raft log entry to the database.
func (s *Store) Apply(l *raft.Log) interface{} {
var c command
if err := json.Unmarshal(l.Data, &c); err != nil {
panic(fmt.Sprintf("failed to unmarshal cluster command: %s", err.Error()))
}
switch c.Typ {
case execute, query:
var d databaseSub
if err := json.Unmarshal(c.Sub, &d); err != nil {
return &fsmGenericResponse{error: err}
}
if c.Typ == execute {
r, err := s.db.Execute(d.Queries, d.Tx, d.Timings)
return &fsmExecuteResponse{results: r, error: err}
}
r, err := s.db.Query(d.Queries, d.Tx, d.Timings)
return &fsmQueryResponse{rows: r, error: err}
case peer:
var d peersSub
if err := json.Unmarshal(c.Sub, &d); err != nil {
return &fsmGenericResponse{error: err}
}
func() {
s.metaMu.Lock()
defer s.metaMu.Unlock()
for k, v := range d {
s.meta.APIPeers[k] = v
}
}()
return &fsmGenericResponse{}
default:
return &fsmGenericResponse{error: fmt.Errorf("unknown command: %v", c.Typ)}
}
}
8 years ago
// Database returns a copy of the underlying database. The caller should
// ensure that no transaction is taking place during this call, or an error may
// be returned. If leader is true, this operation is performed with a read
// consistency level equivalent to "weak". Otherwise no guarantees are made
// about the read consistency level.
//
// http://sqlite.org/howtocorrupt.html states it is safe to do this
// as long as no transaction is in progress.
func (s *Store) Database(leader bool) ([]byte, error) {
if leader && s.raft.State() != raft.Leader {
return nil, ErrNotLeader
}
// Ensure only one snapshot can take place at once, and block all queries.
s.mu.Lock()
defer s.mu.Unlock()
f, err := ioutil.TempFile("", "rqlilte-snap-")
if err != nil {
return nil, err
}
f.Close()
defer os.Remove(f.Name())
if err := s.db.Backup(f.Name()); err != nil {
return nil, err
}
return ioutil.ReadFile(f.Name())
}
// Snapshot returns a snapshot of the database. The caller must ensure that
// no transaction is taking place during this call. Hashicorp Raft guarantees
// that this function will not be called concurrently with Apply.
//
// http://sqlite.org/howtocorrupt.html states it is safe to do this
// as long as no transaction is in progress.
func (s *Store) Snapshot() (raft.FSMSnapshot, error) {
fsm := &fsmSnapshot{}
var err error
fsm.database, err = s.Database(false)
if err != nil {
s.logger.Printf("failed to read database for snapshot: %s", err.Error())
return nil, err
}
fsm.meta, err = json.Marshal(s.meta)
if err != nil {
s.logger.Printf("failed to encode meta for snapshot: %s", err.Error())
return nil, err
}
return fsm, nil
}
// Restore restores the node to a previous state.
func (s *Store) Restore(rc io.ReadCloser) error {
if err := s.db.Close(); err != nil {
return err
}
// Get size of database.
var sz uint64
if err := binary.Read(rc, binary.LittleEndian, &sz); err != nil {
return err
}
// Now read in the database file data and restore.
database := make([]byte, sz)
if _, err := io.ReadFull(rc, database); err != nil {
return err
}
var db *sql.DB
var err error
if !s.dbConf.Memory {
// Write snapshot over any existing database file.
if err := ioutil.WriteFile(s.dbPath, database, 0660); err != nil {
return err
}
// Re-open it.
db, err = sql.OpenWithDSN(s.dbPath, s.dbConf.DSN)
if err != nil {
return err
}
} else {
// In memory. Copy to temporary file, and then load memory from file.
f, err := ioutil.TempFile("", "rqlilte-snap-")
if err != nil {
return err
}
f.Close()
defer os.Remove(f.Name())
if err := ioutil.WriteFile(f.Name(), database, 0660); err != nil {
return err
}
// Load an in-memory database from the snapshot now on disk.
db, err = sql.LoadInMemoryWithDSN(f.Name(), s.dbConf.DSN)
if err != nil {
return err
}
}
s.db = db
// Read remaining bytes, and set to cluster meta.
b, err := ioutil.ReadAll(rc)
if err != nil {
return err
}
return func() error {
s.metaMu.Lock()
defer s.metaMu.Unlock()
return json.Unmarshal(b, &s.meta)
}()
}
// RegisterObserver registers an observer of Raft events
func (s *Store) RegisterObserver(o *raft.Observer) {
s.raft.RegisterObserver(o)
}
// DeregisterObserver deregisters an observer of Raft events
func (s *Store) DeregisterObserver(o *raft.Observer) {
s.raft.DeregisterObserver(o)
}
type fsmSnapshot struct {
database []byte
meta []byte
}
// Persist writes the snapshot to the given sink.
func (f *fsmSnapshot) Persist(sink raft.SnapshotSink) error {
err := func() error {
// Start by writing size of database.
b := new(bytes.Buffer)
sz := uint64(len(f.database))
err := binary.Write(b, binary.LittleEndian, sz)
if err != nil {
return err
}
if _, err := sink.Write(b.Bytes()); err != nil {
return err
}
// Next write database to sink.
if _, err := sink.Write(f.database); err != nil {
return err
}
// Finally write the meta.
if _, err := sink.Write(f.meta); err != nil {
return err
}
// Close the sink.
return sink.Close()
}()
if err != nil {
sink.Cancel()
return err
}
return nil
}
9 years ago
// Release is a no-op.
func (f *fsmSnapshot) Release() {}
// enabledFromBool converts bool to "enabled" or "disabled".
func enabledFromBool(b bool) string {
if b {
return "enabled"
}
return "disabled"
}