1
0
Fork 0

Merge pull request #1535 from rqlite/command-processor

Refactor into CommandProcessor
master
Philip O'Toole 9 months ago committed by GitHub
commit 2e40d22291
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -8,6 +8,7 @@ This release adds new control over Raft snapshotting, a key part of the Raft con
- [PR #1528](https://github.com/rqlite/rqlite/pull/1528): Support setting trailing logs for user-requested snapshot. - [PR #1528](https://github.com/rqlite/rqlite/pull/1528): Support setting trailing logs for user-requested snapshot.
- [PR #1529](https://github.com/rqlite/rqlite/pull/1529): Remove obsolete code related to user-triggered snapshots. - [PR #1529](https://github.com/rqlite/rqlite/pull/1529): Remove obsolete code related to user-triggered snapshots.
- [PR #1536](https://github.com/rqlite/rqlite/pull/1536): Store WAL path in store, to avoid races. - [PR #1536](https://github.com/rqlite/rqlite/pull/1536): Store WAL path in store, to avoid races.
- [PR #1535](https://github.com/rqlite/rqlite/pull/1535): Refactor using CommandProcessor.
## 8.13.5 (December 26th 2023) ## 8.13.5 (December 26th 2023)
### Implementation changes and bug fixes ### Implementation changes and bug fixes

@ -0,0 +1,142 @@
package store
import (
"fmt"
"log"
"os"
"github.com/rqlite/rqlite/v8/command"
"github.com/rqlite/rqlite/v8/command/chunking"
sql "github.com/rqlite/rqlite/v8/db"
)
// CommandProcessor processes commands by applying them to the underlying database.
type CommandProcessor struct {
logger *log.Logger
decMgmr *chunking.DechunkerManager
}
// NewCommandProcessor returns a new instance of CommandProcessor.
func NewCommandProcessor(logger *log.Logger, dm *chunking.DechunkerManager) *CommandProcessor {
return &CommandProcessor{
logger: logger,
decMgmr: dm}
}
// Process processes the given command against the given database.
func (c *CommandProcessor) Process(data []byte, pDB **sql.DB) (*command.Command, interface{}) {
db := *pDB
cmd := &command.Command{}
if err := command.Unmarshal(data, cmd); err != nil {
panic(fmt.Sprintf("failed to unmarshal cluster command: %s", err.Error()))
}
switch cmd.Type {
case command.Command_COMMAND_TYPE_QUERY:
var qr command.QueryRequest
if err := command.UnmarshalSubCommand(cmd, &qr); err != nil {
panic(fmt.Sprintf("failed to unmarshal query subcommand: %s", err.Error()))
}
r, err := db.Query(qr.Request, qr.Timings)
return cmd, &fsmQueryResponse{rows: r, error: err}
case command.Command_COMMAND_TYPE_EXECUTE:
var er command.ExecuteRequest
if err := command.UnmarshalSubCommand(cmd, &er); err != nil {
panic(fmt.Sprintf("failed to unmarshal execute subcommand: %s", err.Error()))
}
r, err := db.Execute(er.Request, er.Timings)
return cmd, &fsmExecuteResponse{results: r, error: err}
case command.Command_COMMAND_TYPE_EXECUTE_QUERY:
var eqr command.ExecuteQueryRequest
if err := command.UnmarshalSubCommand(cmd, &eqr); err != nil {
panic(fmt.Sprintf("failed to unmarshal execute-query subcommand: %s", err.Error()))
}
r, err := db.Request(eqr.Request, eqr.Timings)
return cmd, &fsmExecuteQueryResponse{results: r, error: err}
case command.Command_COMMAND_TYPE_LOAD:
var lr command.LoadRequest
if err := command.UnmarshalLoadRequest(cmd.SubCommand, &lr); err != nil {
panic(fmt.Sprintf("failed to unmarshal load subcommand: %s", err.Error()))
}
// Swap the underlying database to the new one.
if err := db.Close(); err != nil {
return cmd, &fsmGenericResponse{error: fmt.Errorf("failed to close post-load database: %s", err)}
}
if err := sql.RemoveFiles(db.Path()); err != nil {
return cmd, &fsmGenericResponse{error: fmt.Errorf("failed to remove existing database files: %s", err)}
}
newDB, err := createOnDisk(lr.Data, db.Path(), db.FKEnabled(), db.WALEnabled())
if err != nil {
return cmd, &fsmGenericResponse{error: fmt.Errorf("failed to create on-disk database: %s", err)}
}
*pDB = newDB
return cmd, &fsmGenericResponse{}
case command.Command_COMMAND_TYPE_LOAD_CHUNK:
var lcr command.LoadChunkRequest
if err := command.UnmarshalLoadChunkRequest(cmd.SubCommand, &lcr); err != nil {
panic(fmt.Sprintf("failed to unmarshal load-chunk subcommand: %s", err.Error()))
}
dec, err := c.decMgmr.Get(lcr.StreamId)
if err != nil {
return cmd, &fsmGenericResponse{error: fmt.Errorf("failed to get dechunker: %s", err)}
}
if lcr.Abort {
path, err := dec.Close()
if err != nil {
return cmd, &fsmGenericResponse{error: fmt.Errorf("failed to close dechunker: %s", err)}
}
c.decMgmr.Delete(lcr.StreamId)
defer os.Remove(path)
} else {
last, err := dec.WriteChunk(&lcr)
if err != nil {
return cmd, &fsmGenericResponse{error: fmt.Errorf("failed to write chunk: %s", err)}
}
if last {
path, err := dec.Close()
if err != nil {
return cmd, &fsmGenericResponse{error: fmt.Errorf("failed to close dechunker: %s", err)}
}
c.decMgmr.Delete(lcr.StreamId)
defer os.Remove(path)
// Check if reassembled dayabase is valid. If not, do not perform the load. This could
// happen a snapshot truncated earlier parts of the log which contained the earlier parts
// of a database load. If that happened then the database has already been loaded, and
// this load should be ignored.
if !sql.IsValidSQLiteFile(path) {
c.logger.Printf("invalid chunked database file - ignoring")
return cmd, &fsmGenericResponse{error: fmt.Errorf("invalid chunked database file - ignoring")}
}
// Close the underlying database before we overwrite it.
if err := db.Close(); err != nil {
return cmd, &fsmGenericResponse{error: fmt.Errorf("failed to close post-load database: %s", err)}
}
if err := sql.RemoveFiles(db.Path()); err != nil {
return cmd, &fsmGenericResponse{error: fmt.Errorf("failed to remove existing database files: %s", err)}
}
if err := os.Rename(path, db.Path()); err != nil {
return cmd, &fsmGenericResponse{error: fmt.Errorf("failed to rename temporary database file: %s", err)}
}
newDB, err := sql.Open(db.Path(), db.FKEnabled(), db.WALEnabled())
if err != nil {
return cmd, &fsmGenericResponse{error: fmt.Errorf("failed to open new on-disk database: %s", err)}
}
// Swap the underlying database to the new one.
*pDB = newDB
}
}
return cmd, &fsmGenericResponse{}
case command.Command_COMMAND_TYPE_NOOP:
return cmd, &fsmGenericResponse{}
default:
return cmd, &fsmGenericResponse{error: fmt.Errorf("unhandled command: %v", cmd.Type)}
}
}

@ -232,6 +232,7 @@ type Store struct {
appliedIdxUpdateDone chan struct{} appliedIdxUpdateDone chan struct{}
dechunkManager *chunking.DechunkerManager dechunkManager *chunking.DechunkerManager
cmdProc *CommandProcessor
// Channels that must be closed for the Store to be considered ready. // Channels that must be closed for the Store to be considered ready.
readyChans []<-chan struct{} readyChans []<-chan struct{}
@ -401,6 +402,7 @@ func (s *Store) Open() (retErr error) {
return err return err
} }
s.dechunkManager = decMgmr s.dechunkManager = decMgmr
s.cmdProc = NewCommandProcessor(s.logger, s.dechunkManager)
// Create the database directory, if it doesn't already exist. // Create the database directory, if it doesn't already exist.
parentDBDir := filepath.Dir(s.dbPath) parentDBDir := filepath.Dir(s.dbPath)
@ -1670,7 +1672,7 @@ func (s *Store) fsmApply(l *raft.Log) (e interface{}) {
s.logger.Printf("first log applied since node start, log at index %d", l.Index) s.logger.Printf("first log applied since node start, log at index %d", l.Index)
} }
cmd, r := applyCommand(s.logger, l.Data, &s.db, s.dechunkManager) cmd, r := s.cmdProc.Process(l.Data, &s.db)
if cmd.Type == command.Command_COMMAND_TYPE_NOOP { if cmd.Type == command.Command_COMMAND_TYPE_NOOP {
s.numNoops++ s.numNoops++
} }
@ -2131,6 +2133,7 @@ func RecoverNode(dataDir string, logger *log.Logger, logs raft.LogStore, stable
if err != nil { if err != nil {
return fmt.Errorf("failed to create dechunker manager: %s", err.Error()) return fmt.Errorf("failed to create dechunker manager: %s", err.Error())
} }
cmdProc := NewCommandProcessor(logger, decMgmr)
// The snapshot information is the best known end point for the data // The snapshot information is the best known end point for the data
// until we play back the Raft log entries. // until we play back the Raft log entries.
@ -2150,7 +2153,7 @@ func RecoverNode(dataDir string, logger *log.Logger, logs raft.LogStore, stable
return fmt.Errorf("failed to get log at index %d: %v", index, err) return fmt.Errorf("failed to get log at index %d: %v", index, err)
} }
if entry.Type == raft.LogCommand { if entry.Type == raft.LogCommand {
applyCommand(logger, entry.Data, &db, decMgmr) cmdProc.Process(entry.Data, &db)
} }
lastIndex = entry.Index lastIndex = entry.Index
lastTerm = entry.Term lastTerm = entry.Term
@ -2195,124 +2198,6 @@ func RecoverNode(dataDir string, logger *log.Logger, logs raft.LogStore, stable
return nil return nil
} }
func applyCommand(logger *log.Logger, data []byte, pDB **sql.DB, decMgmr *chunking.DechunkerManager) (*command.Command, interface{}) {
c := &command.Command{}
db := *pDB
if err := command.Unmarshal(data, c); err != nil {
panic(fmt.Sprintf("failed to unmarshal cluster command: %s", err.Error()))
}
switch c.Type {
case command.Command_COMMAND_TYPE_QUERY:
var qr command.QueryRequest
if err := command.UnmarshalSubCommand(c, &qr); err != nil {
panic(fmt.Sprintf("failed to unmarshal query subcommand: %s", err.Error()))
}
r, err := db.Query(qr.Request, qr.Timings)
return c, &fsmQueryResponse{rows: r, error: err}
case command.Command_COMMAND_TYPE_EXECUTE:
var er command.ExecuteRequest
if err := command.UnmarshalSubCommand(c, &er); err != nil {
panic(fmt.Sprintf("failed to unmarshal execute subcommand: %s", err.Error()))
}
r, err := db.Execute(er.Request, er.Timings)
return c, &fsmExecuteResponse{results: r, error: err}
case command.Command_COMMAND_TYPE_EXECUTE_QUERY:
var eqr command.ExecuteQueryRequest
if err := command.UnmarshalSubCommand(c, &eqr); err != nil {
panic(fmt.Sprintf("failed to unmarshal execute-query subcommand: %s", err.Error()))
}
r, err := db.Request(eqr.Request, eqr.Timings)
return c, &fsmExecuteQueryResponse{results: r, error: err}
case command.Command_COMMAND_TYPE_LOAD:
var lr command.LoadRequest
if err := command.UnmarshalLoadRequest(c.SubCommand, &lr); err != nil {
panic(fmt.Sprintf("failed to unmarshal load subcommand: %s", err.Error()))
}
// Swap the underlying database to the new one.
if err := db.Close(); err != nil {
return c, &fsmGenericResponse{error: fmt.Errorf("failed to close post-load database: %s", err)}
}
if err := sql.RemoveFiles(db.Path()); err != nil {
return c, &fsmGenericResponse{error: fmt.Errorf("failed to remove existing database files: %s", err)}
}
newDB, err := createOnDisk(lr.Data, db.Path(), db.FKEnabled(), db.WALEnabled())
if err != nil {
return c, &fsmGenericResponse{error: fmt.Errorf("failed to create on-disk database: %s", err)}
}
*pDB = newDB
return c, &fsmGenericResponse{}
case command.Command_COMMAND_TYPE_LOAD_CHUNK:
var lcr command.LoadChunkRequest
if err := command.UnmarshalLoadChunkRequest(c.SubCommand, &lcr); err != nil {
panic(fmt.Sprintf("failed to unmarshal load-chunk subcommand: %s", err.Error()))
}
dec, err := decMgmr.Get(lcr.StreamId)
if err != nil {
return c, &fsmGenericResponse{error: fmt.Errorf("failed to get dechunker: %s", err)}
}
if lcr.Abort {
path, err := dec.Close()
if err != nil {
return c, &fsmGenericResponse{error: fmt.Errorf("failed to close dechunker: %s", err)}
}
decMgmr.Delete(lcr.StreamId)
defer os.Remove(path)
} else {
last, err := dec.WriteChunk(&lcr)
if err != nil {
return c, &fsmGenericResponse{error: fmt.Errorf("failed to write chunk: %s", err)}
}
if last {
path, err := dec.Close()
if err != nil {
return c, &fsmGenericResponse{error: fmt.Errorf("failed to close dechunker: %s", err)}
}
decMgmr.Delete(lcr.StreamId)
defer os.Remove(path)
// Check if reassembled dayabase is valid. If not, do not perform the load. This could
// happen a snapshot truncated earlier parts of the log which contained the earlier parts
// of a database load. If that happened then the database has already been loaded, and
// this load should be ignored.
if !sql.IsValidSQLiteFile(path) {
logger.Printf("invalid chunked database file - ignoring")
return c, &fsmGenericResponse{error: fmt.Errorf("invalid chunked database file - ignoring")}
}
// Close the underlying database before we overwrite it.
if err := db.Close(); err != nil {
return c, &fsmGenericResponse{error: fmt.Errorf("failed to close post-load database: %s", err)}
}
if err := sql.RemoveFiles(db.Path()); err != nil {
return c, &fsmGenericResponse{error: fmt.Errorf("failed to remove existing database files: %s", err)}
}
if err := os.Rename(path, db.Path()); err != nil {
return c, &fsmGenericResponse{error: fmt.Errorf("failed to rename temporary database file: %s", err)}
}
newDB, err := sql.Open(db.Path(), db.FKEnabled(), db.WALEnabled())
if err != nil {
return c, &fsmGenericResponse{error: fmt.Errorf("failed to open new on-disk database: %s", err)}
}
// Swap the underlying database to the new one.
*pDB = newDB
}
}
return c, &fsmGenericResponse{}
case command.Command_COMMAND_TYPE_NOOP:
return c, &fsmGenericResponse{}
default:
return c, &fsmGenericResponse{error: fmt.Errorf("unhandled command: %v", c.Type)}
}
}
// checkRaftConfiguration tests a cluster membership configuration for common // checkRaftConfiguration tests a cluster membership configuration for common
// errors. // errors.
func checkRaftConfiguration(configuration raft.Configuration) error { func checkRaftConfiguration(configuration raft.Configuration) error {

Loading…
Cancel
Save