// Package store provides a distributed SQLite instance. // // Distributed consensus is provided via the Raft algorithm. package store import ( "bytes" "compress/gzip" "encoding/binary" "errors" "expvar" "fmt" "io" "io/ioutil" "log" "math" "os" "path/filepath" "sort" "strconv" "sync" "time" "unsafe" "github.com/hashicorp/raft" "github.com/rqlite/rqlite/command" sql "github.com/rqlite/rqlite/db" rlog "github.com/rqlite/rqlite/log" ) var ( // ErrNotLeader is returned when a node attempts to execute a leader-only // operation. ErrNotLeader = errors.New("not leader") // ErrStaleRead is returned if the executing the query would violate the // requested freshness. ErrStaleRead = errors.New("stale read") // ErrOpenTimeout is returned when the Store does not apply its initial // logs within the specified time. ErrOpenTimeout = errors.New("timeout waiting for initial logs application") // ErrInvalidBackupFormat is returned when the requested backup format // is not valid. ErrInvalidBackupFormat = errors.New("invalid backup format") ) const ( raftDBPath = "raft.db" // Changing this will break backwards compatibility. peersPath = "raft/peers.json" peersInfoPath = "raft/peers.info" retainSnapshotCount = 2 applyTimeout = 10 * time.Second openTimeout = 120 * time.Second sqliteFile = "db.sqlite" leaderWaitDelay = 100 * time.Millisecond appliedWaitDelay = 100 * time.Millisecond connectionPoolCount = 5 connectionTimeout = 10 * time.Second raftLogCacheSize = 512 trailingScale = 1.25 ) const ( numSnaphots = "num_snapshots" numBackups = "num_backups" numRestores = "num_restores" numRecoveries = "num_recoveries" numUncompressedCommands = "num_uncompressed_commands" numCompressedCommands = "num_compressed_commands" numJoins = "num_joins" numIgnoredJoins = "num_ignored_joins" numRemovedBeforeJoins = "num_removed_before_joins" snapshotCreateDuration = "snapshot_create_duration" snapshotPersistDuration = "snapshot_persist_duration" snapshotDBSerializedSize = "snapshot_db_serialized_size" snapshotDBOnDiskSize = "snapshot_db_ondisk_size" ) // BackupFormat represents the format of database backup. type BackupFormat int const ( // BackupSQL is the plaintext SQL command format. BackupSQL BackupFormat = iota // BackupBinary is a SQLite file backup format. BackupBinary ) // stats captures stats for the Store. var stats *expvar.Map func init() { stats = expvar.NewMap("store") stats.Add(numSnaphots, 0) stats.Add(numBackups, 0) stats.Add(numRestores, 0) stats.Add(numRecoveries, 0) stats.Add(numUncompressedCommands, 0) stats.Add(numCompressedCommands, 0) stats.Add(numJoins, 0) stats.Add(numIgnoredJoins, 0) stats.Add(numRemovedBeforeJoins, 0) stats.Add(snapshotCreateDuration, 0) stats.Add(snapshotPersistDuration, 0) stats.Add(snapshotDBSerializedSize, 0) stats.Add(snapshotDBOnDiskSize, 0) } // ClusterState defines the possible Raft states the current node can be in type ClusterState int // Represents the Raft cluster states const ( Leader ClusterState = iota Follower Candidate Shutdown Unknown ) // Store is a SQLite database, where all changes are made via Raft consensus. type Store struct { raftDir string peersPath string peersInfoPath string raft *raft.Raft // The consensus mechanism. ln Listener raftTn *raft.NetworkTransport raftID string // Node ID. dbConf *DBConfig // SQLite database config. dbPath string // Path to underlying SQLite file, if not in-memory. db *sql.DB // The underlying SQLite store. queryTxMu sync.RWMutex dbAppliedIndexMu sync.RWMutex dbAppliedIndex uint64 // Latest log entry index actually reflected by the FSM. Due to Raft code // this value is not updated after a Snapshot-restore. fsmIndex uint64 fsmIndexMu sync.RWMutex reqMarshaller *command.RequestMarshaler // Request marshaler for writing to log. raftLog raft.LogStore // Persistent log store. raftStable raft.StableStore // Persistent k-v store. boltStore *rlog.Log // Physical store. onDiskCreated bool // On disk database actually created? snapsExistOnOpen bool // Any snaps present when store opens? firstIdxOnOpen uint64 // First index on log when Store opens. lastIdxOnOpen uint64 // Last index on log when Store opens. lastCommandIdxOnOpen uint64 // Last command index on log when Store opens. firstLogAppliedT time.Time // Time first log is applied appliedOnOpen uint64 // Number of logs applied at open. openT time.Time // Timestamp when Store opens. logger *log.Logger ShutdownOnRemove bool SnapshotThreshold uint64 SnapshotInterval time.Duration LeaderLeaseTimeout time.Duration HeartbeatTimeout time.Duration ElectionTimeout time.Duration ApplyTimeout time.Duration RaftLogLevel string numTrailingLogs uint64 // For whitebox testing numNoops int numSnapshotsMu sync.Mutex numSnapshots int } // IsNewNode returns whether a node using raftDir would be a brand new node. // It also means that the window this node joining a different cluster has passed. func IsNewNode(raftDir string) bool { // If there is any pre-existing Raft state, then this node // has already been created. return !pathExists(filepath.Join(raftDir, raftDBPath)) } // Config represents the configuration of the underlying Store. type Config struct { DBConf *DBConfig // The DBConfig object for this Store. Dir string // The working directory for raft. Tn Transport // The underlying Transport for raft. ID string // Node ID. Logger *log.Logger // The logger to use to log stuff. } // New returns a new Store. func New(ln Listener, c *Config) *Store { logger := c.Logger if logger == nil { logger = log.New(os.Stderr, "[store] ", log.LstdFlags) } dbPath := filepath.Join(c.Dir, sqliteFile) if c.DBConf.OnDiskPath != "" { dbPath = c.DBConf.OnDiskPath } return &Store{ ln: ln, raftDir: c.Dir, peersPath: filepath.Join(c.Dir, peersPath), peersInfoPath: filepath.Join(c.Dir, peersInfoPath), raftID: c.ID, dbConf: c.DBConf, dbPath: dbPath, reqMarshaller: command.NewRequestMarshaler(), logger: logger, ApplyTimeout: applyTimeout, } } // Open opens the Store. If enableBootstrap is set, then this node becomes a // standalone node. If not set, then the calling layer must know that this // node has pre-existing state, or the calling layer will trigger a join // operation after opening the Store. func (s *Store) Open(enableBootstrap bool) error { s.openT = time.Now() s.logger.Printf("opening store with node ID %s", s.raftID) if !s.dbConf.Memory { s.logger.Printf("configured for an on-disk database at %s", s.dbPath) parentDir := filepath.Dir(s.dbPath) s.logger.Printf("ensuring directory at %s exists", parentDir) err := os.MkdirAll(parentDir, 0755) if err != nil { return err } } else { s.logger.Printf("configured for an in-memory database") } s.logger.Printf("ensuring directory at %s exists", s.raftDir) err := os.MkdirAll(filepath.Dir(s.peersPath), 0755) if err != nil { return err } // Create Raft-compatible network layer. s.raftTn = raft.NewNetworkTransport(NewTransport(s.ln), connectionPoolCount, connectionTimeout, nil) // Don't allow control over trailing logs directly, just implement a policy. s.numTrailingLogs = uint64(float64(s.SnapshotThreshold) * trailingScale) config := s.raftConfig() config.LocalID = raft.ServerID(s.raftID) // Create the snapshot store. This allows Raft to truncate the log. snapshots, err := raft.NewFileSnapshotStore(s.raftDir, retainSnapshotCount, os.Stderr) if err != nil { return fmt.Errorf("file snapshot store: %s", err) } snaps, err := snapshots.List() if err != nil { return fmt.Errorf("list snapshots: %s", err) } s.logger.Printf("%d preexisting snapshots present", len(snaps)) s.snapsExistOnOpen = len(snaps) > 0 // Create the log store and stable store. s.boltStore, err = rlog.NewLog(filepath.Join(s.raftDir, raftDBPath)) if err != nil { return fmt.Errorf("new log store: %s", err) } s.raftStable = s.boltStore s.raftLog, err = raft.NewLogCache(raftLogCacheSize, s.boltStore) if err != nil { return fmt.Errorf("new cached store: %s", err) } // Request to recover node? if pathExists(s.peersPath) { s.logger.Printf("attempting node recovery using %s", s.peersPath) config, err := raft.ReadConfigJSON(s.peersPath) if err != nil { return fmt.Errorf("failed to read peers file: %s", err.Error()) } if err = RecoverNode(s.raftDir, s.logger, s.raftLog, s.raftStable, snapshots, s.raftTn, config); err != nil { return fmt.Errorf("failed to recover node: %s", err.Error()) } if err := os.Rename(s.peersPath, s.peersInfoPath); err != nil { return fmt.Errorf("failed to move %s after recovery: %s", s.peersPath, err.Error()) } s.logger.Printf("node recovered successfully using %s", s.peersPath) stats.Add(numRecoveries, 1) } // Get some info about the log, before any more entries are committed. if err := s.setLogInfo(); err != nil { return fmt.Errorf("set log info: %s", err) } s.logger.Printf("first log index: %d, last log index: %d, last command log index: %d:", s.firstIdxOnOpen, s.lastIdxOnOpen, s.lastCommandIdxOnOpen) // If an on-disk database has been requested, and there are no snapshots, and // there are no commands in the log, then this is the only opportunity to // create that on-disk database file before Raft initializes. if !s.dbConf.Memory && !s.snapsExistOnOpen && s.lastCommandIdxOnOpen == 0 { s.db, err = s.createOnDisk(nil) if err != nil { return fmt.Errorf("failed to create on-disk database") } s.onDiskCreated = true s.logger.Printf("created on-disk database at open") } else { // We need an in-memory database, at least for bootstrapping purposes. s.db, err = s.createInMemory(nil) if err != nil { return fmt.Errorf("failed to create in-memory database") } s.logger.Printf("created in-memory database at open") } // Instantiate the Raft system. ra, err := raft.NewRaft(config, s, s.raftLog, s.raftStable, snapshots, s.raftTn) if err != nil { return fmt.Errorf("new raft: %s", err) } if enableBootstrap { s.logger.Printf("executing new cluster bootstrap") configuration := raft.Configuration{ Servers: []raft.Server{ { ID: config.LocalID, Address: s.raftTn.LocalAddr(), }, }, } ra.BootstrapCluster(configuration) } else { s.logger.Printf("no cluster bootstrap requested") } s.raft = ra return nil } // Close closes the store. If wait is true, waits for a graceful shutdown. func (s *Store) Close(wait bool) error { f := s.raft.Shutdown() if wait { if e := f.(raft.Future); e.Error() != nil { return e.Error() } } // Only shutdown Bolt and SQLite when Raft is done. if err := s.db.Close(); err != nil { return err } if err := s.boltStore.Close(); err != nil { return err } return nil } // WaitForAppliedFSM waits until the currently applied logs (at the time this // function is called) are actually reflected by the FSM, or the timeout expires. func (s *Store) WaitForAppliedFSM(timeout time.Duration) (uint64, error) { if timeout == 0 { return 0, nil } return s.WaitForFSMIndex(s.raft.AppliedIndex(), timeout) } // WaitForInitialLogs waits for logs that were in the Store at time of open // to be applied to the state machine. func (s *Store) WaitForInitialLogs(timeout time.Duration) error { if timeout == 0 { return nil } s.logger.Printf("waiting for up to %s for application of initial logs (lcIdx=%d)", timeout, s.lastCommandIdxOnOpen) return s.WaitForApplied(timeout) } // WaitForApplied waits for all Raft log entries to to be applied to the // underlying database. func (s *Store) WaitForApplied(timeout time.Duration) error { if timeout == 0 { return nil } return s.WaitForAppliedIndex(s.raft.LastIndex(), timeout) } // WaitForAppliedIndex blocks until a given log index has been applied, // or the timeout expires. func (s *Store) WaitForAppliedIndex(idx uint64, timeout time.Duration) error { tck := time.NewTicker(appliedWaitDelay) defer tck.Stop() tmr := time.NewTimer(timeout) defer tmr.Stop() for { select { case <-tck.C: if s.raft.AppliedIndex() >= idx { return nil } case <-tmr.C: return fmt.Errorf("timeout expired") } } } // IsLeader is used to determine if the current node is cluster leader func (s *Store) IsLeader() bool { return s.raft.State() == raft.Leader } // State returns the current node's Raft state func (s *Store) State() ClusterState { state := s.raft.State() switch state { case raft.Leader: return Leader case raft.Candidate: return Candidate case raft.Follower: return Follower case raft.Shutdown: return Shutdown default: return Unknown } } // Path returns the path to the store's storage directory. func (s *Store) Path() string { return s.raftDir } // Addr returns the address of the store. func (s *Store) Addr() string { return string(s.raftTn.LocalAddr()) } // ID returns the Raft ID of the store. func (s *Store) ID() string { return s.raftID } // LeaderAddr returns the address of the current leader. Returns a // blank string if there is no leader. func (s *Store) LeaderAddr() (string, error) { return string(s.raft.Leader()), nil } // LeaderID returns the node ID of the Raft leader. Returns a // blank string if there is no leader, or an error. func (s *Store) LeaderID() (string, error) { addr, err := s.LeaderAddr() if err != nil { return "", nil } configFuture := s.raft.GetConfiguration() if err := configFuture.Error(); err != nil { s.logger.Printf("failed to get raft configuration: %v", err) return "", err } for _, srv := range configFuture.Configuration().Servers { if srv.Address == raft.ServerAddress(addr) { return string(srv.ID), nil } } return "", nil } // Nodes returns the slice of nodes in the cluster, sorted by ID ascending. func (s *Store) Nodes() ([]*Server, error) { f := s.raft.GetConfiguration() if f.Error() != nil { return nil, f.Error() } rs := f.Configuration().Servers servers := make([]*Server, len(rs)) for i := range rs { servers[i] = &Server{ ID: string(rs[i].ID), Addr: string(rs[i].Address), Suffrage: rs[i].Suffrage.String(), } } sort.Sort(Servers(servers)) return servers, nil } // WaitForLeader blocks until a leader is detected, or the timeout expires. func (s *Store) WaitForLeader(timeout time.Duration) (string, error) { tck := time.NewTicker(leaderWaitDelay) defer tck.Stop() tmr := time.NewTimer(timeout) defer tmr.Stop() for { select { case <-tck.C: l, err := s.LeaderAddr() if err != nil { return "", nil } if l != "" { return l, nil } case <-tmr.C: return "", fmt.Errorf("timeout expired") } } } // SetRequestCompression allows low-level control over the compression threshold // for the request marshaler. func (s *Store) SetRequestCompression(batch, size int) { s.reqMarshaller.BatchThreshold = batch s.reqMarshaller.SizeThreshold = size } // WaitForFSMIndex blocks until a given log index has been applied to the // state machine or the timeout expires. func (s *Store) WaitForFSMIndex(idx uint64, timeout time.Duration) (uint64, error) { tck := time.NewTicker(appliedWaitDelay) defer tck.Stop() tmr := time.NewTimer(timeout) defer tmr.Stop() var fsmIdx uint64 for { select { case <-tck.C: s.fsmIndexMu.RLock() fsmIdx = s.fsmIndex s.fsmIndexMu.RUnlock() if fsmIdx >= idx { return fsmIdx, nil } case <-tmr.C: return 0, fmt.Errorf("timeout expired") } } } // Stats returns stats for the store. func (s *Store) Stats() (map[string]interface{}, error) { fsmIdx := func() uint64 { s.fsmIndexMu.RLock() defer s.fsmIndexMu.RUnlock() return s.fsmIndex }() dbAppliedIdx := func() uint64 { s.dbAppliedIndexMu.Lock() defer s.dbAppliedIndexMu.Unlock() return s.dbAppliedIndex }() dbStatus, err := s.db.Stats() if err != nil { return nil, err } nodes, err := s.Nodes() if err != nil { return nil, err } leaderID, err := s.LeaderID() if err != nil { return nil, err } // Perform type-conversion to actual numbers where possible. raftStats := make(map[string]interface{}) for k, v := range s.raft.Stats() { if s, err := strconv.ParseInt(v, 10, 64); err != nil { raftStats[k] = v } else { raftStats[k] = s } } raftStats["log_size"], err = s.logSize() if err != nil { return nil, err } raftStats["bolt"] = s.boltStore.Stats() dirSz, err := dirSize(s.raftDir) if err != nil { return nil, err } leaderAddr, err := s.LeaderAddr() if err != nil { return nil, err } status := map[string]interface{}{ "node_id": s.raftID, "raft": raftStats, "fsm_index": fsmIdx, "db_applied_index": dbAppliedIdx, "addr": s.Addr(), "leader": map[string]string{ "node_id": leaderID, "addr": leaderAddr, }, "apply_timeout": s.ApplyTimeout.String(), "heartbeat_timeout": s.HeartbeatTimeout.String(), "election_timeout": s.ElectionTimeout.String(), "snapshot_threshold": s.SnapshotThreshold, "snapshot_interval": s.SnapshotInterval, "trailing_logs": s.numTrailingLogs, "request_marshaler": s.reqMarshaller.Stats(), "nodes": nodes, "dir": s.raftDir, "dir_size": dirSz, "sqlite3": dbStatus, "db_conf": s.dbConf, } return status, nil } // Execute executes queries that return no rows, but do modify the database. func (s *Store) Execute(ex *command.ExecuteRequest) ([]*command.ExecuteResult, error) { if s.raft.State() != raft.Leader { return nil, ErrNotLeader } return s.execute(ex) } func (s *Store) execute(ex *command.ExecuteRequest) ([]*command.ExecuteResult, error) { b, compressed, err := s.reqMarshaller.Marshal(ex) if err != nil { return nil, err } if compressed { stats.Add(numCompressedCommands, 1) } else { stats.Add(numUncompressedCommands, 1) } c := &command.Command{ Type: command.Command_COMMAND_TYPE_EXECUTE, SubCommand: b, Compressed: compressed, } b, err = command.Marshal(c) if err != nil { return nil, err } af := s.raft.Apply(b, s.ApplyTimeout).(raft.ApplyFuture) if af.Error() != nil { if af.Error() == raft.ErrNotLeader { return nil, ErrNotLeader } return nil, af.Error() } s.dbAppliedIndexMu.Lock() s.dbAppliedIndex = af.Index() s.dbAppliedIndexMu.Unlock() r := af.Response().(*fsmExecuteResponse) return r.results, r.error } // Query executes queries that return rows, and do not modify the database. func (s *Store) Query(qr *command.QueryRequest) ([]*command.QueryRows, error) { if qr.Level == command.QueryRequest_QUERY_REQUEST_LEVEL_STRONG { if s.raft.State() != raft.Leader { return nil, ErrNotLeader } b, compressed, err := s.reqMarshaller.Marshal(qr) if err != nil { return nil, err } if compressed { stats.Add(numCompressedCommands, 1) } else { stats.Add(numUncompressedCommands, 1) } c := &command.Command{ Type: command.Command_COMMAND_TYPE_QUERY, SubCommand: b, Compressed: compressed, } b, err = command.Marshal(c) if err != nil { return nil, err } af := s.raft.Apply(b, s.ApplyTimeout).(raft.ApplyFuture) if af.Error() != nil { if af.Error() == raft.ErrNotLeader { return nil, ErrNotLeader } return nil, af.Error() } s.dbAppliedIndexMu.Lock() s.dbAppliedIndex = af.Index() s.dbAppliedIndexMu.Unlock() r := af.Response().(*fsmQueryResponse) return r.rows, r.error } if qr.Level == command.QueryRequest_QUERY_REQUEST_LEVEL_WEAK && s.raft.State() != raft.Leader { return nil, ErrNotLeader } if qr.Level == command.QueryRequest_QUERY_REQUEST_LEVEL_NONE && qr.Freshness > 0 && time.Since(s.raft.LastContact()).Nanoseconds() > qr.Freshness { return nil, ErrStaleRead } if qr.Request.Transaction { // Transaction requested during query, but not going through consensus. This means // we need to block any database serialization during the query. s.queryTxMu.RLock() defer s.queryTxMu.RUnlock() } return s.db.Query(qr.Request, qr.Timings) } // Backup writes a snapshot of the underlying database to dst // // If leader is true, this operation is performed with a read consistency // level equivalent to "weak". Otherwise no guarantees are made about the // read consistency level. func (s *Store) Backup(leader bool, fmt BackupFormat, dst io.Writer) error { if leader && s.raft.State() != raft.Leader { return ErrNotLeader } if fmt == BackupBinary { if err := s.database(leader, dst); err != nil { return err } } else if fmt == BackupSQL { if err := s.db.Dump(dst); err != nil { return err } } else { return ErrInvalidBackupFormat } stats.Add(numBackups, 1) return nil } // Join joins a node, identified by id and located at addr, to this store. // The node must be ready to respond to Raft communications at that address. func (s *Store) Join(id, addr string, voter bool) error { s.logger.Printf("received request from node with ID %s, at %s, to join this node", id, addr) if s.raft.State() != raft.Leader { return ErrNotLeader } configFuture := s.raft.GetConfiguration() if err := configFuture.Error(); err != nil { s.logger.Printf("failed to get raft configuration: %v", err) return err } for _, srv := range configFuture.Configuration().Servers { // If a node already exists with either the joining node's ID or address, // that node may need to be removed from the config first. if srv.ID == raft.ServerID(id) || srv.Address == raft.ServerAddress(addr) { // However if *both* the ID and the address are the same, then no // join is actually needed. if srv.Address == raft.ServerAddress(addr) && srv.ID == raft.ServerID(id) { stats.Add(numIgnoredJoins, 1) s.logger.Printf("node %s at %s already member of cluster, ignoring join request", id, addr) return nil } if err := s.remove(id); err != nil { s.logger.Printf("failed to remove node %s: %v", id, err) return err } stats.Add(numRemovedBeforeJoins, 1) s.logger.Printf("removed node %s prior to rejoin with changed ID or address", id) } } var f raft.IndexFuture if voter { f = s.raft.AddVoter(raft.ServerID(id), raft.ServerAddress(addr), 0, 0) } else { f = s.raft.AddNonvoter(raft.ServerID(id), raft.ServerAddress(addr), 0, 0) } if e := f.(raft.Future); e.Error() != nil { if e.Error() == raft.ErrNotLeader { return ErrNotLeader } return e.Error() } stats.Add(numJoins, 1) s.logger.Printf("node with ID %s, at %s, joined successfully as %s", id, addr, prettyVoter(voter)) return nil } // Remove removes a node from the store, specified by ID. func (s *Store) Remove(id string) error { s.logger.Printf("received request to remove node %s", id) if err := s.remove(id); err != nil { s.logger.Printf("failed to remove node %s: %s", id, err.Error()) return err } s.logger.Printf("node %s removed successfully", id) return nil } // Noop writes a noop command to the Raft log. A noop command simply // consumes a slot in the Raft log, but has no other affect on the // system. func (s *Store) Noop(id string) error { n := &command.Noop{ Id: id, } b, err := command.MarshalNoop(n) if err != nil { return err } c := &command.Command{ Type: command.Command_COMMAND_TYPE_NOOP, SubCommand: b, } bc, err := command.Marshal(c) if err != nil { return err } af := s.raft.Apply(bc, s.ApplyTimeout).(raft.ApplyFuture) if af.Error() != nil { if af.Error() == raft.ErrNotLeader { return ErrNotLeader } return af.Error() } return nil } // createInMemory returns an in-memory database. If b is non-nil and non-empty, // then the database will be initialized with the contents of b. func (s *Store) createInMemory(b []byte) (db *sql.DB, err error) { if b == nil || len(b) == 0 { db, err = sql.OpenInMemory(s.dbConf.FKConstraints) } else { db, err = sql.DeserializeIntoMemory(b, s.dbConf.FKConstraints) } return } // createOnDisk opens an on-disk database file at the Store's configured path. If // b is non-nil, any preexisting file will first be overwritten with those contents. // Otherwise any pre-existing file will be removed before the database is opened. func (s *Store) createOnDisk(b []byte) (*sql.DB, error) { if err := os.Remove(s.dbPath); err != nil && !os.IsNotExist(err) { return nil, err } if b != nil { if err := ioutil.WriteFile(s.dbPath, b, 0660); err != nil { return nil, err } } return sql.Open(s.dbPath, s.dbConf.FKConstraints) } // setLogInfo records some key indexs about the log. func (s *Store) setLogInfo() error { var err error s.firstIdxOnOpen, err = s.boltStore.FirstIndex() if err != nil { return fmt.Errorf("failed to get last index: %s", err) } s.lastIdxOnOpen, err = s.boltStore.LastIndex() if err != nil { return fmt.Errorf("failed to get last index: %s", err) } s.lastCommandIdxOnOpen, err = s.boltStore.LastCommandIndex() if err != nil { return fmt.Errorf("failed to get last command index: %s", err) } return nil } // remove removes the node, with the given ID, from the cluster. func (s *Store) remove(id string) error { if s.raft.State() != raft.Leader { return ErrNotLeader } f := s.raft.RemoveServer(raft.ServerID(id), 0, 0) if f.Error() != nil { if f.Error() == raft.ErrNotLeader { return ErrNotLeader } return f.Error() } return nil } // raftConfig returns a new Raft config for the store. func (s *Store) raftConfig() *raft.Config { config := raft.DefaultConfig() config.ShutdownOnRemove = s.ShutdownOnRemove config.LogLevel = s.RaftLogLevel if s.SnapshotThreshold != 0 { config.SnapshotThreshold = s.SnapshotThreshold config.TrailingLogs = s.numTrailingLogs } if s.SnapshotInterval != 0 { config.SnapshotInterval = s.SnapshotInterval } if s.LeaderLeaseTimeout != 0 { config.LeaderLeaseTimeout = s.LeaderLeaseTimeout } if s.HeartbeatTimeout != 0 { config.HeartbeatTimeout = s.HeartbeatTimeout } if s.ElectionTimeout != 0 { config.ElectionTimeout = s.ElectionTimeout } return config } type fsmExecuteResponse struct { results []*command.ExecuteResult error error } type fsmQueryResponse struct { rows []*command.QueryRows error error } type fsmGenericResponse struct { error error } // Apply applies a Raft log entry to the database. func (s *Store) Apply(l *raft.Log) (e interface{}) { defer func() { s.fsmIndexMu.Lock() defer s.fsmIndexMu.Unlock() s.fsmIndex = l.Index if l.Index <= s.lastCommandIdxOnOpen { // In here means at least one command entry was in the log when the Store // opened. s.appliedOnOpen++ if l.Index == s.lastCommandIdxOnOpen { s.logger.Printf("%d committed log entries applied in %s, took %s since open", s.appliedOnOpen, time.Since(s.firstLogAppliedT), time.Since(s.openT)) // Last command log applied. Time to switch to on-disk database? if s.dbConf.Memory { s.logger.Println("continuing use of in-memory database") } else { // Since we're here, it means that a) an on-disk database was requested // *and* there were commands in the log. A snapshot may or may not have // been applied, but it wouldn't have created the on-disk database in that // case since there were commands in the log. This is the very last chance // to do convert from in-memory to on-disk. s.queryTxMu.Lock() defer s.queryTxMu.Unlock() b, _ := s.db.Serialize() err := s.db.Close() if err != nil { e = &fsmGenericResponse{error: fmt.Errorf("close failed: %s", err)} return } // Open a new on-disk database. s.db, err = s.createOnDisk(b) if err != nil { e = &fsmGenericResponse{error: fmt.Errorf("open on-disk failed: %s", err)} return } s.onDiskCreated = true s.logger.Println("successfully switched to on-disk database") } } } }() if s.firstLogAppliedT.IsZero() { s.firstLogAppliedT = time.Now() } typ, r := applyCommand(l.Data, s.db) if typ == command.Command_COMMAND_TYPE_NOOP { s.numNoops++ } return r } // Database returns a copy of the underlying database. The caller MUST // ensure that no transaction is taking place during this call, or an error may // be returned. If leader is true, this operation is performed with a read // consistency level equivalent to "weak". Otherwise no guarantees are made // about the read consistency level. // // http://sqlite.org/howtocorrupt.html states it is safe to do this // as long as no transaction is in progress. func (s *Store) Database(leader bool) ([]byte, error) { if leader && s.raft.State() != raft.Leader { return nil, ErrNotLeader } return s.db.Serialize() } // Snapshot returns a snapshot of the database. // // The system must ensure that no transaction is taking place during this call. // Hashicorp Raft guarantees that this function will not be called concurrently // with Apply, as it states Apply() and Snapshot() are always called from the same // thread. This means there is no need to synchronize this function with Execute(). // However queries that involve a transaction must be blocked. // // http://sqlite.org/howtocorrupt.html states it is safe to copy or serialize the // database as long as no transaction is in progress. func (s *Store) Snapshot() (raft.FSMSnapshot, error) { defer func() { s.numSnapshotsMu.Lock() defer s.numSnapshotsMu.Unlock() s.numSnapshots++ }() s.queryTxMu.Lock() defer s.queryTxMu.Unlock() fsm := newFSMSnapshot(s.db, s.logger) dur := time.Since(fsm.startT) stats.Add(numSnaphots, 1) stats.Get(snapshotCreateDuration).(*expvar.Int).Set(dur.Milliseconds()) stats.Get(snapshotDBSerializedSize).(*expvar.Int).Set(int64(len(fsm.database))) s.logger.Printf("node snapshot created in %s", dur) return fsm, nil } // Restore restores the node to a previous state. The Hashicorp docs state this // will not be called concurrently with Apply(), so synchronization with Execute() // is not necessary.To prevent problems during queries, which may not go through // the log, it blocks all query requests. func (s *Store) Restore(rc io.ReadCloser) error { startT := time.Now() b, err := dbBytesFromSnapshot(rc) if err != nil { return fmt.Errorf("restore failed: %s", err.Error()) } if b == nil { s.logger.Println("no database data present in restored snapshot") } if err := s.db.Close(); err != nil { return fmt.Errorf("failed to close pre-restore database: %s", err) } var db *sql.DB if !s.dbConf.Memory && s.lastCommandIdxOnOpen == 0 { // A snapshot clearly exists (this function has been called) but there // are no command entries in the log -- so Apply will not be called. // Therefore this is the last opportunity to create the on-disk database // before Raft starts. db, err = s.createOnDisk(b) if err != nil { return fmt.Errorf("open on-disk file during restore: %s", err) } s.onDiskCreated = true s.logger.Println("successfully switched to on-disk database due to restore") } else { // Deserialize into an in-memory database because a) an in-memory database // has been requested, or b) while there was a snapshot, there are also // command entries in the log. So by sticking with an in-memory database // those entries will be applied in the fastest possible manner. We will // defer creation of any database on disk until the Apply function. db, err = s.createInMemory(b) if err != nil { return fmt.Errorf("createInMemory: %s", err) } } s.db = db stats.Add(numRestores, 1) s.logger.Printf("node restored in %s", time.Since(startT)) return nil } // RegisterObserver registers an observer of Raft events func (s *Store) RegisterObserver(o *raft.Observer) { s.raft.RegisterObserver(o) } // DeregisterObserver deregisters an observer of Raft events func (s *Store) DeregisterObserver(o *raft.Observer) { s.raft.DeregisterObserver(o) } // logSize returns the size of the Raft log on disk. func (s *Store) logSize() (int64, error) { fi, err := os.Stat(filepath.Join(s.raftDir, raftDBPath)) if err != nil { return 0, err } return fi.Size(), nil } // Database copies contents of the underlying SQLite database to dst func (s *Store) database(leader bool, dst io.Writer) error { if leader && s.raft.State() != raft.Leader { return ErrNotLeader } f, err := ioutil.TempFile("", "rqlilte-snap-") if err != nil { return err } if err := f.Close(); err != nil { return err } if err := s.db.Backup(f.Name()); err != nil { return err } of, err := os.Open(f.Name()) if err != nil { return err } defer of.Close() _, err = io.Copy(dst, of) return err } func (s *Store) databaseTypePretty() string { if s.dbConf.Memory { return "in-memory" } return "on-disk" } type fsmSnapshot struct { startT time.Time logger *log.Logger database []byte } func newFSMSnapshot(db *sql.DB, logger *log.Logger) *fsmSnapshot { fsm := &fsmSnapshot{ startT: time.Now(), logger: logger, } // The error code is not meaningful from Serialize(). The code needs to be able // handle a nil byte slice being returned. fsm.database, _ = db.Serialize() return fsm } // Persist writes the snapshot to the given sink. func (f *fsmSnapshot) Persist(sink raft.SnapshotSink) error { defer func() { dur := time.Since(f.startT) stats.Get(snapshotPersistDuration).(*expvar.Int).Set(dur.Milliseconds()) f.logger.Printf("snapshot and persist took %s", dur) }() err := func() error { b := new(bytes.Buffer) // Flag compressed database by writing max uint64 value first. // No SQLite database written by earlier versions will have this // as a size. *Surely*. err := writeUint64(b, math.MaxUint64) if err != nil { return err } if _, err := sink.Write(b.Bytes()); err != nil { return err } b.Reset() // Clear state of buffer for future use. // Get compressed copy of database. cdb, err := f.compressedDatabase() if err != nil { return err } if cdb != nil { // Write size of compressed database. err = writeUint64(b, uint64(len(cdb))) if err != nil { return err } if _, err := sink.Write(b.Bytes()); err != nil { return err } // Write compressed database to sink. if _, err := sink.Write(cdb); err != nil { return err } stats.Get(snapshotDBOnDiskSize).(*expvar.Int).Set(int64(len(cdb))) } else { f.logger.Println("no database data available for snapshot") err = writeUint64(b, uint64(0)) if err != nil { return err } if _, err := sink.Write(b.Bytes()); err != nil { return err } stats.Get(snapshotDBOnDiskSize).(*expvar.Int).Set(0) } // Close the sink. return sink.Close() }() if err != nil { sink.Cancel() return err } return nil } func (f *fsmSnapshot) compressedDatabase() ([]byte, error) { if f.database == nil { return nil, nil } var buf bytes.Buffer gz, err := gzip.NewWriterLevel(&buf, gzip.BestCompression) if err != nil { return nil, err } if _, err := gz.Write(f.database); err != nil { return nil, err } if err := gz.Close(); err != nil { return nil, err } return buf.Bytes(), nil } // Release is a no-op. func (f *fsmSnapshot) Release() {} // RecoverNode is used to manually force a new configuration, in the event that // quorum cannot be restored. This borrows heavily from RecoverCluster functionality // of the Hashicorp Raft library, but has been customized for rqlite use. func RecoverNode(dataDir string, logger *log.Logger, logs raft.LogStore, stable raft.StableStore, snaps raft.SnapshotStore, tn raft.Transport, conf raft.Configuration) error { logPrefix := logger.Prefix() logger.SetPrefix(fmt.Sprintf("%s[recovery] ", logPrefix)) defer logger.SetPrefix(logPrefix) // Sanity check the Raft peer configuration. if err := checkRaftConfiguration(conf); err != nil { return err } // Attempt to restore any snapshots we find, newest to oldest. var ( snapshotIndex uint64 snapshotTerm uint64 snapshots, err = snaps.List() ) if err != nil { return fmt.Errorf("failed to list snapshots: %v", err) } logger.Printf("recovery detected %d snapshots", len(snapshots)) var b []byte for _, snapshot := range snapshots { var source io.ReadCloser _, source, err = snaps.Open(snapshot.ID) if err != nil { // Skip this one and try the next. We will detect if we // couldn't open any snapshots. continue } b, err = dbBytesFromSnapshot(source) // Close the source after the restore has completed source.Close() if err != nil { // Same here, skip and try the next one. continue } snapshotIndex = snapshot.Index snapshotTerm = snapshot.Term break } if len(snapshots) > 0 && (snapshotIndex == 0 || snapshotTerm == 0) { return fmt.Errorf("failed to restore any of the available snapshots") } // Now, create an in-memory database for temporary use, so we can generate new // snapshots later. var db *sql.DB if b == nil || len(b) == 0 { db, err = sql.OpenInMemory(false) } else { db, err = sql.DeserializeIntoMemory(b, false) } if err != nil { return fmt.Errorf("create in-memory database failed: %s", err) } defer db.Close() // The snapshot information is the best known end point for the data // until we play back the Raft log entries. lastIndex := snapshotIndex lastTerm := snapshotTerm // Apply any Raft log entries past the snapshot. lastLogIndex, err := logs.LastIndex() if err != nil { return fmt.Errorf("failed to find last log: %v", err) } logger.Printf("recovery snapshot index is %d, last index is %d", snapshotIndex, lastLogIndex) for index := snapshotIndex + 1; index <= lastLogIndex; index++ { var entry raft.Log if err = logs.GetLog(index, &entry); err != nil { return fmt.Errorf("failed to get log at index %d: %v", index, err) } if entry.Type == raft.LogCommand { applyCommand(entry.Data, db) } lastIndex = entry.Index lastTerm = entry.Term } // Create a new snapshot, placing the configuration in as if it was // committed at index 1. snapshot := newFSMSnapshot(db, logger) sink, err := snaps.Create(1, lastIndex, lastTerm, conf, 1, tn) if err != nil { return fmt.Errorf("failed to create snapshot: %v", err) } if err = snapshot.Persist(sink); err != nil { return fmt.Errorf("failed to persist snapshot: %v", err) } if err = sink.Close(); err != nil { return fmt.Errorf("failed to finalize snapshot: %v", err) } logger.Printf("recovery snapshot created successfully") // Compact the log so that we don't get bad interference from any // configuration change log entries that might be there. firstLogIndex, err := logs.FirstIndex() if err != nil { return fmt.Errorf("failed to get first log index: %v", err) } if err := logs.DeleteRange(firstLogIndex, lastLogIndex); err != nil { return fmt.Errorf("log compaction failed: %v", err) } return nil } func dbBytesFromSnapshot(rc io.ReadCloser) ([]byte, error) { var uint64Size uint64 inc := int64(unsafe.Sizeof(uint64Size)) // Read all the data into RAM, since we have to decode known-length // chunks of various forms. var offset int64 b, err := ioutil.ReadAll(rc) if err != nil { return nil, fmt.Errorf("readall: %s", err) } // Get size of database, checking for compression. compressed := false sz, err := readUint64(b[offset : offset+inc]) if err != nil { return nil, fmt.Errorf("read compression check: %s", err) } offset = offset + inc if sz == math.MaxUint64 { compressed = true // Database is actually compressed, read actual size next. sz, err = readUint64(b[offset : offset+inc]) if err != nil { return nil, fmt.Errorf("read compressed size: %s", err) } offset = offset + inc } // Now read in the database file data, decompress if necessary, and restore. var database []byte if sz > 0 { if compressed { buf := new(bytes.Buffer) gz, err := gzip.NewReader(bytes.NewReader(b[offset : offset+int64(sz)])) if err != nil { return nil, err } if _, err := io.Copy(buf, gz); err != nil { return nil, fmt.Errorf("SQLite database decompress: %s", err) } if err := gz.Close(); err != nil { return nil, err } database = buf.Bytes() } else { database = b[offset : offset+int64(sz)] } } else { database = nil } return database, nil } func applyCommand(data []byte, db *sql.DB) (command.Command_Type, interface{}) { var c command.Command if err := command.Unmarshal(data, &c); err != nil { panic(fmt.Sprintf("failed to unmarshal cluster command: %s", err.Error())) } switch c.Type { case command.Command_COMMAND_TYPE_QUERY: var qr command.QueryRequest if err := command.UnmarshalSubCommand(&c, &qr); err != nil { panic(fmt.Sprintf("failed to unmarshal query subcommand: %s", err.Error())) } r, err := db.Query(qr.Request, qr.Timings) return c.Type, &fsmQueryResponse{rows: r, error: err} case command.Command_COMMAND_TYPE_EXECUTE: var er command.ExecuteRequest if err := command.UnmarshalSubCommand(&c, &er); err != nil { panic(fmt.Sprintf("failed to unmarshal execute subcommand: %s", err.Error())) } r, err := db.Execute(er.Request, er.Timings) return c.Type, &fsmExecuteResponse{results: r, error: err} case command.Command_COMMAND_TYPE_NOOP: return c.Type, &fsmGenericResponse{} default: return c.Type, &fsmGenericResponse{error: fmt.Errorf("unhandled command: %v", c.Type)} } } // checkRaftConfiguration tests a cluster membership configuration for common // errors. func checkRaftConfiguration(configuration raft.Configuration) error { idSet := make(map[raft.ServerID]bool) addressSet := make(map[raft.ServerAddress]bool) var voters int for _, server := range configuration.Servers { if server.ID == "" { return fmt.Errorf("empty ID in configuration: %v", configuration) } if server.Address == "" { return fmt.Errorf("empty address in configuration: %v", server) } if idSet[server.ID] { return fmt.Errorf("found duplicate ID in configuration: %v", server.ID) } idSet[server.ID] = true if addressSet[server.Address] { return fmt.Errorf("found duplicate address in configuration: %v", server.Address) } addressSet[server.Address] = true if server.Suffrage == raft.Voter { voters++ } } if voters == 0 { return fmt.Errorf("need at least one voter in configuration: %v", configuration) } return nil } func readUint64(b []byte) (uint64, error) { var sz uint64 if err := binary.Read(bytes.NewReader(b), binary.LittleEndian, &sz); err != nil { return 0, err } return sz, nil } func writeUint64(w io.Writer, v uint64) error { return binary.Write(w, binary.LittleEndian, v) } // enabledFromBool converts bool to "enabled" or "disabled". func enabledFromBool(b bool) string { if b { return "enabled" } return "disabled" } // prettyVoter converts bool to "voter" or "non-voter" func prettyVoter(v bool) string { if v { return "voter" } return "non-voter" } // pathExists returns true if the given path exists. func pathExists(p string) bool { if _, err := os.Lstat(p); err != nil && os.IsNotExist(err) { return false } return true } // dirSize returns the total size of all files in the given directory func dirSize(path string) (int64, error) { var size int64 err := filepath.Walk(path, func(_ string, info os.FileInfo, err error) error { if err != nil { return err } if !info.IsDir() { size += info.Size() } return err }) return size, err }