1
0
Fork 0

Merge pull request #834 from rqlite/sync-db-conn

Sync use of DB connection
master
Philip O'Toole 3 years ago committed by GitHub
commit f0573b72c1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -9,6 +9,7 @@ import (
"io"
"os"
"strings"
"sync"
"time"
"github.com/rqlite/go-sqlite3"
@ -52,6 +53,7 @@ type DB struct {
path string // Path to database file.
dsn string // DSN, if any.
memory bool // In-memory only.
mu sync.Mutex // Serialize use of DB driver connection
}
// Result represents the outcome of an operation that changes rows.
@ -145,6 +147,8 @@ func DeserializeInMemoryWithDSN(b []byte, dsn string) (*DB, error) {
// Close closes the underlying database connection.
func (db *DB) Close() error {
db.mu.Lock()
defer db.mu.Unlock()
return db.sqlite3conn.Close()
}
@ -163,6 +167,9 @@ func open(dbPath string) (*DB, error) {
// EnableFKConstraints allows control of foreign key constraint checks.
func (db *DB) EnableFKConstraints(e bool) error {
db.mu.Lock()
defer db.mu.Unlock()
q := fkChecksEnabled
if !e {
q = fkChecksDisabled
@ -173,6 +180,9 @@ func (db *DB) EnableFKConstraints(e bool) error {
// FKConstraints returns whether FK constraints are set or not.
func (db *DB) FKConstraints() (bool, error) {
db.mu.Lock()
defer db.mu.Unlock()
r, err := db.sqlite3conn.Query(fkChecks, nil)
if err != nil {
return false, err
@ -246,6 +256,9 @@ func (db *DB) ExecuteStringStmt(query string) ([]*Result, error) {
// Execute executes queries that modify the database.
func (db *DB) Execute(req *command.Request, xTime bool) ([]*Result, error) {
db.mu.Lock()
defer db.mu.Unlock()
stats.Add(numExecutions, int64(len(req.Statements)))
tx := req.Transaction
@ -372,6 +385,9 @@ func (db *DB) QueryStringStmt(query string) ([]*Rows, error) {
// Query executes queries that return rows, but don't modify the database.
func (db *DB) Query(req *command.Request, xTime bool) ([]*Rows, error) {
db.mu.Lock()
defer db.mu.Unlock()
stats.Add(numQueries, int64(len(req.Statements)))
tx := req.Transaction
@ -464,13 +480,16 @@ func (db *DB) Query(req *command.Request, xTime bool) ([]*Rows, error) {
// Backup writes a consistent snapshot of the database to the given file.
// This function can be called when changes to the database are in flight.
func (db *DB) Backup(path string) error {
db.mu.Lock()
defer db.mu.Unlock()
dstDB, err := Open(path)
if err != nil {
return err
}
defer func(db *DB, err *error) {
cerr := db.Close()
cerr := db.sqlite3conn.Close()
if *err == nil {
*err = cerr
}
@ -487,6 +506,9 @@ func (db *DB) Backup(path string) error {
// on-disk database. This function can be called when changes to the source
// database are in flight.
func (db *DB) Copy(dstDB *DB) error {
db.mu.Lock()
defer db.mu.Unlock()
if err := copyDatabase(dstDB.sqlite3conn, db.sqlite3conn); err != nil {
return fmt.Errorf("copy database: %s", err)
}
@ -498,10 +520,10 @@ func (db *DB) Copy(dstDB *DB) error {
// disk file. For an in-memory database or a "TEMP" database, the serialization
// is the same sequence of bytes which would be written to disk if that database
// were backed up to disk.
//
// It is up to the caller to ensure no changes or transactions are in progress
// when this function is called.
func (db *DB) Serialize() ([]byte, error) {
db.mu.Lock()
defer db.mu.Unlock()
b := db.sqlite3conn.Serialize("")
if b == nil {
return nil, fmt.Errorf("failed to serialize database")
@ -522,13 +544,17 @@ func (db *DB) Dump(w io.Writer) error {
return err
}
defer func(db *DB, err *error) {
cerr := db.Close()
cerr := db.sqlite3conn.Close()
if *err == nil {
*err = cerr
}
}(dstDB, &err)
if err := copyDatabase(dstDB.sqlite3conn, db.sqlite3conn); err != nil {
if err := func() error {
db.mu.Lock()
defer db.mu.Unlock()
return copyDatabase(dstDB.sqlite3conn, db.sqlite3conn)
}(); err != nil {
return err
}
@ -587,7 +613,7 @@ func (db *DB) Dump(w io.Writer) error {
// Do indexes, triggers, and views.
query = `SELECT "name", "type", "sql" FROM "sqlite_master"
WHERE "sql" NOT NULL AND "type" IN ('index', 'trigger', 'view')`
rows, err = db.QueryStringStmt(query)
rows, err = dstDB.QueryStringStmt(query)
if err != nil {
return err
}

@ -18,7 +18,6 @@ import (
"path/filepath"
"sort"
"strconv"
"sync"
"time"
"unsafe"
@ -137,9 +136,6 @@ type Store struct {
numNoops int // For whitebox testing
txMu sync.RWMutex // Sync between snapshots and query-level transactions.
queryMu sync.RWMutex // Sync queries generally with other operations.
logger *log.Logger
ShutdownOnRemove bool
@ -628,9 +624,6 @@ func (s *Store) Backup(leader bool, fmt BackupFormat, dst io.Writer) error {
// Query executes queries that return rows, and do not modify the database.
func (s *Store) Query(qr *command.QueryRequest) ([]*sql.Rows, error) {
s.queryMu.RLock()
defer s.queryMu.RUnlock()
if qr.Level == command.QueryRequest_QUERY_REQUEST_LEVEL_STRONG {
b, compressed, err := s.reqMarshaller.Marshal(qr)
if err != nil {
@ -674,12 +667,6 @@ func (s *Store) Query(qr *command.QueryRequest) ([]*sql.Rows, error) {
return nil, ErrStaleRead
}
// Read straight from database. If a transaction is requested, we must block
// certain other database operations.
if qr.Request.Transaction {
s.txMu.Lock()
defer s.txMu.Unlock()
}
return s.db.Query(qr.Request, qr.Timings)
}
@ -934,12 +921,6 @@ func (s *Store) Apply(l *raft.Log) (e interface{}) {
if err := command.UnmarshalSubCommand(&c, &qr); err != nil {
panic(fmt.Sprintf("failed to unmarshal query subcommand: %s", err.Error()))
}
// Read from database. If a transaction is requested, we must block
// certain other database operations.
if qr.Request.Transaction {
s.txMu.Lock()
defer s.txMu.Unlock()
}
r, err := s.db.Query(qr.Request, qr.Timings)
return &fsmQueryResponse{rows: r, error: err}
case command.Command_COMMAND_TYPE_EXECUTE:
@ -987,9 +968,6 @@ func (s *Store) Snapshot() (raft.FSMSnapshot, error) {
logger: s.logger,
}
s.txMu.Lock()
defer s.txMu.Unlock()
fsm.database, _ = s.db.Serialize()
// The error code is not meaningful from Serialize(). The code needs to be able
// handle a nil byte slice being returned.
@ -1006,9 +984,6 @@ func (s *Store) Snapshot() (raft.FSMSnapshot, error) {
func (s *Store) Restore(rc io.ReadCloser) error {
startT := time.Now()
s.queryMu.Lock()
defer s.queryMu.Unlock()
var uint64Size uint64
inc := int64(unsafe.Sizeof(uint64Size))

Loading…
Cancel
Save