1
0
Fork 0

Merge pull request #1298 from rqlite/fsm-dual

Move FSMSnapshot to own source file
master
Philip O'Toole 1 year ago committed by GitHub
commit d7c47d8080
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -1,3 +1,7 @@
## 7.20.3 (unreleased))
### Implementation changes and bug fixes
- [PR #1298](https://github.com/rqlite/rqlite/pull/1298): Move FSMSnapshot to own source file.
## 7.20.2 (June 9th 2023)
### Implementation changes and bug fixes
- [PR #1296](https://github.com/rqlite/rqlite/pull/1296): Use correct connection when checking a SQL statement for "read-only" status, otherwise "database locked" could result. Also refactors much of the DB-level unit tests.

@ -0,0 +1,125 @@
package store
import (
"bytes"
"compress/gzip"
"expvar"
"log"
"math"
"time"
"github.com/hashicorp/raft"
sql "github.com/rqlite/rqlite/db"
)
// FSMSnapshot is a snapshot of the SQLite database.
type FSMSnapshot struct {
startT time.Time
logger *log.Logger
database []byte
}
// NewFSMSnapshot creates a new FSMSnapshot.
func NewFSMSnapshot(db *sql.DB, logger *log.Logger) *FSMSnapshot {
fsm := &FSMSnapshot{
startT: time.Now(),
logger: logger,
}
// The error code is not meaningful from Serialize(). The code needs to be able
// to handle a nil byte slice being returned.
fsm.database, _ = db.Serialize()
return fsm
}
// Persist writes the snapshot to the given sink.
func (f *FSMSnapshot) Persist(sink raft.SnapshotSink) error {
defer func() {
dur := time.Since(f.startT)
stats.Get(snapshotPersistDuration).(*expvar.Int).Set(dur.Milliseconds())
f.logger.Printf("snapshot and persist took %s", dur)
}()
err := func() error {
b := new(bytes.Buffer)
// Flag compressed database by writing max uint64 value first.
// No SQLite database written by earlier versions will have this
// as a size. *Surely*.
err := writeUint64(b, math.MaxUint64)
if err != nil {
return err
}
if _, err := sink.Write(b.Bytes()); err != nil {
return err
}
b.Reset() // Clear state of buffer for future use.
// Get compressed copy of database.
cdb, err := f.compressedDatabase()
if err != nil {
return err
}
if cdb != nil {
// Write size of compressed database.
err = writeUint64(b, uint64(len(cdb)))
if err != nil {
return err
}
if _, err := sink.Write(b.Bytes()); err != nil {
return err
}
// Write compressed database to sink.
if _, err := sink.Write(cdb); err != nil {
return err
}
stats.Get(snapshotDBOnDiskSize).(*expvar.Int).Set(int64(len(cdb)))
} else {
f.logger.Println("no database data available for snapshot")
err = writeUint64(b, uint64(0))
if err != nil {
return err
}
if _, err := sink.Write(b.Bytes()); err != nil {
return err
}
stats.Get(snapshotDBOnDiskSize).(*expvar.Int).Set(0)
}
// Close the sink.
return sink.Close()
}()
if err != nil {
sink.Cancel()
return err
}
return nil
}
func (f *FSMSnapshot) compressedDatabase() ([]byte, error) {
if f.database == nil {
return nil, nil
}
var buf bytes.Buffer
gz, err := gzip.NewWriterLevel(&buf, gzip.BestCompression)
if err != nil {
return nil, err
}
if _, err := gz.Write(f.database); err != nil {
return nil, err
}
if err := gz.Close(); err != nil {
return nil, err
}
return buf.Bytes(), nil
}
// Release is a no-op.
func (f *FSMSnapshot) Release() {}

@ -1528,7 +1528,7 @@ func (s *Store) Snapshot() (raft.FSMSnapshot, error) {
s.queryTxMu.Lock()
defer s.queryTxMu.Unlock()
fsm := newFSMSnapshot(s.db, s.logger)
fsm := NewFSMSnapshot(s.db, s.logger)
dur := time.Since(fsm.startT)
stats.Add(numSnaphots, 1)
stats.Get(snapshotCreateDuration).(*expvar.Int).Set(dur.Milliseconds())
@ -1741,116 +1741,6 @@ func (s *Store) tryCompress(rq command.Requester) ([]byte, bool, error) {
return b, compressed, nil
}
type fsmSnapshot struct {
startT time.Time
logger *log.Logger
database []byte
}
func newFSMSnapshot(db *sql.DB, logger *log.Logger) *fsmSnapshot {
fsm := &fsmSnapshot{
startT: time.Now(),
logger: logger,
}
// The error code is not meaningful from Serialize(). The code needs to be able
// to handle a nil byte slice being returned.
fsm.database, _ = db.Serialize()
return fsm
}
// Persist writes the snapshot to the given sink.
func (f *fsmSnapshot) Persist(sink raft.SnapshotSink) error {
defer func() {
dur := time.Since(f.startT)
stats.Get(snapshotPersistDuration).(*expvar.Int).Set(dur.Milliseconds())
f.logger.Printf("snapshot and persist took %s", dur)
}()
err := func() error {
b := new(bytes.Buffer)
// Flag compressed database by writing max uint64 value first.
// No SQLite database written by earlier versions will have this
// as a size. *Surely*.
err := writeUint64(b, math.MaxUint64)
if err != nil {
return err
}
if _, err := sink.Write(b.Bytes()); err != nil {
return err
}
b.Reset() // Clear state of buffer for future use.
// Get compressed copy of database.
cdb, err := f.compressedDatabase()
if err != nil {
return err
}
if cdb != nil {
// Write size of compressed database.
err = writeUint64(b, uint64(len(cdb)))
if err != nil {
return err
}
if _, err := sink.Write(b.Bytes()); err != nil {
return err
}
// Write compressed database to sink.
if _, err := sink.Write(cdb); err != nil {
return err
}
stats.Get(snapshotDBOnDiskSize).(*expvar.Int).Set(int64(len(cdb)))
} else {
f.logger.Println("no database data available for snapshot")
err = writeUint64(b, uint64(0))
if err != nil {
return err
}
if _, err := sink.Write(b.Bytes()); err != nil {
return err
}
stats.Get(snapshotDBOnDiskSize).(*expvar.Int).Set(0)
}
// Close the sink.
return sink.Close()
}()
if err != nil {
sink.Cancel()
return err
}
return nil
}
func (f *fsmSnapshot) compressedDatabase() ([]byte, error) {
if f.database == nil {
return nil, nil
}
var buf bytes.Buffer
gz, err := gzip.NewWriterLevel(&buf, gzip.BestCompression)
if err != nil {
return nil, err
}
if _, err := gz.Write(f.database); err != nil {
return nil, err
}
if err := gz.Close(); err != nil {
return nil, err
}
return buf.Bytes(), nil
}
// Release is a no-op.
func (f *fsmSnapshot) Release() {}
// RecoverNode is used to manually force a new configuration, in the event that
// quorum cannot be restored. This borrows heavily from RecoverCluster functionality
// of the Hashicorp Raft library, but has been customized for rqlite use.
@ -1941,7 +1831,7 @@ func RecoverNode(dataDir string, logger *log.Logger, logs raft.LogStore, stable
// Create a new snapshot, placing the configuration in as if it was
// committed at index 1.
snapshot := newFSMSnapshot(db, logger)
snapshot := NewFSMSnapshot(db, logger)
sink, err := snaps.Create(1, lastIndex, lastTerm, conf, 1, tn)
if err != nil {
return fmt.Errorf("failed to create snapshot: %v", err)

Loading…
Cancel
Save