|
|
|
@ -1672,7 +1672,7 @@ func (s *Store) fsmApply(l *raft.Log) (e interface{}) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
snapshotNeeded := false
|
|
|
|
|
cmd, r := applyCommand(l.Data, &s.db, s.dechunkManager)
|
|
|
|
|
cmd, r := applyCommand(s.logger, l.Data, &s.db, s.dechunkManager)
|
|
|
|
|
switch cmd.Type {
|
|
|
|
|
case command.Command_COMMAND_TYPE_NOOP:
|
|
|
|
|
s.numNoops++
|
|
|
|
@ -2180,7 +2180,7 @@ func RecoverNode(dataDir string, logger *log.Logger, logs raft.LogStore, stable
|
|
|
|
|
return fmt.Errorf("failed to get log at index %d: %v", index, err)
|
|
|
|
|
}
|
|
|
|
|
if entry.Type == raft.LogCommand {
|
|
|
|
|
applyCommand(entry.Data, &db, decMgmr)
|
|
|
|
|
applyCommand(logger, entry.Data, &db, decMgmr)
|
|
|
|
|
}
|
|
|
|
|
lastIndex = entry.Index
|
|
|
|
|
lastTerm = entry.Term
|
|
|
|
@ -2225,7 +2225,7 @@ func RecoverNode(dataDir string, logger *log.Logger, logs raft.LogStore, stable
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func applyCommand(data []byte, pDB **sql.DB, decMgmr *chunking.DechunkerManager) (*command.Command, interface{}) {
|
|
|
|
|
func applyCommand(logger *log.Logger, data []byte, pDB **sql.DB, decMgmr *chunking.DechunkerManager) (*command.Command, interface{}) {
|
|
|
|
|
c := &command.Command{}
|
|
|
|
|
db := *pDB
|
|
|
|
|
|
|
|
|
@ -2306,6 +2306,15 @@ func applyCommand(data []byte, pDB **sql.DB, decMgmr *chunking.DechunkerManager)
|
|
|
|
|
decMgmr.Delete(lcr.StreamId)
|
|
|
|
|
defer os.Remove(path)
|
|
|
|
|
|
|
|
|
|
// Check if reassembled dayabase is valid. If not, do not perform the load. This could
|
|
|
|
|
// happen a snapshot truncated earlier parts of the log which contained the earlier parts
|
|
|
|
|
// of a database load. If that happened then the database has already been loaded, and
|
|
|
|
|
// this load should be ignored.
|
|
|
|
|
if !sql.IsValidSQLiteFile(path) {
|
|
|
|
|
logger.Printf("invalid chunked database file - ignoring")
|
|
|
|
|
return c, &fsmGenericResponse{error: fmt.Errorf("invalid chunked database file - ignoring")}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Close the underlying database before we overwrite it.
|
|
|
|
|
if err := db.Close(); err != nil {
|
|
|
|
|
return c, &fsmGenericResponse{error: fmt.Errorf("failed to close post-load database: %s", err)}
|
|
|
|
|