Fork 0
You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

220 lines
6.8 KiB

package store
import (
sql "github.com/rqlite/rqlite/v8/db"
rlog "github.com/rqlite/rqlite/v8/log"
// IsNewNode returns whether a node using raftDir would be a brand-new node.
// It also means that the window for this node joining a different cluster has passed.
func IsNewNode(raftDir string) bool {
// If there is any pre-existing Raft state, then this node
// has already been created.
return !pathExists(filepath.Join(raftDir, raftDBPath))
8 months ago
// HasData returns true if the given dir indicates that at least one FSM entry
// has been committed to the log. This is true if there are any snapshots, or
// if there are any entries in the log of raft.LogCommand type. This function
// will block if the Bolt database is already open.
func HasData(dir string) (bool, error) {
if !dirExists(dir) {
return false, nil
sstr, err := snapshot.NewStore(filepath.Join(dir, snapshotsDirName))
if err != nil {
return false, err
snaps, err := sstr.List()
if err != nil {
return false, err
if len(snaps) > 0 {
return true, nil
logs, err := rlog.New(filepath.Join(dir, raftDBPath), false)
if err != nil {
return false, err
defer logs.Close()
h, err := logs.HasCommand()
if err != nil {
return false, err
return h, nil
// RecoverNode is used to manually force a new configuration, in the event that
// quorum cannot be restored. This borrows heavily from RecoverCluster functionality
// of the Hashicorp Raft library, but has been customized for rqlite use.
func RecoverNode(dataDir string, logger *log.Logger, logs raft.LogStore, stable *rlog.Log,
snaps raft.SnapshotStore, tn raft.Transport, conf raft.Configuration) error {
logPrefix := logger.Prefix()
logger.SetPrefix(fmt.Sprintf("%s[recovery] ", logPrefix))
defer logger.SetPrefix(logPrefix)
// Sanity check the Raft peer configuration.
if err := checkRaftConfiguration(conf); err != nil {
return err
// Get a path to a temporary file to use for a temporary database.
tmpDBPath := filepath.Join(dataDir, "recovery.db")
defer os.Remove(tmpDBPath)
// Attempt to restore any latest snapshot.
var (
snapshotIndex uint64
snapshotTerm uint64
snapshots, err := snaps.List()
if err != nil {
return fmt.Errorf("failed to list snapshots: %s", err)
logger.Printf("recovery detected %d snapshots", len(snapshots))
if len(snapshots) > 0 {
if err := func() error {
snapID := snapshots[0].ID
_, rc, err := snaps.Open(snapID)
if err != nil {
return fmt.Errorf("failed to open snapshot %s: %s", snapID, err)
defer rc.Close()
_, err = copyFromReaderToFile(tmpDBPath, rc)
if err != nil {
return fmt.Errorf("failed to copy snapshot %s to temporary database: %s", snapID, err)
snapshotIndex = snapshots[0].Index
snapshotTerm = snapshots[0].Term
return nil
}(); err != nil {
return err
// Now, open the database so we can replay any outstanding Raft log entries.
db, err := sql.OpenSwappable(tmpDBPath, false, true)
if err != nil {
return fmt.Errorf("failed to open temporary database: %s", err)
defer db.Close()
// Need a dechunker manager to handle any chunked load requests.
decMgmr, err := chunking.NewDechunkerManager(dataDir)
if err != nil {
return fmt.Errorf("failed to create dechunker manager: %s", err.Error())
cmdProc := NewCommandProcessor(logger, decMgmr)
// The snapshot information is the best known end point for the data
// until we play back the Raft log entries.
lastIndex := snapshotIndex
lastTerm := snapshotTerm
// Apply any Raft log entries past the snapshot.
lastLogIndex, err := logs.LastIndex()
if err != nil {
return fmt.Errorf("failed to find last log: %v", err)
logger.Printf("last index is %d, last index written to log is %d", lastIndex, lastLogIndex)
for index := snapshotIndex + 1; index <= lastLogIndex; index++ {
var entry raft.Log
if err = logs.GetLog(index, &entry); err != nil {
return fmt.Errorf("failed to get log at index %d: %v", index, err)
if entry.Type == raft.LogCommand {
cmdProc.Process(entry.Data, db)
lastIndex = entry.Index
lastTerm = entry.Term
// Create a new snapshot, placing the configuration in as if it was
// committed at index 1.
if err := db.Checkpoint(); err != nil {
return fmt.Errorf("failed to checkpoint database: %s", err)
tmpDBFD, err := os.Open(tmpDBPath)
if err != nil {
return fmt.Errorf("failed to open temporary database file: %s", err)
fsmSnapshot := snapshot.NewSnapshot(tmpDBFD) // tmpDBPath contains full state now.
sink, err := snaps.Create(1, lastIndex, lastTerm, conf, 1, tn)
if err != nil {
return fmt.Errorf("failed to create snapshot: %v", err)
if err = fsmSnapshot.Persist(sink); err != nil {
return fmt.Errorf("failed to persist snapshot: %v", err)
if err = sink.Close(); err != nil {
return fmt.Errorf("failed to finalize snapshot: %v", err)
logger.Printf("recovery snapshot created successfully using %s", tmpDBPath)
// Compact the log so that we don't get bad interference from any
// configuration change log entries that might be there.
firstLogIndex, err := logs.FirstIndex()
if err != nil {
return fmt.Errorf("failed to get first log index: %v", err)
if err := logs.DeleteRange(firstLogIndex, lastLogIndex); err != nil {
return fmt.Errorf("log compaction failed: %v", err)
// Erase record of previous updating of Applied Index too.
if err := stable.SetAppliedIndex(0); err != nil {
return fmt.Errorf("failed to zero applied index: %v", err)
return nil
// checkRaftConfiguration tests a cluster membership configuration for common
// errors.
func checkRaftConfiguration(configuration raft.Configuration) error {
idSet := make(map[raft.ServerID]bool)
addressSet := make(map[raft.ServerAddress]bool)
var voters int
for _, server := range configuration.Servers {
if server.ID == "" {
return fmt.Errorf("empty ID in configuration: %v", configuration)
if server.Address == "" {
return fmt.Errorf("empty address in configuration: %v", server)
if strings.Contains(string(server.Address), "://") {
return fmt.Errorf("protocol specified in address: %v", server.Address)
_, _, err := net.SplitHostPort(string(server.Address))
if err != nil {
return fmt.Errorf("invalid address in configuration: %v", server.Address)
if idSet[server.ID] {
return fmt.Errorf("found duplicate ID in configuration: %v", server.ID)
idSet[server.ID] = true
if addressSet[server.Address] {
return fmt.Errorf("found duplicate address in configuration: %v", server.Address)
addressSet[server.Address] = true
if server.Suffrage == raft.Voter {
if voters == 0 {
return fmt.Errorf("need at least one voter in configuration: %v", configuration)
return nil