You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
220 lines
6.8 KiB
Go
220 lines
6.8 KiB
Go
package store
|
|
|
|
import (
|
|
"fmt"
|
|
"log"
|
|
"net"
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
|
|
"github.com/hashicorp/raft"
|
|
"github.com/rqlite/rqlite/v8/command/chunking"
|
|
sql "github.com/rqlite/rqlite/v8/db"
|
|
rlog "github.com/rqlite/rqlite/v8/log"
|
|
"github.com/rqlite/rqlite/v8/snapshot"
|
|
)
|
|
|
|
// IsNewNode returns whether a node using raftDir would be a brand-new node.
|
|
// It also means that the window for this node joining a different cluster has passed.
|
|
func IsNewNode(raftDir string) bool {
|
|
// If there is any pre-existing Raft state, then this node
|
|
// has already been created.
|
|
return !pathExists(filepath.Join(raftDir, raftDBPath))
|
|
}
|
|
|
|
// HasData returns true if the given dir indicates that at least one FSM entry
|
|
// has been committed to the log. This is true if there are any snapshots, or
|
|
// if there are any entries in the log of raft.LogCommand type. This function
|
|
// will block if the Bolt database is already open.
|
|
func HasData(dir string) (bool, error) {
|
|
if !dirExists(dir) {
|
|
return false, nil
|
|
}
|
|
sstr, err := snapshot.NewStore(filepath.Join(dir, snapshotsDirName))
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
snaps, err := sstr.List()
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
if len(snaps) > 0 {
|
|
return true, nil
|
|
}
|
|
logs, err := rlog.New(filepath.Join(dir, raftDBPath), false)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
defer logs.Close()
|
|
h, err := logs.HasCommand()
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
return h, nil
|
|
}
|
|
|
|
// RecoverNode is used to manually force a new configuration, in the event that
|
|
// quorum cannot be restored. This borrows heavily from RecoverCluster functionality
|
|
// of the Hashicorp Raft library, but has been customized for rqlite use.
|
|
func RecoverNode(dataDir string, logger *log.Logger, logs raft.LogStore, stable *rlog.Log,
|
|
snaps raft.SnapshotStore, tn raft.Transport, conf raft.Configuration) error {
|
|
logPrefix := logger.Prefix()
|
|
logger.SetPrefix(fmt.Sprintf("%s[recovery] ", logPrefix))
|
|
defer logger.SetPrefix(logPrefix)
|
|
|
|
// Sanity check the Raft peer configuration.
|
|
if err := checkRaftConfiguration(conf); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Get a path to a temporary file to use for a temporary database.
|
|
tmpDBPath := filepath.Join(dataDir, "recovery.db")
|
|
defer os.Remove(tmpDBPath)
|
|
|
|
// Attempt to restore any latest snapshot.
|
|
var (
|
|
snapshotIndex uint64
|
|
snapshotTerm uint64
|
|
)
|
|
|
|
snapshots, err := snaps.List()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to list snapshots: %s", err)
|
|
}
|
|
logger.Printf("recovery detected %d snapshots", len(snapshots))
|
|
if len(snapshots) > 0 {
|
|
if err := func() error {
|
|
snapID := snapshots[0].ID
|
|
_, rc, err := snaps.Open(snapID)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to open snapshot %s: %s", snapID, err)
|
|
}
|
|
defer rc.Close()
|
|
_, err = copyFromReaderToFile(tmpDBPath, rc)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to copy snapshot %s to temporary database: %s", snapID, err)
|
|
}
|
|
snapshotIndex = snapshots[0].Index
|
|
snapshotTerm = snapshots[0].Term
|
|
return nil
|
|
}(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Now, open the database so we can replay any outstanding Raft log entries.
|
|
db, err := sql.OpenSwappable(tmpDBPath, false, true)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to open temporary database: %s", err)
|
|
}
|
|
defer db.Close()
|
|
|
|
// Need a dechunker manager to handle any chunked load requests.
|
|
decMgmr, err := chunking.NewDechunkerManager(dataDir)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create dechunker manager: %s", err.Error())
|
|
}
|
|
cmdProc := NewCommandProcessor(logger, decMgmr)
|
|
|
|
// The snapshot information is the best known end point for the data
|
|
// until we play back the Raft log entries.
|
|
lastIndex := snapshotIndex
|
|
lastTerm := snapshotTerm
|
|
|
|
// Apply any Raft log entries past the snapshot.
|
|
lastLogIndex, err := logs.LastIndex()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to find last log: %v", err)
|
|
}
|
|
logger.Printf("last index is %d, last index written to log is %d", lastIndex, lastLogIndex)
|
|
|
|
for index := snapshotIndex + 1; index <= lastLogIndex; index++ {
|
|
var entry raft.Log
|
|
if err = logs.GetLog(index, &entry); err != nil {
|
|
return fmt.Errorf("failed to get log at index %d: %v", index, err)
|
|
}
|
|
if entry.Type == raft.LogCommand {
|
|
cmdProc.Process(entry.Data, db)
|
|
}
|
|
lastIndex = entry.Index
|
|
lastTerm = entry.Term
|
|
}
|
|
|
|
// Create a new snapshot, placing the configuration in as if it was
|
|
// committed at index 1.
|
|
if err := db.Checkpoint(sql.CheckpointTruncate); err != nil {
|
|
return fmt.Errorf("failed to checkpoint database: %s", err)
|
|
}
|
|
tmpDBFD, err := os.Open(tmpDBPath)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to open temporary database file: %s", err)
|
|
}
|
|
fsmSnapshot := snapshot.NewSnapshot(tmpDBFD) // tmpDBPath contains full state now.
|
|
sink, err := snaps.Create(1, lastIndex, lastTerm, conf, 1, tn)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create snapshot: %v", err)
|
|
}
|
|
if err = fsmSnapshot.Persist(sink); err != nil {
|
|
return fmt.Errorf("failed to persist snapshot: %v", err)
|
|
}
|
|
if err = sink.Close(); err != nil {
|
|
return fmt.Errorf("failed to finalize snapshot: %v", err)
|
|
}
|
|
logger.Printf("recovery snapshot created successfully using %s", tmpDBPath)
|
|
|
|
// Compact the log so that we don't get bad interference from any
|
|
// configuration change log entries that might be there.
|
|
firstLogIndex, err := logs.FirstIndex()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get first log index: %v", err)
|
|
}
|
|
if err := logs.DeleteRange(firstLogIndex, lastLogIndex); err != nil {
|
|
return fmt.Errorf("log compaction failed: %v", err)
|
|
}
|
|
|
|
// Erase record of previous updating of Applied Index too.
|
|
if err := stable.SetAppliedIndex(0); err != nil {
|
|
return fmt.Errorf("failed to zero applied index: %v", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// checkRaftConfiguration tests a cluster membership configuration for common
|
|
// errors.
|
|
func checkRaftConfiguration(configuration raft.Configuration) error {
|
|
idSet := make(map[raft.ServerID]bool)
|
|
addressSet := make(map[raft.ServerAddress]bool)
|
|
var voters int
|
|
for _, server := range configuration.Servers {
|
|
if server.ID == "" {
|
|
return fmt.Errorf("empty ID in configuration: %v", configuration)
|
|
}
|
|
if server.Address == "" {
|
|
return fmt.Errorf("empty address in configuration: %v", server)
|
|
}
|
|
if strings.Contains(string(server.Address), "://") {
|
|
return fmt.Errorf("protocol specified in address: %v", server.Address)
|
|
}
|
|
_, _, err := net.SplitHostPort(string(server.Address))
|
|
if err != nil {
|
|
return fmt.Errorf("invalid address in configuration: %v", server.Address)
|
|
}
|
|
if idSet[server.ID] {
|
|
return fmt.Errorf("found duplicate ID in configuration: %v", server.ID)
|
|
}
|
|
idSet[server.ID] = true
|
|
if addressSet[server.Address] {
|
|
return fmt.Errorf("found duplicate address in configuration: %v", server.Address)
|
|
}
|
|
addressSet[server.Address] = true
|
|
if server.Suffrage == raft.Voter {
|
|
voters++
|
|
}
|
|
}
|
|
if voters == 0 {
|
|
return fmt.Errorf("need at least one voter in configuration: %v", configuration)
|
|
}
|
|
return nil
|
|
}
|