Fork 0
You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

2494 lines
71 KiB

// Package store provides a distributed SQLite instance.
// Distributed consensus is provided via the Raft algorithm.
package store
import (
sql "github.com/rqlite/rqlite/v8/db"
wal "github.com/rqlite/rqlite/v8/db/wal"
rlog "github.com/rqlite/rqlite/v8/log"
var (
// ErrNotOpen is returned when a Store is not open.
ErrNotOpen = errors.New("store not open")
// ErrOpen is returned when a Store is already open.
ErrOpen = errors.New("store already open")
// ErrNotReady is returned when a Store is not ready to accept requests.
ErrNotReady = errors.New("store not ready")
// ErrNotLeader is returned when a node attempts to execute a leader-only
// operation.
ErrNotLeader = errors.New("not leader")
// ErrNotSingleNode is returned when a node attempts to execute a single-node
// only operation.
ErrNotSingleNode = errors.New("not single-node")
// ErrStaleRead is returned if the executing the query would violate the
// requested freshness.
ErrStaleRead = errors.New("stale read")
// ErrOpenTimeout is returned when the Store does not apply its initial
// logs within the specified time.
ErrOpenTimeout = errors.New("timeout waiting for initial logs application")
// ErrWaitForRemovalTimeout is returned when the Store does not confirm removal
// of a node within the specified time.
ErrWaitForRemovalTimeout = errors.New("timeout waiting for node removal confirmation")
// ErrWaitForLeaderTimeout is returned when the Store cannot determine the leader
// within the specified time.
ErrWaitForLeaderTimeout = errors.New("timeout waiting for leader")
// ErrInvalidBackupFormat is returned when the requested backup format
// is not valid.
ErrInvalidBackupFormat = errors.New("invalid backup format")
// ErrInvalidVacuumFormat is returned when the requested backup format is not
// compatible with vacuum.
ErrInvalidVacuum = errors.New("invalid vacuum")
// ErrLoadInProgress is returned when a load is already in progress and the
// requested operation cannot be performed.
ErrLoadInProgress = errors.New("load in progress")
const (
snapshotsDirName = "rsnapshots"
restoreScratchPattern = "rqlite-restore-*"
bootScatchPattern = "rqlite-boot-*"
backupScatchPattern = "rqlite-backup-*"
vacuumScatchPattern = "rqlite-vacuum-*"
raftDBPath = "raft.db" // Changing this will break backwards compatibility.
peersPath = "raft/peers.json"
peersInfoPath = "raft/peers.info"
retainSnapshotCount = 1
applyTimeout = 10 * time.Second
openTimeout = 120 * time.Second
sqliteFile = "db.sqlite"
leaderWaitDelay = 100 * time.Millisecond
appliedWaitDelay = 100 * time.Millisecond
commitEquivalenceDelay = 50 * time.Millisecond
appliedIndexUpdateInterval = 5 * time.Second
connectionPoolCount = 5
connectionTimeout = 10 * time.Second
raftLogCacheSize = 512
trailingScale = 1.25
observerChanLen = 50
baseVacuumTimeKey = "rqlite_base_vacuum"
lastVacuumTimeKey = "rqlite_last_vacuum"
const (
numSnapshots = "num_snapshots"
numSnapshotsFailed = "num_snapshots_failed"
numUserSnapshots = "num_user_snapshots"
numUserSnapshotsFailed = "num_user_snapshots_failed"
numWALSnapshots = "num_wal_snapshots"
numWALSnapshotsFailed = "num_wal_snapshots_failed"
numSnapshotsFull = "num_snapshots_full"
numSnapshotsIncremental = "num_snapshots_incremental"
numFullCheckpointFailed = "num_full_checkpoint_failed"
numWALCheckpointTruncateFailed = "num_wal_checkpoint_truncate_failed"
numAutoVacuums = "num_auto_vacuums"
numAutoVacuumsFailed = "num_auto_vacuums_failed"
autoVacuumDuration = "auto_vacuum_duration"
numBoots = "num_boots"
numBackups = "num_backups"
numLoads = "num_loads"
numRestores = "num_restores"
numRestoresFailed = "num_restores_failed"
numAutoRestores = "num_auto_restores"
numAutoRestoresSkipped = "num_auto_restores_skipped"
numAutoRestoresFailed = "num_auto_restores_failed"
numRecoveries = "num_recoveries"
numProviderChecks = "num_provider_checks"
numProviderProvides = "num_provider_provides"
numProviderProvidesFail = "num_provider_provides_fail"
numUncompressedCommands = "num_uncompressed_commands"
numCompressedCommands = "num_compressed_commands"
numJoins = "num_joins"
numIgnoredJoins = "num_ignored_joins"
numRemovedBeforeJoins = "num_removed_before_joins"
numDBStatsErrors = "num_db_stats_errors"
snapshotCreateDuration = "snapshot_create_duration"
snapshotCreateChkTruncateDuration = "snapshot_create_chk_truncate_duration"
snapshotCreateWALCompactDuration = "snapshot_create_wal_compact_duration"
snapshotPersistDuration = "snapshot_persist_duration"
snapshotPrecompactWALSize = "snapshot_precompact_wal_size"
snapshotWALSize = "snapshot_wal_size"
leaderChangesObserved = "leader_changes_observed"
leaderChangesDropped = "leader_changes_dropped"
failedHeartbeatObserved = "failed_heartbeat_observed"
nodesReapedOK = "nodes_reaped_ok"
nodesReapedFailed = "nodes_reaped_failed"
// stats captures stats for the Store.
var stats *expvar.Map
func init() {
stats = expvar.NewMap("store")
// ResetStats resets the expvar stats for this module. Mostly for test purposes.
func ResetStats() {
stats.Add(numSnapshots, 0)
stats.Add(numSnapshotsFailed, 0)
stats.Add(numUserSnapshots, 0)
stats.Add(numUserSnapshotsFailed, 0)
stats.Add(numWALSnapshots, 0)
stats.Add(numWALSnapshotsFailed, 0)
stats.Add(numSnapshotsFull, 0)
stats.Add(numSnapshotsIncremental, 0)
stats.Add(numFullCheckpointFailed, 0)
stats.Add(numWALCheckpointTruncateFailed, 0)
stats.Add(numAutoVacuums, 0)
stats.Add(numAutoVacuumsFailed, 0)
stats.Add(autoVacuumDuration, 0)
stats.Add(numBoots, 0)
stats.Add(numBackups, 0)
stats.Add(numLoads, 0)
stats.Add(numRestores, 0)
stats.Add(numRestoresFailed, 0)
stats.Add(numRecoveries, 0)
stats.Add(numProviderChecks, 0)
stats.Add(numProviderProvides, 0)
stats.Add(numProviderProvidesFail, 0)
stats.Add(numAutoRestores, 0)
stats.Add(numAutoRestoresSkipped, 0)
stats.Add(numAutoRestoresFailed, 0)
stats.Add(numUncompressedCommands, 0)
stats.Add(numCompressedCommands, 0)
stats.Add(numJoins, 0)
stats.Add(numIgnoredJoins, 0)
stats.Add(numRemovedBeforeJoins, 0)
stats.Add(numDBStatsErrors, 0)
stats.Add(snapshotCreateDuration, 0)
stats.Add(snapshotCreateChkTruncateDuration, 0)
stats.Add(snapshotCreateWALCompactDuration, 0)
stats.Add(snapshotPersistDuration, 0)
stats.Add(snapshotPrecompactWALSize, 0)
stats.Add(snapshotWALSize, 0)
stats.Add(leaderChangesObserved, 0)
stats.Add(leaderChangesDropped, 0)
stats.Add(failedHeartbeatObserved, 0)
stats.Add(nodesReapedOK, 0)
stats.Add(nodesReapedFailed, 0)
// SnapshotStore is the interface Snapshot stores must implement.
type SnapshotStore interface {
// FullNeeded returns true if a full snapshot is needed.
FullNeeded() (bool, error)
// SetFullNeeded explicitly sets that a full snapshot is needed.
SetFullNeeded() error
// Stats returns stats about the Snapshot Store.
Stats() (map[string]interface{}, error)
// ClusterState defines the possible Raft states the current node can be in
type ClusterState int
// Represents the Raft cluster states
const (
Leader ClusterState = iota
// Store is a SQLite database, where all changes are made via Raft consensus.
type Store struct {
open *AtomicBool
raftDir string
snapshotDir string
peersPath string
peersInfoPath string
restorePath string
restoreDoneCh chan struct{}
raft *raft.Raft // The consensus mechanism.
ly Layer
raftTn *NodeTransport
raftID string // Node ID.
dbConf *DBConfig // SQLite database config.
dbPath string // Path to underlying SQLite file.
walPath string // Path to WAL file.
dbDir string // Path to directory containing SQLite file.
db *sql.SwappableDB // The underlying SQLite store.
dechunkManager *chunking.DechunkerManager
cmdProc *CommandProcessor
// Channels that must be closed for the Store to be considered ready.
readyChans []<-chan struct{}
numClosedReadyChannels int
readyChansMu sync.Mutex
// Channels for WAL-size triggered snapshotting
snapshotWClose chan struct{}
snapshotWDone chan struct{}
// Snapshotting synchronization
queryTxMu sync.RWMutex
snapshotCAS *CheckAndSet
// Latest log entry index actually reflected by the FSM. Due to Raft code
// this value is not updated after a Snapshot-restore.
fsmIdx *atomic.Uint64
fsmUpdateTime *AtomicTime // This is node-local time.
// appendedAtTimeis the Leader's clock time when that Leader appended the log entry.
// The Leader that actually appended the log entry is not necessarily the current Leader.
appendedAtTime *AtomicTime
// Latest log entry index which actually changed the database.
dbAppliedIdx *atomic.Uint64
appliedIdxUpdateDone chan struct{}
reqMarshaller *command.RequestMarshaler // Request marshaler for writing to log.
raftLog raft.LogStore // Persistent log store.
raftStable raft.StableStore // Persistent k-v store.
boltStore *rlog.Log // Physical store.
snapshotStore SnapshotStore // Snapshot store.
// Raft changes observer
leaderObserversMu sync.RWMutex
leaderObservers []chan<- struct{}
observerClose chan struct{}
observerDone chan struct{}
observerChan chan raft.Observation
observer *raft.Observer
firstIdxOnOpen uint64 // First index on log when Store opens.
lastIdxOnOpen uint64 // Last index on log when Store opens.
lastCommandIdxOnOpen uint64 // Last command index before applied index when Store opens.
lastAppliedIdxOnOpen uint64 // Last applied index on log when Store opens.
firstLogAppliedT time.Time // Time first log is applied
appliedOnOpen uint64 // Number of logs applied at open.
openT time.Time // Timestamp when Store opens.
logger *log.Logger
logIncremental bool
notifyMu sync.Mutex
BootstrapExpect int
bootstrapped bool
notifyingNodes map[string]*Server
ShutdownOnRemove bool
SnapshotThreshold uint64
SnapshotThresholdWALSize uint64
SnapshotInterval time.Duration
LeaderLeaseTimeout time.Duration
HeartbeatTimeout time.Duration
ElectionTimeout time.Duration
ApplyTimeout time.Duration
RaftLogLevel string
NoFreeListSync bool
AutoVacInterval time.Duration
// Node-reaping configuration
ReapTimeout time.Duration
ReapReadOnlyTimeout time.Duration
numTrailingLogs uint64
// For whitebox testing
numAutoVacuums int
numIgnoredJoins int
numNoops *atomic.Uint64
numSnapshotsMu sync.Mutex
numSnapshots int
// Config represents the configuration of the underlying Store.
type Config struct {
DBConf *DBConfig // The DBConfig object for this Store.
Dir string // The working directory for raft.
Tn Transport // The underlying Transport for raft.
ID string // Node ID.
Logger *log.Logger // The logger to use to log stuff.
// New returns a new Store.
func New(ly Layer, c *Config) *Store {
logger := c.Logger
if logger == nil {
logger = log.New(os.Stderr, "[store] ", log.LstdFlags)
dbPath := filepath.Join(c.Dir, sqliteFile)
if c.DBConf.OnDiskPath != "" {
dbPath = c.DBConf.OnDiskPath
return &Store{
open: NewAtomicBool(),
ly: ly,
raftDir: c.Dir,
snapshotDir: filepath.Join(c.Dir, snapshotsDirName),
peersPath: filepath.Join(c.Dir, peersPath),
peersInfoPath: filepath.Join(c.Dir, peersInfoPath),
restoreDoneCh: make(chan struct{}),
raftID: c.ID,
dbConf: c.DBConf,
dbPath: dbPath,
walPath: sql.WALPath(dbPath),
dbDir: filepath.Dir(dbPath),
leaderObservers: make([]chan<- struct{}, 0),
reqMarshaller: command.NewRequestMarshaler(),
logger: logger,
notifyingNodes: make(map[string]*Server),
ApplyTimeout: applyTimeout,
snapshotCAS: NewCheckAndSet(),
fsmIdx: &atomic.Uint64{},
fsmUpdateTime: NewAtomicTime(),
appendedAtTime: NewAtomicTime(),
dbAppliedIdx: &atomic.Uint64{},
numNoops: &atomic.Uint64{},
// SetRestorePath sets the path to a file containing a copy of a
// SQLite database. This database will be loaded if and when the
// node becomes the Leader for the first time only. The Store will
// also delete the file when it's finished with it.
// This function should only be called before the Store is opened
// and setting the restore path means the Store will not report
// itself as ready until a restore has been attempted.
func (s *Store) SetRestorePath(path string) error {
if s.open.Is() {
return ErrOpen
if !sql.IsValidSQLiteFile(path) {
return fmt.Errorf("file %s is not a valid SQLite file", path)
s.restorePath = path
return nil
// Open opens the Store.
func (s *Store) Open() (retErr error) {
defer func() {
if retErr == nil {
if s.open.Is() {
return ErrOpen
s.openT = time.Now()
s.logger.Printf("opening store with node ID %s, listening on %s", s.raftID, s.ly.Addr().String())
// Create all the required Raft directories.
s.logger.Printf("ensuring data directory exists at %s", s.raftDir)
if err := os.MkdirAll(s.raftDir, 0755); err != nil {
return err
if err := os.MkdirAll(filepath.Dir(s.peersPath), 0755); err != nil {
return err
decMgmr, err := chunking.NewDechunkerManager(filepath.Dir(s.dbPath))
if err != nil {
return err
s.dechunkManager = decMgmr
s.cmdProc = NewCommandProcessor(s.logger, s.dechunkManager)
// Create the database directory, if it doesn't already exist.
parentDBDir := filepath.Dir(s.dbPath)
if !dirExists(parentDBDir) {
s.logger.Printf("creating directory for database at %s", parentDBDir)
err := os.MkdirAll(parentDBDir, 0755)
if err != nil {
return err
// Create Raft-compatible network layer.
nt := raft.NewNetworkTransport(NewTransport(s.ly), connectionPoolCount, connectionTimeout, nil)
s.raftTn = NewNodeTransport(nt)
// Don't allow control over trailing logs directly, just implement a policy.
s.numTrailingLogs = uint64(float64(s.SnapshotThreshold) * trailingScale)
config := s.raftConfig()
config.LocalID = raft.ServerID(s.raftID)
// Upgrade any pre-existing snapshots.
oldSnapshotDir := filepath.Join(s.raftDir, "snapshots")
if err := snapshot.Upgrade(oldSnapshotDir, s.snapshotDir, s.logger); err != nil {
return fmt.Errorf("failed to upgrade snapshots: %s", err)
// Create store for the Snapshots.
snapshotStore, err := snapshot.NewStore(filepath.Join(s.snapshotDir))
if err != nil {
return fmt.Errorf("failed to create snapshot store: %s", err)
s.snapshotStore = snapshotStore
snaps, err := s.snapshotStore.List()
if err != nil {
return fmt.Errorf("list snapshots: %s", err)
s.logger.Printf("%d preexisting snapshots present", len(snaps))
// Create the Raft log store and stable store.
s.boltStore, err = rlog.New(filepath.Join(s.raftDir, raftDBPath), s.NoFreeListSync)
if err != nil {
return fmt.Errorf("new log store: %s", err)
s.raftStable = s.boltStore
s.raftLog, err = raft.NewLogCache(raftLogCacheSize, s.boltStore)
if err != nil {
return fmt.Errorf("new cached store: %s", err)
// Request to recover node?
if pathExists(s.peersPath) {
s.logger.Printf("attempting node recovery using %s", s.peersPath)
config, err := raft.ReadConfigJSON(s.peersPath)
if err != nil {
return fmt.Errorf("failed to read peers file: %s", err.Error())
if err = RecoverNode(s.raftDir, s.logger, s.raftLog, s.boltStore, s.snapshotStore, s.raftTn, config); err != nil {
return fmt.Errorf("failed to recover node: %s", err.Error())
if err := os.Rename(s.peersPath, s.peersInfoPath); err != nil {
return fmt.Errorf("failed to move %s after recovery: %s", s.peersPath, err.Error())
s.logger.Printf("node recovered successfully using %s", s.peersPath)
stats.Add(numRecoveries, 1)
// Get some info about the log, before any more entries are committed.
if err := s.setLogInfo(); err != nil {
return fmt.Errorf("set log info: %s", err)
s.logger.Printf("first log index: %d, last log index: %d, last applied index: %d, last command log index: %d:",
s.firstIdxOnOpen, s.lastIdxOnOpen, s.lastAppliedIdxOnOpen, s.lastCommandIdxOnOpen)
s.db, err = createOnDisk(s.dbPath, s.dbConf.FKConstraints, true)
if err != nil {
return fmt.Errorf("failed to create on-disk database: %s", err)
// Clean up any files from aborted operations. This tries to catch the case where scratch files
// were created in the Raft directory, not cleaned up, and then the node was restarted with an
// explicit SQLite path set.
for _, pattern := range []string{
vacuumScatchPattern} {
for _, dir := range []string{s.raftDir, s.dbDir} {
files, err := filepath.Glob(filepath.Join(dir, pattern))
if err != nil {
return fmt.Errorf("failed to locate temporary files for pattern %s: %s", pattern, err.Error())
for _, f := range files {
if err := os.Remove(f); err != nil {
return fmt.Errorf("failed to remove temporary file %s: %s", f, err.Error())
// Instantiate the Raft system.
ra, err := raft.NewRaft(config, NewFSM(s), s.raftLog, s.raftStable, s.snapshotStore, s.raftTn)
if err != nil {
return fmt.Errorf("creating the raft system failed: %s", err)
s.raft = ra
// Open the observer channels.
s.observerChan = make(chan raft.Observation, observerChanLen)
s.observer = raft.NewObserver(s.observerChan, false, func(o *raft.Observation) bool {
_, isLeaderChange := o.Data.(raft.LeaderObservation)
_, isFailedHeartBeat := o.Data.(raft.FailedHeartbeatObservation)
return isLeaderChange || isFailedHeartBeat
// Register and listen for leader changes.
s.observerClose, s.observerDone = s.observe()
// WAL-size triggered snapshotting.
s.snapshotWClose, s.snapshotWDone = s.runWALSnapshotting()
// Periodically update the applied index for faster startup.
s.appliedIdxUpdateDone = s.updateAppliedIndex()
if err := s.initVacuumTime(); err != nil {
return fmt.Errorf("failed to initialize auto-vacuum times: %s", err.Error())
return nil
// Bootstrap executes a cluster bootstrap on this node, using the given
// Servers as the configuration.
func (s *Store) Bootstrap(servers ...*Server) error {
raftServers := make([]raft.Server, len(servers))
for i := range servers {
raftServers[i] = raft.Server{
ID: raft.ServerID(servers[i].ID),
Address: raft.ServerAddress(servers[i].Addr),
Servers: raftServers,
return nil
// Stepdown forces this node to relinquish leadership to another node in
// the cluster. If this node is not the leader, and 'wait' is true, an error
// will be returned.
func (s *Store) Stepdown(wait bool) error {
if !s.open.Is() {
return ErrNotOpen
f := s.raft.LeadershipTransfer()
if !wait {
return nil
return f.Error()
// RegisterReadyChannel registers a channel that must be closed before the
// store is considered "ready" to serve requests.
func (s *Store) RegisterReadyChannel(ch <-chan struct{}) {
defer s.readyChansMu.Unlock()
s.readyChans = append(s.readyChans, ch)
go func() {
// Ready returns true if the store is ready to serve requests. Ready is
// defined as having no open channels registered via RegisterReadyChannel
// and having a Leader.
func (s *Store) Ready() bool {
l, err := s.LeaderAddr()
if err != nil || l == "" {
return false
return func() bool {
defer s.readyChansMu.Unlock()
if s.numClosedReadyChannels != len(s.readyChans) {
return false
s.readyChans = nil
s.numClosedReadyChannels = 0
return true
// Committed blocks until the local commit index is greater than or
// equal to the Leader index, as checked when the function is called.
// It returns the committed index. If the Leader index is 0, then the
// system waits until the commit index is at least 1.
func (s *Store) Committed(timeout time.Duration) (uint64, error) {
lci, err := s.LeaderCommitIndex()
if err != nil {
return lci, err
return lci, s.WaitForCommitIndex(max(1, lci), timeout)
// Close closes the store. If wait is true, waits for a graceful shutdown.
func (s *Store) Close(wait bool) (retErr error) {
defer func() {
if retErr == nil {
s.logger.Printf("store closed with node ID %s, listening on %s", s.raftID, s.ly.Addr().String())
if !s.open.Is() {
// Protect against closing already-closed resource, such as channels.
return nil
f := s.raft.Shutdown()
if wait {
if f.Error() != nil {
return f.Error()
if err := s.raftTn.Close(); err != nil {
return err
// Only shutdown Bolt and SQLite when Raft is done.
if err := s.db.Close(); err != nil {
return err
if err := s.boltStore.Close(); err != nil {
return err
return nil
// WaitForAppliedFSM waits until the currently applied logs (at the time this
// function is called) are actually reflected by the FSM, or the timeout expires.
func (s *Store) WaitForAppliedFSM(timeout time.Duration) (uint64, error) {
if timeout == 0 {
return 0, nil
return s.WaitForFSMIndex(s.raft.AppliedIndex(), timeout)
// WaitForApplied waits for all Raft log entries to be applied to the
// underlying database.
func (s *Store) WaitForAllApplied(timeout time.Duration) error {
if timeout == 0 {
return nil
return s.WaitForAppliedIndex(s.raft.LastIndex(), timeout)
// WaitForAppliedIndex blocks until a given log index has been applied,
// or the timeout expires.
func (s *Store) WaitForAppliedIndex(idx uint64, timeout time.Duration) error {
tck := time.NewTicker(appliedWaitDelay)
defer tck.Stop()
tmr := time.NewTimer(timeout)
defer tmr.Stop()
for {
select {
case <-tck.C:
if s.raft.AppliedIndex() >= idx {
return nil
case <-tmr.C:
return fmt.Errorf("timeout expired")
// WaitForCommitIndex blocks until the local Raft commit index is equal to
// or greater the given index, or the timeout expires.
func (s *Store) WaitForCommitIndex(idx uint64, timeout time.Duration) error {
tck := time.NewTicker(commitEquivalenceDelay)
defer tck.Stop()
tmr := time.NewTimer(timeout)
defer tmr.Stop()
checkFn := func() bool {
return s.raft.CommitIndex() >= idx
// Try the fast path.
if checkFn() {
return nil
for {
select {
case <-tck.C:
if checkFn() {
return nil
case <-tmr.C:
return fmt.Errorf("timeout expired")
// DBAppliedIndex returns the index of the last Raft log that changed the
// underlying database. If the index is unknown then 0 is returned.
func (s *Store) DBAppliedIndex() uint64 {
return s.dbAppliedIdx.Load()
// IsLeader is used to determine if the current node is cluster leader
func (s *Store) IsLeader() bool {
if !s.open.Is() {
return false
return s.raft.State() == raft.Leader
// HasLeader returns true if the cluster has a leader, false otherwise.
func (s *Store) HasLeader() bool {
if !s.open.Is() {
return false
return s.raft.Leader() != ""
// IsVoter returns true if the current node is a voter in the cluster. If there
// is no reference to the current node in the current cluster configuration then
// false will also be returned.
func (s *Store) IsVoter() (bool, error) {
if !s.open.Is() {
return false, ErrNotOpen
cfg := s.raft.GetConfiguration()
if err := cfg.Error(); err != nil {
return false, err
for _, srv := range cfg.Configuration().Servers {
if srv.ID == raft.ServerID(s.raftID) {
return srv.Suffrage == raft.Voter, nil
return false, nil
// State returns the current node's Raft state
func (s *Store) State() ClusterState {
if !s.open.Is() {
return Unknown
state := s.raft.State()
switch state {
case raft.Leader:
return Leader
case raft.Candidate:
return Candidate
case raft.Follower:
return Follower
case raft.Shutdown:
return Shutdown
return Unknown
// LastVacuumTime returns the time of the last automatic VACUUM.
func (s *Store) LastVacuumTime() (time.Time, error) {
return s.getKeyTime(lastVacuumTimeKey)
// Path returns the path to the store's storage directory.
func (s *Store) Path() string {
return s.raftDir
// Addr returns the address of the store.
func (s *Store) Addr() string {
if !s.open.Is() {
return ""
return string(s.raftTn.LocalAddr())
// ID returns the Raft ID of the store.
func (s *Store) ID() string {
return s.raftID
// LeaderAddr returns the address of the current leader. Returns a
// blank string if there is no leader or if the Store is not open.
func (s *Store) LeaderAddr() (string, error) {
if !s.open.Is() {
return "", nil
addr, _ := s.raft.LeaderWithID()
return string(addr), nil
// LeaderID returns the node ID of the Raft leader. Returns a
// blank string if there is no leader, or an error.
func (s *Store) LeaderID() (string, error) {
if !s.open.Is() {
return "", nil
_, id := s.raft.LeaderWithID()
return string(id), nil
// LeaderWithID is used to return the current leader address and ID of the cluster.
// It may return empty strings if there is no current leader or the leader is unknown.
func (s *Store) LeaderWithID() (string, string) {
if !s.open.Is() {
return "", ""
addr, id := s.raft.LeaderWithID()
return string(addr), string(id)
// CommitIndex returns the Raft commit index.
func (s *Store) CommitIndex() (uint64, error) {
if !s.open.Is() {
return 0, ErrNotOpen
return s.raft.CommitIndex(), nil
// LeaderCommitIndex returns the Raft leader commit index, as indicated
// by the latest AppendEntries RPC. If this node is the Leader then the
// commit index is returned directly from the Raft object.
func (s *Store) LeaderCommitIndex() (uint64, error) {
if !s.open.Is() {
return 0, ErrNotOpen
if s.raft.State() == raft.Leader {
return s.raft.CommitIndex(), nil
return s.raftTn.LeaderCommitIndex(), nil
// Nodes returns the slice of nodes in the cluster, sorted by ID ascending.
func (s *Store) Nodes() ([]*Server, error) {
if !s.open.Is() {
return nil, ErrNotOpen
f := s.raft.GetConfiguration()
if f.Error() != nil {
return nil, f.Error()
rs := f.Configuration().Servers
servers := make([]*Server, len(rs))
for i := range rs {
servers[i] = &Server{
ID: string(rs[i].ID),
Addr: string(rs[i].Address),
Suffrage: rs[i].Suffrage.String(),
return servers, nil
// WaitForRemoval blocks until a node with the given ID is removed from the
// cluster or the timeout expires.
func (s *Store) WaitForRemoval(id string, timeout time.Duration) error {
check := func() bool {
nodes, err := s.Nodes()
if err == nil && !Servers(nodes).Contains(id) {
return true
return false
// try the fast path
if check() {
return nil
tck := time.NewTicker(appliedWaitDelay)
defer tck.Stop()
tmr := time.NewTimer(timeout)
defer tmr.Stop()
for {
select {
case <-tck.C:
if check() {
return nil
case <-tmr.C:
return ErrWaitForRemovalTimeout
// WaitForLeader blocks until a leader is detected, or the timeout expires.
func (s *Store) WaitForLeader(timeout time.Duration) (string, error) {
var err error
var leaderAddr string
check := func() bool {
leaderAddr, err = s.LeaderAddr()
if err == nil && leaderAddr != "" {
return true
return false
// try the fast path
if check() {
return leaderAddr, nil
tck := time.NewTicker(leaderWaitDelay)
defer tck.Stop()
tmr := time.NewTimer(timeout)
defer tmr.Stop()
for {
select {
case <-tck.C:
if check() {
return leaderAddr, nil
case <-tmr.C:
if err != nil {
s.logger.Printf("timed out waiting for leader, last error: %s", err.Error())
return "", ErrWaitForLeaderTimeout
// SetRequestCompression allows low-level control over the compression threshold
// for the request marshaler.
func (s *Store) SetRequestCompression(batch, size int) {
s.reqMarshaller.BatchThreshold = batch
s.reqMarshaller.SizeThreshold = size
// WaitForFSMIndex blocks until a given log index has been applied to our
// state machine or the timeout expires.
func (s *Store) WaitForFSMIndex(idx uint64, timeout time.Duration) (uint64, error) {
tck := time.NewTicker(appliedWaitDelay)
defer tck.Stop()
tmr := time.NewTimer(timeout)
defer tmr.Stop()
for {
select {
case <-tck.C:
if fsmIdx := s.fsmIdx.Load(); fsmIdx >= idx {
return fsmIdx, nil
case <-tmr.C:
return 0, fmt.Errorf("timeout expired")
// Stats returns stats for the store.
func (s *Store) Stats() (map[string]interface{}, error) {
if !s.open.Is() {
return map[string]interface{}{
"open": false,
}, nil
dbStatus, err := s.db.Stats()
if err != nil {
stats.Add(numDBStatsErrors, 1)
s.logger.Printf("failed to get database stats: %s", err.Error())
nodes, err := s.Nodes()
if err != nil {
return nil, err
leaderAddr, leaderID := s.LeaderWithID()
// Perform type-conversion to actual numbers where possible.
raftStats := make(map[string]interface{})
for k, v := range s.raft.Stats() {
if s, err := strconv.ParseInt(v, 10, 64); err != nil {
raftStats[k] = v
} else {
raftStats[k] = s
raftStats["log_size"], err = s.logSize()
if err != nil {
return nil, err
raftStats["voter"], err = s.IsVoter()
if err != nil {
return nil, err
raftStats["bolt"] = s.boltStore.Stats()
raftStats["transport"] = s.raftTn.Stats()
dirSz, err := dirSize(s.raftDir)
if err != nil {
return nil, err
lAppliedIdx, err := s.boltStore.GetAppliedIndex()
if err != nil {
return nil, err
status := map[string]interface{}{
"open": s.open,
"node_id": s.raftID,
"raft": raftStats,
"fsm_index": s.fsmIdx.Load(),
"fsm_update_time": s.fsmUpdateTime.Load(),
"db_applied_index": s.dbAppliedIdx.Load(),
"last_applied_index": lAppliedIdx,
"addr": s.Addr(),
"leader": map[string]string{
"node_id": leaderID,
"addr": leaderAddr,
"leader_appended_at_time": s.appendedAtTime.Load(),
"ready": s.Ready(),
"observer": map[string]uint64{
"observed": s.observer.GetNumObserved(),
"dropped": s.observer.GetNumDropped(),
"apply_timeout": s.ApplyTimeout.String(),
"heartbeat_timeout": s.HeartbeatTimeout.String(),
"election_timeout": s.ElectionTimeout.String(),
"snapshot_threshold": s.SnapshotThreshold,
"snapshot_interval": s.SnapshotInterval.String(),
"reap_timeout": s.ReapTimeout.String(),
"reap_read_only_timeout": s.ReapReadOnlyTimeout.String(),
"no_freelist_sync": s.NoFreeListSync,
"trailing_logs": s.numTrailingLogs,
"request_marshaler": s.reqMarshaller.Stats(),
"nodes": nodes,
"dir": s.raftDir,
"dir_size": dirSz,
"dir_size_friendly": friendlyBytes(uint64(dirSz)),
"sqlite3": dbStatus,
"db_conf": s.dbConf,
if s.AutoVacInterval > 0 {
bt, err := s.getKeyTime(baseVacuumTimeKey)
if err != nil {
return nil, err
avm := map[string]interface{}{}
if lvt, err := s.LastVacuumTime(); err == nil {
avm["last_vacuum"] = lvt
bt = lvt
avm["next_vacuum_after"] = bt.Add(s.AutoVacInterval)
status["auto_vacuum"] = avm
// Snapshot stats may be in flux if a snapshot is in progress. Only
// report them if they are available.
snapsStats, err := s.snapshotStore.Stats()
if err == nil {
status["snapshot_store"] = snapsStats
return status, nil
// Execute executes queries that return no rows, but do modify the database.
func (s *Store) Execute(ex *proto.ExecuteRequest) ([]*proto.ExecuteResult, error) {
if !s.open.Is() {
return nil, ErrNotOpen
if s.raft.State() != raft.Leader {
return nil, ErrNotLeader
if !s.Ready() {
return nil, ErrNotReady
return s.execute(ex)
func (s *Store) execute(ex *proto.ExecuteRequest) ([]*proto.ExecuteResult, error) {
b, compressed, err := s.tryCompress(ex)
if err != nil {
return nil, err
c := &proto.Command{
Type: proto.Command_COMMAND_TYPE_EXECUTE,
SubCommand: b,
Compressed: compressed,
b, err = command.Marshal(c)
if err != nil {
return nil, err
af := s.raft.Apply(b, s.ApplyTimeout)
if af.Error() != nil {
if af.Error() == raft.ErrNotLeader {
return nil, ErrNotLeader
return nil, af.Error()
r := af.Response().(*fsmExecuteResponse)
return r.results, r.error
// Query executes queries that return rows, and do not modify the database.
func (s *Store) Query(qr *proto.QueryRequest) ([]*proto.QueryRows, error) {
if !s.open.Is() {
return nil, ErrNotOpen
if qr.Level == proto.QueryRequest_QUERY_REQUEST_LEVEL_STRONG {
if s.raft.State() != raft.Leader {
return nil, ErrNotLeader
if !s.Ready() {
return nil, ErrNotReady
b, compressed, err := s.tryCompress(qr)
if err != nil {
return nil, err
c := &proto.Command{
Type: proto.Command_COMMAND_TYPE_QUERY,
SubCommand: b,
Compressed: compressed,
b, err = command.Marshal(c)
if err != nil {
return nil, err
af := s.raft.Apply(b, s.ApplyTimeout)
if af.Error() != nil {
if af.Error() == raft.ErrNotLeader {
return nil, ErrNotLeader
return nil, af.Error()
r := af.Response().(*fsmQueryResponse)
return r.rows, r.error
if qr.Level == proto.QueryRequest_QUERY_REQUEST_LEVEL_WEAK && s.raft.State() != raft.Leader {
return nil, ErrNotLeader
if qr.Level == proto.QueryRequest_QUERY_REQUEST_LEVEL_NONE && s.isStaleRead(qr.Freshness, qr.FreshnessStrict) {
return nil, ErrStaleRead
if qr.Request.Transaction {
// Transaction requested during query, but not going through consensus. This means
// we need to block any database serialization during the query.
defer s.queryTxMu.RUnlock()
return s.db.Query(qr.Request, qr.Timings)
// Request processes a request that may contain both Executes and Queries.
func (s *Store) Request(eqr *proto.ExecuteQueryRequest) ([]*proto.ExecuteQueryResponse, error) {
if !s.open.Is() {
return nil, ErrNotOpen
nRW, _ := s.RORWCount(eqr)
isLeader := s.raft.State() == raft.Leader
if nRW == 0 && eqr.Level != proto.QueryRequest_QUERY_REQUEST_LEVEL_STRONG {
// It's a little faster just to do a Query of the DB if we know there is no need
// for consensus.
if eqr.Request.Transaction {
// Transaction requested during query, but not going through consensus. This means
// we need to block any database serialization during the query.
defer s.queryTxMu.RUnlock()
convertFn := func(qr []*proto.QueryRows) []*proto.ExecuteQueryResponse {
resp := make([]*proto.ExecuteQueryResponse, len(qr))
for i := range qr {
resp[i] = &proto.ExecuteQueryResponse{
Result: &proto.ExecuteQueryResponse_Q{Q: qr[i]},
return resp
if eqr.Level == proto.QueryRequest_QUERY_REQUEST_LEVEL_NONE && s.isStaleRead(eqr.Freshness, eqr.FreshnessStrict) {
return nil, ErrStaleRead
} else if eqr.Level == proto.QueryRequest_QUERY_REQUEST_LEVEL_WEAK {
if !isLeader {
return nil, ErrNotLeader
qr, err := s.db.Query(eqr.Request, eqr.Timings)
return convertFn(qr), err
// At least one write in the request, or STRONG consistency requested, so
// we need to go through consensus. Check that we can do that.
if !isLeader {
return nil, ErrNotLeader
if !s.Ready() {
return nil, ErrNotReady
// Send the request through consensus.
b, compressed, err := s.tryCompress(eqr)
if err != nil {
return nil, err
c := &proto.Command{
SubCommand: b,
Compressed: compressed,
b, err = command.Marshal(c)
if err != nil {
return nil, err
af := s.raft.Apply(b, s.ApplyTimeout)
if af.Error() != nil {
if af.Error() == raft.ErrNotLeader {
return nil, ErrNotLeader
return nil, af.Error()
r := af.Response().(*fsmExecuteQueryResponse)
return r.results, r.error
// Backup writes a consistent snapshot of the underlying database to dst. This
// can be called while writes are being made to the system. The backup may fail
// if the system is actively snapshotting. The client can just retry in this case.
// If vacuum is not true the copy is written directly to dst, optionally in compressed
// form, without any intermediate temporary files.
// If vacuum is true, then a VACUUM is performed on the database before the backup
// is made. If compression false, and dst is an os.File, then the vacuumed copy
// will be written directly to that file. Otherwise a temporary file will be created,
// and that temporary file copied to dst.
func (s *Store) Backup(br *proto.BackupRequest, dst io.Writer) (retErr error) {
if !s.open.Is() {
return ErrNotOpen
if br.Vacuum && br.Format != proto.BackupRequest_BACKUP_REQUEST_FORMAT_BINARY {
return ErrInvalidBackupFormat
startT := time.Now()
defer func() {
if retErr == nil {
stats.Add(numBackups, 1)
s.logger.Printf("database backed up in %s", time.Since(startT))
if br.Leader && s.raft.State() != raft.Leader {
return ErrNotLeader
if br.Format == proto.BackupRequest_BACKUP_REQUEST_FORMAT_BINARY {
var srcFD *os.File
var err error
if br.Vacuum {
if !br.Compress {
if f, ok := dst.(*os.File); ok {
// Fast path, just vacuum directly to the destination.
return s.db.Backup(f.Name(), br.Vacuum)
srcFD, err = createTemp(s.dbDir, backupScatchPattern)
if err != nil {
return err
defer os.Remove(srcFD.Name())
defer srcFD.Close()
if err := s.db.Backup(srcFD.Name(), br.Vacuum); err != nil {
return err
} else {
// Snapshot to ensure the main SQLite file has all the latest data.
if err := s.Snapshot(0); err != nil {
if err != raft.ErrNothingNewToSnapshot &&
!strings.Contains(err.Error(), "wait until the configuration entry at") {
return fmt.Errorf("pre-backup snapshot failed: %s", err.Error())
// Pause any snapshotting and which will allow us to read the SQLite
// file without it changing underneath us. Any new writes will be
// sent to the WAL.
if err := s.snapshotCAS.Begin(); err != nil {
return err
defer s.snapshotCAS.End()
// Now we can copy the SQLite file directly.
srcFD, err = os.Open(s.dbPath)
if err != nil {
return fmt.Errorf("failed to open database file: %s", err.Error())
defer srcFD.Close()
if br.Compress {
var dstGz *gzip.Writer
dstGz, err = gzip.NewWriterLevel(dst, gzip.BestSpeed)
if err != nil {
return err
defer dstGz.Close()
_, err = io.Copy(dstGz, srcFD)
} else {
_, err = io.Copy(dst, srcFD)
return err
} else if br.Format == proto.BackupRequest_BACKUP_REQUEST_FORMAT_SQL {
return s.db.Dump(dst)
return ErrInvalidBackupFormat
// Loads an entire SQLite file into the database, sending the request
// through the Raft log.
func (s *Store) Load(lr *proto.LoadRequest) error {
if !s.open.Is() {
return ErrNotOpen
if !s.Ready() {
return ErrNotReady
if err := s.load(lr); err != nil {
return err
stats.Add(numLoads, 1)
return nil
// load loads an entire SQLite file into the database, and is for internal use
// only. It does not check for readiness, and does not update statistics.
func (s *Store) load(lr *proto.LoadRequest) error {
startT := time.Now()
b, err := command.MarshalLoadRequest(lr)
if err != nil {
s.logger.Printf("load failed during load-request marshalling %s", err.Error())
return err
c := &proto.Command{
Type: proto.Command_COMMAND_TYPE_LOAD,
SubCommand: b,
b, err = command.Marshal(c)
if err != nil {
return err
af := s.raft.Apply(b, s.ApplyTimeout)
if af.Error() != nil {
if af.Error() == raft.ErrNotLeader {
return ErrNotLeader
s.logger.Printf("load failed during Apply: %s", af.Error())
return af.Error()
s.logger.Printf("node loaded in %s (%d bytes)", time.Since(startT), len(b))
return nil
// ReadFrom reads data from r, and loads it into the database, bypassing Raft consensus.
// Once the data is loaded, a snapshot is triggered, which then results in a system as
// if the data had been loaded through Raft consensus.
func (s *Store) ReadFrom(r io.Reader) (int64, error) {
// Check the constraints.
if s.raft.State() != raft.Leader {
return 0, ErrNotLeader
nodes, err := s.Nodes()
if err != nil {
return 0, err
if len(nodes) != 1 {
return 0, ErrNotSingleNode
// Write the data to a temporary file.
f, err := createTemp(s.dbDir, bootScatchPattern)
if err != nil {
return 0, err
defer os.Remove(f.Name())
defer f.Close()
cw := progress.NewCountingWriter(f)
cm := progress.StartCountingMonitor(func(n int64) {
s.logger.Printf("boot process installed %d bytes", n)
}, cw)
n, err := func() (int64, error) {
defer cm.StopAndWait()
defer f.Close()
return io.Copy(cw, r)
if err != nil {
return n, err
// Confirm the data is a valid SQLite database.
if !sql.IsValidSQLiteFile(f.Name()) {
return n, fmt.Errorf("invalid SQLite data")
// Raft won't snapshot unless there is at least one unsnappshotted log entry,
// so prep that now before we do anything destructive.
if af, err := s.Noop("boot"); err != nil {
return n, err
} else if err := af.Error(); err != nil {
return n, err
// Swap in new database file.
if err := s.db.Swap(f.Name(), s.dbConf.FKConstraints, true); err != nil {
return n, fmt.Errorf("error swapping database file: %v", err)
// Snapshot, so we load the new database into the Raft system.
if err := s.snapshotStore.SetFullNeeded(); err != nil {
return n, err
if err := s.Snapshot(1); err != nil {
return n, err
stats.Add(numBoots, 1)
return n, nil
// Vacuum performs a VACUUM operation on the underlying database. It does
// this by performing a VACUUM INTO a temporary file, and then swapping
// the temporary file with the existing database file. The database is then
// re-opened.
func (s *Store) Vacuum() error {
fd, err := createTemp(s.dbDir, vacuumScatchPattern)
if err != nil {
return err
if err := fd.Close(); err != nil {
return err
defer os.Remove(fd.Name())
if err := s.db.VacuumInto(fd.Name()); err != nil {
return err
// Verify that the VACUUMed database is valid.
if !sql.IsValidSQLiteFile(fd.Name()) {
return fmt.Errorf("invalid SQLite file post VACUUM")
// Swap in new database file.
if err := s.db.Swap(fd.Name(), s.dbConf.FKConstraints, true); err != nil {
return fmt.Errorf("error swapping database file: %v", err)
if err := s.snapshotStore.SetFullNeeded(); err != nil {
return err
return nil
// Database returns a copy of the underlying database. The caller MUST
// ensure that no transaction is taking place during this call, or an error may
// be returned. If leader is true, this operation is performed with a read
// consistency level equivalent to "weak". Otherwise, no guarantees are made
// about the read consistency level.
// http://sqlite.org/howtocorrupt.html states it is safe to do this
// as long as the database is not written to during the call.
func (s *Store) Database(leader bool) ([]byte, error) {
if leader && s.raft.State() != raft.Leader {
return nil, ErrNotLeader
return s.db.Serialize()
// Notify notifies this Store that a node is ready for bootstrapping at the
// given address. Once the number of known nodes reaches the expected level
// bootstrapping will be attempted using this Store. "Expected level" includes
// this node, so this node must self-notify to ensure the cluster bootstraps
// with the *advertised Raft address* which the Store doesn't know about.
// Notifying is idempotent. A node may repeatedly notify the Store without issue.
func (s *Store) Notify(nr *proto.NotifyRequest) error {
if !s.open.Is() {
return ErrNotOpen
defer s.notifyMu.Unlock()
if s.BootstrapExpect == 0 || s.bootstrapped || s.HasLeader() {
// There is no reason this node will bootstrap.
// - Read-only nodes require that BootstrapExpect is set to 0, so this
// block ensures that notifying a read-only node will not cause a bootstrap.
// - If the node is already bootstrapped, then there is nothing to do.
// - If the node already has a leader, then no bootstrapping is required.
return nil
if _, ok := s.notifyingNodes[nr.Id]; ok {
return nil
// Confirm that this node can resolve the remote address. This can happen due
// to incomplete DNS records across the underlying infrastructure. If it can't
// then don't consider this Notify attempt successful -- so the notifying node
// will presumably try again.
if addr, err := resolvableAddress(nr.Address); err != nil {
return fmt.Errorf("failed to resolve %s: %w", addr, err)
s.notifyingNodes[nr.Id] = &Server{nr.Id, nr.Address, "voter"}
if len(s.notifyingNodes) < s.BootstrapExpect {
return nil
raftServers := make([]raft.Server, 0)
for _, n := range s.notifyingNodes {
raftServers = append(raftServers, raft.Server{
ID: raft.ServerID(n.ID),
Address: raft.ServerAddress(n.Addr),
s.logger.Printf("reached expected bootstrap count of %d, starting cluster bootstrap",
bf := s.raft.BootstrapCluster(raft.Configuration{
Servers: raftServers,
if bf.Error() != nil {
s.logger.Printf("cluster bootstrap failed: %s", bf.Error())
} else {
s.logger.Printf("cluster bootstrap successful, servers: %s", raftServers)
s.bootstrapped = true
return nil
// Join joins a node, identified by id and located at addr, to this store.
// The node must be ready to respond to Raft communications at that address.
func (s *Store) Join(jr *proto.JoinRequest) error {
if !s.open.Is() {
return ErrNotOpen
if s.raft.State() != raft.Leader {
return ErrNotLeader
id := jr.Id
addr := jr.Address
voter := jr.Voter
// Confirm that this node can resolve the remote address. This can happen due
// to incomplete DNS records across the underlying infrastructure. If it can't
// then don't consider this join attempt successful -- so the joining node
// will presumably try again.
if addr, err := resolvableAddress(addr); err != nil {
return fmt.Errorf("failed to resolve %s: %w", addr, err)
configFuture := s.raft.GetConfiguration()
if err := configFuture.Error(); err != nil {
s.logger.Printf("failed to get raft configuration: %v", err)
return err
for _, srv := range configFuture.Configuration().Servers {
// If a node already exists with either the joining node's ID or address,
// that node may need to be removed from the config first.
if srv.ID == raft.ServerID(id) || srv.Address == raft.ServerAddress(addr) {
// However, if *both* the ID and the address are the same, then no
// join is actually needed.
if srv.Address == raft.ServerAddress(addr) && srv.ID == raft.ServerID(id) {
stats.Add(numIgnoredJoins, 1)
s.logger.Printf("node %s at %s already member of cluster, ignoring join request", id, addr)
return nil
if err := s.remove(id); err != nil {
s.logger.Printf("failed to remove node %s: %v", id, err)
return err
stats.Add(numRemovedBeforeJoins, 1)
s.logger.Printf("removed node %s prior to rejoin with changed ID or address", id)
var f raft.IndexFuture
if voter {
f = s.raft.AddVoter(raft.ServerID(id), raft.ServerAddress(addr), 0, 0)
} else {
f = s.raft.AddNonvoter(raft.ServerID(id), raft.ServerAddress(addr), 0, 0)
if e := f.(raft.Future); e.Error() != nil {
if e.Error() == raft.ErrNotLeader {
return ErrNotLeader
return e.Error()
stats.Add(numJoins, 1)
s.logger.Printf("node with ID %s, at %s, joined successfully as %s", id, addr, prettyVoter(voter))
return nil
// Remove removes a node from the store.
func (s *Store) Remove(rn *proto.RemoveNodeRequest) error {
if !s.open.Is() {
return ErrNotOpen
id := rn.Id
s.logger.Printf("received request to remove node %s", id)
if err := s.remove(id); err != nil {
return err
s.logger.Printf("node %s removed successfully", id)
return nil
// Noop writes a noop command to the Raft log. A noop command simply
// consumes a slot in the Raft log, but has no other effect on the
// system.
func (s *Store) Noop(id string) (raft.ApplyFuture, error) {
n := &proto.Noop{
Id: id,
b, err := command.MarshalNoop(n)
if err != nil {
return nil, err
c := &proto.Command{
Type: proto.Command_COMMAND_TYPE_NOOP,
SubCommand: b,
bc, err := command.Marshal(c)
if err != nil {
return nil, err
return s.raft.Apply(bc, s.ApplyTimeout), nil
// RORWCount returns the number of read-only and read-write statements in the
// given ExecuteQueryRequest.
func (s *Store) RORWCount(eqr *proto.ExecuteQueryRequest) (nRW, nRO int) {
for _, stmt := range eqr.Request.Statements {
sql := stmt.Sql
if sql == "" {
ro, err := s.db.StmtReadOnly(sql)
if err == nil && ro {
} else {
// setLogInfo records some key indexes about the log.
func (s *Store) setLogInfo() error {
var err error
s.firstIdxOnOpen, err = s.boltStore.FirstIndex()
if err != nil {
return fmt.Errorf("failed to get last index: %s", err)
s.lastAppliedIdxOnOpen, err = s.boltStore.GetAppliedIndex()
if err != nil {
return fmt.Errorf("failed to get last applied index: %s", err)
s.lastIdxOnOpen, err = s.boltStore.LastIndex()
if err != nil {
return fmt.Errorf("failed to get last index: %s", err)
s.lastCommandIdxOnOpen, err = s.boltStore.LastCommandIndex(s.firstIdxOnOpen, s.lastAppliedIdxOnOpen)
if err != nil {
return fmt.Errorf("failed to get last command index: %s", err)
return nil
// remove removes the node, with the given ID, from the cluster.
func (s *Store) remove(id string) error {
f := s.raft.RemoveServer(raft.ServerID(id), 0, 0)
if f.Error() != nil && f.Error() == raft.ErrNotLeader {
return ErrNotLeader
return f.Error()
// initVacuumTime initializes the last vacuum times in the Config store.
// If auto-vacuum is disabled, then all auto-vacuum related state is removed.
// If enabled, but no last vacuum time is set, then the auto-bac baseline
// time i.e. now is set. If a last vacuum time is set, then it is left as is.
func (s *Store) initVacuumTime() error {
if s.AutoVacInterval == 0 {
if err := s.clearKeyTime(baseVacuumTimeKey); err != nil {
return fmt.Errorf("failed to clear base vacuum time: %s", err)
if err := s.clearKeyTime(lastVacuumTimeKey); err != nil {
return fmt.Errorf("failed to clear last vacuum time: %s", err)
return nil
if _, err := s.LastVacuumTime(); err != nil {
return s.setKeyTime(baseVacuumTimeKey, time.Now())
return nil
func (s *Store) setKeyTime(key string, t time.Time) error {
buf := bytes.NewBuffer(make([]byte, 0, 8))
if err := binary.Write(buf, binary.LittleEndian, t.UnixNano()); err != nil {
return err
return s.boltStore.Set([]byte(key), buf.Bytes())
func (s *Store) getKeyTime(key string) (time.Time, error) {
kt, err := s.boltStore.Get([]byte(key))
if err != nil {
return time.Time{}, fmt.Errorf("failed to get key %s: %s", key, err)
} else if kt == nil {
return time.Time{}, fmt.Errorf("key %s is nil", key)
n := int64(binary.LittleEndian.Uint64(kt))
return time.Unix(0, n), nil
func (s *Store) clearKeyTime(key string) error {
return s.boltStore.Set([]byte(key), nil)
// raftConfig returns a new Raft config for the store.
func (s *Store) raftConfig() *raft.Config {
config := raft.DefaultConfig()
config.ShutdownOnRemove = s.ShutdownOnRemove
config.LogLevel = s.RaftLogLevel
if s.SnapshotThreshold != 0 {
config.SnapshotThreshold = s.SnapshotThreshold
config.TrailingLogs = s.numTrailingLogs
if s.SnapshotInterval != 0 {
config.SnapshotInterval = s.SnapshotInterval
if s.LeaderLeaseTimeout != 0 {
config.LeaderLeaseTimeout = s.LeaderLeaseTimeout
if s.HeartbeatTimeout != 0 {
config.HeartbeatTimeout = s.HeartbeatTimeout
if s.ElectionTimeout != 0 {
config.ElectionTimeout = s.ElectionTimeout
opts := hclog.DefaultOptions
opts.Name = ""
opts.Level = hclog.LevelFromString(s.RaftLogLevel)
s.logIncremental = opts.Level < hclog.Warn
config.Logger = hclog.FromStandardLogger(log.New(os.Stderr, "[raft] ", log.LstdFlags), opts)
return config
func (s *Store) updateAppliedIndex() chan struct{} {
done := make(chan struct{})
go func() {
ticker := time.NewTicker(appliedIndexUpdateInterval)
defer ticker.Stop()
var idx uint64
for {
select {
case <-ticker.C:
newIdx := s.raft.AppliedIndex()
if newIdx == idx {
idx = newIdx
if err := s.boltStore.SetAppliedIndex(idx); err != nil {
s.logger.Printf("failed to set applied index: %s", err.Error())
case <-done:
return done
func (s *Store) isStaleRead(freshness int64, strict bool) bool {
if s.raft.State() == raft.Leader {
return false
return IsStaleRead(
type fsmExecuteResponse struct {
results []*proto.ExecuteResult
error error
type fsmQueryResponse struct {
rows []*proto.QueryRows
error error
type fsmExecuteQueryResponse struct {
results []*proto.ExecuteQueryResponse
error error
type fsmGenericResponse struct {
error error
// fsmApply applies a Raft log entry to the database.
func (s *Store) fsmApply(l *raft.Log) (e interface{}) {
defer func() {
if l.Index <= s.lastCommandIdxOnOpen {
// In here means at least one command entry was in the log when the Store
// opened.
if l.Index == s.lastCommandIdxOnOpen {
s.logger.Printf("%d confirmed committed log entries applied in %s, took %s since open",
s.appliedOnOpen, time.Since(s.firstLogAppliedT), time.Since(s.openT))
if s.firstLogAppliedT.IsZero() {
s.firstLogAppliedT = time.Now()
s.logger.Printf("first log applied since node start, log at index %d", l.Index)
cmd, mutated, r := s.cmdProc.Process(l.Data, s.db)
if mutated {
if cmd.Type == proto.Command_COMMAND_TYPE_NOOP {
} else if cmd.Type == proto.Command_COMMAND_TYPE_LOAD {
// Swapping in a new database invalidates any existing snapshot.
err := s.snapshotStore.SetFullNeeded()
if err != nil {
return &fsmGenericResponse{
error: fmt.Errorf("failed to set full snapshot needed: %s", err.Error()),
return r
// fsmSnapshot returns a snapshot of the database.
// The system must ensure that no transaction is taking place during this call.
// Hashicorp Raft guarantees that this function will not be called concurrently
// with Apply, as it states Apply() and Snapshot() are always called from the same
// thread. This means there is no need to synchronize this function with Execute().
// However, queries that involve a transaction must be blocked.
// http://sqlite.org/howtocorrupt.html states it is safe to copy or serialize the
// database as long as no writes to the database are in progress.
func (s *Store) fsmSnapshot() (fSnap raft.FSMSnapshot, retErr error) {
defer s.queryTxMu.Unlock()
if err := s.snapshotCAS.Begin(); err != nil {
return nil, err
defer s.snapshotCAS.End()
startT := time.Now()
defer func() {
if retErr != nil {
stats.Add(numSnapshotsFailed, 1)
// Automatic VACUUM needed? This is deliberately done in the context of a Snapshot
// as it guarantees that the database is not being written to.
if avn, err := s.autoVacNeeded(time.Now()); err != nil {
return nil, err
} else if avn {
vacStart := time.Now()
if err := s.Vacuum(); err != nil {
stats.Add(numAutoVacuumsFailed, 1)
return nil, err
s.logger.Printf("database vacuumed in %s", time.Since(vacStart))
stats.Add(numAutoVacuums, 1)
if err := s.setKeyTime(lastVacuumTimeKey, time.Now()); err != nil {
return nil, err
fullNeeded, err := s.snapshotStore.FullNeeded()
if err != nil {
return nil, err
fPLog := fullPretty(fullNeeded)
defer func() {
defer s.numSnapshotsMu.Unlock()
var fsmSnapshot raft.FSMSnapshot
if fullNeeded {
chkStartTime := time.Now()
if err := s.db.Checkpoint(sql.CheckpointTruncate); err != nil {
stats.Add(numFullCheckpointFailed, 1)
return nil, err
dbFD, err := os.Open(s.db.Path())
if err != nil {
return nil, err
fsmSnapshot = snapshot.NewSnapshot(dbFD)
stats.Add(numSnapshotsFull, 1)
} else {
compactedBuf := bytes.NewBuffer(nil)
var err error
if pathExistsWithData(s.walPath) {
compactStartTime := time.Now()
// Read a compacted version of the WAL into memory, and write it
// to the Snapshot store.
walFD, err := os.Open(s.walPath)
if err != nil {
return nil, err
defer walFD.Close()
scanner, err := wal.NewFastCompactingScanner(walFD)
if err != nil {
return nil, err
compactedBytes, err := scanner.Bytes()
if err != nil {
return nil, err
compactedBuf = bytes.NewBuffer(compactedBytes)
// Now that we're written a (compacted) copy of the WAL to the Snapshot,
// we can truncate the WAL. We use truncate mode so that the next WAL
// contains just changes since the this snapshot.
walSz, err := fileSize(s.walPath)
if err != nil {
return nil, err
chkTStartTime := time.Now()
if err := s.db.Checkpoint(sql.CheckpointTruncate); err != nil {
stats.Add(numWALCheckpointTruncateFailed, 1)
return nil, fmt.Errorf("snapshot can't complete due to WAL checkpoint failure (will retry): %s",
fsmSnapshot = snapshot.NewSnapshot(io.NopCloser(compactedBuf))
if err != nil {
return nil, err
stats.Add(numSnapshotsIncremental, 1)
stats.Add(numSnapshots, 1)
dur := time.Since(startT)
fs := FSMSnapshot{
FSMSnapshot: fsmSnapshot,
if fullNeeded || s.logIncremental {
s.logger.Printf("%s snapshot created in %s on node ID %s", fPLog, dur, s.raftID)
fs.logger = s.logger
return &fs, nil
// fsmRestore restores the node to a previous state. The Hashicorp docs state this
// will not be called concurrently with Apply(), so synchronization with Execute()
// is not necessary.
func (s *Store) fsmRestore(rc io.ReadCloser) (retErr error) {
defer func() {
if retErr != nil {
stats.Add(numRestoresFailed, 1)
s.logger.Printf("initiating node restore on node ID %s", s.raftID)
startT := time.Now()
// Create a scatch file to write the restore data to it.
tmpFile, err := createTemp(s.dbDir, restoreScratchPattern)
if err != nil {
return fmt.Errorf("error creating temporary file for restore operation: %v", err)
defer os.Remove(tmpFile.Name())
defer tmpFile.Close()
// Copy it from the reader to the temporary file.
_, err = io.Copy(tmpFile, rc)
if err != nil {
return fmt.Errorf("error copying restore data: %v", err)
if tmpFile.Close(); err != nil {
return fmt.Errorf("error creating temporary file for restore operation: %v", err)
if err := s.db.Swap(tmpFile.Name(), s.dbConf.FKConstraints, true); err != nil {
return fmt.Errorf("error swapping database file: %v", err)
s.logger.Printf("successfully opened database at %s due to restore", s.db.Path())
// Take conservative approach and assume that everything has changed, so update
// the indexes. It is possible that dbAppliedIdx is now ahead of some other nodes'
// same value, since the last index is not necessarily a database-changing index,
// but that is OK. Worse that can happen is that anything paying attention to the
// index might consider the database to be changed when it is not, *logically* speaking.
li, err := snapshot.LatestIndex(s.snapshotDir)
if err != nil {
return fmt.Errorf("failed to get latest snapshot index post restore: %s", err)
if err := s.boltStore.SetAppliedIndex(li); err != nil {
return fmt.Errorf("failed to set applied index: %s", err)
stats.Add(numRestores, 1)
s.logger.Printf("node restored in %s", time.Since(startT))
return nil
// RegisterObserver registers an observer of Raft events
func (s *Store) RegisterObserver(o *raft.Observer) {
// DeregisterObserver deregisters an observer of Raft events
func (s *Store) DeregisterObserver(o *raft.Observer) {
// RegisterLeaderChange registers the given channel which will
// receive a signal when this node detects that the Leader changes.
func (s *Store) RegisterLeaderChange(c chan<- struct{}) {
defer s.leaderObserversMu.Unlock()
s.leaderObservers = append(s.leaderObservers, c)
func (s *Store) observe() (closeCh, doneCh chan struct{}) {
closeCh = make(chan struct{})
doneCh = make(chan struct{})
go func() {
defer close(doneCh)
for {
select {
case o := <-s.observerChan:
switch signal := o.Data.(type) {
case raft.FailedHeartbeatObservation:
stats.Add(failedHeartbeatObserved, 1)
nodes, err := s.Nodes()
if err != nil {
s.logger.Printf("failed to get nodes configuration during reap check: %s", err.Error())
servers := Servers(nodes)
id := string(signal.PeerID)
dur := time.Since(signal.LastContact)
isReadOnly, found := servers.IsReadOnly(id)
if !found {
s.logger.Printf("node %s is not present in configuration", id)
if (isReadOnly && s.ReapReadOnlyTimeout > 0 && dur > s.ReapReadOnlyTimeout) ||
(!isReadOnly && s.ReapTimeout > 0 && dur > s.ReapTimeout) {
pn := "voting node"
if isReadOnly {
pn = "non-voting node"
if err := s.remove(id); err != nil {
stats.Add(nodesReapedFailed, 1)
s.logger.Printf("failed to reap %s %s: %s", pn, id, err.Error())
} else {
stats.Add(nodesReapedOK, 1)
s.logger.Printf("successfully reaped %s %s", pn, id)
case raft.LeaderObservation:
for i := range s.leaderObservers {
select {
case s.leaderObservers[i] <- struct{}{}:
stats.Add(leaderChangesObserved, 1)
stats.Add(leaderChangesDropped, 1)
s.selfLeaderChange(signal.LeaderID == raft.ServerID(s.raftID))
case <-closeCh:
return closeCh, doneCh
// Snapshot performs a snapshot, leaving n trailing logs behind. If n
// is greater than zero, that many logs are left in the log after
// snapshotting. If n is zero, then the number set at Store creation is used.
// Finally, once this function returns, the trailing log configuration value
// is reset to the value set at Store creation.
func (s *Store) Snapshot(n uint64) (retError error) {
defer func() {
if retError != nil {
stats.Add(numUserSnapshotsFailed, 1)
if n > 0 {
cfg := s.raft.ReloadableConfig()
defer func() {
cfg.TrailingLogs = s.numTrailingLogs
if err := s.raft.ReloadConfig(cfg); err != nil {
s.logger.Printf("failed to reload Raft config: %s", err.Error())
cfg.TrailingLogs = n
if err := s.raft.ReloadConfig(cfg); err != nil {
return fmt.Errorf("failed to reload Raft config: %s", err.Error())
if err := s.raft.Snapshot().Error(); err != nil {
if strings.Contains(err.Error(), ErrLoadInProgress.Error()) {
return ErrLoadInProgress
return err
stats.Add(numUserSnapshots, 1)
return nil
// runWALSnapshotting runs the periodic check to see if a snapshot should be
// triggered due to WAL size.
func (s *Store) runWALSnapshotting() (closeCh, doneCh chan struct{}) {
closeCh = make(chan struct{})
doneCh = make(chan struct{})
ticker := time.NewTicker(time.Hour) // Just need an initialized ticker to start with.
if s.SnapshotInterval > 0 && s.SnapshotThresholdWALSize > 0 {
go func() {
defer close(doneCh)
defer ticker.Stop()
for {
select {
case <-ticker.C:
sz, err := fileSizeExists(s.walPath)
if err != nil {
s.logger.Printf("failed to check WAL size: %s", err.Error())
if uint64(sz) >= s.SnapshotThresholdWALSize {
if err := s.Snapshot(0); err != nil {
stats.Add(numWALSnapshotsFailed, 1)
s.logger.Printf("failed to snapshot due to WAL threshold: %s", err.Error())
stats.Add(numWALSnapshots, 1)
case <-closeCh:
return closeCh, doneCh
// selfLeaderChange is called when this node detects that its leadership
// status has changed.
func (s *Store) selfLeaderChange(leader bool) {
if s.restorePath != "" {
defer func() {
// Whatever happens, this is a one-shot attempt to perform a restore
err := os.Remove(s.restorePath)
if err != nil {
s.logger.Printf("failed to remove restore path after restore %s: %s",
s.restorePath, err.Error())
s.restorePath = ""
if !leader {
s.logger.Printf("different node became leader, not performing auto-restore")
stats.Add(numAutoRestoresSkipped, 1)
} else {
s.logger.Printf("this node is now leader, auto-restoring from %s", s.restorePath)
if err := s.installRestore(); err != nil {
s.logger.Printf("failed to auto-restore from %s: %s", s.restorePath, err.Error())
stats.Add(numAutoRestoresFailed, 1)
stats.Add(numAutoRestores, 1)
s.logger.Printf("node auto-restored successfully from %s", s.restorePath)
func (s *Store) installRestore() error {
f, err := os.Open(s.restorePath)
if err != nil {
return err
defer f.Close()
b, err := io.ReadAll(f)
if err != nil {
return err
lr := &proto.LoadRequest{
Data: b,
return s.load(lr)
// logSize returns the size of the Raft log on disk.
func (s *Store) logSize() (int64, error) {
fi, err := os.Stat(filepath.Join(s.raftDir, raftDBPath))
if err != nil {
return 0, err
return fi.Size(), nil
// tryCompress attempts to compress the given command. If the command is
// successfully compressed, the compressed byte slice is returned, along with
// a boolean true. If the command cannot be compressed, the uncompressed byte
// slice is returned, along with a boolean false. The stats are updated
// accordingly.
func (s *Store) tryCompress(rq command.Requester) ([]byte, bool, error) {
b, compressed, err := s.reqMarshaller.Marshal(rq)
if err != nil {
return nil, false, err
if compressed {
stats.Add(numCompressedCommands, 1)
} else {
stats.Add(numUncompressedCommands, 1)
return b, compressed, nil
// autoVacNeeded returns true if an automatic VACUUM is needed.
func (s *Store) autoVacNeeded(t time.Time) (bool, error) {
if s.AutoVacInterval == 0 {
return false, nil
vt, err := s.LastVacuumTime()
if err == nil {
return t.Sub(vt) > s.AutoVacInterval, nil
// OK, check if we have a base time from which we can start.
bt, err := s.getKeyTime(baseVacuumTimeKey)
if err != nil {
return false, err
return t.Sub(bt) > s.AutoVacInterval, nil
// createOnDisk opens an on-disk database file at the configured path. Any
// preexisting file will be removed before the database is opened.
func createOnDisk(path string, fkConstraints, wal bool) (*sql.SwappableDB, error) {
if err := sql.RemoveFiles(path); err != nil {
return nil, err
return sql.OpenSwappable(path, fkConstraints, wal)
func createTemp(dir, pattern string) (*os.File, error) {
fd, err := os.CreateTemp(dir, pattern)
if err != nil {
return nil, err
if err := os.Chmod(fd.Name(), 0644); err != nil {
return nil, err
return fd, nil
func copyFromReaderToFile(path string, r io.Reader) (int64, error) {
fd, err := os.Create(path)
if err != nil {
return 0, err
defer fd.Close()
return io.Copy(fd, r)
// prettyVoter converts bool to "voter" or "non-voter"
func prettyVoter(v bool) string {
if v {
return "voter"
return "non-voter"
// pathExists returns true if the given path exists.
func pathExists(p string) bool {
if _, err := os.Lstat(p); err != nil && os.IsNotExist(err) {
return false
return true
// pathExistsWithData returns true if the given path exists and has data.
func pathExistsWithData(p string) bool {
if !pathExists(p) {
return false
if size, err := fileSize(p); err != nil || size == 0 {
return false
return true
func dirExists(path string) bool {
stat, err := os.Stat(path)
return err == nil && stat.IsDir()
func fileSize(path string) (int64, error) {
stat, err := os.Stat(path)
if err != nil {
return 0, err
return stat.Size(), nil
// fileSizeExists returns the size of the given file, or 0 if the file does not
// exist. Any other error is returned.
func fileSizeExists(path string) (int64, error) {
if !pathExists(path) {
return 0, nil
return fileSize(path)
// dirSize returns the total size of all files in the given directory
func dirSize(path string) (int64, error) {
var size int64
err := filepath.Walk(path, func(_ string, info os.FileInfo, err error) error {
if err != nil {
// If the file doesn't exist, we can ignore it. Snapshot files might
// disappear during walking.
if os.IsNotExist(err) {
return nil
return err
if !info.IsDir() {
size += info.Size()
return err
return size, err
func fullPretty(full bool) string {
if full {
return "full"
return "incremental"
func resolvableAddress(addr string) (string, error) {
h, _, err := net.SplitHostPort(addr)
if err != nil {
// Just try the given address directly.
h = addr
_, err = net.LookupHost(h)
return h, err
func friendlyBytes(n uint64) string {
return humanize.Bytes(n)